parquet_s3_fdw: PostgreSQL 访问 Parquet S3 的外部数据包装器

五月 9, 2024

摘要parquet_s3_fdw扩展是一个外部数据包装器(FDW),用于访问本地文件系统和 Amazon S3 上的 Parquet 文件。

目录

此版本的 parquet_s3_fdw 适用于 PostgreSQL 13、14、15 和 16。

只读模式下的 Apache Parquet 外部数据包装器,支持 PostgreSQL 访问 S3 存储。

用法

加载扩展

CREATE EXTENSION parquet_s3_fdw;

创建服务器

CREATE SERVER parquet_s3_srv FOREIGN DATA WRAPPER parquet_s3_fdw;

如果要使用 MinIO 而不是 AWS S3,请使用 use_minio 选项创建服务器。

CREATE SERVER parquet_s3_srv FOREIGN DATA WRAPPER parquet_s3_fdw OPTIONS (use_minio 'true');

创建用户映射

如果要访问 Amazon S3,则必须指定用户名和密码。

CREATE USER MAPPING FOR public SERVER parquet_s3_srv OPTIONS (user 's3user', password 's3password');

创建外表

现在你应该能够从 Parquet 文件创建外表了。目前parquet_s3_fdw支持下面这些数据列类型(还将会增加):

Arrow 类型 SQL 类型
INT8 INT2
INT16 INT2
INT32 INT4
INT64 INT8
FLOAT FLOAT4
DOUBLE FLOAT8
TIMESTAMP TIMESTAMP
DATE32 DATE
STRING TEXT
BINARY BYTEA
LIST ARRAY
MAP JSONB

目前parquet_s3_fdw不支持结构体和嵌套列表。

支持以下选项:

  • filename - 要读取的 Parquet 文件的路径列表,以空格分隔。您可以用s3://开始来指定 AWS S3 上的路径。不支持混合使用本地路径和 S3 路径;
  • dirname - 具有要读取的 Parquet 文件的目录路径;
  • sorted - 用来预排序 Parquet 文件的,空格分隔的列列表;当使用ORDER BY子句运行查询,或在其他情况下带有预排序的列集会有用(Group Aggregate,Merge Join)时,这将有助于 postgres 避免冗余的排序;
  • files_in_order - 要求以filename指定或由files_func返回的文件,根据sorted选项进行排序,并且在范围上没有交叉;这允许在并行多文件扫描节点上使用Gather Merge节点(默认值为false);
  • use_mmap - 是否使用内存映射操作,而不是文件读取操作(默认值为false);
  • use_threads - 启用 Apache Arrow 的并行列解码/解压(默认值为false);
  • files_func - 用户定义的函数,由 parquet_s3_fdw 在每次查询时用于检索 parquet 文件列表;函数必须接受一个JSONB参数,并返回 parquet 文件完整路径的文本数组;
  • files_func_arg - 由 files_func 指定的函数的参数。
  • max_open_files - 同时打开的 Parquet 文件的数量限制。
  • region - 用于连接到的 AWS 区域的值(默认值为ap-northeast-1)。
  • endpoint - 用于连接的地址和端口(默认值为127.0.0.1:9000)。

可以为单个和一组 Parquet 文件创建外部表。也可以指定一个用户定义的函数,该函数会返回一个文件路径列表。根据文件数量和表选项,parquet_s3_fdw可以使用以下的一种执行策略:

策略 描述
Single File 基本的单文件读取器
Multifile 按顺序逐个处理 Parquet 文件的读取器
Multifile Merge 该读取器会合并预排序的 Parquet 文件,以便生成的结果也是有序的;在指定了sorted选项,并且查询计划需要排序(例如包含了ORDER BY子句)时会使用
Caching Multifile Merge Multifile Merge相同,但会限制同时打开的文件数;当指定的 Parquet 文件数超过max_open_files时会使用

GUC 变量:

  • parquet_fdw.use_threads - 允许用户启用或禁用线程的全局开关(默认值为true);
  • parquet_fdw.enable_multifile - 启用多文件读取器(默认值为true)。
  • parquet_fdw.enable_multifile_merge - 启用多文件合并读取器(默认值为true)。

