DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • JSON-Based Serialized LOB Pattern
  • Kafka JDBC Source Connector for Large Data
  • Inside What Actually Breaks in Large-Scale S/4HANA Conversions (And How to Prevent It)
  • Apache Spark 3 to Apache Spark 4 Migration: What Breaks, What Improves, What's Mandatory

Trending

  • Building a DevOps-Ready Internal Developer Platform: A Hands-On Guide to Golden Paths, Self-Service, and Automated Delivery Pipelines
  • From Indicators to Insights: Automating IOC Enrichment Using Python and Threat Feeds
  • DevOps and Platform Engineering Readiness Checklist: Everything Needed for a Scalable, Secure, High-Velocity Delivery Platform
  • How to Format Articles for DZone
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Real-Time Streaming ETL Using Apache Kafka, Kafka Connect, Debezium, and ksqlDB

Real-Time Streaming ETL Using Apache Kafka, Kafka Connect, Debezium, and ksqlDB

Clarify why we need to transfer data from one point to another, look at traditional approaches, and describe how to build a real-time streaming ETL process.

By 
Dursun Koç user avatar
Dursun Koç
DZone Core CORE ·
Sep. 21, 22 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
6.6K Views

Join the DZone community and get the full member experience.

Join For Free

As most of you already know, ETL stands for Extract-Transform-Load and is the process of moving data from one source system to another. First, we will clarify why we need to transfer data from one point to another; second, we will look at traditional approaches; finally, we will describe how one can build a real-time streaming ETL process using Apache Kafka, Kafka Connect, Debezium, and ksqlDB.

When we build our business applications, we design the data model considering the functional requirements of our application. We do not take account of any kind of operational or analytical reporting requirements. A data model for reporting requirements is to be denormalized, whereas the data model for operations of an application is to be mostly normalized. So, for reporting or any kind of analytical purposes, we are required to convert our data model into denormalized form. 

In order to reshape our data, we need to move it to another database. One may argue that we can reshape our data within the same database using database views, or materialized views, but configurations for a reporting database may differ from an operational database, mostly an operational database is configured as an OLTP (transactional), and a reporting database is configured as OLAP (analytical). Moreover, executing a reporting process on an operational database will slow down the business transaction, and it will result in a slowdown in the business process, so your businesspeople will be unhappy with it. TLDR; if you need to prepare a report or want to make analytical studies on your operational database, you should move your data to another database.

In the industry, people mostly extract data from the source system in batches, in reasonable periods, mostly daily, but it can be hourly or once in two or three days. Keeping the period short may cause higher resource usage in the source system and frequent interruption in the target system; however, keeping it long may cause an up-to-date problem in the target system. So we need something that will cause a minimum effect on the source systems' performance and will update the target system in shorter periods or maybe in real time.

Now let’s see the proposed architecture. You can find the full source code of this demo project on my GitHub repository: https://github.com/dursunkoc/ksqlwithconnect.  We will be using Debezium source connector to extract data changes from the source system. 

Debezium is not extracting data using SQL. It uses database log files to track the changes in the database, so it has minimum effect on the source system. For more information about Debezium, please visit their website. 

After the data is extracted, we need Kafka Connect to stream it into Apache Kafka in order to play with it and reshape it as we required. And we will be using ksqlDB in order to reshape the raw data in a way we are required in the target system. Let’s consider a simple ordering system database in which we have a customer table, a product table, and an orders table, as shown below.

 

Now, let’s consider we need to present a report on orders where we see purchaser’s email, and the name of the product on the same row. So we need a table, as shown below:

The customer column will contain the email of the customer which resides in the email field of the customers table, and the product column will contain the name of the product which resides in the name field of the products table.

First, we need to create a source connector to extract the data from source database.  In our sample case, the source database is a MySQL database, so we will be using Debezium MySQL Source Connector as below:

 
CREATE SOURCE CONNECTOR `mysql-connector` WITH(

    "connector.class"= 'io.debezium.connector.mysql.MySqlConnector',

    "tasks.max"= '1',

    "database.hostname"= 'mysql',

    "database.port"= '3306',

    "database.user"= 'root',

    "database.password"= 'debezium',

    "database.server.id"= '184054',

    "database.server.name"= 'dbserver1',

    "database.whitelist"= 'inventory',

    "table.whitelist"= 'inventory.customers,inventory.products,inventory.orders',

    "database.history.kafka.bootstrap.servers"= 'kafka:9092',

    "database.history.kafka.topic"= 'schema-changes.inventory',

    "transforms"= 'unwrap',

    "transforms.unwrap.type"= 'io.debezium.transforms.ExtractNewRecordState',

    "key.converter"= 'org.apache.kafka.connect.json.JsonConverter',

    "key.converter.schemas.enable"= 'false',

    "value.converter"= 'org.apache.kafka.connect.json.JsonConverter',

    "value.converter.schemas.enable"= 'false');

