DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Building an Enterprise CDC Solution
  • How To Get Closer to Consistency in Microservice Architecture
  • Data Ingestion Into Azure Data Explorer Using Kafka Connect
  • Tutorial: Data Ingestion From Kafka to Azure Data Explorer

Trending

  • How to Format Articles for DZone
  • Unlocking the Benefits of a Private API in AWS API Gateway
  • Docker Base Images Demystified: A Practical Guide
  • AI Meets Vector Databases: Redefining Data Retrieval in the Age of Intelligence
  1. DZone
  2. Data Engineering
  3. Databases
  4. Change Data Captures CDC from MySQL Database to Kafka with Kafka Connect and Debezium

Change Data Captures CDC from MySQL Database to Kafka with Kafka Connect and Debezium

In this post, look at CDC from MySQL to Kafka topics using Debezium MySQL connector.

By 
Ramu kakarla user avatar
Ramu kakarla
·
Jun. 08, 20 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
12.5K Views

Join the DZone community and get the full member experience.

Join For Free

Introduction

Debezium is an open-source project developed by Red Hat which aims to simplify this process by allowing you to extract changes from various database systems (e.g. MySQL, PostgreSQL, MongoDB) and push them to Kafka

Architecture


Debezium Connectors

Debezium has a library of connectors that capture changes from a variety of databases and produce events with very similar structures, making it easier for the applications to consume and respond to the events regardless of where the changes originated. Debezium currently have the following connectors

  • MySQL Connector
  • MongoDB Connector
  • PostgreSQL Connector
  • SQL Server Connector
  • Oracle Connector (Incubating)
  • Db2(Incubating)
  • Cassandra(Incubating)

In this article, we are going to capture the change events from MySQL database to the Kafka Topics.

The Debezium MySQL connector tracks the structure of the tables, performs snapshots, transforms bin log events into Debezium change events and records where those events are recorded in Kafka.

Prerequisites:

  1. Kafka installed on OpenShift
  2. Kafka connect installed on OpenShift
  3. MySQL Server installed and setup

I have Installed one node Kafka cluster with Kafka connect

Java
 




x
12


 
1
[kkakarla@kkakarla amq-streams-1.4.0-ocp-install-examples]$ oc get pods
2
NAME                                          READY   STATUS      RESTARTS   AGE
3
my-cluster-entity-operator-759bfbc89d-s68wx   3/3     Running     0          9m
4
my-cluster-kafka-0                            2/2     Running     2          9m55s
5
my-cluster-zookeeper-0                        2/2     Running     0          11m
6
my-cluster-zookeeper-1                        2/2     Running     0          11m
7
my-cluster-zookeeper-2                        2/2     Running     0          11m
8
my-connect-cluster-connect-1-build            0/1     Completed   0          6m4s
9
my-connect-cluster-connect-1-deploy           0/1     Completed   0          5m27s
10
my-connect-cluster-connect-1-ltphz            1/1     Running     0          5m25s
11
strimzi-cluster-operator-64d9cf9bc7-wsf4q     1/1     Running     0          15m
12

          


Installing the MySQL Connector

Installing the Debezium MySQL connector is a simple process; just download the JAR, extract it to the Kafka Connect environment, and ensure the plugin’s parent directory is specified in your Kafka Connect environment.

  1. Download the Debezium MySQL connector.
  2. Extract the files into your Kafka Connect environment.
  3. Add the plugin’s parent directory to your Kafka Connect plugin path:
  4. Create a directory with Kafka Connect plug-ins
Java
 




xxxxxxxxxx
1
20


 
1
[kkakarla@kkakarla upstream-plugins]$ tree
2
.
3
└── my-plugins
4
    └── debezium-connector-mysql
5
        ├── antlr4-runtime-4.7.2.jar
6
        ├── CHANGELOG.md
7
        ├── CONTRIBUTE.md
8
        ├── COPYRIGHT.txt
9
        ├── debezium-api-1.1.2.Final.jar
10
        ├── debezium-connector-mysql-1.1.2.Final.jar
11
        ├── debezium-core-1.1.2.Final.jar
12
        ├── debezium-ddl-parser-1.1.2.Final.jar
13
        ├── LICENSE-3rd-PARTIES.txt
14
        ├── LICENSE.txt
15
        ├── mysql-binlog-connector-java-0.19.1.jar
16
        ├── mysql-connector-java-8.0.16.jar
17
        └── README.md
18

          
19
2 directories, 13 files
20

          



The Kafka Connect S2I image takes your binaries (with plug-ins and connectors) and stores them in the /tmp/kafka-plugins/s2i directory. It creates a new Kafka Connect image from this directory, which can then be used with the Kafka Connect deployment. When started using the enhanced image, Kafka Connect loads any third-party plug-ins from the /tmp/kafka-plugins/s2i directory.

  • Start a new build of the image