示例:

CREATE FOREIGN TABLE userdata (
    id           int,
    first_name   text,
    last_name    text
)
SERVER parquet_s3_srv
OPTIONS (
    filename 's3://bucket/dir/userdata1.parquet'
);

访问外表

SELECT * FROM userdata;

并行查询

parquet_s3_fdw还支持并行查询执行(注意不要与 Apache Arrow 的多线程解码功能混淆)。

导入

parquet_s3_fdw还支持 IMPORT FOREIGN SCHEMA 命令,来发现文件系统上指定目录中的 parquet 文件,并根据这些文件创建外部表。它可以像这样使用:

IMPORT FOREIGN SCHEMA "/path/to/directory"
FROM SERVER parquet_s3_srv
INTO public;

重要的是,这里的remote_schema是一个本地文件系统的目录路径,并且用双引号引起来。

将 parquet 文件导入到外部表的另一种方法是,使用import_parquet_s3import_parquet_s3_explicit

CREATE FUNCTION import_parquet_s3(
    tablename   text,
    schemaname  text,
    servername  text,
    userfunc    regproc,
    args        jsonb,
    options     jsonb)

CREATE FUNCTION import_parquet_s3_explicit(
    tablename   text,
    schemaname  text,
    servername  text,
    attnames    text[],
    atttypes    regtype[],
    userfunc    regproc,
    args        jsonb,
    options     jsonb)

import_parquet_s3import_parquet_s3_explicit之间的唯一区别是,后者允许指定一组要导入的属性/列。attnamesatttypes分别是属性名称和属性类型的数组(参见下面的示例)。

userfunc是一个用户自定义函数。它必须接受一个jsonb参数,并返回一个要导入的 parquet 文件的文件系统路径的文本数组。args是用户指定的 jsonb 对象,以作为参数传递给userfunc。这种函数的简单实现和用法,可以如下面这样:

CREATE FUNCTION list_parquet_s3_files(args jsonb)
RETURNS text[] AS
$$
BEGIN
    RETURN array_agg(args->>'dir' || '/' || filename)
           FROM pg_ls_dir(args->>'dir') AS files(filename)
           WHERE filename ~~ '%.parquet';
END
$$
LANGUAGE plpgsql;

SELECT import_parquet_s3_explicit(
    'abc',
    'public',
    'parquet_srv',
    array['one', 'three', 'six'],
    array['int8', 'text', 'bool']::regtype[],
    'list_parquet_files',
    '{"dir": "/path/to/directory"}',
    '{"sorted": "one"}'
);

特性

  • 支持在本地文件系统或 Amazon S3 上对 parquet 文件进行 SELECT 操作。
  • 支持 INSERT、DELETE、UPDATE(外部修改)。
  • 支持 MinIO 访问,以替代 Amazon S3。
  • 允许控制外部服务器在事务完成后是否保持连接打开状态。这由 keep_connections 控制,默认为 on。
  • 支持 parquet_s3_fdw 的 parquet_s3_fdw_get_connections() 函数,列出打开的外部服务器连接。

无结构模式

  • 该功能将使用户能够使用无结构的能力:

    • 每个 parquet 文件没有特定的外部表结构(列定义)。
    • 无结构的外表只有一个 jsonb 列,用于根据以下规则表示 parquet 文件中的数据:
      • Jsonb 键:parquet 列名称。
      • Jsonb 值:parquet 列数据。
  • 使用无结构模式,会有几个好处:

    • parquet 文件数据结构的灵活性:通过将所有列数据合并到一个 jsonb 列中,无结构的外表可以查询任何 parquet 文件,文件中的所有列都能映射到 postgres 类型。
    • 没有预定义的外部表结构(列定义)。缺少结构意味着外部表会查询 parquet 文件中的所有列,包括用户还未使用的列。

