DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Building Real-Time Applications to Process Wikimedia Streams Using Kafka and Hazelcast
  • Boosting Similarity Search With Stream Processing
  • Harnessing Real-Time Insights With Streaming SQL on Kafka
  • Evaluating Message Brokers

Trending

  • AI-Driven Test Automation Techniques for Multimodal Systems
  • Building Resilient Identity Systems: Lessons from Securing Billions of Authentication Requests
  • Scaling DevOps With NGINX Caching: Reducing Latency and Backend Load
  • Unlocking the Benefits of a Private API in AWS API Gateway
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Enriching Kafka Applications With Contextual Data

Enriching Kafka Applications With Contextual Data

Kafka is great for event streaming architectures, continuous data integration (ETL), and messaging systems of record (database). Learn how to enhance your Kafka applications.

By 
Fawaz Ghali, PhD user avatar
Fawaz Ghali, PhD
DZone Core CORE ·
May. 12, 23 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
6.9K Views

Join the DZone community and get the full member experience.

Join For Free

Developing high-performance large-stream processing applications is a challenging task. Choosing the right tool(s) is crucial to get the job done; as developers, we tend to focus on performance, simplicity, and cost. However, the cost becomes relatively high if we end up with two or more tools to do the same task. Simply put, you need to multiply development time, deployment time, and maintenance costs by the number of tools. 

Kafka is great for event streaming architectures, continuous data integration (ETL), and messaging systems of record (database). However, Kafka has some challenges, such as a complex architecture with many moving parts, it can’t be embedded, and it’s a centralized middleware, just like a database. Moreover, Kafka does not offer batch processing, and all intermediate steps are materialized to disk in Kafka. This leads to enormous disk space usage. 

Hazelcast is a real-time stream processing platform that can enhance Kafka (and many more sources). Hazelcast can address Kafka’s challenges mentioned above by simplifying deployment and operations with ultra-low latency and a lightweight architecture making it a great tool for edge (restricted) environments. This blog post aims to take your Kafka applications to the next level. Hazelcast can process real-time and batch data in one platform and enriches your Kafka applications with "context."

Prerequisites

  • If you are new to Kafka or you’re just getting started, I recommend you start with Kafka Documentation.
  • If you are new to Hazelcast or you’re just getting started, I recommend you start with Hazelcast Documentation.
  • For Kafka, you need to download Kafka, start the environment, create a topic to store events, write some events to your topic, and finally read these events. Here’s a Kafka Quick Start.
  • For Hazelcast, you can use either the Platform or the Cloud. I will use a local cluster.

Step 1

Start a Hazelcast local cluster: This will run a Hazelcast cluster in client/server mode and an instance of Management Center running on your local network.

 
brew tap hazelcast/hz

brew install hazelcast@5.2.3

hz -V

hz start 


To add more members to your cluster, open another terminal window and rerun the start command. 

Optional: The Management Center is a user interface for managing and monitoring your cluster. It is a handy tool that you can use to check clusters/nodes, memory, and jobs.

 
brew tap hazelcast/hz

brew install hazelcast-management-center@5.2.3

hz-mc -V

hz-mc start


We will use the SQL shell, the easiest way to run SQL queries on a cluster. You can use SQL to query data in maps and Kafka topics. The results can be sent directly to the client or inserted into maps or Kafka topics. You can do so by running the following command:

 
bin/hz-cli sql


We need a Kafka Broker. I’m using a Docker image to run it (on the same cluster/device as my Hazelcast member).

 
docker run --name kafka --network hazelcast-network --rm hazelcast/hazelcast-quickstart-kafka


Step 2

Once we have all components up and running, we need to create a Kafka mapping to allow Hazelcast to access messages in the trades topic.

SQL
 
CREATE MAPPING trades (

    id BIGINT,

    ticker VARCHAR,

    price DECIMAL,

    amount BIGINT)

TYPE Kafka

OPTIONS (

    'valueFormat' = 'json-flat',

    'bootstrap.servers' = '127.0.0.1:9092'

);

 

Here, you configure the connector to read JSON values with the following fields:

JSON
 
