DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Hybrid Search Using Postgres DB
  • Modify JSON Data in Postgres and Hibernate 6
  • Optimizing Java Applications: Parallel Processing and Result Aggregation Techniques
  • The Long Road to Java Virtual Threads

Trending

  • How to Convert XLS to XLSX in Java
  • Integrating Security as Code: A Necessity for DevSecOps
  • A Complete Guide to Modern AI Developer Tools
  • Mastering Advanced Traffic Management in Multi-Cloud Kubernetes: Scaling With Multiple Istio Ingress Gateways
  1. DZone
  2. Data Engineering
  3. Data
  4. Debezium Server With PostgreSQL and Redis Stream

Debezium Server With PostgreSQL and Redis Stream

Stream your PostgreSQL Table changes.

By 
Emmanouil Gkatziouras user avatar
Emmanouil Gkatziouras
DZone Core CORE ·
Oct. 17, 22 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
10.3K Views

Join the DZone community and get the full member experience.

Join For Free

Debezium is a great tool for capturing the row-level changes that happen on a Database and streaming those changes to a broker of our choice.

Since this functionality stays within the boundaries of a Database, it helps on keeping our applications simple. For example, there is no need for an application to emit events on any database interactions. Debezium will monitor the row changes and will cast the events. Based on the broker solution used with Debezium a consumer can subscribe to the broker and thus receive the changes.

PostgreSQL is a popular SQL database is supported by Debezium.

Our goal would be to listen to PostgreSQL changes and stream them to a Redis stream through a Debezium Server. It is common to use Debizum with Kafka, in cases where Kafka is not present in a team’s Tech stack we can use other brokers.

In our case, we would keep things lightweight by using Redis Streams.

Redis will be set up without any extra configurations.

In order to use PostgreSQL with Debezium, it is essential to alter the configuration of PostgreSQL.

The configuration we shall use on PostgreSQL will be the following

Shell
 
listen_addresses = '*'
port = 5432
max_connections = 20
shared_buffers = 128MB
temp_buffers = 8MB
work_mem = 4MB
wal_level = logical
max_wal_senders = 3


As we can see we use the logical_decoding from PostgreSQL.
From the documentation:

Logical decoding is the process of extracting all persistent changes to a database’s tables into a coherent, easy to understand format which can be interpreted without detailed knowledge of the database’s internal state.

In PostgreSQL, logical decoding is implemented by decoding the contents of the write-ahead log, which describe changes on a storage level, into an application-specific form such as a stream of tuples or SQL statements.

We will also create a namespace and a table for PostgreSQL. The namespace and the table will be the ones to listen to for changes.


PLSQL
 
#!/bin/bash
set -e
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
  create schema test_schema;
  create table test_schema.employee(
          id  SERIAL PRIMARY KEY,
          firstname   TEXT    NOT NULL,
          lastname    TEXT    NOT NULL,
          email       TEXT    not null,
          age         INT     NOT NULL,
          salary         real,
          unique(email)
      );
EOSQL


Debezium will have to be able to interact with the PostgreSQL server as well as the Redis server.
The configuration should be the following.

Shell
 
debezium.sink.type=redis
debezium.sink.redis.address=redis:6379
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=postgres
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.database.server.name=tutorial
debezium.source.schema.whitelist=test_schema
debezium.source.plugin.name=pgoutput


By examining the configuration we can see that we have the necessary information for Debezium to communicate to the PostgreSQL database, also we specify the schema we created previously. Therefore only changes from that schema will be streamed. We can also make things more restrictive for example whitelisting tables.

Since this demo will involve three different software Components docker-compose will come in handy.

Python
 
version: '3.1'
services:
  redis:
    image: redis
    ports:
      - 6379:6379
    depends_on:
      - postgres
  postgres:
    image: postgres
    restart: always
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    volumes:
      - ./postgresql.conf:/etc/postgresql/postgresql.conf
      - ./init:/docker-entrypoint-initdb.d
    command:
      - "-c"
      - "config_file=/etc/postgresql/postgresql.conf"
    ports:
      - 5432:5432
  debezium:
    image: debezium/server
    volumes:
      - ./conf:/debezium/conf
      - ./data:/debezium/data
    depends_on:
      - redis


Using Compose, we could spin up three different software components on the same network. This helps the components to interact with each other by using the DNS names of the services as specified on Compose. Also, the configuration files we created previously are mounted to the Docker containers. Docker Compose V2 is out there with many good features, you can find more about it in the book I authored - A Developer’s Essential Guide to Docker Compose.

In order to get the stack running, we shall execute the following command

Shell
 
$ docker compose up


Since it is up and running, we can now start listening to events.

We shall log in to Redis and start listening for any possible database updates.

Shell
 
$ docker exec -it debezium-example-redis-1 redis-cli
> xread block 1000000 streams tutorial.test_schema.employee $


This will make blocking possible until we receive one event from the stream.
If we examine the stream name we should see the pattern of {server-name}.{schema}.{table}. This would allow consumers to subscribe only to the changes of interest.

Onwards we will make an entry.

Shell
 
$ docker exec -it debezium-example-postgres-1 psql postgres postgres
> insert into test_schema.employee (firstname,lastname,email,age,salary) values ('John','Doe 1','john1@doe.com',18,1234.23);
> \q


If we check the Redis session we should see that we received an event from the Redis stream

Shell
 
127.0.0.1:6379> xread block 1000000 streams tutorial.test_schema.employee $
1) 1) "tutorial.test_schema.employee"
   2) 1) 1) "1663796657336-0"
         2) 1) "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"}],\"optional\":false,\"name\":\"tutorial.test_schema.employee.Key\"},\"payload\":{\"id\":1}}"
            2) "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"firstname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"lastname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"},{\"type\":\"float\",\"optional\":true,\"field\":\"salary\"}],\"optional\":true,\"name\":\"tutorial.test_schema.employee.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"firstname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"lastname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"},{\"type\":\"float\",\"optional\":true,\"field\":\"salary\"}],\"optional\":true,\"name\":\"tutorial.test_schema.employee.Value\",\"field\":\"after\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"},\"default\":\"false\",\"field\":\"snapshot\"},{\"type\":\"string\",\"optional\":false,\"field\":\"db\"},{\"type\":\"string\",\"optional\":true,\"field\":\"sequence\"},{\"type\":\"string\",\"optional\":false,\"field\":\"schema\"},{\"type\":\"string\",\"optional\":false,\"field\":\"table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"txId\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"lsn\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"xmin\"}],\"optional\":false,\"name\":\"io.debezium.connector.postgresql.Source\",\"field\":\"source\"},{\"type\":\"string\",\"optional\":false,\"field\":\"op\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ms\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"total_order\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"data_collection_order\"}],\"optional\":true,\"field\":\"transaction\"}],\"optional\":false,\"name\":\"tutorial.test_schema.employee.Envelope\"},\"payload\":{\"before\":null,\"after\":{\"id\":1,\"firstname\":\"John\",\"lastname\":\"Doe 1\",\"email\":\"john1@doe.com\",\"age\":18,\"salary\":1234.23},\"source\":{\"version\":\"1.9.5.Final\",\"connector\":\"postgresql\",\"name\":\"tutorial\",\"ts_ms\":1663796656393,\"snapshot\":\"false\",\"db\":\"postgres\",\"sequence\":\"[null,\\\"24289128\\\"]\",\"schema\":\"test_schema\",\"table\":\"employee\",\"txId\":738,\"lsn\":24289128,\"xmin\":null},\"op\":\"c\",\"ts_ms\":1663796657106,\"transaction\":null}}"
(10.17s)
127.0.0.1:6379>


How cool is that? We can now start streaming our database changes to the broker of our choice.

You can find the source code on GitHub.

Redis (company) Stream (computing) PostgreSQL Data Types

Published at DZone with permission of Emmanouil Gkatziouras, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Hybrid Search Using Postgres DB
  • Modify JSON Data in Postgres and Hibernate 6
  • Optimizing Java Applications: Parallel Processing and Result Aggregation Techniques
  • The Long Road to Java Virtual Threads

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!