无结构模式用法

  • 无结构模式由schemaless选项来启用:

    • schemaless选项是true:启用无结构模式。
    • schemaless选项是false:禁用无结构模式(我们称之为non-schemaless模式)。
    • 如果未配置schemaless选项,则默认值为 false。
    • CREATE FOREIGN TABLEIMPORT FOREIGN SCHEMAimport_parquet_s3()import_parquet_s3_explicit()中均支持schemaless选项。
  • 无结构外表需要至少一个 jsonb 列来表示数据:

    • 如果有 1 个以上的 jsonb 列,则仅填充一列,所有其他列都使用 NULL 值处理。
    • 如果没有 jsonb 列,则所有列都使用 NULL 值处理。
    • 示例:
    CREATE FOREIGN TABLE example_schemaless (
      id int,
      v jsonb
    ) OPTIONS (filename '/path/to/parquet_file', schemaless 'true');
    SELECT * FROM example_schemaless;
    id |                                                                v
    ----+---------------------------------------------------------------------------------------------------------------------------------
        | {"one": 1, "six": "t", "two": [1, 2, 3], "five": "2018-01-01", "four": "2018-01-01 00:00:00", "seven": 0.5, "three": "foo"}
        | {"one": 2, "six": "f", "two": [null, 5, 6], "five": "2018-01-02", "four": "2018-01-02 00:00:00", "seven": null, "three": "bar"}
    (2 rows)
    
  • 创建外部表:使用IMPORT FOREIGN SCHEMAimport_parquet_s3()import_parquet_s3_explicit(),外部表将以固定的列定义进行创建,如下所示:

    CREATE FOREIGN TABLE example (
      v jsonb
    ) OPTIONS (filename '/path/to/parquet_file', schemaless 'true');
    
  • 查询数据:

    -- non-schemaless mode
    SELECT * FROM example;
     one |    two     | three |        four         |    five    | six | seven
    -----+------------+-------+---------------------+------------+-----+-------
       1 | {1,2,3}    | foo   | 2018-01-01 00:00:00 | 2018-01-01 | t   |   0.5
       2 | {NULL,5,6} | bar   | 2018-01-02 00:00:00 | 2018-01-02 | f   |
    (2 rows)
    -- schemaless mode
    SELECT * FROM example_schemaless;
                                                                      v
    ---------------------------------------------------------------------------------------------------------------------------------
     {"one": 1, "six": "t", "two": [1, 2, 3], "five": "2018-01-01", "four": "2018-01-01 00:00:00", "seven": 0.5, "three": "foo"}
     {"one": 2, "six": "f", "two": [null, 5, 6], "five": "2018-01-02", "four": "2018-01-02 00:00:00", "seven": null, "three": "bar"}
    (2 rows)
    
  • 在 jsonb 表达式中获取值:

    • 使用 ->> jsonb 箭头操作符,返回文本类型。用户可以强制转换 jsonb 表达式的类型,以获得相应的数据表示。

    • 例如,获取col值的表达式v->>'col',将是 parquet 文件中的列名col,我们称之为schemaless variableslvar

      SELECT v->>'two', sqrt((v->>'one')::int) FROM example_schemaless;
        ?column?   |        sqrt
      --------------+--------------------
      [1, 2, 3]    |                  1
      [null, 5, 6] | 1.4142135623730951
      (2 rows)
      
  • 某些功能与non-schemaless模式不同

    • 行组过滤器支持:在无结构模式下,parquet_s3_fdw 可以通过一些如下的WHERE条件,支持对行组进行过滤:

      • slvar::type {operator} const。例如:(v->>'int64_col')::int8 = 100
      • const {operator} slvar ::type。例如:100 = (v->>'int64_col')::int8
      • slvar::boolean is true/false。例如:(v->>'bool_col')::boolean is false
      • !(slvar::boolean)。例如:!(v->>'bool_col')::boolean
      • Jsonb exist 运算符:((v->>'col')::jsonb) ? element(v->'col') ? elementv ? 'col'
      • 转换函数必须映射 parquet 列类型,否则会跳过过滤器。
    • 要使用 parquet 文件的预排序列,用户必须是:

      • sorted选项中定义列名,与non-schemaless mode相同

      • ORDER BY子句中使用slvar代替列名。

      • 如果排序的 parquet 列不是文本列,请将此列显式地强制转换到映射类型。

      • 例如:

        CREATE FOREIGN TABLE example_sorted (v jsonb)
        SERVER parquet_s3_srv
        OPTIONS (filename '/path/to/example1.parquet /path/to/example2.parquet', sorted 'int64_col', schemaless 'true');
        EXPLAIN (COSTS OFF) SELECT * FROM example_sorted ORDER BY (v->>'int64_col')::int8;
                  QUERY PLAN
        --------------------------------
        Foreign Scan on example_sorted
          Reader: Multifile Merge
          Row groups:
            example1.parquet: 1, 2
            example2.parquet: 1
        (5 rows)
        
    • 支持对嵌套列表和映射表使用箭头运算符:这些类型将被视为嵌套的 jsonb 值,可以通过->操作符访问。例如:

      SELECT * FROM example_schemaless;
                                        v
      ----------------------------------------------------------------------------
      {"array_col": [19, 20], "jsonb_col": {"1": "foo", "2": "bar", "3": "baz"}}
      {"array_col": [21, 22], "jsonb_col": {"4": "test1", "5": "test2"}}
      (2 rows)
      
      SELECT v->'array_col'->1, v->'jsonb_col'->'1' FROM example3;
      ?column? | ?column?
      ----------+----------
      20       | "foo"
      22       |
      (2 rows)
      
    • Postgres 计算(jsonb->>'col')::type的成本,比在non-schemaless模式下直接获取列要大得多,在一些复杂的查询中,schemaless模式的查询计划可能与non-schemaless模式不同。

  • 对于其他功能,schemaless模式与non-schemaless模式工作相同。