{

  "id"

  "ticker"

  "price"

  "amount"

}


You can write a streaming query to filter messages from Kafka:

SQL
 
SELECT ticker, ROUND(price * 100) AS price_cents, amount

  FROM trades

  WHERE price * amount > 100;


This will return an empty table, we need to insert some data:

SQL
 
INSERT INTO trades VALUES

  (1, 'ABCD', 5.5, 10),

  (2, 'EFGH', 14, 20);


Go back to the terminal where you created the streaming query. You should see that Hazelcast has executed the query and filtered the results.

Step 3

While the previous step is possible to execute with Kafka only, this step will enrich the data in the Kafka message, taking your Kafka processing to the next step. Kafka messages are often small and contain minimal data to reduce network latency. For example, the trades topic does not contain any information about the company that’s associated with a given ticker. To get deeper insights from data in Kafka topics, you can join query results with data in other mappings. In order to do this, we need to create a mapping to a new map in which to store the company information that you’ll use to enrich results from the trades topic. Then we need to add some entries to the companies map.

SQL
 
CREATE MAPPING companies (

__key BIGINT,

ticker VARCHAR,

company VARCHAR,

marketcap BIGINT)

TYPE IMap

OPTIONS (

'keyFormat'='bigint',

'valueFormat'='json-flat');

 

INSERT INTO companies VALUES

(1, 'ABCD', 'The ABCD', 100000),

(2, 'EFGH', 'The EFGH', 5000000);


Use the JOIN clause to merge results from the companies map and trades topic so you can see which companies are being traded.

SQL
 
SELECT trades.ticker, companies.company, trades.amount

FROM trades

JOIN companies

ON companies.ticker = trades.ticker;


In another SQL shell, publish some messages to the trades topic.

SQL
 
INSERT INTO trades VALUES

  (1, 'ABCD', 5.5, 10),

  (2, 'EFGH', 14, 20);


Go back to the terminal where you created the streaming query that merges results from the companies map and trades topic.

Step 4

Finally, we will ingest query results into a Hazelcast map. We create a mapping to a new map in which to ingest your streaming query results.

SQL
 
CREATE MAPPING trade_map (

__key BIGINT,

ticker VARCHAR,

company VARCHAR,

amount BIGINT)

TYPE IMap

OPTIONS (

'keyFormat'='bigint',

'valueFormat'='json-flat');


Submit a streaming job to your cluster that will monitor your trade topic for changes and store them in a map. You can check running jobs by running SHOW JOBS:

SQL
 
CREATE JOB ingest_trades AS

SINK INTO trade_map

SELECT trades.id, trades.ticker, companies.company, trades.amount

FROM trades

JOIN companies

ON companies.ticker = trades.ticker;

 

INSERT INTO trades VALUES

(1, 'ABCD', 5.5, 10),

(2, 'EFGH', 14, 20);


 Now you can query your trade_map map to see that the Kafka messages have been added to it.

SQL
 
 SELECT * FROM trade_map;


The following diagram explains our demo setup. We have a Kafka topic called trades which contains a collection of trades that will be ingested into a Hazelcast cluster. Additionally, a companies map represents companies’ data stored in the Hazelcast cluster. We create a new map by aggregating trades and companies into the ingest_trades map. We used SQL, but you can send results to a web server/client.

Hazelcast demo setup

Summary

So here you have it! Hazelcast can be used to enrich Kafka applications with contextual data. This can be done programmatically, using the command line, or through SQL as demonstrated in this blog post. Hazelcast can process real-time data and batch data in one platform, making it a great platform to use with Kafka applications by providing “context” to your Kafka applications. We are looking forward to your feedback and comments about this blog post. Don’t hesitate to share your experience with us in our community GitHub repository.

Hazelcast Stream processing kafka Open source

Published at DZone with permission of Fawaz Ghali, PhD. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Building Real-Time Applications to Process Wikimedia Streams Using Kafka and Hazelcast
  • Boosting Similarity Search With Stream Processing
  • Harnessing Real-Time Insights With Streaming SQL on Kafka
  • Evaluating Message Brokers

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!