Redrock Postgres 搜索 英文
版本: 9.4 / 9.5 / 9.6 / 10 / 11 / 12 / 13 / 14 / 15 / 16 / 17

47.6. 逻辑解码输出插件 #

47.6.1. 初始化函数
47.6.2. 功能
47.6.3. 输出模式
47.6.4. 输出插件回调
47.6.5. 生成输出的函数

可以在 PostgreSQL 源树的 contrib/test_decoding 子目录中找到一个示例输出插件。

47.6.1. 初始化函数 #

通过使用输出插件名称作为库基本名称动态加载共享库的方式来加载输出插件。使用正常的库搜索路径来定位库。要提供必需的输出插件回调,并指示该库实际上是输出插件,则需要提供名为 _PG_output_plugin_init 的函数。该函数传递了一个结构,需要用个体操作的回调函数指针填充该结构。

typedef struct OutputPluginCallbacks
{
    LogicalDecodeStartupCB startup_cb;
    LogicalDecodeBeginCB begin_cb;
    LogicalDecodeChangeCB change_cb;
    LogicalDecodeTruncateCB truncate_cb;
    LogicalDecodeCommitCB commit_cb;
    LogicalDecodeMessageCB message_cb;
    LogicalDecodeFilterByOriginCB filter_by_origin_cb;
    LogicalDecodeShutdownCB shutdown_cb;
    LogicalDecodeFilterPrepareCB filter_prepare_cb;
    LogicalDecodeBeginPrepareCB begin_prepare_cb;
    LogicalDecodePrepareCB prepare_cb;
    LogicalDecodeCommitPreparedCB commit_prepared_cb;
    LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
    LogicalDecodeStreamStartCB stream_start_cb;
    LogicalDecodeStreamStopCB stream_stop_cb;
    LogicalDecodeStreamAbortCB stream_abort_cb;
    LogicalDecodeStreamPrepareCB stream_prepare_cb;
    LogicalDecodeStreamCommitCB stream_commit_cb;
    LogicalDecodeStreamChangeCB stream_change_cb;
    LogicalDecodeStreamMessageCB stream_message_cb;
    LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;

typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);

需要 begin_cbchange_cbcommit_cb 回调,而 startup_cbtruncate_cbmessage_cbfilter_by_origin_cbshutdown_cb 是可选的。如果未设置 truncate_cb 但要解码 TRUNCATE,则该操作将被忽略。

输出插件也可定义函数来支持正在进行的大事务的流传输。需要 stream_start_cbstream_stop_cbstream_abort_cbstream_commit_cbstream_change_cb,而 stream_message_cbstream_truncate_cb 是可选的。如果输出插件还支持两阶段提交,则还需要 stream_prepare_cb

输出插件也可定义函数来支持两阶段提交,这允许在 PREPARE TRANSACTION 上解码操作。需要 begin_prepare_cbprepare_cbcommit_prepared_cbrollback_prepared_cb 回调,而 filter_prepare_cb 是可选的。如果输出插件还支持正在进行的大事务的流传输,则还需要 stream_prepare_cb

47.6.2. 功能 #

要解码、格式化和输出变更,输出插件可以使用后端的正常基础设施的大多数功能,包括调用输出函数。只要访问的是 initdbpg_catalog 模式中创建的关系,或者已经使用以下内容将其标记为用户提供的目录表,则允许以只读方式访问关系:

ALTER TABLE user_catalog_table SET (user_catalog_table = true);
CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);

注意,必须通过 systable_* 扫描 API 访问输出插件中的用户目录表或常规系统目录表。通过 heap_* 扫描 API 访问将出错。此外,禁止导致事务 ID 分配的任何操作。其中包括但不限于向表写入、执行 DDL 更改和调用 pg_current_xact_id()

47.6.3. 输出模式 #