可写的 FDW

用户可以对已设置键列的外表,执行 insert、update 和 delete 语句。

键列

  • 在结构化模式下:可以通过使用 OPTIONS (key ’true’) 创建 parquet_s3_fdw 外表对象,来设置键列:
CREATE FOREIGN TABLE userdata (
    id1          int OPTIONS(key 'true'),
    id2          int OPTIONS(key 'true'),
    first_name   text,
    last_name    text
) SERVER parquet_s3_srv
OPTIONS (
    filename 's3://bucket/dir/userdata1.parquet'
);
  • 在无结构模式下,可以在创建 parquet_s3_fdw 外部表对象时,使用key_columns选项设置键列:
CREATE FOREIGN TABLE userdata (
    v JSONB
) SERVER parquet_s3_srv
OPTIONS (
    filename 's3://bucket/dir/userdata1.parquet',
    schemaless 'true',
    key_columns 'id1 id2'
);
  • key_columns选项可用于 IMPORT FOREIGN SCHEMA 功能:
-- in schemaless mode
IMPORT FOREIGN SCHEMA 's3://data/' FROM SERVER parquet_s3_srv INTO tmp_schema
OPTIONS (sorted 'c1', schemaless 'true', key_columns 'id1 id2');
-- corresponding CREATE FOREIGN TABLE
CREATE FOREIGN TABLE tbl1 (
      v jsonb
) SERVER parquet_s3_srv
OPTIONS (filename 's3://data/tbl1.parquet', sorted 'c1', schemaless 'true', key_columns 'id1 id2');

-- in non-schemaless mode
IMPORT FOREIGN SCHEMA 's3://data/' FROM SERVER parquet_s3_srv INTO tmp_schema
OPTIONS (sorted 'c1', schemaless 'true', key_columns 'id1 id2');
-- corresponding CREATE FOREIGN TABLE
CREATE FOREIGN TABLE tbl1 (
      id1 INT OPTIONS (key 'true'),
      id2 INT OPTIONS (key 'true'),
      c1  TEXT,
      c2  FLOAT
) SERVER parquet_s3_srv
OPTIONS (filename 's3://data/tbl1.parquet', sorted 'c1');

insert_file_selector 选项

parquet_s3_fdw 用来在 INSERT 查询中检索目标 parquet 文件的用户定义函数签名:

CREATE FUNCTION insert_file_selector_func(one INT8, dirname text)
RETURNS TEXT AS
$$
    SELECT (dirname || '/example7.parquet')::TEXT;
