DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Using PostgreSQL pgoutput Plugin for Change Data Capture With Debezium
  • Salesforce Change Data Capture Streaming Data With Kafka and Snowflake Data Warehouse
  • Building a Real-Time Change Data Capture Pipeline With Debezium, Kafka, and PostgreSQL
  • Comparing Managed Postgres Options on The Azure Marketplace

Trending

  • Manual Sharding in PostgreSQL: A Step-by-Step Implementation Guide
  • Can You Run a MariaDB Cluster on a $150 Kubernetes Lab? I Gave It a Shot
  • Building a Real-Time Audio Transcription System With OpenAI’s Realtime API
  • AI Speaks for the World... But Whose Humanity Does It Learn From?
  1. DZone
  2. Data Engineering
  3. Databases
  4. How to Set Up and Run PostgreSQL Change Data Capture

How to Set Up and Run PostgreSQL Change Data Capture

PostgreSQL offers a Logical Decoding Method to make log-based change data capture possible. Learn how to set up and run Postgres CDC in a few steps.

By 
Dmitry Narizhnykh user avatar
Dmitry Narizhnykh
DZone Core CORE ·
May. 02, 22 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
4.5K Views

Join the DZone community and get the full member experience.

Join For Free

The architecture of modern web applications consists of several software components such as dashboards, analytics, databases, data lakes, caches, search, etc.

The database is usually the core part of any application. Real-time data updates keep disparate data systems in continuous sync and respond quickly to new information. So how to keep your application ecosystem in sync? How do these other components get information about changes in the database? Change Data Capture or CDC refers to any solution that identifies new or changed data.

This post is about PostgreSQL CDCand the ways to achieve this.
Change data capture (CDC) is an approach to data integration to detect, capture, and deliver the changes made to database data sources.

In General, CDC-based data integration consists of the following steps:

  1. Capture change data in a source database.
  2. Convert the changed data to a format that your consumers can accept.
  3. Publish the data to consumers or target database.

PostgreSQL offers two built-in ways to make CDC possible:

  • From transaction logs, PostgreSQL WALs, aka Write Ahead Logs.
  • Database Triggers.

Let's briefly discuss the pros and cons of using Transaction logs (WALs) and Triggers to capture data changes.

Triggers

Trigger-based methods involve creating audit triggers on the database to capture all the events related to INSERT, UPDATE and DELETE methods.

Triggers can be attached to tables (partitioned or not) or views.

Triggers can also fire for TRUNCATE statements. If a trigger event occurs, the trigger's function is called at the appropriate time to handle the event.

  • The most important advantage of this method is that all of this can be done at the SQL level, unlike transaction logs.
  • However, the use of triggers has a significant impact on the performance of the source database, because these triggers need to be run on the application database when changes are made to the data.

Transaction Logs

On the other hand, for modern DBMS, transaction logs (WAL for PostgreSQL) are commonly used for transaction logging and replication.

In PostgreSQL, all transactions like INSERT, UPDATE, and DELETE are written to the WAL before a client receives a transaction result.

  • The advantage of this approach is that it does not affect the performance of the database in any way.
  • It also requires no modification to DB tables or the application. There is no need to create additional tables in the source database.
  • Log-based CDC is generally considered the superior approach to change data capture applicable to all possible scenarios, including systems with extremely high transaction volumes.
Please note that currently most DDL statements like CREATE, DROP, ALTER are not tracked. However, the TRUNCATE command is in the logical replication stream.

If you want row-by-row streaming of Postgres data changes as they happen, you'll need Logical Decoding or Postgres logical replication feature.

Using Postgres Logical Decoding

Logical decoding is the official name of PostgreSQL's log-based CDC (Logical Replication).

Logical decoding uses the content of the PostgreSQL Write-Ahead Log to store all of the activities that occur in the database. Write Ahead Log is an internal log that describes database changes on a storage level.

1. The first step in using logical decoding is to set the following parameters in the Postgres configuration `postgresql.conf`.

wal_level = logical
max_replication_slots = 5
max_wal_senders = 10
  • Setting wal_level to logical allows the WAL to record information needed for logical decoding.
  • Ensure that your max_replication_slots value is equal to or higher than the number of PostgreSQL connectors that use WAL plus the number of other replication slots your database uses.
  • Ensure that the max_wal_senders parameter, which specifies the maximum number of concurrent connections to the WAL, is at least twice the number of logical replication slots. For example, if your database uses 5 replication slots in total, the max_wal_senders value must be 10 or greater.

Restart your Postgres server to apply the changes.

2. The second step is to set up logical replication using the output plugin test_decoding

Create a logical replication slot for the database you want to sync by running the following command.

