Codementor Events

PostgreSQL Change data capture (CDC) + golang sample code.

Published Jul 13, 2022
PostgreSQL Change data capture (CDC) + golang sample code.

Originally posted at DBConvert Blog

PostgreSQL offers Logical Decoding Method to make log-based change data capture possible. Setup and run CDC in several steps.

The architecture of modern web applications consists of several software components such as dashboards, analytics, databases, data lakes, caches, search, etc.

The database is usually the core part of any application. Real-time data updates keep disparate data systems in continuous sync and respond quickly to new information. So how to keep your application ecosystem in sync? How do these other components get information about changes in the database? Change Data Capture or CDC refers to any solution that identifies new or changed data.

This post is about PostgreSQL CDC and the ways to achieve this.
Change data capture (CDC) is an approach to data integration to detect, capture, and deliver the changes made to database data sources.

In General, CDC-based data integration consists of the following steps:

  1. Capture change data in a source database.
  2. Convert the changed data to a format that your consumers can accept.
  3. Publish the data to consumers or target database.

PostgreSQL offers two built-in ways to make CDC possible:

  • From transaction logs, PostgreSQL WALs, aka Write Ahead Logs.
  • Database Triggers.

Let's briefly discuss the pros and cons of using Transaction logs (WALs) and Triggers to capture data changes.

Triggers.

Trigger-based methods involve creating audit triggers on the database to capture all the events related to INSERT, UPDATE and DELETE methods.

Triggers can be attached to tables (partitioned or not) or views.

Triggers can also fire for TRUNCATE statements. If a trigger event occurs, the trigger's function is called at the appropriate time to handle the event.

😄 The most important advantage of this method is that all of this can be done at the SQL level, unlike transaction logs.
😕 However, the use of triggers has a significant impact on the performance of the source database, because these triggers need to be run on the application database when changes are made to the data.

Transaction Logs

On the other hand, for modern DBMS, transaction logs (WAL for PostgreSQL) are commonly used for transaction logging and replication.

In PostgreSQL, all transactions like INSERT, UPDATE, DELETE are written to the WAL before a client receives a transaction result.

  • The advantage of this approach is that it does not affect the performance of the database in any way.
  • It also requires no modification to DB tables or the application. There is no need to create additional tables in the source database.
  • Log-based CDC is generally considered the superior approach to change data capture applicable to all possible scenarios, including systems with extremely high transaction volumes.

Please note that currently most DDL statements like CREATE, DROP, ALTER are not tracked. However, the TRUNCATE command is in the logical replication stream.

If you want row-by-row streaming of Postgres data changes as they happen, you'll need Logical Decoding or Postgres logical replication feature.

running-elephant.gif

Using Postgres Logical Decoding.

Logical decoding is the official name of PostgreSQL's log-based CDC (Logical Replication).

Logical decoding uses the content of the PostgreSQL Write-Ahead Log to store all of the activities that occur in the database. Write Ahead Log is an internal log that describes database changes on a storage level.

  1. The first step in using logical decoding is to set the following parameters in the Postgres configuration postgresql.conf.
wal_level = logical
max_replication_slots = 5
max_wal_senders = 10
  • Setting wal_level to logical allows the WAL to record information needed for logical decoding.
  • Ensure that your max_replication_slots value is equal to or higher than the number of PostgreSQL connectors that use WAL plus the number of other replication slots your database uses.
  • Ensure that the max_wal_senders parameter, which specifies the maximum number of concurrent connections to the WAL, is at least twice the number of logical replication slots. For example, if your database uses 5 replication slots in total, the max_wal_senders value must be 10 or greater.

Restart your Postgres server to apply the changes.

  1. The second step is to set up logical replication using the output plugin test_decoding

Create a logical replication slot for the database you want to sync by running the following command.

SELECT pg_create_logical_replication_slot('replication_slot', 'test_decoding');

Note: Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character.

To verify the slot is created successfully run the following command.

SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
  1. In the next step create a publication for all your tables or only specific ones. If you specify tables, you add or remove tables from the publication later.
CREATE PUBLICATION pub FOR ALL TABLES;

or

CREATE PUBLICATION pub FOR TABLE table1, table2, table3;

Optionally you can choose which operations to include in the publication. For example, the following publication includes only INSERT and UPDATE operations for table1.

