Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Kafka Connectors Without Kafka

DZone 's Guide to

Kafka Connectors Without Kafka

Increase CDC simplicity with Debezium PostgreSQL.

· Big Data Zone ·
Free Resource

In this article, you will find basic information about change data capture and a high-level view of the Kafka Connect. Then, see how one of the connectors (Debezium PostgreSQL) can work in standalone mode (without the platform) — moving CDC to another level of simplicity.

Starting from the beginning, if you somehow slept away the last couple of years and missed Kafka and the "Events being complementary to state" revolution, here's a quick summary:

  • things happen  =>  it’s good to think about it as events  =>  we have Kafka to handle this.
  • many of the world’s description is still state-based  =>  many times it would be great if it was event-based  =>  we have Kafka Connect to handle this.

When a state is kept in databases, then turning every database change to an event naturally leads to CDC (Change Data Capture) techniques.

The idea is that in most databases, data is already, at some level, stored as events, and the trick is to connect to this store.

In the case of databases, this store is most likely:

  • some kind of journal-like PostgreSQL’s WAL (Write-Ahead Log), where all the database operations are sequentially logged; only after and based on this journal are changes materialized in destination tables (a mechanism that provides recovery after crashes and reduces the number of disk writes).
  • Or — the other way around — some replication purpose mechanism, like Mongo’s Oplog (operations log), which stores all replica master’s changes, so that the slaves can use them to sync.

Either way — we have a mean to be informed anytime the database state has changed.

Kafka and Kafka Connect Platform

To put this knowledge into practice, we can use Kafka as a destination event log, and populate it by Kafka Connect reading database changes from either a journal or oplog as described above. This is easily summarized by a diagram from Confluent page (the company behind Kafka).

Kafka Connect Platform 

In my description, I’ve purposely focused on reading the changes, as this where the magic happens (cdc).
Of course, there is also the twin — writing side, which is just…writing.

The Kafka Connect Platform is built in a pluggable way, where Confluent provides the platform and API and everybody can provide connectors that read/write data from different data sources (file, PostgreSQL, MongoDBAWS S3ActiveMq, etc). 

Among many, a popular choice is Debezium, an open source project developed by Red Hat that provides connectors to MySqlPostgreSQLSQL Server, and MongoDB (and Oracle being incubated at the time of writing).

It all may look easy enough, but in many cases, it isn’t. The Kafka Pipeline is Kafka, Zookeper, and Kafka Connect, which basically another Kafka cluster on steroids.

While installing all of them locally is super simple thanks to Docker images provided by Confluent, doing it in on prod might be tricky, especially in a cloud; some DevOps maturity is expected.

Of course, as Jay Kreps wrote on Twitter, you don’t need to do it by yourself when they (Confluent) offer Kafka as a service for a few cents per GB of writes.

The thing is, sometimes, it’s just not your decision, and in some organizations, these decisions take time.
Sometimes, you just need to prototype and play with CDC and the question is, "Can you do it without a platform and still use the abstraction provided by Kafka Connectors?"

It appears you can.

CDC With no Platform

It turns out that Debezium connectors (mentioned above) can work in an embedded mode, which basically means that that you can add two dependencies and some configuration and your app will level up, gaining CDC ability.

Let’s see it in action with a small example using the Spring Boot. To focus attention let’s assume that we have a PostgreSQL table, and we want to react to all CRUD operations performed on it, like this:

 

So, every time anyone performs any CRUD operation on the orders table, it’s firstly reflected in WAL (by PostgreSQL inner workings). Inside the Demo App, we have Debezium Connector monitoring WAL and notifying the app about every change.

The steps to make PostgreSql Debezium Connector working for our app is: 

dependencies {
implementation("io.debezium:debezium-embedded:0.9.5.Final")
implementation("io.debezium:debezium-connector-postgres:0.9.5.Final")
}


Configuration debeziumConfiguration() {
        return io.debezium.config.Configuration.create()
            .with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
            .with("offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore")
            .with("offset.flush.interval.ms", 60000)
            .with("name", "orders-postgres-connector")
            .with("database.server.name", "orders")
            .with("database.hostname", "localhost")
            .with("database.port", 5432)
            .with("database.user", "postgres")
            .with("database.password", "postgres")
            .with("database.dbname", "demo")
            .with("table.whitelist", "public.orders")
           // .with("snapshot.mode", "never")
            .build();
}


You can probably figure out most of the params by yourself; if not you can check here.

EmbeddedEngine engine = EmbeddedEngine.create()
.using(debeziumConfiguration)
.notifying(this::handleEvent)
.build();
executor.execute(engine);


providing it with a reference to the handleEvent method that will be executed every time the database change is captured.

  • From there, decide what operations to react to. For exmpale: 
private void handleEvent(SourceRecord sourceRecord) {
Struct sourceRecordValue = (Struct) sourceRecord.value();
Operation operation = Operation.forCode((String) sourceRecordValue.get(OPERATION));
if (operation != Operation.CREATE) {
log.error("unknown operation");
return;
}
Struct after = (Struct) sourceRecordValue.get(AFTER);
changesCaptured.add(after);
}


The whole demo is here, where the mechansim is easily tested with Spock Spec —DebeziumCDCDemo.groovy, which inserts a new record to the Orders table and checks whether the OrdersWatch component reacted to the change.

Spec uses Testcontainers framework with PostgreSQL Docker Image, so the demo is working on its own, and no external database setup is needed. 

A Few Things to Keep in Mind

Enabling Replication Is PostgreSQL

WAL, our journal of all operations, is not publically available by default. PostgreSQL provides infrastructure to stream the modifications performed via SQL to external consumers by a mechanism called logicaldecoding. To make it work for us, we need to provide some configuration and register proper plugin (both operations on PostgreSQL side). I’m using Docker Image provided by Debezium, which is already doing all of this for me, but one can easily inspect the image or read “Setting up PostgreSQL” from Debzium page to do it by himself. This needs to be done whether you use the Platform or not.

Offset Tracking

Debezium Connector needs to keep track of the last change it read from WAL. This is done by keeping offsets, and in the demo example, we store it in memory.  

.with(“offset.storage”, “org.apache.kafka.connect.storage.MemoryOffsetBackingStore”)

Of course, this is okay for tests, but it will result in the connector reading WAL from the beginning every time the app is restarted. You can also use, 

org.apache.kafka.connect.storage.FileOffsetBackingStore

but it may not work properly in the cloud or for more than one instance of an app. Then there is of course,

org.apache.kafka.connect.storage.KafkaOffsetBackingStore

but we are trying to do it without a platform.

If none of these stores work for you, you can easily implement your own BackingStore, tracking the offset in PostgreSQL itself. Basically, it’s implementing two methods: get() and set() from OffsetBackingStore interface, which reads and saves offset to the store of your choice.

Overusing Might not Be a Good Idea

This mechanism is powerful and while it may have its usages, it’s easy to overdose; you shouldn’t end up with every microservice reading each other's database logs.

Don’t read the state; focus on business events. Don’t couple yourself to models that don’t belong to your service. 

I believe it is safe enough to use it as a transition solution, like when extracting microservice from a bigger monolithic app. For other cases proceed with caution.

Topics:
big data ,kafka ,DEBEZIUM ,postgresql ,TUTORIAL

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}