DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • How TBMQ Uses Redis for Persistent Message Storage
  • Hybrid Search Using Postgres DB
  • Modify JSON Data in Postgres and Hibernate 6
  • Optimizing Java Applications: Parallel Processing and Result Aggregation Techniques

Trending

  • DevOps and Platform Engineering Readiness Checklist: Everything Needed for a Scalable, Secure, High-Velocity Delivery Platform
  • Architecting Sub-Microsecond HFT Systems With C++ and Zero-Copy IPC
  • Stop Writing Dialect-Specific SQL: A Unified Query Builder for Node.js
  • How to Format Articles for DZone
  1. DZone
  2. Data Engineering
  3. Data
  4. Debezium Server With PostgreSQL and Redis Stream

Debezium Server With PostgreSQL and Redis Stream

Stream your PostgreSQL Table changes.

By 
Emmanouil Gkatziouras user avatar
Emmanouil Gkatziouras
DZone Core CORE ·
Oct. 17, 22 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
10.9K Views

Join the DZone community and get the full member experience.

Join For Free

Debezium is a great tool for capturing the row-level changes that happen on a Database and streaming those changes to a broker of our choice.

Since this functionality stays within the boundaries of a Database, it helps on keeping our applications simple. For example, there is no need for an application to emit events on any database interactions. Debezium will monitor the row changes and will cast the events. Based on the broker solution used with Debezium a consumer can subscribe to the broker and thus receive the changes.

PostgreSQL is a popular SQL database is supported by Debezium.

Our goal would be to listen to PostgreSQL changes and stream them to a Redis stream through a Debezium Server. It is common to use Debizum with Kafka, in cases where Kafka is not present in a team’s Tech stack we can use other brokers.

In our case, we would keep things lightweight by using Redis Streams.

Redis will be set up without any extra configurations.

In order to use PostgreSQL with Debezium, it is essential to alter the configuration of PostgreSQL.

The configuration we shall use on PostgreSQL will be the following

Shell
 
listen_addresses = '*'
port = 5432
max_connections = 20
shared_buffers = 128MB
temp_buffers = 8MB
work_mem = 4MB
wal_level = logical
max_wal_senders = 3


As we can see we use the logical_decoding from PostgreSQL.
From the documentation:

Logical decoding is the process of extracting all persistent changes to a database’s tables into a coherent, easy to understand format which can be interpreted without detailed knowledge of the database’s internal state.

In PostgreSQL, logical decoding is implemented by decoding the contents of the write-ahead log, which describe changes on a storage level, into an application-specific form such as a stream of tuples or SQL statements.

We will also create a namespace and a table for PostgreSQL. The namespace and the table will be the ones to listen to for changes.


PLSQL
 
#!/bin/bash
set -e
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
  create schema test_schema;
  create table test_schema.employee(
          id  SERIAL PRIMARY KEY,
          firstname   TEXT    NOT NULL,
          lastname    TEXT    NOT NULL,
          email       TEXT    not null,
          age         INT     NOT NULL,
          salary         real,
          unique(email)
      );
EOSQL


Debezium will have to be able to interact with the PostgreSQL server as well as the Redis server.
The configuration should be the following.

Shell
 
debezium.sink.type=redis
debezium.sink.redis.address=redis:6379
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=postgres
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.database.server.name=tutorial
debezium.source.schema.whitelist=test_schema
debezium.source.plugin.name=pgoutput


By examining the configuration we can see that we have the necessary information for Debezium to communicate to the PostgreSQL database, also we specify the schema we created previously. Therefore only changes from that schema will be streamed. We can also make things more restrictive for example whitelisting tables.

Since this demo will involve three different software Components docker-compose will come in handy.

Python
 
version: '3.1'
services:
  redis:
    image: redis
    ports:
      - 6379:6379
    depends_on:
      - postgres
  postgres:
    image: postgres
    restart: always
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    volumes:
      - ./postgresql.conf:/etc/postgresql/postgresql.conf
      - ./init:/docker-entrypoint-initdb.d
    command:
      - "-c"
      - "config_file=/etc/postgresql/postgresql.conf"
    ports:
      - 5432:5432
  debezium:
    image: debezium/server
    volumes:
      - ./conf:/debezium/conf
      - ./data:/debezium/data
    depends_on:
      - redis