CREATE PUBLICATION insert_update_only_pub FOR TABLE table1 WITH (publish = 'INSERT, UPDATE');
  1. Verify that your chosen tables are in the publication.
psql-stream=# SELECT * FROM pg_publication_tables WHERE pubname='pub';
Output
pubname | schemaname | tablename
---------+------------+-----------
pub     | public     | table1
pub     | public     | table2
pub     | public     | table3
(3 rows)

Since then, our publication pub tracks all tables' changes in the psql-stream database.

  1. Let's create an abstract table t and fill it with some records.
create table t (id int, name text);
INSERT INTO t(id, name) SELECT g.id, k.name FROM generate_series(1, 10) as g(id), substr(md5(random()::text), 0, 25) as k(name);

As a result we have 10 records in the table t.

psql-stream=# SELECT count(*) FROM t;
count
-------
10
(1 row)
  1. Finally, it is time to check if our Logical Replication works.

Run the following command in the PostgreSQL console to see Postgres WAL entries.

SELECT * FROM pg_logical_slot_get_changes('replication_slot', NULL, NULL);

As a result, you get something like:

    lsn    | xid  |                          data                          
-----------+------+--------------------------------------------------------
 0/19EA2C0 | 1045 | BEGIN 1045
 0/19EA2C0 | 1045 | table public.t: INSERT: id[integer]:1 name[text]:51459cbc211647e7b31c8720
 0/19EA300 | 1045 | table public.t: INSERT: id[integer]:2 name[text]:51459cbc211647e7b31c8720
 0/19EA340 | 1045 | table public.t: INSERT: id[integer]:3 name[text]:51459cbc211647e7b31c8720
 0/19EA380 | 1045 | table public.t: INSERT: id[integer]:4 name[text]:51459cbc211647e7b31c8720
 0/19EA3C0 | 1045 | table public.t: INSERT: id[integer]:5 name[text]:51459cbc211647e7b31c8720
 0/19EA400 | 1045 | table public.t: INSERT: id[integer]:6 name[text]:51459cbc211647e7b31c8720
 0/19EA440 | 1045 | table public.t: INSERT: id[integer]:7 name[text]:51459cbc211647e7b31c8720
 0/19EA480 | 1045 | table public.t: INSERT: id[integer]:8 name[text]:51459cbc211647e7b31c8720
 0/19EA4C0 | 1045 | table public.t: INSERT: id[integer]:9 name[text]:51459cbc211647e7b31c8720
 0/19EA500 | 1045 | table public.t: INSERT: id[integer]:10 name[text]:51459cbc211647e7b31c8720
 0/19EA5B0 | 1045 | COMMIT 1045
(13 rows)

pg_logical_slot_peek_changes is another PostgreSQL command to peek changes from WAL entries without consuming them. So calling pg_logical_slot_peek_changes multiple times returns the same result each time.

On the other hand pg_logical_slot_get_changes only returns results the first time. The following calls of pg_logical_slot_get_changes return empty result sets. This means when get command is executed, the results are served and deleted, which greatly enhancing our ability to write the logic for using these events to create a replica of the table.

  1. Remember to destroy a slot you no longer need to stop it consuming
SELECT pg_drop_replication_slot('replication_slot');

Output plugins.

We've already talked about test_decoding output plugin available on Postgres 9.4+. Though created as an example of an output plugin, it is still useful if your consumer supports it.

Along with the test_decoding plugin, another pgoutput plugin is shipped with PostgreSQL natively. pgoutput is available since Postgres 10. Some consumers support it for decoding (e.g. Debezium).

Run the following command to create the plugin based on pgoutput as in step 2 above.

SELECT * FROM pg_create_logical_replication_slot('replication_slot', 'pgoutput');

The following command consumes data changes similar to those described in step 6.

psql-stream=# SELECT * FROM pg_logical_slot_peek_binary_changes('replication_slot', null, null, 'proto_version', '1', 'publication_names', 'pub');
    lsn    | xid  |                                           data                                           