Java
 




xxxxxxxxxx
1


 
1
[kkakarla@kkakarla upstream-plugins]$ oc start-build my-connect-cluster-connect --from-dir ./my-plugins/
2
Uploading directory "my-plugins" as binary input for the build ...
3
....................
4
Uploading finished
5
build.build.openshift.io/my-connect-cluster-connect-2 started


  • Once the build has completed, the new image is used automatically by the Kafka Connect deployment
Java
 




xxxxxxxxxx
1


 
1
[kkakarla@kkakarla upstream-plugins]$ oc get build
2
NAME                           TYPE     FROM     STATUS     STARTED          DURATION
3
my-connect-cluster-connect-1   Source            Complete   59 minutes ago   36s
4
my-connect-cluster-connect-2   Source   Binary   Complete   6 minutes ago    2m7s
5

          



Installing MySQL

  • Install MySQL  using command oc new-app --name=mysql debezium/example-mysql:1.0


Shell
 




x
37


 
1
[kkakarla@kkakarla upstream-plugins]$ oc new-app --name=mysql debezium/example-mysql:1.0
2

          
3
--> Found container image d8322b2 (3 days old) from Docker Hub for "debezium/example-mysql:1.0"
4

          
5

          
6

          
7
    * An image stream tag will be created as "mysql:1.0" that will track this image
8

          
9
    * This image will be deployed in deployment config "mysql"
10

          
11
    * Ports 3306/tcp, 33060/tcp will be load balanced by service "mysql"
12

          
13
      * Other containers can access this service through the hostname "mysql"
14

          
15
    * This image declares volumes and will default to use non-persistent, host-local storage.
16

          
17
      You can add persistent volumes later by running 'oc set volume dc/mysql --add ...'
18

          
19
    * WARNING: Image "debezium/example-mysql:1.0" runs as the 'root' user which may not be permitted by your cluster administrator
20

          
21

          
22

          
23
--> Creating resources ...
24

          
25
    imagestream.image.openshift.io "mysql" created
26

          
27
    deploymentconfig.apps.openshift.io "mysql" created
28

          
29
    service "mysql" created
30

          
31
--> Success
32

          
33
    Application is not exposed. You can expose services to the outside world by executing one or more of the commands below:
34

          
35
     'oc expose svc/mysql' 
36

          
37
    Run 'oc status' to view your app.



  • Specify one of MYSQL_ROOT_PASSWORD, MYSQL_ALLOW_EMPTY_PASSWORD and MYSQL_RANDOM_ROOT_PASSWORD
Shell
 




xxxxxxxxxx
1


 
1
[kkakarla@kkakarla upstream-plugins]$ oc set env dc/mysql MYSQL_ROOT_PASSWORD=debezium  MYSQL_USER=mysqluser MYSQL_PASSWORD=mysqlpw
2
deploymentconfig.apps.openshift.io/mysql updated
3

          
4
oc get pods -l app=mysql
5
NAME            READY   STATUS    RESTARTS   AGE
6
mysql-2-v2gbg   1/1     Running   0          64s
7

          



  • Connect to MySQL database
Shell
 




xxxxxxxxxx
1
44


 
1
oc exec mysql-2-v2gbg -it -- mysql -u mysqluser -p mysqlpw inventory
2
mysql -u root -p
3

          
4
mysql> show databases;
5
+--------------------+
6
| Database           |
7
+--------------------+
8
| information_schema |
9
| inventory          |
10
| mysql              |
11
| performance_schema |
12
| sys                |
13
+--------------------+
14
5 rows in set (0.00 sec)
15

          
16
mysql> use inventory
17
Reading table information for completion of table and column names
18
You can turn off this feature to get a quicker startup with -A
19

          
20
Database changed
21
mysql> show tables;
22
+---------------------+
23
| Tables_in_inventory |
24
+---------------------+
25
| addresses           |
26
| customers           |
27
| geom                |
28
| orders              |
29
| products            |
30
| products_on_hand    |
31
+---------------------+
32
6 rows in set (0.00 sec)
33
  
34
 mysql> select * from customers;
35
+------+------------+-----------+-----------------------+
36
| id   | first_name | last_name | email                 |
37
+------+------------+-----------+-----------------------+
38
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
39
| 1002 | George     | Bailey    | gbailey@foobar.com    |
40
| 1003 | Edward     | Walker    | ed@walker.com         |
41
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
42
+------+------------+-----------+-----------------------+
43
4 rows in set (0.00 sec) 
44

          



  • Create a MySQL connector instance to monitor the inventory database
YAML
 




xxxxxxxxxx
1
19


 
1
apiVersion: kafka.strimzi.io/v1beta1
2
kind: KafkaConnector
3
metadata:
4
  name: inventory-connector  
5
  labels:
6
    strimzi.io/cluster: my-connect-cluster
7
spec:
8
  class: io.debezium.connector.mysql.MySqlConnector
9
  tasksMax: 1  
10
  config:  
11
    database.hostname: mysql  
12
    database.port: 3306
13
    database.user: debezium
14
    database.password: dbz
15
    database.server.id: 184054  
16
    database.server.name: dbserver1  
17
    database.whitelist: inventory  
18
    database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092 
19
    database.history.kafka.topic: schema-changes.inventory 


Inventory-connector.yaml 

  •  Verify that inventory-connector was created and has started to monitor the inventory database.
Java
 




xxxxxxxxxx
1


 
1
oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)



Capture Data Change Events

Insert

Java
 




xxxxxxxxxx
1
11


 
1
INSERT INTO `inventory`.`customers`
2
(
3
    `first_name`,
4
    `last_name`,
5
    `email`)
6
VALUES
7
(
8
    'Ramu',
9
    'kakarla',
10
    'Ramu@gmail.com'
11
)
12
  
13
  "payload":{
14
      "before":null,
15
      "after":{
16
         "id":1005,
17
         "first_name":"Ramu",
18
         "last_name":"kakarla",
19
         "email":"Ramu@gmail.com"
20
      },
21
      "source":{
22
         "version":"1.1.2.Final",
23
         "connector":"mysql",
24
         "name":"dbserver1",
25
         "ts_ms":1591593372000,
26
         "snapshot":"false",
27
         "db":"inventory",
28
         "table":"customers",
29
         "server_id":223344,
30
         "gtid":null,
31
         "file":"mysql-bin.000003",
32
         "pos":364,
33
         "row":0,
34
         "thread":3,
35
         "query":null
36
      },
37
      "op":"c",
38
      "ts_ms":1591593372420,
39
      "transaction":null
40
   }
41
}


The before object is null while the after object shows the newly inserted value. Notice that the op attribute value is c, meaning it’s a CREATE event.

Update

Java
 




xxxxxxxxxx
1
38


 
1
mysql> UPDATE customers SET email='ramu@123@msn.com' WHERE id=1005;
2
Query OK, 1 row affected (0.02 sec)
3
Rows matched: 1  Changed: 1  Warnings: 0
4
  
5
 "payload":{
6
      "before":{
7
         "id":1005,
8
         "first_name":"Ramu",
9
         "last_name":"kakarla",
10
         "email":"Ramu@gmail.com"
11
      },
12
      "after":{
13
         "id":1005,
14
         "first_name":"Ramu",
15
         "last_name":"kakarla",
16
         "email":"ramu@123@msn.com"
17
      },
18
      "source":{
19
         "version":"1.1.2.Final",
20
         "connector":"mysql",
21
         "name":"dbserver1",
22
         "ts_ms":1591594059000,
23
         "snapshot":"false",
24
         "db":"inventory",
25
         "table":"customers",
26
         "server_id":223344,
27
         "gtid":null,
28
         "file":"mysql-bin.000003",
29
         "pos":673,
30
         "row":0,
31
         "thread":8,
32
         "query":null
33
      },
34
      "op":"u",
35
      "ts_ms":1591594059965,
36
      "transaction":null
37
   }
38
}  


The before object  shows the row state, while the after object shows the current state of the row. Notice that the op attribute value is u, meaning it’s a UPDATE event.

DELETE

Java
 




xxxxxxxxxx
1
32


 
1
mysql> DELETE FROM `inventory`.`customers` WHERE id = 1005;
2
Query OK, 1 row affected (0.01 sec)
3
  
4
   "payload":{
5
      "before":{
6
         "id":1005,
7
         "first_name":"Ramu",
8
         "last_name":"kakarla",
9
         "email":"ramu@123@msn.com"
10
      },
11
      "after":null,
12
      "source":{
13
         "version":"1.1.2.Final",
14
         "connector":"mysql",
15
         "name":"dbserver1",
16
         "ts_ms":1591594605000,
17
         "snapshot":"false",
18
         "db":"inventory",
19
         "table":"customers",
20
         "server_id":223344,
21
         "gtid":null,
22
         "file":"mysql-bin.000003",
23
         "pos":1018,
24
         "row":0,
25
         "thread":8,
26
         "query":null
27
      },
28
      "op":"d",
29
      "ts_ms":1591594605296,
30
      "transaction":null
31
   }
32
}


The before object  state is not null  while the after object state is null. Notice that the op attribute value is d, meaning it’s a DELETE event.

Enjoy!


Database MySQL kafka Connector (mathematics) Event Data (computing) Java (programming language)

Opinions expressed by DZone contributors are their own.

Related

  • Building an Enterprise CDC Solution
  • How To Get Closer to Consistency in Microservice Architecture
  • Data Ingestion Into Azure Data Explorer Using Kafka Connect
  • Tutorial: Data Ingestion From Kafka to Azure Data Explorer

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!