五月 9, 2024
摘要:parquet_s3_fdw
扩展是一个外部数据包装器(FDW),用于访问本地文件系统和 Amazon S3 上的 Parquet 文件。
目录
此版本的 parquet_s3_fdw 适用于 PostgreSQL 13、14、15 和 16。
只读模式下的 Apache Parquet 外部数据包装器,支持 PostgreSQL 访问 S3 存储。
用法
加载扩展
CREATE EXTENSION parquet_s3_fdw;
创建服务器
CREATE SERVER parquet_s3_srv FOREIGN DATA WRAPPER parquet_s3_fdw;
如果要使用 MinIO 而不是 AWS S3,请使用 use_minio 选项创建服务器。
CREATE SERVER parquet_s3_srv FOREIGN DATA WRAPPER parquet_s3_fdw OPTIONS (use_minio 'true');
创建用户映射
如果要访问 Amazon S3,则必须指定用户名和密码。
CREATE USER MAPPING FOR public SERVER parquet_s3_srv OPTIONS (user 's3user', password 's3password');
创建外表
现在你应该能够从 Parquet 文件创建外表了。目前parquet_s3_fdw
支持下面这些数据列类型(还将会增加):
Arrow 类型 | SQL 类型 |
---|---|
INT8 | INT2 |
INT16 | INT2 |
INT32 | INT4 |
INT64 | INT8 |
FLOAT | FLOAT4 |
DOUBLE | FLOAT8 |
TIMESTAMP | TIMESTAMP |
DATE32 | DATE |
STRING | TEXT |
BINARY | BYTEA |
LIST | ARRAY |
MAP | JSONB |
目前parquet_s3_fdw
不支持结构体和嵌套列表。
支持以下选项:
- filename - 要读取的 Parquet 文件的路径列表,以空格分隔。您可以用
s3://
开始来指定 AWS S3 上的路径。不支持混合使用本地路径和 S3 路径; - dirname - 具有要读取的 Parquet 文件的目录路径;
- sorted - 用来预排序 Parquet 文件的,空格分隔的列列表;当使用
ORDER BY
子句运行查询,或在其他情况下带有预排序的列集会有用(Group Aggregate,Merge Join)时,这将有助于 postgres 避免冗余的排序; - files_in_order - 要求以
filename
指定或由files_func
返回的文件,根据sorted
选项进行排序,并且在范围上没有交叉;这允许在并行多文件扫描节点上使用Gather Merge
节点(默认值为false
); - use_mmap - 是否使用内存映射操作,而不是文件读取操作(默认值为
false
); - use_threads - 启用 Apache Arrow 的并行列解码/解压(默认值为
false
); - files_func - 用户定义的函数,由 parquet_s3_fdw 在每次查询时用于检索 parquet 文件列表;函数必须接受一个
JSONB
参数,并返回 parquet 文件完整路径的文本数组; - files_func_arg - 由 files_func 指定的函数的参数。
- max_open_files - 同时打开的 Parquet 文件的数量限制。
- region - 用于连接到的 AWS 区域的值(默认值为
ap-northeast-1
)。 - endpoint - 用于连接的地址和端口(默认值为
127.0.0.1:9000
)。
可以为单个和一组 Parquet 文件创建外部表。也可以指定一个用户定义的函数,该函数会返回一个文件路径列表。根据文件数量和表选项,parquet_s3_fdw
可以使用以下的一种执行策略:
策略 | 描述 |
---|---|
Single File | 基本的单文件读取器 |
Multifile | 按顺序逐个处理 Parquet 文件的读取器 |
Multifile Merge | 该读取器会合并预排序的 Parquet 文件,以便生成的结果也是有序的;在指定了sorted 选项,并且查询计划需要排序(例如包含了ORDER BY 子句)时会使用 |
Caching Multifile Merge | 与Multifile Merge 相同,但会限制同时打开的文件数;当指定的 Parquet 文件数超过max_open_files 时会使用 |
GUC 变量:
- parquet_fdw.use_threads - 允许用户启用或禁用线程的全局开关(默认值为
true
); - parquet_fdw.enable_multifile - 启用多文件读取器(默认值为
true
)。 - parquet_fdw.enable_multifile_merge - 启用多文件合并读取器(默认值为
true
)。
示例:
CREATE FOREIGN TABLE userdata (
id int,
first_name text,
last_name text
)
SERVER parquet_s3_srv
OPTIONS (
filename 's3://bucket/dir/userdata1.parquet'
);
访问外表
SELECT * FROM userdata;
并行查询
parquet_s3_fdw
还支持并行查询执行(注意不要与 Apache Arrow 的多线程解码功能混淆)。
导入
parquet_s3_fdw
还支持 IMPORT FOREIGN SCHEMA 命令,来发现文件系统上指定目录中的 parquet 文件,并根据这些文件创建外部表。它可以像这样使用:
IMPORT FOREIGN SCHEMA "/path/to/directory"
FROM SERVER parquet_s3_srv
INTO public;
重要的是,这里的remote_schema
是一个本地文件系统的目录路径,并且用双引号引起来。
将 parquet 文件导入到外部表的另一种方法是,使用import_parquet_s3
或import_parquet_s3_explicit
:
CREATE FUNCTION import_parquet_s3(
tablename text,
schemaname text,
servername text,
userfunc regproc,
args jsonb,
options jsonb)
CREATE FUNCTION import_parquet_s3_explicit(
tablename text,
schemaname text,
servername text,
attnames text[],
atttypes regtype[],
userfunc regproc,
args jsonb,
options jsonb)
import_parquet_s3
和import_parquet_s3_explicit
之间的唯一区别是,后者允许指定一组要导入的属性/列。attnames
和atttypes
分别是属性名称和属性类型的数组(参见下面的示例)。
userfunc
是一个用户自定义函数。它必须接受一个jsonb
参数,并返回一个要导入的 parquet 文件的文件系统路径的文本数组。args
是用户指定的 jsonb 对象,以作为参数传递给userfunc
。这种函数的简单实现和用法,可以如下面这样:
CREATE FUNCTION list_parquet_s3_files(args jsonb)
RETURNS text[] AS
$$
BEGIN
RETURN array_agg(args->>'dir' || '/' || filename)
FROM pg_ls_dir(args->>'dir') AS files(filename)
WHERE filename ~~ '%.parquet';
END
$$
LANGUAGE plpgsql;
SELECT import_parquet_s3_explicit(
'abc',
'public',
'parquet_srv',
array['one', 'three', 'six'],
array['int8', 'text', 'bool']::regtype[],
'list_parquet_files',
'{"dir": "/path/to/directory"}',
'{"sorted": "one"}'
);
特性
- 支持在本地文件系统或 Amazon S3 上对 parquet 文件进行 SELECT 操作。
- 支持 INSERT、DELETE、UPDATE(外部修改)。
- 支持 MinIO 访问,以替代 Amazon S3。
- 允许控制外部服务器在事务完成后是否保持连接打开状态。这由 keep_connections 控制,默认为 on。
- 支持 parquet_s3_fdw 的 parquet_s3_fdw_get_connections() 函数,列出打开的外部服务器连接。
无结构模式
-
该功能将使用户能够使用无结构的能力:
-
- 每个 parquet 文件没有特定的外部表结构(列定义)。
- 无结构的外表只有一个 jsonb 列,用于根据以下规则表示 parquet 文件中的数据:
- Jsonb 键:parquet 列名称。
- Jsonb 值:parquet 列数据。
-
使用无结构模式,会有几个好处:
- parquet 文件数据结构的灵活性:通过将所有列数据合并到一个 jsonb 列中,无结构的外表可以查询任何 parquet 文件,文件中的所有列都能映射到 postgres 类型。
- 没有预定义的外部表结构(列定义)。缺少结构意味着外部表会查询 parquet 文件中的所有列,包括用户还未使用的列。
无结构模式用法
-
无结构模式由
schemaless
选项来启用:schemaless
选项是true
:启用无结构模式。schemaless
选项是false
:禁用无结构模式(我们称之为non-schemaless
模式)。- 如果未配置
schemaless
选项,则默认值为 false。 CREATE FOREIGN TABLE
、IMPORT FOREIGN SCHEMA
、import_parquet_s3()
和import_parquet_s3_explicit()
中均支持schemaless
选项。
-
无结构外表需要至少一个 jsonb 列来表示数据:
- 如果有 1 个以上的 jsonb 列,则仅填充一列,所有其他列都使用 NULL 值处理。
- 如果没有 jsonb 列,则所有列都使用 NULL 值处理。
- 示例:
CREATE FOREIGN TABLE example_schemaless ( id int, v jsonb ) OPTIONS (filename '/path/to/parquet_file', schemaless 'true'); SELECT * FROM example_schemaless; id | v ----+--------------------------------------------------------------------------------------------------------------------------------- | {"one": 1, "six": "t", "two": [1, 2, 3], "five": "2018-01-01", "four": "2018-01-01 00:00:00", "seven": 0.5, "three": "foo"} | {"one": 2, "six": "f", "two": [null, 5, 6], "five": "2018-01-02", "four": "2018-01-02 00:00:00", "seven": null, "three": "bar"} (2 rows)
-
创建外部表:使用
IMPORT FOREIGN SCHEMA
,import_parquet_s3()
和import_parquet_s3_explicit()
,外部表将以固定的列定义进行创建,如下所示:CREATE FOREIGN TABLE example ( v jsonb ) OPTIONS (filename '/path/to/parquet_file', schemaless 'true');
-
查询数据:
-- non-schemaless mode SELECT * FROM example; one | two | three | four | five | six | seven -----+------------+-------+---------------------+------------+-----+------- 1 | {1,2,3} | foo | 2018-01-01 00:00:00 | 2018-01-01 | t | 0.5 2 | {NULL,5,6} | bar | 2018-01-02 00:00:00 | 2018-01-02 | f | (2 rows) -- schemaless mode SELECT * FROM example_schemaless; v --------------------------------------------------------------------------------------------------------------------------------- {"one": 1, "six": "t", "two": [1, 2, 3], "five": "2018-01-01", "four": "2018-01-01 00:00:00", "seven": 0.5, "three": "foo"} {"one": 2, "six": "f", "two": [null, 5, 6], "five": "2018-01-02", "four": "2018-01-02 00:00:00", "seven": null, "three": "bar"} (2 rows)
-
在 jsonb 表达式中获取值:
-
使用
->>
jsonb 箭头操作符,返回文本类型。用户可以强制转换 jsonb 表达式的类型,以获得相应的数据表示。 -
例如,获取
col
值的表达式v->>'col'
,将是 parquet 文件中的列名col
,我们称之为schemaless variable
或slvar
。SELECT v->>'two', sqrt((v->>'one')::int) FROM example_schemaless; ?column? | sqrt --------------+-------------------- [1, 2, 3] | 1 [null, 5, 6] | 1.4142135623730951 (2 rows)
-
-
某些功能与
non-schemaless
模式不同-
行组过滤器支持:在无结构模式下,parquet_s3_fdw 可以通过一些如下的
WHERE
条件,支持对行组进行过滤:slvar::type {operator} const
。例如:(v->>'int64_col')::int8 = 100
const {operator} slvar ::type
。例如:100 = (v->>'int64_col')::int8
slvar::boolean is true/false
。例如:(v->>'bool_col')::boolean is false
!(slvar::boolean)
。例如:!(v->>'bool_col')::boolean
- Jsonb
exist
运算符:((v->>'col')::jsonb) ? element
、(v->'col') ? element
和v ? 'col'
- 转换函数必须映射 parquet 列类型,否则会跳过过滤器。
-
要使用 parquet 文件的预排序列,用户必须是:
-
在
sorted
选项中定义列名,与non-schemaless mode
相同 -
在
ORDER BY
子句中使用slvar
代替列名。 -
如果排序的 parquet 列不是文本列,请将此列显式地强制转换到映射类型。
-
例如:
CREATE FOREIGN TABLE example_sorted (v jsonb) SERVER parquet_s3_srv OPTIONS (filename '/path/to/example1.parquet /path/to/example2.parquet', sorted 'int64_col', schemaless 'true'); EXPLAIN (COSTS OFF) SELECT * FROM example_sorted ORDER BY (v->>'int64_col')::int8; QUERY PLAN -------------------------------- Foreign Scan on example_sorted Reader: Multifile Merge Row groups: example1.parquet: 1, 2 example2.parquet: 1 (5 rows)
-
-
支持对嵌套列表和映射表使用箭头运算符:这些类型将被视为嵌套的 jsonb 值,可以通过
->
操作符访问。例如:SELECT * FROM example_schemaless; v ---------------------------------------------------------------------------- {"array_col": [19, 20], "jsonb_col": {"1": "foo", "2": "bar", "3": "baz"}} {"array_col": [21, 22], "jsonb_col": {"4": "test1", "5": "test2"}} (2 rows) SELECT v->'array_col'->1, v->'jsonb_col'->'1' FROM example3; ?column? | ?column? ----------+---------- 20 | "foo" 22 | (2 rows)
-
Postgres 计算
(jsonb->>'col')::type
的成本,比在non-schemaless
模式下直接获取列要大得多,在一些复杂的查询中,schemaless
模式的查询计划可能与non-schemaless
模式不同。
-
-
对于其他功能,
schemaless
模式与non-schemaless
模式工作相同。
可写的 FDW
用户可以对已设置键列的外表,执行 insert、update 和 delete 语句。
键列
- 在结构化模式下:可以通过使用 OPTIONS (key ’true’) 创建 parquet_s3_fdw 外表对象,来设置键列:
CREATE FOREIGN TABLE userdata (
id1 int OPTIONS(key 'true'),
id2 int OPTIONS(key 'true'),
first_name text,
last_name text
) SERVER parquet_s3_srv
OPTIONS (
filename 's3://bucket/dir/userdata1.parquet'
);
- 在无结构模式下,可以在创建 parquet_s3_fdw 外部表对象时,使用
key_columns
选项设置键列:
CREATE FOREIGN TABLE userdata (
v JSONB
) SERVER parquet_s3_srv
OPTIONS (
filename 's3://bucket/dir/userdata1.parquet',
schemaless 'true',
key_columns 'id1 id2'
);
key_columns
选项可用于 IMPORT FOREIGN SCHEMA 功能:
-- in schemaless mode
IMPORT FOREIGN SCHEMA 's3://data/' FROM SERVER parquet_s3_srv INTO tmp_schema
OPTIONS (sorted 'c1', schemaless 'true', key_columns 'id1 id2');
-- corresponding CREATE FOREIGN TABLE
CREATE FOREIGN TABLE tbl1 (
v jsonb
) SERVER parquet_s3_srv
OPTIONS (filename 's3://data/tbl1.parquet', sorted 'c1', schemaless 'true', key_columns 'id1 id2');
-- in non-schemaless mode
IMPORT FOREIGN SCHEMA 's3://data/' FROM SERVER parquet_s3_srv INTO tmp_schema
OPTIONS (sorted 'c1', schemaless 'true', key_columns 'id1 id2');
-- corresponding CREATE FOREIGN TABLE
CREATE FOREIGN TABLE tbl1 (
id1 INT OPTIONS (key 'true'),
id2 INT OPTIONS (key 'true'),
c1 TEXT,
c2 FLOAT
) SERVER parquet_s3_srv
OPTIONS (filename 's3://data/tbl1.parquet', sorted 'c1');
insert_file_selector 选项
parquet_s3_fdw 用来在 INSERT 查询中检索目标 parquet 文件的用户定义函数签名:
CREATE FUNCTION insert_file_selector_func(one INT8, dirname text)
RETURNS TEXT AS
$$
SELECT (dirname || '/example7.parquet')::TEXT;
$$
LANGUAGE SQL;
CREATE FOREIGN TABLE example_func (one INT8 OPTIONS (key 'true'), two TEXT)
SERVER parquet_s3_srv
OPTIONS (
insert_file_selector 'insert_file_selector_func(one, dirname)',
dirname '/tmp/data_local/data/test',
sorted 'one');
- insert_file_selector 函数签名规格:
- 语法:
[function name]([arg name] , [arg name] ...)
- 默认返回类型为
TEXT
(parquet 文件的完整路径) [arg name]
:必须是外部表的列名或dirname
- args 值:
dirname
arg:dirname 选项的值。column
args:按名称从插入槽位中获取。
- 语法:
排序列:
parquet_s3_fdw 支持在修改功能中保持排序列的排序状态。
Parquet 文件结构:
基本上,parquet 文件结构是根据一组列名和相应的类型定义的,但在 parquet_s3_fdw 的扫描中,它假定所有具有相同名称的列都具有相同的类型。因此,在修改功能中,也会使用该假设。
从 postgres 类型到 arrow 类型的映射:
-
基础类型映射:
SQL 类型 Arrow 类型 BOOL BOOL INT2 INT16 INT4 INT32 INT8 INT64 FLOAT4 FLOAT FLOAT8 DOUBLE TIMESTAMP/TIMESTAMPTZ TIMESTAMP DATE DATE32 TEXT STRING BYTEA BINARY -
arrow::TIMESTAMP 的默认时间精度为 UTC 时区的微秒级。
-
LIST 是由它的元素类型创建的,对于元素只支持基础类型。
-
MAP 由其 jsonb 元素的类型来创建的:
jsonb 类型 Arrow 类型 text STRING numeric FLOAT8 boolean BOOL null STRING 其他类型 STRING -
在无结构模式下:
-
在结构化模式下,基础的 jsonb 类型的映射与 MAP 相同。
-
对于无结构模式下的第一个嵌套的 jsonb:
jsonb 类型 Arrow 类型 array LIST object MAP -
在结构化模式下,LIST 和 MAP 的元素类型与 MAP 类型相同。
-
INSERT
-- non-schemaless mode
CREATE FOREIGN TABLE example_insert (
c1 INT2 OPTIONS (key 'true'),
c2 TEXT,
c3 BOOLEAN
) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example_insert.parquet');
INSERT INTO example_insert VALUES (1, 'text1', true), (2, DEFAULT, false), ((select 3), (select i from (values('values are fun!')) as foo (i)), true);
INSERT 0 3
SELECT * FROM example_insert;
c1 | c2 | c3
----+-----------------+----
1 | text1 | t
2 | | f
3 | values are fun! | t
(3 rows)
-- schemaless mode
CREATE FOREIGN TABLE example_insert_schemaless (
v JSONB
) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example_insert.parquet', schemaless 'true', key_column 'c1');
INSERT INTO example_insert_schemaless VALUES ('{"c1": 1, "c2": "text1", "c3": true}'), ('{"c1": 2, "c2": null, "c3": false}'), ('{"c1": 3, "c2": "values are fun!", "c3": true}');
SELECT * FROM example_insert_schemaless;
v
-----------------------------------------------
{"c1": 1, "c2": "text1", "c3": "t"}
{"c1": 2, "c2": null, "c3": "f"}
{"c1": 3, "c2": "values are fun!", "c3": "t"}
(3 rows)
- 选择要插入的文件:
- 如果存在选项
insert_file_selector
,目标文件就是该函数的结果。- 如果目标文件不存在,则创建与目标文件同名的新文件。
- 如果目标文件存在,但其结构与插入记录的列不匹配,则会引发错误消息。
- 如果选项
insert_file_selector
不存在:- 目标文件是第一个其结构与插入记录匹配(插入记录的所有列都存在于目标文件中)的文件。
- 如果没有符合其结构的文件与插入记录的列匹配,并且已指定
dirname
选项。创建新文件,文件名格式为:[foreign_table_name]_[date_time].parquet
- 否则,会引发错误消息。
- 如果存在选项
- 新文件的结构:
- 在结构化模式下,新文件将所有列都存在于外部表中。
- 在无结构模式下,新文件将在 jsonb 值中带上所有列。
- 列信息:
- 从现有文件列表中获取。
- 如果在任何文件中都不存在列:根据预定义的映射类型创建基础文件。
UPDATE/DELETE
-- non-schemaless mode
CREATE FOREIGN TABLE example (
c1 INT2 OPTIONS (key 'true'),
c2 TEXT,
c3 BOOLEAN
) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example.parquet');
SELECT * FROM example;
c1 | c2 | c3
----+-----------------+----
1 | text1 | t
2 | | f
3 | values are fun! | t
(3 rows)
UPDATE example SET c3 = false WHERE c2 = 'text1';
UPDATE 1
SELECT * FROM example;
c1 | c2 | c3
----+-----------------+----
1 | text1 | f
2 | | f
3 | values are fun! | t
(3 rows)
DELETE FROM example WHERE c1 = 2;
DELETE 1
SELECT * FROM example;
c1 | c2 | c3
----+-----------------+----
1 | text1 | f
3 | values are fun! | t
(2 rows)
-- schemaless mode
CREATE FOREIGN TABLE example_schemaless (
v JSONB
) SERVER parquet_s3_srv OPTIONS (filename 's3://data/example.parquet', schemaless 'true', key_columns 'c1');
SELECT * FROM example_schemaless;
v
-----------------------------------------------
{"c1": 1, "c2": "text1", "c3": "t"}
{"c1": 2, "c2": null, "c3": "f"}
{"c1": 3, "c2": "values are fun!", "c3": "t"}
(3 rows)
UPDATE example_schemaless SET v='{"c3":false}' WHERE v->>'c2' = 'text1';
UPDATE 1
SELECT * FROM example_schemaless;
v
-----------------------------------------------
{"c1": 1, "c2": "text1", "c3": "f"}
{"c1": 2, "c2": null, "c3": "f"}
{"c1": 3, "c2": "values are fun!", "c3": "t"}
(3 rows)
DELETE FROM example_schemaless WHERE (v->>'c1')::int = 2;
DELETE 1
SELECT * FROM example_schemaless;
v
-----------------------------------------------
{"c1": 1, "c2": "text1", "c3": "f"}
{"c1": 3, "c2": "values are fun!", "c3": "t"}
(2 rows)
限制
-
不支持事务。
-
无法同时在文件系统和 Amazon S3 上使用 parquet 文件创建单个外部表。
-
import_parquet_s3_explicit()
函数的第 4 和第 5 个参数,在schemaless
模式下没有意义。- 这些参数应该定义为
NULL
值。 - 如果这些参数不是 NULL 值,则会出现下面的
WARNING
:
WARNING: parquet_s3_fdw: attnames and atttypes are expected to be NULL. They are meaningless for schemaless table. HINT: Schemaless table imported always contain "v" column with "jsonb" type.
- 这些参数应该定义为
-
schemaless
模式不支持通过CREATE TABLE parent_tbl (v jsonb) PARTITION BY RANGE((v->>'a')::int)
创建分区表。 -
在修改功能中:
parquet_s3_fdw
修改 parquet 文件的方法是,从目标 parquet 文件创建可修改的缓存数据,并覆盖旧文件:- 对于大文件,性能不太好。
- 当完全相同的文件同时修改时,结果会出现不一致。
- 不支持 WITH CHECK OPTION、ON CONFLICT 和 RETURNING。
sorted
列仅支持这些类型:int2
、int4
、int8
、date
、timestamp
、float4
、float8
。key
列仅支持这些类型:int2
、int4
、int8
、date
、timestamp
、float4
、float8
和text
。key
列的值必须是唯一的,parquet_s3_fdw
不支持检查键列的唯一值,用户必须做好检查。key
列仅用于 UPDATE/UPDATE。