$$
LANGUAGE SQL;

CREATE FOREIGN TABLE example_func (one INT8 OPTIONS (key 'true'), two TEXT)
SERVER parquet_s3_srv
OPTIONS (
    insert_file_selector 'insert_file_selector_func(one, dirname)',
    dirname '/tmp/data_local/data/test',
    sorted 'one');
  • insert_file_selector 函数签名规格:
    • 语法:[function name]([arg name] , [arg name] ...)
    • 默认返回类型为TEXT(parquet 文件的完整路径)
    • [arg name]:必须是外部表的列名或dirname
    • args 值:
      • dirname arg:dirname 选项的值。
      • column args:按名称从插入槽位中获取。

排序列:

parquet_s3_fdw 支持在修改功能中保持排序列的排序状态。

Parquet 文件结构:

基本上,parquet 文件结构是根据一组列名和相应的类型定义的,但在 parquet_s3_fdw 的扫描中,它假定所有具有相同名称的列都具有相同的类型。因此,在修改功能中,也会使用该假设。

从 postgres 类型到 arrow 类型的映射:

  • 基础类型映射:

    SQL 类型 Arrow 类型
    BOOL BOOL
    INT2 INT16
    INT4 INT32
    INT8 INT64
    FLOAT4 FLOAT
    FLOAT8 DOUBLE
    TIMESTAMP/TIMESTAMPTZ TIMESTAMP
    DATE DATE32
    TEXT STRING
    BYTEA BINARY
  • arrow::TIMESTAMP 的默认时间精度为 UTC 时区的微秒级。

  • LIST 是由它的元素类型创建的,对于元素只支持基础类型。

  • MAP 由其 jsonb 元素的类型来创建的:

    jsonb 类型 Arrow 类型
    text STRING
    numeric FLOAT8
    boolean BOOL
    null STRING
    其他类型 STRING
  • 在无结构模式下:

    • 在结构化模式下,基础的 jsonb 类型的映射与 MAP 相同。

    • 对于无结构模式下的第一个嵌套的 jsonb:

      jsonb 类型 Arrow 类型
      array LIST
      object MAP
    • 在结构化模式下,LIST 和 MAP 的元素类型与 MAP 类型相同。

INSERT

-- non-schemaless mode
CREATE FOREIGN TABLE example_insert (
    c1 INT2 OPTIONS (key 'true'),
    c2 TEXT,
    c3 BOOLEAN
) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example_insert.parquet');

INSERT INTO example_insert VALUES (1, 'text1', true), (2, DEFAULT, false), ((select 3), (select i from (values('values are fun!')) as foo (i)), true);
INSERT 0 3

SELECT * FROM example_insert;
 c1 |       c2        | c3
----+-----------------+----
  1 | text1           | t
  2 |                 | f
  3 | values are fun! | t
(3 rows)

-- schemaless mode
CREATE FOREIGN TABLE example_insert_schemaless (
    v JSONB
) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example_insert.parquet', schemaless 'true', key_column 'c1');

INSERT INTO example_insert_schemaless VALUES ('{"c1": 1, "c2": "text1", "c3": true}'), ('{"c1": 2, "c2": null, "c3": false}'), ('{"c1": 3, "c2": "values are fun!", "c3": true}');

SELECT * FROM example_insert_schemaless;
                       v
-----------------------------------------------
 {"c1": 1, "c2": "text1", "c3": "t"}
 {"c1": 2, "c2": null, "c3": "f"}
 {"c1": 3, "c2": "values are fun!", "c3": "t"}
(3 rows)
  • 选择要插入的文件:
    • 如果存在选项insert_file_selector,目标文件就是该函数的结果。
      • 如果目标文件不存在,则创建与目标文件同名的新文件。
      • 如果目标文件存在,但其结构与插入记录的列不匹配,则会引发错误消息。
    • 如果选项insert_file_selector不存在:
      • 目标文件是第一个其结构与插入记录匹配(插入记录的所有列都存在于目标文件中)的文件。
      • 如果没有符合其结构的文件与插入记录的列匹配,并且已指定dirname选项。创建新文件,文件名格式为:[foreign_table_name]_[date_time].parquet
      • 否则,会引发错误消息。
  • 新文件的结构:
    • 在结构化模式下,新文件将所有列都存在于外部表中。
    • 在无结构模式下,新文件将在 jsonb 值中带上所有列。
    • 列信息:
      • 从现有文件列表中获取。
      • 如果在任何文件中都不存在列:根据预定义的映射类型创建基础文件。

