November 9, 2024
Summary: pg_parquet
is a PostgreSQL extension that allows you to read and write Parquet files.
Table of Contents
Introduction
pg_parquet
is a PostgreSQL extension that allows you to read and write Parquet files, which are located in S3
or file system, from PostgreSQL via COPY TO/FROM
commands. It depends on Apache Arrow project to read and write Parquet files and pgrx project to extend PostgreSQL’s COPY
command.
-- Copy a query result into Parquet in S3
COPY (SELECT * FROM table) TO 's3://mybucket/data.parquet' WITH (format 'parquet');
-- Load data from Parquet in S3
COPY table FROM 's3://mybucket/data.parquet' WITH (format 'parquet');
Installation from source
After installing PostgreSQL (in 14 or newer version), you need to set up rustup
, cargo-pgrx
to build the extension.
# install rustup
> curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# install cargo-pgrx
> cargo install cargo-pgrx
# configure pgrx
> cargo pgrx init --pg17 $(which pg_config)
# append the extension to shared_preload_libraries in ~/.pgrx/data-17/postgresql.conf
> echo "shared_preload_libraries = 'pg_parquet'" >> ~/.pgrx/data-17/postgresql.conf
# run cargo-pgrx to build and install the extension
> cargo pgrx run
# create the extension in the database
psql> "CREATE EXTENSION pg_parquet;"
Usage
There are mainly 3 things that you can do with pg_parquet
:
- You can export Postgres tables/queries to Parquet files,
- You can ingest data from Parquet files to Postgres tables,
- You can inspect the schema and metadata of Parquet files.
Copy to/from Parquet files from/to tables
You can use PostgreSQL’s COPY
command to read and write Parquet files. Below is an example of how to write a PostgreSQL table, with complex types, into a Parquet file and then to read the Parquet file content back into the same table.
-- create composite types
CREATE TYPE product_item AS (id INT, name TEXT, price float4);
CREATE TYPE product AS (id INT, name TEXT, items product_item[]);
-- create a table with complex types
CREATE TABLE product_example (
id int,
product product,
products product[],
created_at TIMESTAMP,
updated_at TIMESTAMPTZ
);
-- insert some rows into the table
insert into product_example values (
1,
ROW(1, 'product 1', ARRAY[ROW(1, 'item 1', 1.0), ROW(2, 'item 2', 2.0), NULL]::product_item[])::product,
ARRAY[ROW(1, NULL, NULL)::product, NULL],
now(),
'2022-05-01 12:00:00-04'
);
-- copy the table to a parquet file
COPY product_example TO '/tmp/product_example.parquet' (format 'parquet', compression 'gzip');
-- show table
SELECT * FROM product_example;
-- copy the parquet file to the table
COPY product_example FROM '/tmp/product_example.parquet';
-- show table
SELECT * FROM product_example;
Inspect Parquet schema
You can call SELECT * FROM parquet.schema(<uri>)
to discover the schema of the Parquet file at given uri.
SELECT * FROM parquet.schema('/tmp/product_example.parquet') LIMIT 10;
uri | name | type_name | type_length | repetition_type | num_children | converted_type | scale | precision | field_id | logical_type
------------------------------+--------------+------------+-------------+-----------------+--------------+----------------+-------+-----------+----------+--------------
/tmp/product_example.parquet | arrow_schema | | | | 5 | | | | |
/tmp/product_example.parquet | id | INT32 | | OPTIONAL | | | | | 0 |
/tmp/product_example.parquet | product | | | OPTIONAL | 3 | | | | 1 |
/tmp/product_example.parquet | id | INT32 | | OPTIONAL | | | | | 2 |
/tmp/product_example.parquet | name | BYTE_ARRAY | | OPTIONAL | | UTF8 | | | 3 | STRING
/tmp/product_example.parquet | items | | | OPTIONAL | 1 | LIST | | | 4 | LIST
/tmp/product_example.parquet | list | | | REPEATED | 1 | | | | |
/tmp/product_example.parquet | items | | | OPTIONAL | 3 | | | | 5 |
/tmp/product_example.parquet | id | INT32 | | OPTIONAL | | | | | 6 |
/tmp/product_example.parquet | name | BYTE_ARRAY | | OPTIONAL | | UTF8 | | | 7 | STRING
(10 rows)
Inspect Parquet metadata
You can call SELECT * FROM parquet.metadata(<uri>)
to discover the detailed metadata of the Parquet file, such as column statistics, at given uri.
SELECT uri, row_group_id, row_group_num_rows, row_group_num_columns, row_group_bytes, column_id, file_offset, num_values, path_in_schema, type_name FROM parquet.metadata('/tmp/product_example.parquet') LIMIT 1;
uri | row_group_id | row_group_num_rows | row_group_num_columns | row_group_bytes | column_id | file_offset | num_values | path_in_schema | type_name
------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-----------
/tmp/product_example.parquet | 0 | 1 | 13 | 842 | 0 | 0 | 1 | id | INT32
(1 row)
SELECT stats_null_count, stats_distinct_count, stats_min, stats_max, compression, encodings, index_page_offset, dictionary_page_offset, data_page_offset, total_compressed_size, total_uncompressed_size FROM parquet.metadata('/tmp/product_example.parquet') LIMIT 1;
stats_null_count | stats_distinct_count | stats_min | stats_max | compression | encodings | index_page_offset | dictionary_page_offset | data_page_offset | total_compressed_size | total_uncompressed_size
------------------+----------------------+-----------+-----------+--------------------+--------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------
0 | | 1 | 1 | GZIP(GzipLevel(6)) | PLAIN,RLE,RLE_DICTIONARY | | 4 | 42 | 101 | 61
(1 row)
You can call SELECT * FROM parquet.file_metadata(<uri>)
to discover file level metadata of the Parquet file, such as format version, at given uri.
SELECT * FROM parquet.file_metadata('/tmp/product_example.parquet')
uri | created_by | num_rows | num_row_groups | format_version
------------------------------+------------+----------+----------------+----------------
/tmp/product_example.parquet | pg_parquet | 1 | 1 | 1
(1 row)
You can call SELECT * FROM parquet.kv_metadata(<uri>)
to query custom key-value metadata of the Parquet file at given uri.
SELECT uri, encode(key, 'escape') as key, encode(value, 'escape') as value FROM parquet.kv_metadata('/tmp/product_example.parquet');
uri | key | value
------------------------------+--------------+---------------------
/tmp/product_example.parquet | ARROW:schema | /////5gIAAAQAAAA ...
(1 row)
Object store support
pg_parquet
supports reading and writing Parquet files from/to S3
object store. Only the uris with s3://
scheme is supported.
The simplest way to configure object storage is by creating the standard ~/.aws/credentials
and ~/.aws/config
files:
$ cat ~/.aws/credentials
[default]
aws_access_key_id = AKIAIOSFODNN7EXAMPLE
aws_secret_access_key = wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
$ cat ~/.aws/config
[default]
region = eu-central-1
Alternatively, you can use the following environment variables when starting postgres to configure the S3 client:
AWS_ACCESS_KEY_ID
: the access key ID of the AWS accountAWS_SECRET_ACCESS_KEY
: the secret access key of the AWS accountAWS_REGION
: the default region of the AWS accountAWS_SHARED_CREDENTIALS_FILE
: an alternative location for the credentials fileAWS_CONFIG_FILE
: an alternative location for the config fileAWS_PROFILE
: the name of the profile from the credentials and config file (default profile name isdefault
)
NOTE: To be able to write into a object store location, you need to grant
parquet_object_store_write
role to your current postgres user. Similarly, to read from an object store location, you need to grantparquet_object_store_read
role to your current postgres user.
Copy options
pg_parquet
supports the following options in the COPY TO
command:
format parquet
: you need to specify this option to read or write Parquet files which does not end with.parquet[.<compression>]
extension. (This is the only option thatCOPY FROM
command supports.),row_group_size <int>
: the number of rows in each row group while writing Parquet files. The default row group size is122880
,row_group_size_bytes <int>
: the total byte size of rows in each row group while writing Parquet files. The default row group size bytes isrow_group_size * 1024
,compression <string>
: the compression format to use while writing Parquet files. The supported compression formats areuncompressed
,snappy
,gzip
,brotli
,lz4
,lz4raw
andzstd
. The default compression format issnappy
. If not specified, the compression format is determined by the file extension.compression_level <int>
: the compression level to use while writing Parquet files. The supported compression levels are only supported forgzip
,zstd
andbrotli
compression formats. The default compression level is6
forgzip (0-10)
,1
forzstd (1-22)
and1
forbrotli (0-11)
.
Configuration
There is currently only one GUC parameter to enable/disable the pg_parquet
:
pg_parquet.enable_copy_hooks
: you can set this parameter toon
oroff
to enable or disable thepg_parquet
extension. The default value ison
.
Supported types
pg_parquet
has rich type support, including PostgreSQL’s primitive, array, and composite types. Below is the table of the supported types in PostgreSQL and their corresponding Parquet types.
PostgreSQL Type | Parquet Physical Type | Logical Type |
---|---|---|
bool |
BOOLEAN | |
smallint |
INT16 | |
integer |
INT32 | |
bigint |
INT64 | |
real |
FLOAT | |
oid |
INT32 | |
double |
DOUBLE | |
numeric [1] |
FIXED_LEN_BYTE_ARRAY(16) | DECIMAL(128) |
text |
BYTE_ARRAY | STRING |
json |
BYTE_ARRAY | STRING |
bytea |
BYTE_ARRAY | |
date [2] |
INT32 | DATE |
timestamp |
INT64 | TIMESTAMP_MICROS |
timestamptz [3] |
INT64 | TIMESTAMP_MICROS |
time |
INT64 | TIME_MICROS |
timetz [3] |
INT64 | TIME_MICROS |
geometry [4] |
BYTE_ARRAY | |
composite |
GROUP | STRUCT |
array |
element’s physical type | LIST |
WARNING:
(1) The numeric
types with <= 38
precision is represented as FIXED_LEN_BYTE_ARRAY(16)
with DECIMAL(128)
logical type. The numeric
types with > 38
precision is represented as BYTE_ARRAY
with STRING
logical type.
(2) The date
type is represented according to Unix epoch
when writing to Parquet files. It is converted back according to PostgreSQL epoch
when reading from Parquet files.
(3) The timestamptz
and timetz
types are adjusted to UTC
when writing to Parquet files. They are converted back with UTC
timezone when reading from Parquet files.
(4) The geometry
type is represented as BYTE_ARRAY
encoded as WKB
when postgis
extension is created. Otherwise, it is represented as BYTE_ARRAY
with STRING
logical type.
(5) Any type that does not have a corresponding Parquet type will be represented, as a fallback mechanism, as BYTE_ARRAY
with STRING
logical type. e.g. enum
.