SELECT pg_create_logical_replication_slot('replication_slot', 'test_decoding');

Note: Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character.

To verify the slot is created successfully run the following command.

SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;

3. In the next step, create a publication for all your tables or only specific ones. If you specify tables, you add or remove tables from the publication later.

CREATE PUBLICATION pub FOR ALL TABLES;

or

CREATE PUBLICATION pub FOR TABLE table1, table2, table3;

Optionally you can choose which operations to include in the publication. For example, the following publication includes only INSERT and UPDATE operations for table1.

CREATE PUBLICATION insert_update_only_pub FOR TABLE table1 WITH (publish = 'INSERT, UPDATE');

4. Verify that your chosen tables are in the publication.

psql-stream=# SELECT * FROM pg_publication_tables WHERE pubname='pub';
Output
pubname | schemaname | tablename
---------+------------+-----------
pub     | public     | table1
pub     | public     | table2
pub     | public     | table3
(3 rows)

Since then, our publication pub tracks all tables' changes in the psql-stream database.

5. Let's create an abstract table t and fill it with some records.

create table t (id int, name text);
INSERT INTO t(id, name) SELECT g.id, k.name FROM generate_series(1, 10) as g(id), substr(md5(random()::text), 0, 25) as k(name);

As a result, we have 10 records in the table t.

psql-stream=# SELECT count(*) FROM t;
count
-------
10
(1 row)

6. Finally, it is time to check if our Logical Replication works.

Run the following command in the PostgreSQL console to see Postgres WAL entries.

SELECT * FROM pg_logical_slot_get_changes('replication_slot', NULL, NULL);

As a result, you get something like:

    lsn    | xid  |                          data                          
-----------+------+--------------------------------------------------------
 0/19EA2C0 | 1045 | BEGIN 1045
 0/19EA2C0 | 1045 | table public.t: INSERT: id[integer]:1 name[text]:51459cbc211647e7b31c8720
 0/19EA300 | 1045 | table public.t: INSERT: id[integer]:2 name[text]:51459cbc211647e7b31c8720
 0/19EA340 | 1045 | table public.t: INSERT: id[integer]:3 name[text]:51459cbc211647e7b31c8720
 0/19EA380 | 1045 | table public.t: INSERT: id[integer]:4 name[text]:51459cbc211647e7b31c8720
 0/19EA3C0 | 1045 | table public.t: INSERT: id[integer]:5 name[text]:51459cbc211647e7b31c8720
 0/19EA400 | 1045 | table public.t: INSERT: id[integer]:6 name[text]:51459cbc211647e7b31c8720
 0/19EA440 | 1045 | table public.t: INSERT: id[integer]:7 name[text]:51459cbc211647e7b31c8720
 0/19EA480 | 1045 | table public.t: INSERT: id[integer]:8 name[text]:51459cbc211647e7b31c8720
 0/19EA4C0 | 1045 | table public.t: INSERT: id[integer]:9 name[text]:51459cbc211647e7b31c8720
 0/19EA500 | 1045 | table public.t: INSERT: id[integer]:10 name[text]:51459cbc211647e7b31c8720
 0/19EA5B0 | 1045 | COMMIT 1045
(13 rows)


pg_logical_slot_peek_changes is another PostgreSQL command to peek changes from WAL entries without consuming them.  So calling pg_logical_slot_peek_changes multiple times returns the same result each time.

On the other hand  pg_logical_slot_get_changes  only returns results the first time. The following calls of pg_logical_slot_get_changes return empty result sets. This means when get command is executed, the results are served and deleted, which greatly enhances our ability to write the logic for using these events to create a replica of the table.


7. Remember to destroy a slot you no longer need to stop it from consuming

SELECT pg_drop_replication_slot('replication_slot');


Output Plugins

We've already talked about test_decoding output plugin available on Postgres 9.4+. Though created as an example of an output plugin, it is still useful if your consumer supports it.

Along with the test_decoding plugin, another pgoutput plugin is shipped with PostgreSQL natively. pgoutput is available since Postgres 10. Some consumers support it for decoding (e.g. Debezium).

Run the following command to create the plugin based on pgoutput as in step 2 above.

SELECT * FROM pg_create_logical_replication_slot('replication_slot', 'pgoutput');

The following command consumes data changes similar to those described in step 6.

psql-stream=# SELECT * FROM pg_logical_slot_peek_binary_changes('replication_slot', null, null, 'proto_version', '1', 'publication_names', 'pub');
    lsn    | xid  |                                           data                                           
