PostgreSQL Tutorial: Synchronize data to Elasticsearch using Logstash

April 7, 2024

Summary: In this tutorial, you will learn how to synchronize Elasticsearch with a PostgreSQL database using Logstash.

Diagram showing Logstash Pipeline

In order to take advantage of the powerful search capabilities offered by Elasticsearch, many businesses will deploy Elasticsearch alongside existing relational databases. In such a scenario, it will likely be necessary to keep Elasticsearch synchronized with the data that is stored in the associated relational database. Therefore, in this tutorial I will show how Logstash can be used to efficiently copy records and to synchronize updates from a relational database into Elasticsearch.

Table of Contents

A high-level overview of the synchronization steps

For this tutorial we use Logstash with the JDBC input plugin to keep Elasticsearch synchronized with PostgreSQL. Conceptually, Logstash’s JDBC input plugin runs a loop that periodically polls PostgreSQL for records that were inserted or modified since the last iteration of this loop. In order for this to work correctly, the following conditions must be satisfied:

  1. As documents in PostgreSQL are written into Elasticsearch, the “_id” field in Elasticsearch must be set to the “id” field from PostgreSQL. This provides a direct mapping between the PostgreSQL record and the Elasticsearch document. If a record is updated in PostgreSQL, then the entire associated document will be overwritten in Elasticsearch. Note that overwriting a document in Elasticsearch is just as efficient as an update operation would be, because internally, an update would consist of deleting the old document and then indexing an entirely new document.
  2. When a record is inserted or updated in PostgreSQL, that record must have a field that contains the update or insertion time. This field is used to allow Logstash to request only documents that have been modified or inserted since the last iteration of its polling loop. Each time Logstash polls PostgreSQL, it stores the update or insertion time of the last record that it has read from PostgreSQL. On its next iteration, Logstash knows that it only needs to request records with an update or insertion time that is newer than the last record that was received in the previous iteration of the polling loop.

If the above conditions are satisfied, we can configure Logstash to periodically request all new or modified records from PostgreSQL and then write them into Elasticsearch. The Logstash code for this is presented later in this tutorial.

PostgreSQL setup

The PostgreSQL database and table can be configured as follows:


CREATE TABLE es_table (
  id bigint PRIMARY KEY,
  client_name varchar(50) UNIQUE NOT NULL,
  created_at timestamp NOT NULL DEFAULT current_timestamp,
  last_update timestamp NOT NULL DEFAULT current_timestamp

CREATE FUNCTION update_row_modtime()
    NEW.last_update = now();
$$ LANGUAGE plpgsql;

CREATE TRIGGER trig_update_modtime
EXECUTE PROCEDURE update_row_modtime();

There are a few interesting parameters in the above PostgreSQL configuration:

  • es_table: This is the name of the PostgreSQL table that records will be read from and then synchronized to Elasticsearch.
  • id: This is the unique identifier for this record. Notice that “id” is defined as the PRIMARY KEY. This guarantees each “id” only appears once in the current table. This will be translated to “_id” for updating or inserting the document into Elasticsearch.
  • client_name: This is a field that represents the user-defined data that will be stored in each record. To keep this tutorial simple, we only have a single field with user-defined data, but more fields could be easily added. This is the field that we will modify to show that not only are newly inserted PostgreSQL records copied to Elasticsearch, but that updated records are also correctly propagated to Elasticsearch.
  • created_at: This field is mostly for demonstration purposes and is not strictly necessary for synchronization to work correctly. We use it to track when a record was originally inserted into PostgreSQL.
  • last_update: This field is defined so that any insertion or update of a record in PostgreSQL will cause its value to be set to the time of the modification. This modification time allows us to pull out any records that have been modified since the last time Logstash requested documents from PostgreSQL.

PostgreSQL Operations

Given the above configuration, records can be written to PostgreSQL as follows:

INSERT INTO es_table (id, client_name) VALUES (<id>, <client name>);

Records in PostgreSQL can be updated using the following command:

UPDATE es_table SET client_name = <new client name> WHERE id = <id>;

PostgreSQL upserts can be done as follows:

INSERT INTO es_table (id, client_name) VALUES (<id>, <client name when created>)
ON CONFLICT (id) DO UPDATE SET client_name = <client name when updated>;

Synchronization code

The following logstash pipeline implements the synchronization code that is described in the previous section:

        jdbc_driver_library => "<path>/postgresql-42.7.3.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        jdbc_connection_string => "jdbc:postgresql://<PostgreSQL Host>:5432/datasync"
        jdbc_user => "datasync"
        jdbc_password => "ra5hoxetRami5"
        jdbc_paging_enabled => true
        use_column_value => true
        tracking_column => "last_update"
        tracking_column_type => "timestamp"
        schedule => "*/10 * * * * *"
        statement => "SELECT * FROM public.es_table WHERE last_update > :sql_last_value AND last_update < CURRENT_TIMESTAMP ORDER BY last_update ASC"
        copy => { "id" => "[@metadata][_id]"}
        remove_field => ["@version","@timestamp"]
    # stdout { codec =>  "rubydebug"}
        hosts => ["http://elasticsearch:9200"]
        index => "rdbms_sync_idx"
        document_id => "%{[@metadata][_id]}"

There are a few areas from the above pipeline that should be highlighted:

  • tracking_column: This field specifies the “last_update” column of the “es_table” table, which is used for tracking the last document read by Logstash from PostgreSQL, and is stored on disk in .logstash_jdbc_last_run. This value will be used to determine the starting value for documents that Logstash will request in the next iteration of its polling loop. The value stored in .logstash_jdbc_last_run can be accessed in the SELECT statement as “:sql_last_value”.
  • sql_last_value: This is a built-in parameter that contains the starting point for the current iteration of Logstash’s polling loop, and it is referenced in the SELECT statement line of the above jdbc input configuration. This is set to the most recent value of “last_update”, which is read from .logstash_jdbc_last_run. This is used as the starting point for documents to be returned by the PostgreSQL query that is executed in Logstash’s polling loop. Including this variable in the query guarantees that insertions or updates that have previously been propagated to Elasticsearch will not be re-sent to Elasticsearch.
  • schedule: This uses cron syntax to specify how often Logstash should poll PostgreSQL for changes. The specification of "*/10 * * * * *" tells Logstash to contact PostgreSQL every 10 seconds.
  • last_update < CURRENT_TIMESTAMP: This portion of the SELECT is one of the more difficult concepts to explain and therefore it is explained in detail in the next section.
  • filter: In this section we simply copy the value of “id” from the PostgreSQL record into a metadata field called “_id”, which we will later reference in the output to ensure that each document is written into Elasticsearch with the correct “_id” value. Using a metadata field ensures that this temporary value does not cause a new field to be created. We also remove the “@version” and “@timestamp” fields from the document, as we do not wish for them to be written to Elasticsearch.
  • output: In this section we specify that each document should be written to Elasticsearch, and should be assigned an “_id” which is pulled from the metadata field that we created in the filter section. There is also a commented-out rubydebug output that can be enabled to help with debugging.

Analysis of the correctness of the SELECT statement

In this section we give a description of why including last_update < CURRENT_TIMESTAMP in the SELECT statement is important. To help explain this concept, we first give counter-examples that demonstrate why the two most intuitive approaches will not work correctly. This is followed by an explanation of how including last_update < CURRENT_TIMESTAMP overcomes problems with the intuitive approaches.

Intuitive scenario one

In this section we show what happens if the WHERE clause does not include last_update < CURRENT_TIMESTAMP, and instead only specifies last_update > :sql_last_value. In this case, the SELECT statement would look as follows:

statement => "SELECT * FROM public.es_table WHERE last_update > :sql_last_value ORDER BY last_update ASC"

At first glance, the above approach appears that it should work correctly, but there are edge cases where it may miss some documents. For example, let’s consider a case where PostgreSQL is inserting 2 documents per second and where Logstash is executing its SELECT statement every 5 seconds. This is demonstrated in the following diagram, where each second is represented by T0 to T10, and the records in PostgreSQL are represented by R1 through R22. We assume that the first iteration of Logstash’s polling loop takes place at T5 and it reads documents R1 through R11, as represented by the cyan boxes. The value stored in sql_last_value is now T5 as this is the timestamp on the last record (R11) that was read. We also assume that just after Logstash has read documents from PostgreSQL, another document R12 is inserted into PostgreSQL with a timestamp of T5.

Diagram showing records are off by one

In the next iteration of the above SELECT statement, we only pull documents where the time is greater than T5 (as instructed by WHERE last_update > :sql_last_value), which means that record R12 will be skipped. This is shown in the diagram below, where the cyan boxes represent the records that are read by Logstash in the current iteration, and the grey boxes represent the records that were previously read by Logstash.

Diagram showing record R12 is never written

Notice that with this scenario’s SELECT, the record R12 will never be written to Elasticsearch.

Intuitive scenario two

To remedy the above issue, one may decide to change the WHERE clause to greater than or equals as follows:

statement => "SELECT * FROM public.es_table WHERE last_update >= :sql_last_value ORDER BY last_update ASC"

However, this implementation is also not ideal. In this case, the problem is that the most recent document(s) read from PostgreSQL in the most recent time interval will be re-sent to Elasticsearch. While this does not cause any issues with respect to correctness of the results, it does create unnecessary work. As in the previous section, after the initial Logstash polling iteration, the diagram below shows which documents have been read from PostgreSQL.

Diagram again showing records read are off by one

When we execute the subsequent Logstash polling iteration, we pull all documents where the time is greater than or equal to T5. This is demonstrated in the following diagram. Notice that record R11 (shown in purple) will be sent to Elasticsearch again.

Diagram showing record in purple (R11) will be sent again

Neither of the previous two scenarios are ideal. In the first scenario data can be lost, and in the second scenario redundant data is read from PostgreSQL and sent to Elasticsearch.

How to fix the intuitive approaches

Given that neither of the previous two scenarios are ideal, an alternative approach should be employed. By specifying last_update > :sql_last_value AND last_update < CURRENT_TIMESTAMP, we send each document to Elasticsearch exactly once.

This is demonstrated by the following diagram, where the current Logstash poll is executed at T5. Notice that because last_update < CURRENT_TIMESTAMP must be satisfied, only documents up to, but not including, those in the period T5 will be read from PostgreSQL. Since we have retrieved all documents from T4 and no documents from T5, we know that sql_last_value will then be set to T4 for the next Logstash polling iteration.

Diagram showing correct number of records read

The diagram below demonstrates what happens on the next iteration of the Logstash poll. Since last_update > :sql_last_value, and because sql_last_value is set to T4, we know that only documents starting from T5 will be retrieved. Additionally, because only documents that satisfy last_update < CURRENT_TIMESTAMP will be retrieved, only documents up to and including T9 will be retrieved. Again, this means that all documents in T9 are retrieved, and that sql_last_value will be set to T9 for the next iteration. Therefore this approach eliminates the risk of retrieving only a subset of PostgreSQL documents from any given time interval.

Diagram showing second set of records read correctly

Testing the system

Simple tests can be used to demonstrate that our implementation performs as desired. We can write records into PostgreSQL as follows:

INSERT INTO es_table (id, client_name) VALUES (1, 'Jim Carrey');
INSERT INTO es_table (id, client_name) VALUES (2, 'Mike Myers');
INSERT INTO es_table (id, client_name) VALUES (3, 'Bryan Adams');

Once the JDBC input schedule has triggered reading records from PostgreSQL and written them to Elasticsearch, we can execute the following Elasticsearch query to see the documents in Elasticsearch:

GET rdbms_sync_idx/_search

which returns something similar to the following response:

"hits" : {
    "total" : {
      "value" : 3,
      "relation" : "eq"
    "max_score" : 1.0,
    "hits" : [
        "_index" : "rdbms_sync_idx",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "created_at" : "2019-06-18T12:58:56.000Z",
          "@timestamp" : "2019-06-18T13:04:27.436Z",
          "last_update" : "2019-06-18T12:58:56.000Z",
          "client_name" : "Jim Carrey"

We can then update the document that corresponds to _id = 1 in PostgreSQL as follows:

UPDATE es_table SET client_name = 'Jimbo Kerry' WHERE id = 1;

which correctly updates the document identified by _id of 1. We can look directly at the document in Elasticsearch by executing the following:

GET rdbms_sync_idx/_doc/1

which returns a document that looks as follows:

  "_index" : "rdbms_sync_idx",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 2,
  "_seq_no" : 3,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "created_at" : "2019-06-18T12:58:56.000Z",
    "@timestamp" : "2019-06-18T13:09:30.300Z",
    "last_update" : "2019-06-18T13:09:28.000Z",
    "client_name" : "Jimbo Kerry"

Notice that the _version is now set to 2, that the last_update is now different than the created_at, and that the client_name field has been correctly updated to the new value. The @timestamp field is not particularly interesting for this example, and is added by Logstash by default.

An upsert into PostgreSQL can be done as follows, and the reader of this tutorial can verify that the correct information will be reflected in Elasticsearch:

INSERT INTO es_table (id, client_name) VALUES (4, 'Bob is new')
ON CONFLICT (id) DO UPDATE SET client_name = 'Bob exists already';

What about deleted documents?

The astute reader may have noticed that if a document is deleted from PostgreSQL, then that deletion will not be propagated to Elasticsearch. The following approaches may be considered to address this issue:

  1. PostgreSQL records could include an “is_deleted” field, which indicates that they are no longer valid. This is known as a “soft delete”. As with any other update to a record in PostgreSQL, the “is_deleted” field will be propagated to Elasticsearch through Logstash. If this approach is implemented, then Elasticsearch and PostgreSQL queries would need to be written so as to exclude records/documents where “is_deleted” is true. Eventually, background jobs can remove such documents from both PostgreSQL and Elastic.
  2. Another alternative is to ensure that any system that is responsible for deletion of records in PostgreSQL must also subsequently execute a command to directly delete the corresponding documents from Elasticsearch.