-----------+------+------------------------------------------------------------------------------------------
 0/19A15F8 | 1038 | \x4200000000019a1d9000027de20a91a0ea0000040e
 0/19A15F8 | 1038 | \x52000080387075626c69630074006400020169640000000017ffffffff006e616d650000000019ffffffff
 0/19A15F8 | 1038 | \x49000080384e0002740000000234306e
 0/19A1890 | 1038 | \x49000080384e0002740000000234316e
 0/19A1910 | 1038 | \x49000080384e0002740000000234326e
 0/19A1990 | 1038 | \x49000080384e0002740000000234336e
 0/19A1A10 | 1038 | \x49000080384e0002740000000234346e
 0/19A1A90 | 1038 | \x49000080384e0002740000000234356e
 0/19A1B10 | 1038 | \x49000080384e0002740000000234366e
 0/19A1B90 | 1038 | \x49000080384e0002740000000234376e
 0/19A1C10 | 1038 | \x49000080384e0002740000000234386e
 0/19A1C90 | 1038 | \x49000080384e0002740000000234396e
 0/19A1DC0 | 1038 | \x430000000000019a1d9000000000019a1dc000027de20a91a0ea
(13 rows)

Here you can notice that the results are returned in binary format. pgoutput plugin produces binary output.

wal2json is another popular output plugin for logical decoding.

Here is a sample output from wal2json plugin

{
      "change":[
         {
            "kind":"insert",
            "schema":"public",
            "table":"t",
            "columnnames":[
               "id",
               "name"
            ],
            "columntypes":[
               "integer",
               "character varying(255)"
            ],
            "columnvalues":[
               1,
               ""
            ]
         }
      ]
   }
   {
      "change":[
         {
            "kind":"update",
            "schema":"public",
            "table":"t",
            "columnnames":[
               "id",
               "name"
            ],
            "columntypes":[
               "integer",
               "character varying(255)"
            ],
            "columnvalues":[
               1,
               "New Value"
            ],
            "oldkeys":{
               "keynames":[
                  "id"
               ],
               "keytypes":[
                  "integer"
               ],
               "keyvalues":[
                  1
               ]
            }
         }
      ]
   }
   {
      "change":[
         {
            "kind":"delete",
            "schema":"public",
            "table":"t",
            "oldkeys":{
               "keynames":[
                  "id"
               ],
               "keytypes":[
                  "integer"
               ],
               "keyvalues":[
                  1
               ]
            }
         }
      ]
   }

Important tips about slots.

Keep the following in mind when working with slots:

-Each slot has only one output plugin (you choose which one).
-Each slot provides changes from only one database.
-One database can have multiple slots.
-Each data change is typically is emitted once per slot.
-But a slot may re-emit the changes when the Postgres instance is restarted. A consumer must deal with this situation.
-An unconsumed slot is a threat to the availability of your Postgres instance. Postgres will save all WAL files for these unconsumed changes. This can lead to storage overflow.

PostgreSQL WAL consumers.

A consumer is any application that can ingest the Postgres logical decoding stream. pg_recvlogical is a PostgreSQL application that can manage slots and consume the stream from them. It is included in the Postgres distribution, so it's probably already installed with PostgreSQL.

photo-1526374965328-7f61d4dc18c5

Golang sample code.

The following Golang code example shows how to get started creating your own Postgress WAL consumer. It uses PostgreSQL-10.x logical replication to stream database changes (decoded WAL messages) from the source database.

package main

import (
  "context"
  "fmt"
  "os"
  "os/signal"
  "strings"
  "time"

  "github.com/jackc/pgconn"
  "github.com/jackc/pglogrepl"
  "github.com/jackc/pgproto3/v2"
)

// Note that runtime parameter "replication=database" in connection string is obligatory
// replicaiton slot will not be created if replication=database is omitted

const CONN = "postgres://postgres:postgres@localhost/psql-streamer?replication=database"
const SLOT_NAME = "replication_slot"
const OUTPUT_PLUGIN = "pgoutput"
const INSERT_TEMPLATE = "create table t (id int, name text);"

var Event = struct {
  Relation string
  Columns  []string
}{}