输出插件回调可以将数据几乎以任意格式传递给使用者。对于某些用例(例如通过 SQL 查看更改),返回包含任意数据的数据类型(例如 bytea)中的数据很麻烦。如果输出插件仅以服务器编码输出文本数据,它可以通过在 启动回调中将 OutputPluginOptions.output_type 设置为 OUTPUT_PLUGIN_TEXTUAL_OUTPUT(而不是 OUTPUT_PLUGIN_BINARY_OUTPUT)来声明这一点。在那种情况下,所有数据都必须采用服务器编码,以便 text 数据项可以包含它。这会在启用断言的编译中进行检查。

47.6.4. 输出插件回调 #

通过各种需要提供的回调通知输出插件正在进行的更改。

并发事务按提交顺序解码,并且 begincommit 回调之间仅解码属于特定事务的更改。明确或隐式回滚的事务永远不会被解码。成功保存点将折叠到包含它们的事务中,按照他们在该事务中执行的顺序折叠。如果使用 PREPARE TRANSACTION 为两阶段提交准备了事务,并且提供了解码所需输出插件回调,那么它也将被解码。正在解码的当前已准备事务可能通过 ROLLBACK PREPARED 命令并发中止。在这种情况下,此事务的逻辑解码也将被中止。一旦检测到中止并且调用 prepare_cb 回调,则将跳过此类事务的所有更改。因此,即使在并发中止的情况下,也会向输出插件提供足够的信息,以便在解码后正确处理 ROLLBACK PREPARED

备注

将仅解码已安全刷新到磁盘的事务。当 synchronous_commit 设置为 off 时,会导致在随后紧跟的 pg_logical_slot_get_changes() 中不能立即解码 COMMIT

47.6.4.1. 启动回调 #

无论要输出的更改数目如何,每当创建复制槽或请求流式更改时,将调用可选的 startup_cb 回调。

typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
                                        OutputPluginOptions *options,
                                        bool is_init);

创建复制槽时,is_init 参数为 true,否则为 false。options 指向输出插件可以设置的选项的结构

typedef struct OutputPluginOptions
{
    OutputPluginOutputType output_type;
    bool        receive_rewrites;
} OutputPluginOptions;

output_type 必须设置成 OUTPUT_PLUGIN_TEXTUAL_OUTPUTOUTPUT_PLUGIN_BINARY_OUTPUT。另请参阅 第 47.6.3 节。如果 receive_rewrites 为 true,则输出插件还会针对某些 DDL 操作期间堆重写所做的更改被调用。DDL 复制处理插件对这些更改感兴趣,但它们需要特殊处理。

启动回调应验证 ctx->output_plugin_options 中存在的选项。如果输出插件需要有状态,则可以使用 ctx->output_plugin_private 进行存储。

47.6.4.2. 关闭回调 #

每当不再使用以前激活的复制槽,且可以用来释放输出插件私有资源时,将调用可选的 shutdown_cb 回调。不过这并不一定会丢弃该槽,而仅仅停止流式传输。

typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);

47.6.4.3. 事务开始回调 #

每当已提交事务的开始被解码时,都会调用必需的 begin_cb 回调。中止的事务及其内容绝不会被解码。

typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
                                      ReorderBufferTXN *txn);

txn 参数包含有关该事务的元信息,例如其已提交的时间戳和其 XID。

47.6.4.4. 事务结束回调 #

每当事务提交已解码时,将调用必需的 commit_cb 回调。如果有任何已修改的行,将在该回调之前已调用所有修改行对应的 change_cb 回调。

typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       XLogRecPtr commit_lsn);

47.6.4.5. 更改回调 #

对于事务中每个单独的行修改,都会调用必需的 change_cb 回调,该修改可能是 INSERTUPDATEDELETE。即使原始命令一次性修改了多行,也会对每一行单独调用回调。回调 change_cb 可以访问系统或用户目录表,以帮助输出行修改详情。如果解码已准备(但尚未提交)的事务或未提交的事务,则由于该事务本身体现时回滚,这个变更回调也可能出错。在该情况下,会正常停止已中止事务的逻辑解码。

typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       Relation relation,
                                       ReorderBufferChange *change);

参数 ctxtxn 的内容与回调 begin_cbcommit_cb 的相同,但此外还会传入关系描述符 relation(其指向行所属的关系)和描述行修改的结构 change

