{{announcement.body}}
{{announcement.title}}

Kafka Connectors Without Kafka

DZone 's Guide to

Kafka Connectors Without Kafka

In this article, you will find basic information about change data capture and a high-level overview of Kafka Connect.

· Big Data Zone ·
Free Resource

In this article, you will find basic information about change data capture and a high-level overview of Kafka Connect. We'll then see how one of the connectors (Debezium PostgreSQL) can work in standalone mode (without the platform) — moving CDC to another level of simplicity.

But let's start from the beginning.

If you somehow slept away the last couple of years and missed the Kafka/'Events being complementary to state' revolution, the quick summary is:

  • things happen=>it's good to think about them as events=>we have Kafka to handle this
  • a lot of the world's descriptions are still state based=>it would be great if it was event based=>and we have Kafka Connect to handle this.

When state is kept in databases then turning every db change to events naturally leads to CDC (Change Data Capture) techniques.

The idea is that in most dbs data is already, at some level, stored as events and the trick is to connect to this store. In case of databases, this store is most likely:

  • kind of a journal like PostgreSQL's WAL (Write-Ahead Log) where all the db operations are sequentially logged, and only after and based on this journal, changes are "materialized" in destination tables (a mechanism that provides recovery after crashes and reduces the number of disk writes).
  • Or - the other way around - some replication purpose mechanisms like Mongo's Oplog (operations log) which stores all the eplica master's changes so that the slaves can use them to sync.

Either way - we have a way to be informed anytime if the db state has changed.

Kafka and the Kafka Connect Platform

To put this knowledge into practice we can use Kafka as a destination event log, and populate it by Kafka Connect reading db changes from either a journal or oplog as described above. This is easilly summarized by a diagram from Confluent page (the company behind Kafka)

Kafka Connect Platform

In my description, I've purposely focused on reading the changes, as this is where the magic happens (CDC).
Of course, there is also the twin - writing side, which is just... writing.

The Kafka Connect Platform is built in a pluggable way, where Confluent provides the platform and API and everybody can provide connectors that read/write data from different data sources (file, PostgreSQL, MongoDB, AWS S3, ActiveMQ, etc...)
Among many popular choice is Debezium, an open source project developed by Red Hat, that provides connectors to MySQL, PostgreSQL, SQL Server and MongoDB (and Oracle is being incubated at the time of writing).

It all may look easy enough but in many cases ut isn't. Kafka Pipeline is Kafka and Zookeper and Kafka Connect; so it is basically another Kafka cluster on steroids. While installing all of them locally for dev purposes is super simple thanks to Docker images provided by Confluent

https://hub.docker.com/u/confluentinc
https://github.com/confluentinc/cp-docker-images

doing so in on prod might be tricky, especially in a cloud, and some DevOps maturity is expected.
Of course, as Jay Kreps wrote on Twitter — you don't need to do it by yourself, when they (Confluent) offer Kafka as a service for a few cents per GB of writes.

The thing is - sometimes it's just not your decision and in some organizations these decisions take time.
And sometimes you just need to prototype and play with CDC and the question is — can you do it without a platform and still use the abstraction provided by Kafka Connectors? It appears you can.

CDC With No Platform

It turns out that Debezium connectors (mentioned above) can work in embedded mode, which basically means that that you can add two dependencies and some configuration and your app will level up, gaining the CDC ability.

Let's see it in action with a small example using Spring Boot.

To focus our attention, let's assume that we have a PostgreSQL table

orders(id, description)

and we want to react to all CRUD operations performed on it, like here:

CRUD Operations

So everytime anyone performs any CRUD operation on the orders table, it's first reflected in WAL (by PostgreSQL's inner workings). Inside the Demo App, we have Debezium Connector monitoring WAL and notifying the app about every change.

The steps to make PostgreSql Debezium Connector working for our app would be:

dependencies {
	implementation("io.debezium:debezium-embedded:0.9.5.Final")
	implementation("io.debezium:debezium-connector-postgres:0.9.5.Final")
}
Configuration debeziumConfiguration() {
        return io.debezium.config.Configuration.create()
            .with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
            .with("offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore")
            .with("offset.flush.interval.ms", 60000)
            .with("name", "orders-postgres-connector")
            .with("database.server.name", "orders")
            .with("database.hostname", "localhost")
            .with("database.port", 5432)
            .with("database.user", "postgres")
            .with("database.password", "postgres")
            .with("database.dbname", "demo")
            .with("table.whitelist", "public.orders")
           // .with("snapshot.mode", "never")
            .build();
}
  • You can probably figure out most of the params by yourself; if not you can check here.

  • With the above configuration, configure and start the engine providing it with a reference to the  handleEvent method that will be executed every time a db change is captured.

EmbeddedEngine engine = EmbeddedEngine.create()
		.using(debeziumConfiguration)
		.notifying(this::handleEvent)
		.build();
		executor.execute(engine);
private void handleEvent(SourceRecord sourceRecord) {
	Struct sourceRecordValue = (Struct) sourceRecord.value();
	Operation operation = Operation.forCode((String) sourceRecordValue.get(OPERATION));
	if (operation != Operation.CREATE) {
		log.error("unknown operation");
		return;
	}
	Struct after = (Struct) sourceRecordValue.get(AFTER);
	changesCaptured.add(after);
}

The whole demo can be checked here, where the mechansim is easilly tested with Spock Spec - DebeziumCDCDemo.groovy which inserts a new record to the orders table and checks whether the OrdersWatch component reacted to the change.

Spec uses the Testcontainers framework with a PostgreSQL Docker Image so the demo is working on it's own, and no external database setup is needed.

A Few Things to Keep in Mind

Enabling Replication in PostgreSQL

WAL, our journal of all operations, is not publically available by default. PostgreSQL provides infrastructure to stream the modifications performed via SQL to external consumers by a mechanism called logicaldecoding. To make it work for us we need to provide some configuration and register the proper plugin (both operations on the PostgreSQL side). I'm using a Docker Image provided by Debezium which is already doing all of this for me, but one can easilly inspect the image or read "Setting up PostgreSQL" from Debzium page to do it by himself. This need to be done wheter you use the Platform or not.

Offset Tracking

Debezium Connector needs to keep track of what was the last change it read from WAL.
This is done by keeping offsets and in demo example we store it in the memory

.with("offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore")

Of course, this is okay for tests but it will result in the connector reading WAL from the beginning every time the app is restarted.

You can also use

org.apache.kafka.connect.storage.FileOffsetBackingStore

but it may not work properly in cloud or for more than one instance of an app.

Then there is of course

org.apache.kafka.connect.storage.KafkaOffsetBackingStore

but we are trying to do it without platform.

If none of these stores work for you, you can easily implement your own BackingStore — tracking the offset  in PostgreSQL itself. Basically it's implementing two methods: get() and set() from OffsetBackingStore interface which reads and saves the offset to the store of your choice.

Over Use Might Not Be a Good Idea

This mechanism is powerful and while it may have it's usages, it's easy to overdose and you shouldn't end up with every microservice reading each other db logs.

Don't read the state, focus on business events, don't couple yourself to models that don't belong to your service. Basically read "Turning the database inside-out with Apache Samza" by Martin Kleppmann and "Designing Event-Driven Systems" by Ben Stopford.

I believe it is safe enough to use it as a transition solution, like when extracting microservice from a bigger monolithic app. For other cases, proceed with caution.

Topics:
big data ,kafka tutorial ,kafka connectors ,kafka connect platform

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}