func main() {
  ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
  defer cancel()
  conn, err := pgconn.Connect(ctx, CONN)
  if err != nil {
    panic(err)
  }
  defer conn.Close(ctx)

  // 1. Create table
  if _, err := conn.Exec(ctx, INSERT_TEMPLATE).ReadAll(); err != nil {
    fmt.Errorf("failed to create table: %v", err)
  }

  // 2. ensure publication exists
  if _, err := conn.Exec(ctx, "DROP PUBLICATION IF EXISTS pub;").ReadAll(); err != nil {
    fmt.Errorf("failed to drop publication: %v", err)
  }

  if _, err := conn.Exec(ctx, "CREATE PUBLICATION pub FOR ALL TABLES;").ReadAll(); err != nil {
    fmt.Errorf("failed to create publication: %v", err)
  }

  // 3. create temproary replication slot server
  if _, err = pglogrepl.CreateReplicationSlot(ctx, conn, SLOT_NAME, OUTPUT_PLUGIN, pglogrepl.CreateReplicationSlotOptions{Temporary: true}); err != nil {
    fmt.Errorf("failed to create a replication slot: %v", err)
  }

  var msgPointer pglogrepl.LSN
  pluginArguments := []string{"proto_version '1'", "publication_names 'pub'"}

  // 4. establish connection
  err = pglogrepl.StartReplication(ctx, conn, SLOT_NAME, msgPointer, pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments})
  if err != nil {
    fmt.Errorf("failed to establish start replication: %v", err)
  }

  var pingTime time.Time
  for ctx.Err() != context.Canceled {
    if time.Now().After(pingTime) {
      if err = pglogrepl.SendStandbyStatusUpdate(ctx, conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: msgPointer}); err != nil {
        fmt.Errorf("failed to send standby update: %v", err)
      }
      pingTime = time.Now().Add(10 * time.Second)
      //fmt.Println("client: please standby")
    }

    ctx, cancel := context.WithTimeout(ctx, time.Second*10)
    defer cancel()

    msg, err := conn.ReceiveMessage(ctx)
    if pgconn.Timeout(err) {
      continue
    }
    if err != nil {
      fmt.Errorf("something went wrong while listening for message: %v", err)
    }

    switch msg := msg.(type) {
    case *pgproto3.CopyData:
      switch msg.Data[0] {
      case pglogrepl.PrimaryKeepaliveMessageByteID:
      //	fmt.Println("server: confirmed standby")

      case pglogrepl.XLogDataByteID:
        walLog, err := pglogrepl.ParseXLogData(msg.Data[1:])
        if err != nil {
          fmt.Errorf("failed to parse logical WAL log: %v", err)
        }

        var msg pglogrepl.Message
        if msg, err = pglogrepl.Parse(walLog.WALData); err != nil {
          fmt.Errorf("failed to parse logical replication message: %v", err)
        }
        switch m := msg.(type) {
        case *pglogrepl.RelationMessage:
          Event.Columns = []string{}
          for _, col := range m.Columns {
            Event.Columns = append(Event.Columns, col.Name)
          }
          Event.Relation = m.RelationName
        case *pglogrepl.InsertMessage:
          var sb strings.Builder
          sb.WriteString(fmt.Sprintf("INSERT %s(", Event.Relation))
          for i := 0; i < len(Event.Columns); i++ {
            sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.Tuple.Columns[i].Data)))
          }
          sb.WriteString(")")
          fmt.Println(sb.String())
        case *pglogrepl.UpdateMessage:
          var sb strings.Builder
          sb.WriteString(fmt.Sprintf("UPDATE %s(", Event.Relation))
          for i := 0; i < len(Event.Columns); i++ {
            sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.NewTuple.Columns[i].Data)))
          }
          sb.WriteString(")")
          fmt.Println(sb.String())
        case *pglogrepl.DeleteMessage:
          var sb strings.Builder
          sb.WriteString(fmt.Sprintf("DELETE %s(", Event.Relation))
          for i := 0; i < len(Event.Columns); i++ {
            sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.OldTuple.Columns[i].Data)))
          }
          sb.WriteString(")")
          fmt.Println(sb.String())
        case *pglogrepl.TruncateMessage:
          fmt.Println("ALL GONE (TRUNCATE)")
        }
      }
    default:
      fmt.Printf("received unexpected message: %T", msg)
    }
  }
}

This code just logs incoming events, but in a production environment you can easily send them to a message queue or a target database.

Conclusion

Logical decoding in PostgreSQL provides an efficient way for other application components to stay up-to-date with data changes in your Postgres database.

Traditionally, the pull notification model has been used, in which each application component queries Postgres at a certain interval. Logical encoding uses the push notification model, where Postgres notifies other parts of the application of every change as soon as it happens.

Data change events can now be sent to consumers in milliseconds without querying the database. With logic decoding, the PostgreSQL database becomes a central part of your modern dynamic real-time application.

Discover and read more posts from Dmitry Narizhnykh
get started