备注

只能使用逻辑解码提取未记录在案(参见 UNLOGGED)且非临时(参见 TEMPORARYTEMP)的用户定义表中的变更。

47.6.4.6. 截断回调 #

对于 TRUNCATE 命令,会调用可选回调 truncate_cb

typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
                                         ReorderBufferTXN *txn,
                                         int nrelations,
                                         Relation relations[],
                                         ReorderBufferChange *change);

该参数类似于回调 change_cb。然而,由于通过外键相连接的表上的 TRUNCATE 操作需要一起执行,因此该回调接收的关系数组,而不仅是一个关系。有关详细信息,请参阅对 TRUNCATE 语句的描述。

47.6.4.7. 来源筛选回调 #

会调用可选回调 filter_by_origin_cb,以确定从 origin_id 重播的数据是否与输出插件相关。

typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
                                               RepOriginId origin_id);

参数 ctx 的内容与其他回调相同。除了来源之外,没有其他信息可用。若要表明传入节点上发生的更改不相关,请返回 true,以便将其筛选掉;否则返回 false。对于已筛选掉的交易和更改,不会调用其他回调。

在实现级联或多向复制方案时,这非常有用。在这样的设置中,按来源进行筛选可防止重复来回复制相同的更改。虽然事务和更改也携带有关来源的信息,但通过此回调进行筛选明显更有效率。

47.6.4.8. 通用消息回调 #

每当已解码逻辑解码消息时,就会调用可选的 message_cb 回调。

typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
                                        XLogRecPtr message_lsn,
                                        bool transactional,
                                        const char *prefix,
                                        Size message_size,
                                        const char *message);

txn 参数包含有关事务的元信息,例如已提交的时间戳和 XID。但是,当消息是非事务性的并且尚未在记录消息的事务中分配 XID 时,此参数可能为 NULL。 lsn 有消息的 WAL 位置。 transactional 表示消息是否已作为事务性消息发送。与更改回调类似,在解码已准备(但尚未提交)的事务或解码未提交的事务的情况下,此消息回调也可能会因同时回滚同一事务而出错。在此情况下,将正常停止已中止事务的逻辑解码。 prefix 是任意空终止的前缀,可用于识别当前插件的有趣消息。最后, message 参数保存实际消息,其中 message_size 为大小。

应特别小心,以确保输出插件认为有趣的这个前缀是唯一的。使用扩展的名称或输出插件本身通常是一个不错的选择。

47.6.4.9. 准备筛选回调 #

调用可选的 filter_prepare_cb 回调来确定在当前两阶段提交事务中作为其中一部分的数据在准备阶段应考虑解码,还是以后作为 COMMIT PREPARED 时间的常规单阶段事务考虑解码。要标记应跳过解码,返回 true;否则,返回 false。如果未定义回调,则假定为 false(即不进行筛选,所有使用两阶段提交的事务也分两个阶段进行解码)。

typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
                                              TransactionId xid,
                                              const char *gid);

ctx 参数的内容与其他回调相同。参数 xidgid 提供两种不同的方式来识别事务。后来的 COMMIT PREPAREDROLLBACK PREPARED 同时携带这两个标识符,为输出插件提供要使用哪一个的选择。

每次事务进行解码时,可多次调用回调,必须提供给定的xidgid的静态应答,每次调用都是如此。

47.6.4.10. 事务开始准备回调#

当已解码某个已准备事务开始时,将调用必需的begin_prepare_cb回调。txn参数的一部分gid字段可用于此回调,以检查插件是否已接收此PREPARE,在这种情况下,它可以出错或跳过事务的剩余更改。

typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn);

47.6.4.11. 事务准备回调#

当已准备进行两阶段提交的事务已解码时,将调用必需的prepare_cb回调。如果已修改任何行,则在此之前将会调用所有已修改行的change_cb回调。此回调中可以使用txn参数的一部分gid字段。

typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
                                        XLogRecPtr prepare_lsn);

47.6.4.12. 事务提交准备回调#

当已解码事务COMMIT PREPARED时,将调用必需的commit_prepared_cb回调。此回调中可以使用txn参数的一部分gid字段。

typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
                                               ReorderBufferTXN *txn,
                                               XLogRecPtr commit_lsn);

47.6.4.13. 事务回滚准备回调#

当已解码事务ROLLBACK PREPARED时,将调用必需的rollback_prepared_cb回调。此回调中可以使用txn参数 جزءgid字段。使用参数prepare_end_lsnprepare_time可以检查插件是否已接收此PREPARE TRANSACTION,如果是这样,则它可以应用回滚,否则它可以跳过回滚操作。仅gid是不够的,因为下游节点可以有具有相同标识符的已准备事务。

typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
                                                 ReorderBufferTXN *txn,
                                                 XLogRecPtr prepare_end_lsn,
                                                 TimestampTz prepare_time);

47.6.4.14. 流启动回调#

打开正在进行的事务的流式更改块时,将调用必需的stream_start_cb回调。

typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
                                            ReorderBufferTXN *txn);

47.6.4.15. 流停止回调#

关闭正在进行的事务的流式更改块时,将调用必需的stream_stop_cb回调。

typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
                                           ReorderBufferTXN *txn);

47.6.4.16. 流中止回调#

必须调用的 stream_abort_cb 回调用于中止先前流式传输的事务。

typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
                                            ReorderBufferTXN *txn,
                                            XLogRecPtr abort_lsn);

47.6.4.17. 流准备回调 #

在两阶段提交过程中,会调用 stream_prepare_cb 回调来准备先前流式传输的事务。当输出插件既支持大规模进行中的事务流式传输,又支持两阶段提交时,则需要使用此回调。

typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
                                              ReorderBufferTXN *txn,
                                              XLogRecPtr prepare_lsn);

47.6.4.18. 流提交回调 #

必须调用的 stream_commit_cb 回调用于提交先前流式传输的事务。

typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn,
                                             XLogRecPtr commit_lsn);

47.6.4.19. 流更改回调 #

发送已流式传输更改块中某项更改时,会调用 stream_change_cb 回调(通过 stream_start_cbstream_stop_cb 调用进行分界)。事务可能会在稍后中止,并且我们不会对中止的事务解码更改,因此未显示实际更改。

typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn,
                                             Relation relation,
                                             ReorderBufferChange *change);

47.6.4.20. 流消息回调 #

发送已流式传输更改块中某条常规消息时,会调用可选的 stream_message_cb 回调(通过 stream_start_cbstream_stop_cb 调用进行分界)。有关事务性消息的消息内容不会显示,因为事务可能会在稍后中止,并且我们不会对中止的事务解码更改。

typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
                                              ReorderBufferTXN *txn,
                                              XLogRecPtr message_lsn,
                                              bool transactional,
                                              const char *prefix,
                                              Size message_size,
                                              const char *message);

47.6.4.21. 流截断回调 #

已流式传输更改块中出现了 TRUNCATE 命令时,将会调用可选的 stream_truncate_cb 回调(通过 stream_start_cbstream_stop_cb 调用进行分界)。

typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
                                               ReorderBufferTXN *txn,
                                               int nrelations,
                                               Relation relations[],
                                               ReorderBufferChange *change);

这些参数类似于 stream_change_cb 回调。但是,由于需要对通过外键连接的表上的 TRUNCATE 操作同时执行,因此此回调会接收一个关系数组,而不仅仅接收一个关系。有关详细信息,请参阅 TRUNCATE 语句的说明。

47.6.5. 用于生成输出的函数 #

要实际生成输出,输出插件可以在 begin_cbcommit_cbchange_cb 回调中写入到 ctx->out 中的 StringInfo 输出缓冲区。在写输出缓冲区之前,必须调用 OutputPluginPrepareWrite(ctx, last_write),并且在完成向缓冲区写入之后,必须调用 OutputPluginWrite(ctx, last_write) 来执行写入。last_write 参数会指出某次特定写入是否是回调的最后一次写入。

以下示例展示了如何向输出插件的使用者输出数据

OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
OutputPluginWrite(ctx, true);