-----------+------+------------------------------------------------------------------------------------------
 0/19A15F8 | 1038 | \x4200000000019a1d9000027de20a91a0ea0000040e
 0/19A15F8 | 1038 | \x52000080387075626c69630074006400020169640000000017ffffffff006e616d650000000019ffffffff
 0/19A15F8 | 1038 | \x49000080384e0002740000000234306e
 0/19A1890 | 1038 | \x49000080384e0002740000000234316e
 0/19A1910 | 1038 | \x49000080384e0002740000000234326e
 0/19A1990 | 1038 | \x49000080384e0002740000000234336e
 0/19A1A10 | 1038 | \x49000080384e0002740000000234346e
 0/19A1A90 | 1038 | \x49000080384e0002740000000234356e
 0/19A1B10 | 1038 | \x49000080384e0002740000000234366e
 0/19A1B90 | 1038 | \x49000080384e0002740000000234376e
 0/19A1C10 | 1038 | \x49000080384e0002740000000234386e
 0/19A1C90 | 1038 | \x49000080384e0002740000000234396e
 0/19A1DC0 | 1038 | \x430000000000019a1d9000000000019a1dc000027de20a91a0ea
(13 rows)

Here you can notice that the results are returned in binary format. pgoutput plugin produces binary output.

wal2json is another popular output plugin for logical decoding.

Here is a sample output from wal2json plugin

JSON
 
{
      "change":[
         {
            "kind":"insert",
            "schema":"public",
            "table":"t",
            "columnnames":[
               "id",
               "name"
            ],
            "columntypes":[
               "integer",
               "character varying(255)"
            ],
            "columnvalues":[
               1,
               ""
            ]
         }
      ]
   }
   {
      "change":[
         {
            "kind":"update",
            "schema":"public",
            "table":"t",
            "columnnames":[
               "id",
               "name"
            ],
            "columntypes":[
               "integer",
               "character varying(255)"
            ],
            "columnvalues":[
               1,
               "New Value"
            ],
            "oldkeys":{
               "keynames":[
                  "id"
               ],
               "keytypes":[
                  "integer"
               ],
               "keyvalues":[
                  1
               ]
            }
         }
      ]
   }
   {
      "change":[
         {
            "kind":"delete",
            "schema":"public",
            "table":"t",
            "oldkeys":{
               "keynames":[
                  "id"
               ],
               "keytypes":[
                  "integer"
               ],
               "keyvalues":[
                  1
               ]
            }
         }
      ]
   }


Important Tips About Slots

Keep the following in mind when working with slots:

  • Each slot has only one output plugin (you choose which one).
  • Each slot provides changes from only one database.
  • One database can have multiple slots.
  • Each data change is typically is emitted once per slot.
  • But a slot may re-emit the changes when the Postgres instance is restarted. A consumer must deal with this situation.
  • An unconsumed slot is a threat to the availability of your Postgres instance. Postgres will save all WAL files for these unconsumed changes. This can lead to storage overflow.

PostgreSQL WAL Consumers

A consumer is any application that can ingest the Postgres logical decoding stream.  pg_recvlogical is a PostgreSQL application that can manage slots and consume the stream from them. It is included in the Postgres distribution,  so it's probably already installed with PostgreSQL.

Golang Sample Code

The following Golang code example shows how to get started creating your own Postgress WAL consumer. It uses PostgreSQL-10.x logical replication to stream database changes (decoded WAL messages) from the source database.

Go
 
package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "strings"
    "time"

    "github.com/jackc/pgconn"
    "github.com/jackc/pglogrepl"
    "github.com/jackc/pgproto3/v2"
)

// Note that runtime parameter "replication=database" in connection string is obligatory
// replicaiton slot will not be created if replication=database is omitted

const CONN = "postgres://postgres:postgres@localhost/psql-streamer?replication=database"
const SLOT_NAME = "replication_slot"
const OUTPUT_PLUGIN = "pgoutput"
const INSERT_TEMPLATE = "create table t (id int, name text);"

var Event = struct {
    Relation string
    Columns  []string
}{}