UPDATE/DELETE

-- non-schemaless mode
CREATE FOREIGN TABLE example (
    c1 INT2 OPTIONS (key 'true'),
    c2 TEXT,
    c3 BOOLEAN
) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example.parquet');

SELECT * FROM example;
 c1 |       c2        | c3
----+-----------------+----
  1 | text1           | t
  2 |                 | f
  3 | values are fun! | t
(3 rows)

UPDATE example SET c3 = false WHERE c2 = 'text1';
UPDATE 1

SELECT * FROM example;
 c1 |       c2        | c3
----+-----------------+----
  1 | text1           | f
  2 |                 | f
  3 | values are fun! | t
(3 rows)

DELETE FROM example WHERE c1 = 2;
DELETE 1

SELECT * FROM example;
 c1 |       c2        | c3
----+-----------------+----
  1 | text1           | f
  3 | values are fun! | t
(2 rows)

-- schemaless mode
CREATE FOREIGN TABLE example_schemaless (
    v JSONB
) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example.parquet', schemaless 'true', key_columns 'c1');

SELECT * FROM example_schemaless;
                       v
-----------------------------------------------
 {"c1": 1, "c2": "text1", "c3": "t"}
 {"c1": 2, "c2": null, "c3": "f"}
 {"c1": 3, "c2": "values are fun!", "c3": "t"}
(3 rows)

UPDATE example_schemaless SET v='{"c3":false}' WHERE v->>'c2' = 'text1';
UPDATE 1

SELECT * FROM example_schemaless;
                       v
-----------------------------------------------
 {"c1": 1, "c2": "text1", "c3": "f"}
 {"c1": 2, "c2": null, "c3": "f"}
 {"c1": 3, "c2": "values are fun!", "c3": "t"}
(3 rows)

DELETE FROM example_schemaless WHERE (v->>'c1')::int = 2;
DELETE 1

SELECT * FROM example_schemaless;
                       v
-----------------------------------------------
 {"c1": 1, "c2": "text1", "c3": "f"}
 {"c1": 3, "c2": "values are fun!", "c3": "t"}
(2 rows)

限制

  • 不支持事务。

  • 无法同时在文件系统和 Amazon S3 上使用 parquet 文件创建单个外部表。

  • import_parquet_s3_explicit()函数的第 4 和第 5 个参数,在schemaless模式下没有意义。

    • 这些参数应该定义为NULL值。
    • 如果这些参数不是 NULL 值,则会出现下面的WARNING
    WARNING: parquet_s3_fdw: attnames and atttypes are expected to be NULL. They are meaningless for schemaless table.
    HINT: Schemaless table imported always contain "v" column with "jsonb" type.
    
  • schemaless模式不支持通过CREATE TABLE parent_tbl (v jsonb) PARTITION BY RANGE((v->>'a')::int)创建分区表。

  • 在修改功能中:

    • parquet_s3_fdw修改 parquet 文件的方法是,从目标 parquet 文件创建可修改的缓存数据,并覆盖旧文件:
      • 对于大文件,性能不太好。
      • 当完全相同的文件同时修改时,结果会出现不一致。
    • 不支持 WITH CHECK OPTION、ON CONFLICT 和 RETURNING。
    • sorted列仅支持这些类型:int2int4int8datetimestampfloat4float8
    • key列仅支持这些类型:int2int4int8datetimestampfloat4float8text
    • key列的值必须是唯一的,parquet_s3_fdw不支持检查键列的唯一值,用户必须做好检查。
    • key列仅用于 UPDATE/UPDATE。

了解更多

parquet_s3_fdw 项目