DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Data Ingestion Into Azure Data Explorer Using Kafka Connect
  • Tutorial: Data Ingestion From Kafka to Azure Data Explorer
  • Data Pipeline Using MongoDB and Kafka Connect on Kubernetes
  • Change Data Captures CDC from MySQL Database to Kafka with Kafka Connect and Debezium

Trending

  • How to Build Local LLM RAG Apps With Ollama, DeepSeek-R1, and SingleStore
  • Build Your First AI Model in Python: A Beginner's Guide (1 of 3)
  • Grafana Loki Fundamentals and Architecture
  • How to Format Articles for DZone
  1. DZone
  2. Data Engineering
  3. Databases
  4. Using PostgreSQL pgoutput Plugin for Change Data Capture With Debezium

Using PostgreSQL pgoutput Plugin for Change Data Capture With Debezium

In this article, see a quick walkthrough on how to use the PostgreSQL pgoutput plugin for change data capture with Debezium.

By 
Abhishek Gupta user avatar
Abhishek Gupta
DZone Core CORE ·
Aug. 21, 20 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
17.7K Views

Join the DZone community and get the full member experience.

Join For Free

Change Data Capture Architecture Using Debezium, Postgres, and Kafka
was a tutorial on how to use Debezium for change data capture from Azure PostgreSQL and send them to Azure Event Hubs for Kafka - it used the wal2json output plugin.

What About the pgoutput Plugin?

This blog will provide a quick walk through of how to pgoutput plugin. I will not be repeating a lot of details and use containerized versions (using Docker Compose) of Kafka connect, Kafka (and Zookeeper) to keep things simple. So, the only thing you need is Azure PostgreSQL, which you can setup using a variety of options including, the Azure Portal, Azure CLI, Azure PowerShell, ARM template.

The resources are available on GitHub - https://github.com/abhirockzz/debezium-postgres-pgoutput

Using the Right publication.autocreate.mode

With the pgoutput plugin, it's important that you use the appropriate value for publication.autocreate.mode. If you're using all_tables (which is the default), you need to ensure that the publication is created up-front for the specific table(s) you want to configure for change data capture. If the publication is not found, the connector will try to create one using CREATE PUBLICATION <publication_name> FOR ALL TABLES; which will fail due to lack of permissions.

The other two options work as expected:

  • disabled: you need to ensure that the publication is created up-front. The connector will not attempt to create the publication if it isn't found to exist upon startup - it will throw an exception and stop.
  • filtered: you can (optionally) choose to create the publication up-front. If the publication is not found, the connector will create a new publication for all those tables matching the current filter configuration.

This has been highlighted in the docs https://debezium.io/documentation/reference/1.3/connectors/postgresql.html#postgresql-on-azure

Let's Try the Different Scenarios

Before that:

Java
 




x


 
1
git clone https://github.com/abhirockzz/debezium-postgres-pgoutput && cd debezium-postgres-pgoutput



Start Kafka, Zookeeper and Kafka Connect containers:

Java
 




xxxxxxxxxx
1


 
1
export DEBEZIUM_VERSION=1.2
2
docker-compose up


It might take a while to pull the containers for the first time

Once all the containers are up and running, connect to Azure PostgreSQL, create a table and insert some data:

Java
 




xxxxxxxxxx
1


 
1
psql -h <DBNAME>.postgres.database.azure.com -p 5432 -U <DBUSER>@<DBNAME> -W -d postgres --set=sslmode=require
2

          
3
psql -h abhishgu-pg.postgres.database.azure.com -p 5432 -U abhishgu@abhishgu-pg -W -d postgres --set=sslmode=require
4

          
5
CREATE TABLE inventory (id SERIAL, item VARCHAR(30), qty INT, PRIMARY KEY(id));



When publication.autocreate.mode is set to filtered

This works well with Azure PostgreSQL - it does not require super user permissions because the connector creates the publication for a specific table(s) based on the filter/*list values

Update the connector config file (pg-source-connector.json) with details of your Azure PostgreSQL instance and then create the connector

To create the connector:

Java
 




xxxxxxxxxx
1


 
1
curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors



Notice the logs (in the docker compose terminal):

Java
 




xxxxxxxxxx
1


 
1
Creating new publication 'mytestpub' for plugin 'PGOUTPUT'   [io.debezium.connector.postgresql.connection.PostgresReplicationConnection]



Once the connector starts, check the publications in PostgreSQL:

Java
 




xxxxxxxxxx
1


 
1
  pubname  | schemaname | tablename 
2
-----------+------------+-----------
3
 mytestpub | public     | inventory



Does it work?

Insert a couple of records in the inventory table

Java
 




xxxxxxxxxx
1


 
1
psql -h <DBNAME>.postgres.database.azure.com -p 5432 -U <DBUSER>@<DBNAME> -W -d postgres --set=sslmode=require
2

          
3
INSERT INTO inventory (item, qty) VALUES ('apples', '100');
4
INSERT INTO inventory (item, qty) VALUES ('oranges', '42');
5

          
6
select * from inventory;



The connector should push the change events from PostgreSQL WAL (write ahead log) to Kafka. Check the messages in the corresponding Kafka topic:

Java
 




xxxxxxxxxx
1


 
1
//exec into the kafka docker container
2
docker exec -it debezium-postgres-pgoutput_kafka_1 bash
3

          
4
cd bin && ./kafka-console-consumer.sh --topic myserver.public.inventory --bootstrap-server kafka:9092 --from-beginning



You should see a couple of change log event payloads (corresponding to the two INSERTs)

yes they are verbose since the schema is included in the payload

Change publication.autocreate.mode to disabled

For this mode, we need a publication created up-front. Since we already have one (mytestpub), just use it. All you need to do is update the publication.autocreate.mode in pg-source-connector.json to disabled.

Re-create the connector:

Java
 




xxxxxxxxxx
1


 
1
//delete
2
curl -X DELETE localhost:8083/connectors/inventory-connector
3

          
4
//create
5
curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors



Test it end to end using the same steps as in the previous section - everything should work just fine!

Just to confirm, update the publication.name in connector config to one that does not exist. The connector will fail to start due to missing publication (as expected)


Try publication.autocreate.mode = all_tables

Set publication.autocreate.mode to all_tables, publication.name to one that does not exist (e.g. testpub1) and create the connector:

Java
 




xxxxxxxxxx
1


 
1
curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors



(as expected) It will fail with an error similar to this:

Java
 




xxxxxxxxxx
1


 
1
....
2
INFO Creating new publication 'testpub1' for plugin 'PGOUTPUT' (io.debezium.connector.postgresql.connection.PostgresReplicationConnection:127)
3
ERROR WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
4
io.debezium.jdbc.JdbcConnectionException: ERROR: must be superuser to create FOR ALL TABLES publication
5
....



Notice the part must be superuser to create FOR ALL TABLES publication - as previously mentioned, CREATE PUBLICATION <publication_name> FOR ALL TABLES; failed due to lack of superuser permissions.

As I mentioned earlier, you need to work around this by creating the publication manually for specific tables only

Clean Up

To clean up, delete the Azure PostgreSQL instance using az postgres server delete and remove the containers

Java
 




xxxxxxxxxx
1


 
1
az postgres server delete -g <resource group> -n <server name>
2
docker-compose down -v


That's it for this short blog post. Stay tuned for more!

Change data capture PostgreSQL Data (computing) Connector (mathematics) Java (programming language) azure kafka Docker (software) Database

Opinions expressed by DZone contributors are their own.

Related

  • Data Ingestion Into Azure Data Explorer Using Kafka Connect
  • Tutorial: Data Ingestion From Kafka to Azure Data Explorer
  • Data Pipeline Using MongoDB and Kafka Connect on Kubernetes
  • Change Data Captures CDC from MySQL Database to Kafka with Kafka Connect and Debezium

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!