func main() {
    ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
    defer cancel()
    conn, err := pgconn.Connect(ctx, CONN)
    if err != nil {
        panic(err)
    }
    defer conn.Close(ctx)

    // 1. Create table
    if _, err := conn.Exec(ctx, INSERT_TEMPLATE).ReadAll(); err != nil {
        fmt.Errorf("failed to create table: %v", err)
    }

    // 2. ensure publication exists
    if _, err := conn.Exec(ctx, "DROP PUBLICATION IF EXISTS pub;").ReadAll(); err != nil {
        fmt.Errorf("failed to drop publication: %v", err)
    }

    if _, err := conn.Exec(ctx, "CREATE PUBLICATION pub FOR ALL TABLES;").ReadAll(); err != nil {
        fmt.Errorf("failed to create publication: %v", err)
    }

    // 3. create temproary replication slot server
    if _, err = pglogrepl.CreateReplicationSlot(ctx, conn, SLOT_NAME, OUTPUT_PLUGIN, pglogrepl.CreateReplicationSlotOptions{Temporary: true}); err != nil {
        fmt.Errorf("failed to create a replication slot: %v", err)
    }

    var msgPointer pglogrepl.LSN
    pluginArguments := []string{"proto_version '1'", "publication_names 'pub'"}

    // 4. establish connection
    err = pglogrepl.StartReplication(ctx, conn, SLOT_NAME, msgPointer, pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments})
    if err != nil {
        fmt.Errorf("failed to establish start replication: %v", err)
    }

    var pingTime time.Time
    for ctx.Err() != context.Canceled {
        if time.Now().After(pingTime) {
            if err = pglogrepl.SendStandbyStatusUpdate(ctx, conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: msgPointer}); err != nil {
                fmt.Errorf("failed to send standby update: %v", err)
            }
            pingTime = time.Now().Add(10 * time.Second)
            //fmt.Println("client: please standby")
        }

        ctx, cancel := context.WithTimeout(ctx, time.Second*10)
        defer cancel()

        msg, err := conn.ReceiveMessage(ctx)
        if pgconn.Timeout(err) {
            continue
        }
        if err != nil {
            fmt.Errorf("something went wrong while listening for message: %v", err)
        }

        switch msg := msg.(type) {
        case *pgproto3.CopyData:
            switch msg.Data[0] {
            case pglogrepl.PrimaryKeepaliveMessageByteID:
            //    fmt.Println("server: confirmed standby")

            case pglogrepl.XLogDataByteID:
                walLog, err := pglogrepl.ParseXLogData(msg.Data[1:])
                if err != nil {
                    fmt.Errorf("failed to parse logical WAL log: %v", err)
                }

                var msg pglogrepl.Message
                if msg, err = pglogrepl.Parse(walLog.WALData); err != nil {
                    fmt.Errorf("failed to parse logical replication message: %v", err)
                }
                switch m := msg.(type) {
                case *pglogrepl.RelationMessage:
                    Event.Columns = []string{}
                    for _, col := range m.Columns {
                        Event.Columns = append(Event.Columns, col.Name)
                    }
                    Event.Relation = m.RelationName
                case *pglogrepl.InsertMessage:
                    var sb strings.Builder
                    sb.WriteString(fmt.Sprintf("INSERT %s(", Event.Relation))
                    for i := 0; i < len(Event.Columns); i++ {
                        sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.Tuple.Columns[i].Data)))
                    }
                    sb.WriteString(")")
                    fmt.Println(sb.String())
                case *pglogrepl.UpdateMessage:
                    var sb strings.Builder
                    sb.WriteString(fmt.Sprintf("UPDATE %s(", Event.Relation))
                    for i := 0; i < len(Event.Columns); i++ {
                        sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.NewTuple.Columns[i].Data)))
                    }
                    sb.WriteString(")")
                    fmt.Println(sb.String())
                case *pglogrepl.DeleteMessage:
                    var sb strings.Builder
                    sb.WriteString(fmt.Sprintf("DELETE %s(", Event.Relation))
                    for i := 0; i < len(Event.Columns); i++ {
                        sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.OldTuple.Columns[i].Data)))
                    }
                    sb.WriteString(")")
                    fmt.Println(sb.String())
                case *pglogrepl.TruncateMessage:
                    fmt.Println("ALL GONE (TRUNCATE)")
                }
            }
        default:
            fmt.Printf("received unexpected message: %T", msg)
        }
    }
}


This code just logs incoming events, but in a production environment, you can easily send them to a message queue or a target database.

Conclusion

Logical decoding in PostgreSQL provides an efficient way for other application components to stay up-to-date with data changes in your Postgres database.

Traditionally, the pull notification model has been used, in which each application component queries Postgres at a certain interval. Logical encoding uses the push notification model, where Postgres notifies other parts of the application of every change as soon as it happens.

Data change events can now be sent to consumers in milliseconds without querying the database. With logic decoding, the PostgreSQL database becomes a central part of your modern dynamic real-time application.

Change data capture Data integration Database PostgreSQL

Published at DZone with permission of Dmitry Narizhnykh. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Using PostgreSQL pgoutput Plugin for Change Data Capture With Debezium
  • Salesforce Change Data Capture Streaming Data With Kafka and Snowflake Data Warehouse
  • Building a Real-Time Change Data Capture Pipeline With Debezium, Kafka, and PostgreSQL
  • Comparing Managed Postgres Options on The Azure Marketplace

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!