PGSync: 将 PostgreSQL 数据同步到 Elasticsearch

四月 2, 2024

摘要PGSync是一个中间件,可以轻松地将数据从 PostgreSQL 同步到 Elasticsearch/OpenSearch。

PGSync

概述

它允许您将 PostgreSQL 作为真实数据源,并在 Elasticsearch/OpenSearch 中呈现出结构化非规范化的文档。

对嵌套实体的更改会传播到 Elasticsearch。然后,PGSync 的高级查询构建器,会根据您的结构动态生成优化的 SQL 查询。PGSync 的推荐模型允许您快速移动和转换大量数据,同时保持关系完整性。

只需用 JSON 描述您的文档结构或模式,PGSync 就会持续捕获数据上的更改,并将其加载到 Elasticsearch 中,而无需编写任何代码。PGSync 可将您的关系数据转换为结构化文档格式。

它允许您直接从 PostgreSQL 利用 Elasticsearch 的处理能力和可扩展性。您不必编写复杂查询和转换通道。PGSync 轻巧、灵活、快速。

Elasticsearch 更适合作为一个辅助性非规范化的搜索引擎,以配合更传统的规范化数据存储。此外,您不应将主要数据存储在 Elasticsearch 中。

那么,首先如何将数据导入 Elasticsearch?像 Logstash 和 Kafka 这样的工具可以帮助完成这项任务,但它们仍然需要一些工程和开发。

提取、转换、加载(ETL)和数据更改捕获(CDC)工具可能会很复杂,并且需要投入大量的工程工作。

PGSync 的其他优点还包括:

  • 实时分析
  • 可靠的主数据存储/真实数据源
  • 按需扩展
  • 轻松连接多个嵌套表

为什么选择 PGSync?

在高层面上,您在 PostgreSQL 数据库中拥有数据,并且您希望在 Elasticsearch 中镜像它。这意味着对数据的每个更改(插入,更新,删除和截断语句)都需要复制到 Elasticsearch。一开始,这似乎很容易,后面就不容易了。简单添加一些代码,在更新数据库后将数据复制到 Elasticsearch(或所谓的双写)。跨越多个表并涉及多个关系的 SQL 查询很难编写。检测嵌套文档中的更改也可能相当困难。当然,如果你的数据从来没有改变过,那么你可以及时创建一个快照,并将其加载到 Elasticsearch 中,作为一次性操作。

PGSync 适用于以下情况:

  • PostgreSQL 是您真实的读/写数据源,而 Elasticsearch 是您的只读搜索层。
  • 您需要将关系数据非规范化为 NoSQL 数据源。
  • 您的数据在不断变化。
  • 您在关系数据库(如 PostgreSQL)中有现有数据,并且需要一个辅助的 NoSQL 数据库(如 Elasticsearch),进行基于文本的查询或自动完成的查询,以镜像现有数据,而无需应用程序执行双向写入。
  • 您希望在不影响关系数据安全性的情况下,通过提供数据访问视图,保持现有数据不变,同时利用 Elasticsearch 的搜索能力。
  • 或者,您只是想提供关系数据的访问视图,以进行搜索。

怎么工作的

PGSync 是用 Python(支持 3.8 以上版本)编写的,软件栈由 Redis,Elasticsearch,PostgreSQL 和 SQLAlchemy 组成。

PGSync 利用 PostgreSQL 的逻辑解码功能(在 PostgreSQL 9.4 中引入),来捕获连续的更改数据流。此功能需要在 PostgreSQL 配置文件中启用,方法是在 postgresql.conf 文件中设置:

wal_level = logical

您可以选择任何数据透视表作为文档的来源。

PGSync 的查询生成器可以针对您的数据结构动态构建高级查询。

PGSync 通过为数据库中的表创建触发器来处理通知事件,以运行在一个事件驱动的模型中。

这是 PGSync 唯一一次对您的数据库进行更改。

注意:如果您更改了 PGSync 模式配置的结构,则需要重新构建 Elasticsearch 索引。未来计划支持零停机迁移以简化此过程。

快速入门

有几种方法可以安装和试用 PGSync

在 Docker 中运行

要使用 docker 启动所有服务。运行:

$ docker-compose up

显示 Elasticsearch 中的内容

$ curl -X GET http://[Elasticsearch host]:9201/reservations/_search?pretty=true

手动配置

  • 设置

    • 确保数据库用户是超级用户

    • 启用逻辑解码。您还需要在 postgresql.conf 中设置至少两个参数

      wal_level = logical

      max_replication_slots = 1

    • 要防止服务器日志增长过大,例如在云基础设施上运行时,会产生成本影响。您可以选择使用 max_slot_wal_keep_size,限制复制槽上保留的数据量

      max_slot_wal_keep_size = 100GB

  • 安装

    • 使用 pip 从 pypi 安装 PGSync

      $ pip install pgsync
      
    • 为文档表示形式创建一个 schema.json

    • 引导数据库(仅限一次)

      $ bootstrap --config schema.json
      
    • 使用下面命令运行程序

      $ pgsync --config schema.json
      
    • 或以守护进程运行

      $ pgsync --config schema.json -d
      

特性

PGSync 的关键特性有:

  • 轻松对关系数据进行非规范化处理。
  • 适用于任何 PostgreSQL 数据库(版本 9.6 或更高版本)。
  • 对数据库性能的影响可以忽略不计。
  • 输出到 Elasticsearch 中的数据满足事务一致性。这意味着:仅当写入提交到数据库时才会出现,插入、更新和删除操作的出现顺序与提交顺序相同(而不是最终一致性)。
  • 容错:即使进程崩溃或网络中断等,也不会丢失数据。进程可以从最后一个检查点恢复。
  • 为了加快速度,可直接从数据库中以 PostgreSQL JSON 的形式返回数据。
  • 支持复合主键和外键。
  • 支持视图和物化视图。
  • 支持任意嵌套层次的实体,即具有很长依赖关系链的表。
  • 支持 PostgreSQL JSON 数据字段。这意味着:我们可以将数据库表中的 JSON 字段提取成文档中的单独字段。
  • 可自定义的文档结构。

要求

  • Python 3.8+
  • PostgreSQL 9.6+
  • Redis 3.1.0
  • Elasticsearch 6.3.1+ 或 OpenSearch 1.3.7+
  • SQLAlchemy 1.3.4+

示例

请考虑以下图书馆数据库的示例。

Book

isbn (PK) title description
9785811243570 Charlie and the chocolate factory Willy Wonka’s famous chocolate factory is opening at last!
9788374950978 Kafka on the Shore Kafka on the Shore is a 2002 novel by Japanese author Haruki Murakami.
9781471331435 1984 1984 was George Orwell’s chilling prophecy about the dystopian future.

Author

id (PK) name
1 Roald Dahl
2 Haruki Murakami
3 Philip Gabriel
4 George Orwell

BookAuthor

id (PK) book_isbn author_id
1 9785811243570 1
2 9788374950978 2
3 9788374950978 3
4 9781471331435 4

使用 PGSync,我们可以简单地定义这个 JSON 模式,其中 book 表是枢轴。一个 枢轴 表指示了文档的基础。

{
    "table": "book",
    "columns": [
        "isbn",
        "title",
        "description"
    ],
    "children": [
        {
            "table": "author",
            "columns": [
                "name"
            ]
        }
    ]
}

要获取 Elasticsearch 中此文档的结构

[
  {
      "isbn": "9785811243570",
      "title": "Charlie and the chocolate factory",
      "description": "Willy Wonka’s famous chocolate factory is opening at last!",
      "authors": ["Roald Dahl"]
  },
  {
      "isbn": "9788374950978",
      "title": "Kafka on the Shore",
      "description": "Kafka on the Shore is a 2002 novel by Japanese author Haruki Murakami",
      "authors": ["Haruki Murakami", "Philip Gabriel"]
  },
  {
      "isbn": "9781471331435",
      "title": "1984",
      "description": "1984 was George Orwell’s chilling prophecy about the dystopian future",
      "authors": ["George Orwell"]
  }
]

在幕后,PGSync 在为您生成高级查询,例如。

SELECT 
       JSON_BUILD_OBJECT(
          'isbn', book_1.isbn, 
          'title', book_1.title, 
          'description', book_1.description,
          'authors', anon_1.authors
       ) AS "JSON_BUILD_OBJECT_1",
       book_1.id
FROM book AS book_1
LEFT OUTER JOIN
  (SELECT 
          JSON_AGG(anon_2.anon) AS authors,
          book_author_1.book_isbn AS book_isbn
   FROM book_author AS book_author_1
   LEFT OUTER JOIN
     (SELECT 
             author_1.name AS anon,
             author_1.id AS id
      FROM author AS author_1) AS anon_2 ON anon_2.id = book_author_1.author_id
   GROUP BY book_author_1.book_isbn) AS anon_1 ON anon_1.book_isbn = book_1.isbn

您还可以通过模式配置,配置 PGSync 对属性重命名,例如

  {
      "isbn": "9781471331435",
      "this_is_a_custom_title": "1984",
      "desc": "1984 was George Orwell’s chilling prophecy about the dystopian future",
      "contributors": ["George Orwell"]
  }

PGSync 解决了以下挑战:

  • 如果我们在数据库中更新作者的姓名会怎样?
  • 如果我们想为现有书籍添加另一位作者怎么办?
  • 如果我们已经有很多文档是同一作者的,我们想更改作者姓名怎么办?
  • 如果我们删除或更新作者会怎样?
  • 如果我们截断整个表会怎样?

优点

  • PGSync 是可用于数据变更捕获的,一种简单易用的开箱即用解决方案。
  • PGSync 可处理数据删除。
  • PGSync 几乎不需要开发工作。您只需定义一个描述数据的模式配置。
  • PGSync 会直接生成与您的模式匹配的高级查询。
  • PGSync 允许您在模式更改时轻松重建索引。
  • 您可以在 Elasticsearch 中只公开所需要的数据。
  • 支持用于多租户应用的多个 Postgres 模式。