可以在 PostgreSQL 源树的 contrib/test_decoding
子目录中找到一个示例输出插件。
通过使用输出插件名称作为库基本名称动态加载共享库的方式来加载输出插件。使用正常的库搜索路径来定位库。要提供必需的输出插件回调,并指示该库实际上是输出插件,则需要提供名为 _PG_output_plugin_init
的函数。该函数传递了一个结构,需要用个体操作的回调函数指针填充该结构。
typedef struct OutputPluginCallbacks { LogicalDecodeStartupCB startup_cb; LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; LogicalDecodeFilterPrepareCB filter_prepare_cb; LogicalDecodeBeginPrepareCB begin_prepare_cb; LogicalDecodePrepareCB prepare_cb; LogicalDecodeCommitPreparedCB commit_prepared_cb; LogicalDecodeRollbackPreparedCB rollback_prepared_cb; LogicalDecodeStreamStartCB stream_start_cb; LogicalDecodeStreamStopCB stream_stop_cb; LogicalDecodeStreamAbortCB stream_abort_cb; LogicalDecodeStreamPrepareCB stream_prepare_cb; LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
需要 begin_cb
、change_cb
和 commit_cb
回调,而 startup_cb
、truncate_cb
、message_cb
、filter_by_origin_cb
和 shutdown_cb
是可选的。如果未设置 truncate_cb
但要解码 TRUNCATE
,则该操作将被忽略。
输出插件也可定义函数来支持正在进行的大事务的流传输。需要 stream_start_cb
、stream_stop_cb
、stream_abort_cb
、stream_commit_cb
和 stream_change_cb
,而 stream_message_cb
和 stream_truncate_cb
是可选的。如果输出插件还支持两阶段提交,则还需要 stream_prepare_cb
。
输出插件也可定义函数来支持两阶段提交,这允许在 PREPARE TRANSACTION
上解码操作。需要 begin_prepare_cb
、prepare_cb
、commit_prepared_cb
和 rollback_prepared_cb
回调,而 filter_prepare_cb
是可选的。如果输出插件还支持正在进行的大事务的流传输,则还需要 stream_prepare_cb
。
要解码、格式化和输出变更,输出插件可以使用后端的正常基础设施的大多数功能,包括调用输出函数。只要访问的是 initdb
在 pg_catalog
模式中创建的关系,或者已经使用以下内容将其标记为用户提供的目录表,则允许以只读方式访问关系:
ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
注意,必须通过 systable_*
扫描 API 访问输出插件中的用户目录表或常规系统目录表。通过 heap_*
扫描 API 访问将出错。此外,禁止导致事务 ID 分配的任何操作。其中包括但不限于向表写入、执行 DDL 更改和调用 pg_current_xact_id()
。
输出插件回调可以将数据几乎以任意格式传递给使用者。对于某些用例(例如通过 SQL 查看更改),返回包含任意数据的数据类型(例如 bytea
)中的数据很麻烦。如果输出插件仅以服务器编码输出文本数据,它可以通过在 启动回调中将 OutputPluginOptions.output_type
设置为 OUTPUT_PLUGIN_TEXTUAL_OUTPUT
(而不是 OUTPUT_PLUGIN_BINARY_OUTPUT
)来声明这一点。在那种情况下,所有数据都必须采用服务器编码,以便 text
数据项可以包含它。这会在启用断言的编译中进行检查。
通过各种需要提供的回调通知输出插件正在进行的更改。
并发事务按提交顺序解码,并且 begin
和 commit
回调之间仅解码属于特定事务的更改。明确或隐式回滚的事务永远不会被解码。成功保存点将折叠到包含它们的事务中,按照他们在该事务中执行的顺序折叠。如果使用 PREPARE TRANSACTION
为两阶段提交准备了事务,并且提供了解码所需输出插件回调,那么它也将被解码。正在解码的当前已准备事务可能通过 ROLLBACK PREPARED
命令并发中止。在这种情况下,此事务的逻辑解码也将被中止。一旦检测到中止并且调用 prepare_cb
回调,则将跳过此类事务的所有更改。因此,即使在并发中止的情况下,也会向输出插件提供足够的信息,以便在解码后正确处理 ROLLBACK PREPARED
。
将仅解码已安全刷新到磁盘的事务。当 synchronous_commit
设置为 off
时,会导致在随后紧跟的 pg_logical_slot_get_changes()
中不能立即解码 COMMIT
。
无论要输出的更改数目如何,每当创建复制槽或请求流式更改时,将调用可选的 startup_cb
回调。
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx, OutputPluginOptions *options, bool is_init);
创建复制槽时,is_init
参数为 true,否则为 false。options
指向输出插件可以设置的选项的结构
typedef struct OutputPluginOptions { OutputPluginOutputType output_type; bool receive_rewrites; } OutputPluginOptions;
output_type
必须设置成 OUTPUT_PLUGIN_TEXTUAL_OUTPUT
或 OUTPUT_PLUGIN_BINARY_OUTPUT
。另请参阅 第 47.6.3 节。如果 receive_rewrites
为 true,则输出插件还会针对某些 DDL 操作期间堆重写所做的更改被调用。DDL 复制处理插件对这些更改感兴趣,但它们需要特殊处理。
启动回调应验证 ctx->output_plugin_options
中存在的选项。如果输出插件需要有状态,则可以使用 ctx->output_plugin_private
进行存储。
每当不再使用以前激活的复制槽,且可以用来释放输出插件私有资源时,将调用可选的 shutdown_cb
回调。不过这并不一定会丢弃该槽,而仅仅停止流式传输。
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
每当已提交事务的开始被解码时,都会调用必需的 begin_cb
回调。中止的事务及其内容绝不会被解码。
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
txn
参数包含有关该事务的元信息,例如其已提交的时间戳和其 XID。
每当事务提交已解码时,将调用必需的 commit_cb
回调。如果有任何已修改的行,将在该回调之前已调用所有修改行对应的 change_cb
回调。
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
对于事务中每个单独的行修改,都会调用必需的 change_cb
回调,该修改可能是 INSERT
、UPDATE
或 DELETE
。即使原始命令一次性修改了多行,也会对每一行单独调用回调。回调 change_cb
可以访问系统或用户目录表,以帮助输出行修改详情。如果解码已准备(但尚未提交)的事务或未提交的事务,则由于该事务本身体现时回滚,这个变更回调也可能出错。在该情况下,会正常停止已中止事务的逻辑解码。
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change);
参数 ctx
和 txn
的内容与回调 begin_cb
和 commit_cb
的相同,但此外还会传入关系描述符 relation
(其指向行所属的关系)和描述行修改的结构 change
。
只能使用逻辑解码提取未记录在案(参见 UNLOGGED
)且非临时(参见 TEMPORARY
或 TEMP
)的用户定义表中的变更。
对于 TRUNCATE
命令,会调用可选回调 truncate_cb
。
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change);
该参数类似于回调 change_cb
。然而,由于通过外键相连接的表上的 TRUNCATE
操作需要一起执行,因此该回调接收的关系数组,而不仅是一个关系。有关详细信息,请参阅对 TRUNCATE 语句的描述。
会调用可选回调 filter_by_origin_cb
,以确定从 origin_id
重播的数据是否与输出插件相关。
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx, RepOriginId origin_id);
参数 ctx
的内容与其他回调相同。除了来源之外,没有其他信息可用。若要表明传入节点上发生的更改不相关,请返回 true,以便将其筛选掉;否则返回 false。对于已筛选掉的交易和更改,不会调用其他回调。
在实现级联或多向复制方案时,这非常有用。在这样的设置中,按来源进行筛选可防止重复来回复制相同的更改。虽然事务和更改也携带有关来源的信息,但通过此回调进行筛选明显更有效率。
每当已解码逻辑解码消息时,就会调用可选的 message_cb
回调。
typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message);
txn
参数包含有关事务的元信息,例如已提交的时间戳和 XID。但是,当消息是非事务性的并且尚未在记录消息的事务中分配 XID 时,此参数可能为 NULL。 lsn
有消息的 WAL 位置。 transactional
表示消息是否已作为事务性消息发送。与更改回调类似,在解码已准备(但尚未提交)的事务或解码未提交的事务的情况下,此消息回调也可能会因同时回滚同一事务而出错。在此情况下,将正常停止已中止事务的逻辑解码。 prefix
是任意空终止的前缀,可用于识别当前插件的有趣消息。最后, message
参数保存实际消息,其中 message_size
为大小。
应特别小心,以确保输出插件认为有趣的这个前缀是唯一的。使用扩展的名称或输出插件本身通常是一个不错的选择。
调用可选的 filter_prepare_cb
回调来确定在当前两阶段提交事务中作为其中一部分的数据在准备阶段应考虑解码,还是以后作为 COMMIT PREPARED
时间的常规单阶段事务考虑解码。要标记应跳过解码,返回 true
;否则,返回 false
。如果未定义回调,则假定为 false
(即不进行筛选,所有使用两阶段提交的事务也分两个阶段进行解码)。
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, TransactionId xid, const char *gid);
ctx
参数的内容与其他回调相同。参数 xid
和 gid
提供两种不同的方式来识别事务。后来的 COMMIT PREPARED
或 ROLLBACK PREPARED
同时携带这两个标识符,为输出插件提供要使用哪一个的选择。
每次事务进行解码时,可多次调用回调,必须提供给定的xid
和gid
的静态应答,每次调用都是如此。
当已解码某个已准备事务开始时,将调用必需的begin_prepare_cb
回调。txn
参数的一部分gid
字段可用于此回调,以检查插件是否已接收此PREPARE
,在这种情况下,它可以出错或跳过事务的剩余更改。
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
当已准备进行两阶段提交的事务已解码时,将调用必需的prepare_cb
回调。如果已修改任何行,则在此之前将会调用所有已修改行的change_cb
回调。此回调中可以使用txn
参数的一部分gid
字段。
typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
当已解码事务COMMIT PREPARED
时,将调用必需的commit_prepared_cb
回调。此回调中可以使用txn
参数的一部分gid
字段。
typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
当已解码事务ROLLBACK PREPARED
时,将调用必需的rollback_prepared_cb
回调。此回调中可以使用txn
参数 جزءgid
字段。使用参数prepare_end_lsn
和prepare_time
可以检查插件是否已接收此PREPARE TRANSACTION
,如果是这样,则它可以应用回滚,否则它可以跳过回滚操作。仅gid
是不够的,因为下游节点可以有具有相同标识符的已准备事务。
typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
打开正在进行的事务的流式更改块时,将调用必需的stream_start_cb
回调。
typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
关闭正在进行的事务的流式更改块时,将调用必需的stream_stop_cb
回调。
typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
必须调用的 stream_abort_cb
回调用于中止先前流式传输的事务。
typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn);
在两阶段提交过程中,会调用 stream_prepare_cb
回调来准备先前流式传输的事务。当输出插件既支持大规模进行中的事务流式传输,又支持两阶段提交时,则需要使用此回调。
typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
必须调用的 stream_commit_cb
回调用于提交先前流式传输的事务。
typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
发送已流式传输更改块中某项更改时,会调用 stream_change_cb
回调(通过 stream_start_cb
和 stream_stop_cb
调用进行分界)。事务可能会在稍后中止,并且我们不会对中止的事务解码更改,因此未显示实际更改。
typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change);
发送已流式传输更改块中某条常规消息时,会调用可选的 stream_message_cb
回调(通过 stream_start_cb
和 stream_stop_cb
调用进行分界)。有关事务性消息的消息内容不会显示,因为事务可能会在稍后中止,并且我们不会对中止的事务解码更改。
typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message);
已流式传输更改块中出现了 TRUNCATE
命令时,将会调用可选的 stream_truncate_cb
回调(通过 stream_start_cb
和 stream_stop_cb
调用进行分界)。
typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change);
这些参数类似于 stream_change_cb
回调。但是,由于需要对通过外键连接的表上的 TRUNCATE
操作同时执行,因此此回调会接收一个关系数组,而不仅仅接收一个关系。有关详细信息,请参阅 TRUNCATE 语句的说明。
要实际生成输出,输出插件可以在 begin_cb
、commit_cb
或 change_cb
回调中写入到 ctx->out
中的 StringInfo
输出缓冲区。在写输出缓冲区之前,必须调用 OutputPluginPrepareWrite(ctx, last_write)
,并且在完成向缓冲区写入之后,必须调用 OutputPluginWrite(ctx, last_write)
来执行写入。last_write
参数会指出某次特定写入是否是回调的最后一次写入。
以下示例展示了如何向输出插件的使用者输出数据
OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "BEGIN %u", txn->xid); OutputPluginWrite(ctx, true);