Now we will have Kafka topics for tables, customers, products, and orders from the source system.

 
ksql> show topics;

 

 Kafka Topic                   | Partitions | Partition Replicas

-----------------------------------------------------------------

 dbserver1                     | 1          | 1

 dbserver1.inventory.customers | 1          | 1

 dbserver1.inventory.orders    | 1          | 1

 dbserver1.inventory.products  | 1          | 1

 default_ksql_processing_log   | 1          | 1

 my_connect_configs            | 1          | 1

 my_connect_offsets            | 25         | 1

 my_connect_statuses           | 5          | 1

 schema-changes.inventory      | 1          | 1

-----------------------------------------------------------------

Now, with the following scripts, we will create a ksqlDB stream for orders which joins customer and products data beside the order data.

 
CREATE STREAM S_CUSTOMER (ID INT,

                       FIRST_NAME string,

                       LAST_NAME string,

                       EMAIL string)

                 WITH (KAFKA_TOPIC='dbserver1.inventory.customers',

                       VALUE_FORMAT='json');

 

CREATE TABLE T_CUSTOMER

AS

    SELECT id,

           latest_by_offset(first_name) as fist_name,

           latest_by_offset(last_name) as last_name,

           latest_by_offset(email) as email

    FROM s_customer

    GROUP BY id

    EMIT CHANGES;

 

CREATE STREAM S_PRODUCT (ID INT,

                       NAME string,

                       description string,

                       weight DOUBLE)

                 WITH (KAFKA_TOPIC='dbserver1.inventory.products',

                       VALUE_FORMAT='json');

 

CREATE TABLE T_PRODUCT

AS

    SELECT id,

           latest_by_offset(name) as name,

           latest_by_offset(description) as description,

           latest_by_offset(weight) as weight

    FROM s_product

    GROUP BY id

    EMIT CHANGES;

 

CREATE STREAM s_order (

    order_number integer,

    order_date timestamp,

    purchaser integer,

    quantity integer,

    product_id integer) 

    WITH (KAFKA_TOPIC='dbserver1.inventory.orders',VALUE_FORMAT='json');

 

CREATE STREAM SA_ENRICHED_ORDER WITH (VALUE_FORMAT='AVRO') AS

   select o.order_number, o.quantity, p.name as product, c.email as customer, p.id as product_id, c.id as customer_id

     from s_order as o 

left join t_product as p on o.product_id = p.id

left join t_customer as c on o.purchaser = c.id

partition by o.order_number

emit changes;

Finally, with help of a JDBC sink connector, we will push our enriched orders table into the PostgreSQL database.

 
CREATE SINK CONNECTOR `postgres-sink` WITH(

    "connector.class"= 'io.confluent.connect.jdbc.JdbcSinkConnector',

    "tasks.max"= '1',

    "dialect.name"= 'PostgreSqlDatabaseDialect',

    "table.name.format"= 'ENRICHED_ORDER',

    "topics"= 'SA_ENRICHED_ORDER',

    "connection.url"= 'jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw',

    "auto.create"= 'true',

    "insert.mode"= 'upsert',

    "pk.fields"= 'ORDER_NUMBER',

    "pk.mode"= 'record_key',

    "key.converter"= 'org.apache.kafka.connect.converters.IntegerConverter',

    "key.converter.schemas.enable" = 'false',

    "value.converter"= 'io.confluent.connect.avro.AvroConverter',

    "value.converter.schemas.enable" = 'true',

    "value.converter.schema.registry.url"= 'http://schema-registry:8081'

);

You can find the full source code of this demo project on my GitHub repository: https://github.com/dursunkoc/ksqlwithconnect.

Database Extract, transform, load HTTPS JSON Operational database Data (computing) Data model (GIS) kafka systems Data Types

Published at DZone with permission of Dursun Koç. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • JSON-Based Serialized LOB Pattern
  • Kafka JDBC Source Connector for Large Data
  • Inside What Actually Breaks in Large-Scale S/4HANA Conversions (And How to Prevent It)
  • Apache Spark 3 to Apache Spark 4 Migration: What Breaks, What Improves, What's Mandatory

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook