DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Using PostgreSQL pgoutput Plugin for Change Data Capture With Debezium
  • Kafka JDBC Source Connector for Large Data
  • Reporting in Microservices: How To Optimize Performance
  • Data Ingestion Into Azure Data Explorer Using Kafka Connect

Trending

  • Why Documentation Matters More Than You Think
  • Zero Trust for AWS NLBs: Why It Matters and How to Do It
  • AI’s Role in Everyday Development
  • Fixing Common Oracle Database Problems
  1. DZone
  2. Coding
  3. Tools
  4. How To Use Change Data Capture With Apache Kafka and ScyllaDB

How To Use Change Data Capture With Apache Kafka and ScyllaDB

Learning how to use the ScyllaDB CDC source connector to push the row-level changes events in the tables of a ScyllaDB NoSQL cluster to a Kafka server.

By 
Guy Shtub user avatar
Guy Shtub
·
Feb. 22, 23 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
7.4K Views

Join the DZone community and get the full member experience.

Join For Free

In this hands-on lab from ScyllaDB University, you will learn how to use the ScyllaDB CDC source connector to push the row-level changes events in the tables of a ScyllaDB cluster to a Kafka server.

What Is ScyllaDB CDC?

To recap, Change Data Capture (CDC) is a feature that allows you to not only query the current state of a database’s table but also query the history of all changes made to the table. CDC is production-ready (GA) starting from ScyllaDB Enterprise 2021.1.1 and ScyllaDB Open Source 4.3.

In ScyllaDB, CDC is optional and enabled on a per-table basis. The history of changes made to a CDC-enabled table is stored in a separate associated table.

You can enable CDC when creating or altering a table using the CDC option, for example:

 
CREATE TABLE ks.t (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};


ScyllaDB CDC Source Connector

ScyllaDB CDC Source Connector is a source connector capturing row-level changes in the tables of a ScyllaDB cluster. It is a Debezium connector, compatible with Kafka Connect (with Kafka 2.6.0+). The connector reads the CDC log for specified tables and produces Kafka messages for each row-level INSERT, UPDATE, or DELETE operation. The connector is fault-tolerant, retrying reading data from Scylla in case of failure. It periodically saves the current position in the ScyllaDB CDC log using Kafka Connect offset tracking. Each generated Kafka message contains information about the source, such as the timestamp and the table name.

Note: at the time of writing, there is no support for collection types (LIST, SET, MAP) and UDTs—columns with those types are omitted from generated messages. Stay up to date on this enhancement request and other developments in the GitHub project.

Confluent and Kafka Connect

Confluent is a full-scale data streaming platform that enables you to easily access, store, and manage data as continuous, real-time streams. It expands the benefits of Apache Kafka with enterprise-grade features. Confluent makes it easy to build modern, event-driven applications, and gain a universal data pipeline, supporting scalability, performance, and reliability.

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other data systems. It makes it simple to define connectors that move large data sets in and out of Kafka. It can ingest entire databases or collect metrics from application servers into Kafka topics, making the data available for stream processing with low latency.

Kafka Connect includes two types of connectors:

  1. Source connector: Source connectors ingest entire databases and stream table updates to Kafka topics. Source connectors can also collect metrics from application servers and store the data in Kafka topics, making the data available for stream processing with low latency.
  2. Sink connector: Sink connectors deliver data from Kafka topics to secondary indexes, such as Elasticsearch, or batch systems, such as Hadoop, for offline analysis.

Service Setup With Docker

In this lab, you’ll use Docker.

Please ensure that your environment meets the following prerequisites:

  • Docker for Linux, Mac, or Windows.
    • Note: running ScyllaDB in Docker is only recommended to evaluate and try ScyllaDB.
  • ScyllaDB open source. For the best performance, a regular install is recommended.
  • 8 GB of RAM or greater for Kafka and ScyllaDB services.
  • docker-compose
  • Git

ScyllaDB Install and Init Table

First, you’ll launch a three-node ScyllaDB cluster and create a table with CDC enabled.

If you haven’t done so yet, download the example from git:

 
git clone https://github.com/scylladb/scylla-code-samples.git cd scylla-code-samples/CDC_Kafka_Lab


This is the docker-compose file you’ll use. It starts a three-node ScyllaDB Cluster:

 
version: "3"

services:
  scylla-node1:
    container_name: scylla-node1
    image: scylladb/scylla:5.0.0
    ports:
      - 9042:9042
    restart: always
    command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0

  scylla-node2:
    container_name: scylla-node2
    image: scylladb/scylla:5.0.0
    restart: always
    command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0

  scylla-node3:
    container_name: scylla-node3
    image: scylladb/scylla:5.0.0
    restart: always
    command: --seeds=scylla-node1,scylla-node2 --smp 1 --memory 750M --overprovisioned 1 --api-address 0.0.0.0


Launch the ScyllaDB cluster:

 
docker-compose -f docker-compose-scylladb.yml up -d


Wait for a minute or so, and check that the ScyllaDB cluster is up and in normal status:

 
docker exec scylla-node1 nodetool status


Next, you’ll use cqlsh to interact with ScyllaDB. Create a keyspace, and a table with CDC enabled, and insert a row into the table:

 
docker exec -ti scylla-node1 cqlsh 
CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1}; 
CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true}; 
INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20); 
exit


 
[guy@fedora cdc_test]$ docker-compose -f docker-compose-scylladb.yml up -d

Creating scylla-node1 ... done
Creating scylla-node2 ... done
Creating scylla-node3 ... done
[guy@fedora cdc_test]$ docker exec  scylla-node1 nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address     Load       Tokens       Owns    Host ID                               Rack
UN  172.19.0.3  ?          256          ?       4d4eaad4-62a4-485b-9a05-61432516a737  rack1
UN  172.19.0.2  496 KB     256          ?       bec834b5-b0de-4d55-b13d-a8aa6800f0b9  rack1
UN  172.19.0.4  ?          256          ?       2788324e-548a-49e2-8337-976897c61238  rack1

Note: Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless
[guy@fedora cdc_test]$ docker exec -ti scylla-node1 cqlsh
Connected to  at 172.19.0.2:9042.
[cqlsh 5.0.1 | Cassandra 3.0.8 | CQL spec 3.3.1 | Native protocol v4]
Use HELP for help.
cqlsh> CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1};
cqlsh> CREATE TABLE ks.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
cqlsh> INSERT INTO ks.my_table(pk, ck, v) VALUES (1, 1, 20);
cqlsh> exit
[guy@fedora cdc_test]$ 


Confluent Setup and Connector Configuration

To launch a Kafka server, you’ll use the Confluent platform, which provides a user-friendly web GUI to track topics and messages. The confluent platform provides a docker-compose.yml file to set up the services.

Note: this is not how you would use Apache Kafka in production. The example is useful for training and development purposes only. Get the file:

 
wget -O docker-compose-confluent.yml https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.3.0-post/cp-all-in-one/docker-compose.yml


Next, download the ScyllaDB CDC connector:

 
wget -O scylla-cdc-plugin.jar https://github.com/scylladb/scylla-cdc-source-connector/releases/download/scylla-cdc-source-connector-1.0.1/scylla-cdc-source-connector-1.0.1-jar-with-dependencies.jar


Add the ScyllaDB CDC connector to the Confluent connect service plugin directory using a Docker volume by editing docker-compose-confluent.yml to add the two lines as below, replacing the directory with the directory of your scylla-cdc-plugin.jar file.

 
 image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
     hostname: connect
     container_name: connect
+    volumes:
+      - <directory>/scylla-cdc-plugin.jar:/usr/share/java/kafka/plugins/scylla-cdc-plugin.jar
     depends_on:
       - broker
       - schema-registry


Launch the Confluent services:

 
docker-compose -f docker-compose-confluent.yml up -d


Wait a minute or so, then access http://localhost:9021 for the Confluent web GUI.

Add the ScyllaConnector using the Confluent dashboard:

Confluent Dashboard

Add the Scylla Connector by clicking the plugin:

Plugin

Fill the “Hosts” with the IP address of one of the Scylla nodes (you can see it in the output of the nodetool status command) and port 9042, which is listened to by the ScyllaDB service.

The “Namespace” is the keyspace you created before in ScyllaDB.

Notice that it might take a minute or so for the ks.my_table to appear:

Table 1

 

Table 2

Test Kafka Messages

You can see that MyScyllaCluster.ks.my_table is the topic created by the ScyllaDB CDC connector.

Now, check for Kafka messages from the Topics panel:

Panel

Select the topic, which is the same as the keyspace and table name that you created in ScyllaDB:

Table 3

From the “Overview” tab, you can see the topic info. At the bottom, it shows this topic is on partition 0.

A partition is the smallest storage unit that holds a subset of records owned by a topic. Each partition is a single log file where records are written to it in an append-only fashion. The records in the partitions are each assigned a sequential identifier called the offset, which is unique for each record within the partition. The offset is an incremental and immutable number maintained by Kafka.

As you already know, the ScyllaDB CDC messages are sent to the ks.my_table topic, and the partition id of the topic is 0. Next, go to the “Messages” tab and enter partition id 0 into the “offset” field:

KSKS Table

You can see from the output of the Kafka topic messages that the ScyllaDB table INSERT event and the data were transferred to Kafka messages by the Scylla CDC Source Connector. Click on the message to view the full message info:

Message Info

The message contains the ScyllaDB table name and keyspace name with the time, as well as the data status before the action and afterward. Since this is an insert operation, the data before the insert is null.

Next, insert another row into the ScyllaDB table:

 
docker exec -ti scylla-node1 cqlsh 
INSERT INTO ks.my_table(pk, ck, v) VALUES (200, 50, 70);


Now, in Kafka, wait for a few seconds and you can see the details of the new Message:

New Message

Cleanup

Once you are done working on this lab, you can stop and remove the Docker containers and images.

To view a list of all container IDs:

 
docker container ls -aq


Then you can stop and remove the containers you are no longer using:

 
docker stop <ID_or_Name> 
docker rm <ID_or_Name>


Later, if you want to rerun the lab, you can follow the steps and use docker-compose as before.

Summary

With the CDC source connector, a Kafka plugin compatible with Kafka Connect, you can capture all the ScyllaDB table row-level changes (INSERT, UPDATE, or DELETE) and convert those events to Kafka messages. You can then consume the data from other applications or perform any other operation with Kafka.

Change data capture Open source Collective Knowledge (software) Connector (mathematics) Data (computing) Docker (software) kafka Data Types Container Git Mac OS X Microsoft Windows Reliability engineering Scalability hadoop Keyspace (distributed data store) Linux (operating system) NoSQL cluster

Published at DZone with permission of Guy Shtub. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Using PostgreSQL pgoutput Plugin for Change Data Capture With Debezium
  • Kafka JDBC Source Connector for Large Data
  • Reporting in Microservices: How To Optimize Performance
  • Data Ingestion Into Azure Data Explorer Using Kafka Connect

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!