Using Compose, we could spin up three different software components on the same network. This helps the components to interact with each other by using the DNS names of the services as specified on Compose. Also, the configuration files we created previously are mounted to the Docker containers. Docker Compose V2 is out there with many good features, you can find more about it in the book I authored - A Developer’s Essential Guide to Docker Compose.

In order to get the stack running, we shall execute the following command

Shell
 
$ docker compose up


Since it is up and running, we can now start listening to events.

We shall log in to Redis and start listening for any possible database updates.

Shell
 
$ docker exec -it debezium-example-redis-1 redis-cli
> xread block 1000000 streams tutorial.test_schema.employee $


This will make blocking possible until we receive one event from the stream.
If we examine the stream name we should see the pattern of {server-name}.{schema}.{table}. This would allow consumers to subscribe only to the changes of interest.

Onwards we will make an entry.

Shell
 
$ docker exec -it debezium-example-postgres-1 psql postgres postgres
> insert into test_schema.employee (firstname,lastname,email,age,salary) values ('John','Doe 1','[email protected]',18,1234.23);
> \q


If we check the Redis session we should see that we received an event from the Redis stream

Shell
 
127.0.0.1:6379> xread block 1000000 streams tutorial.test_schema.employee $
1) 1) "tutorial.test_schema.employee"
   2) 1) 1) "1663796657336-0"
         2) 1) "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"}],\"optional\":false,\"name\":\"tutorial.test_schema.employee.Key\"},\"payload\":{\"id\":1}}"
            2) "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"firstname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"lastname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"},{\"type\":\"float\",\"optional\":true,\"field\":\"salary\"}],\"optional\":true,\"name\":\"tutorial.test_schema.employee.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"firstname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"lastname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"},{\"type\":\"float\",\"optional\":true,\"field\":\"salary\"}],\"optional\":true,\"name\":\"tutorial.test_schema.employee.Value\",\"field\":\"after\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"},\"default\":\"false\",\"field\":\"snapshot\"},{\"type\":\"string\",\"optional\":false,\"field\":\"db\"},{\"type\":\"string\",\"optional\":true,\"field\":\"sequence\"},{\"type\":\"string\",\"optional\":false,\"field\":\"schema\"},{\"type\":\"string\",\"optional\":false,\"field\":\"table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"txId\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"lsn\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"xmin\"}],\"optional\":false,\"name\":\"io.debezium.connector.postgresql.Source\",\"field\":\"source\"},{\"type\":\"string\",\"optional\":false,\"field\":\"op\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ms\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"total_order\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"data_collection_order\"}],\"optional\":true,\"field\":\"transaction\"}],\"optional\":false,\"name\":\"tutorial.test_schema.employee.Envelope\"},\"payload\":{\"before\":null,\"after\":{\"id\":1,\"firstname\":\"John\",\"lastname\":\"Doe 1\",\"email\":\"[email protected]\",\"age\":18,\"salary\":1234.23},\"source\":{\"version\":\"1.9.5.Final\",\"connector\":\"postgresql\",\"name\":\"tutorial\",\"ts_ms\":1663796656393,\"snapshot\":\"false\",\"db\":\"postgres\",\"sequence\":\"[null,\\\"24289128\\\"]\",\"schema\":\"test_schema\",\"table\":\"employee\",\"txId\":738,\"lsn\":24289128,\"xmin\":null},\"op\":\"c\",\"ts_ms\":1663796657106,\"transaction\":null}}"
(10.17s)
127.0.0.1:6379>


How cool is that? We can now start streaming our database changes to the broker of our choice.

You can find the source code on GitHub.

Redis (company) Stream (computing) PostgreSQL Data Types

Published at DZone with permission of Emmanouil Gkatziouras. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • How TBMQ Uses Redis for Persistent Message Storage
  • Hybrid Search Using Postgres DB
  • Modify JSON Data in Postgres and Hibernate 6
  • Optimizing Java Applications: Parallel Processing and Result Aggregation Techniques

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook