DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • JSON-Based Serialized LOB Pattern
  • Kafka JDBC Source Connector for Large Data
  • FHIR Data Model With Couchbase N1QL
  • Working With Data in Microservices

Trending

  • Scalable, Resilient Data Orchestration: The Power of Intelligent Systems
  • Unmasking Entity-Based Data Masking: Best Practices 2025
  • Apache Doris vs Elasticsearch: An In-Depth Comparative Analysis
  • Contextual AI Integration for Agile Product Teams
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Real-Time Streaming ETL Using Apache Kafka, Kafka Connect, Debezium, and ksqlDB

Real-Time Streaming ETL Using Apache Kafka, Kafka Connect, Debezium, and ksqlDB

Clarify why we need to transfer data from one point to another, look at traditional approaches, and describe how to build a real-time streaming ETL process.

By 
Dursun Koç user avatar
Dursun Koç
DZone Core CORE ·
Sep. 21, 22 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
6.4K Views

Join the DZone community and get the full member experience.

Join For Free

As most of you already know, ETL stands for Extract-Transform-Load and is the process of moving data from one source system to another. First, we will clarify why we need to transfer data from one point to another; second, we will look at traditional approaches; finally, we will describe how one can build a real-time streaming ETL process using Apache Kafka, Kafka Connect, Debezium, and ksqlDB.

When we build our business applications, we design the data model considering the functional requirements of our application. We do not take account of any kind of operational or analytical reporting requirements. A data model for reporting requirements is to be denormalized, whereas the data model for operations of an application is to be mostly normalized. So, for reporting or any kind of analytical purposes, we are required to convert our data model into denormalized form. 

In order to reshape our data, we need to move it to another database. One may argue that we can reshape our data within the same database using database views, or materialized views, but configurations for a reporting database may differ from an operational database, mostly an operational database is configured as an OLTP (transactional), and a reporting database is configured as OLAP (analytical). Moreover, executing a reporting process on an operational database will slow down the business transaction, and it will result in a slowdown in the business process, so your businesspeople will be unhappy with it. TLDR; if you need to prepare a report or want to make analytical studies on your operational database, you should move your data to another database.

In the industry, people mostly extract data from the source system in batches, in reasonable periods, mostly daily, but it can be hourly or once in two or three days. Keeping the period short may cause higher resource usage in the source system and frequent interruption in the target system; however, keeping it long may cause an up-to-date problem in the target system. So we need something that will cause a minimum effect on the source systems' performance and will update the target system in shorter periods or maybe in real time.

Now let’s see the proposed architecture. You can find the full source code of this demo project on my GitHub repository: https://github.com/dursunkoc/ksqlwithconnect.  We will be using Debezium source connector to extract data changes from the source system. 

Debezium is not extracting data using SQL. It uses database log files to track the changes in the database, so it has minimum effect on the source system. For more information about Debezium, please visit their website. 

After the data is extracted, we need Kafka Connect to stream it into Apache Kafka in order to play with it and reshape it as we required. And we will be using ksqlDB in order to reshape the raw data in a way we are required in the target system. Let’s consider a simple ordering system database in which we have a customer table, a product table, and an orders table, as shown below.

 

Now, let’s consider we need to present a report on orders where we see purchaser’s email, and the name of the product on the same row. So we need a table, as shown below:

The customer column will contain the email of the customer which resides in the email field of the customers table, and the product column will contain the name of the product which resides in the name field of the products table.

First, we need to create a source connector to extract the data from source database.  In our sample case, the source database is a MySQL database, so we will be using Debezium MySQL Source Connector as below:

 
CREATE SOURCE CONNECTOR `mysql-connector` WITH(

    "connector.class"= 'io.debezium.connector.mysql.MySqlConnector',

    "tasks.max"= '1',

    "database.hostname"= 'mysql',

    "database.port"= '3306',

    "database.user"= 'root',

    "database.password"= 'debezium',

    "database.server.id"= '184054',

    "database.server.name"= 'dbserver1',

    "database.whitelist"= 'inventory',

    "table.whitelist"= 'inventory.customers,inventory.products,inventory.orders',

    "database.history.kafka.bootstrap.servers"= 'kafka:9092',

    "database.history.kafka.topic"= 'schema-changes.inventory',

    "transforms"= 'unwrap',

    "transforms.unwrap.type"= 'io.debezium.transforms.ExtractNewRecordState',

    "key.converter"= 'org.apache.kafka.connect.json.JsonConverter',

    "key.converter.schemas.enable"= 'false',

    "value.converter"= 'org.apache.kafka.connect.json.JsonConverter',

    "value.converter.schemas.enable"= 'false');

Now we will have Kafka topics for tables, customers, products, and orders from the source system.

 
ksql> show topics;

 

 Kafka Topic                   | Partitions | Partition Replicas

-----------------------------------------------------------------

 dbserver1                     | 1          | 1

 dbserver1.inventory.customers | 1          | 1

 dbserver1.inventory.orders    | 1          | 1

 dbserver1.inventory.products  | 1          | 1

 default_ksql_processing_log   | 1          | 1

 my_connect_configs            | 1          | 1

 my_connect_offsets            | 25         | 1

 my_connect_statuses           | 5          | 1

 schema-changes.inventory      | 1          | 1

-----------------------------------------------------------------

Now, with the following scripts, we will create a ksqlDB stream for orders which joins customer and products data beside the order data.

 
CREATE STREAM S_CUSTOMER (ID INT,

                       FIRST_NAME string,

                       LAST_NAME string,

                       EMAIL string)

                 WITH (KAFKA_TOPIC='dbserver1.inventory.customers',

                       VALUE_FORMAT='json');

 

CREATE TABLE T_CUSTOMER

AS

    SELECT id,

           latest_by_offset(first_name) as fist_name,

           latest_by_offset(last_name) as last_name,

           latest_by_offset(email) as email

    FROM s_customer

    GROUP BY id

    EMIT CHANGES;

 

CREATE STREAM S_PRODUCT (ID INT,

                       NAME string,

                       description string,

                       weight DOUBLE)

                 WITH (KAFKA_TOPIC='dbserver1.inventory.products',

                       VALUE_FORMAT='json');

 

CREATE TABLE T_PRODUCT

AS

    SELECT id,

           latest_by_offset(name) as name,

           latest_by_offset(description) as description,

           latest_by_offset(weight) as weight

    FROM s_product

    GROUP BY id

    EMIT CHANGES;

 

CREATE STREAM s_order (

    order_number integer,

    order_date timestamp,

    purchaser integer,

    quantity integer,

    product_id integer) 

    WITH (KAFKA_TOPIC='dbserver1.inventory.orders',VALUE_FORMAT='json');

 

CREATE STREAM SA_ENRICHED_ORDER WITH (VALUE_FORMAT='AVRO') AS

   select o.order_number, o.quantity, p.name as product, c.email as customer, p.id as product_id, c.id as customer_id

     from s_order as o 

left join t_product as p on o.product_id = p.id

left join t_customer as c on o.purchaser = c.id

partition by o.order_number

emit changes;

Finally, with help of a JDBC sink connector, we will push our enriched orders table into the PostgreSQL database.

 
CREATE SINK CONNECTOR `postgres-sink` WITH(

    "connector.class"= 'io.confluent.connect.jdbc.JdbcSinkConnector',

    "tasks.max"= '1',

    "dialect.name"= 'PostgreSqlDatabaseDialect',

    "table.name.format"= 'ENRICHED_ORDER',

    "topics"= 'SA_ENRICHED_ORDER',

    "connection.url"= 'jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw',

    "auto.create"= 'true',

    "insert.mode"= 'upsert',

    "pk.fields"= 'ORDER_NUMBER',

    "pk.mode"= 'record_key',

    "key.converter"= 'org.apache.kafka.connect.converters.IntegerConverter',

    "key.converter.schemas.enable" = 'false',

    "value.converter"= 'io.confluent.connect.avro.AvroConverter',

    "value.converter.schemas.enable" = 'true',

    "value.converter.schema.registry.url"= 'http://schema-registry:8081'

);

You can find the full source code of this demo project on my GitHub repository: https://github.com/dursunkoc/ksqlwithconnect.

Database Extract, transform, load HTTPS JSON Operational database Data (computing) Data model (GIS) kafka systems Data Types

Published at DZone with permission of Dursun Koç, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • JSON-Based Serialized LOB Pattern
  • Kafka JDBC Source Connector for Large Data
  • FHIR Data Model With Couchbase N1QL
  • Working With Data in Microservices

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!