DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Minimizing Latency in Kafka Streaming Applications That Use External API or Database Calls
  • Integration Patterns in Microservices World
  • OWASP TOP 10 API Security Part 2 (Broken Object Level Authorization)
  • Reactive Messaging Examples for Quarkus

Trending

  • Building a Real-Time Audio Transcription System With OpenAI’s Realtime API
  • Build a Simple REST API Using Python Flask and SQLite (With Tests)
  • How to Create a Successful API Ecosystem
  • Introducing Graph Concepts in Java With Eclipse JNoSQL
  1. DZone
  2. Data Engineering
  3. Databases
  4. Take a Deep Dive Into the Kafka Producer API

Take a Deep Dive Into the Kafka Producer API

This article teaches us about the Kafka Producer API and demonstrates how to construct a Kafka producer.

By 
Jashan Goyal user avatar
Jashan Goyal
·
Sep. 10, 19 · Tutorial
Likes (10)
Comment
Save
Tweet
Share
24.1K Views

Join the DZone community and get the full member experience.

Join For Free

Image title

Deep diving into the Kafka Producer API!

In this article, we are going to learn about the Kafka Producer API. If you are new to Kafka, then I recommend that you get a basic idea of Kafka Quickstart from Kafka-quickstart.

There are many reasons an application might need to write messages to Kafka: recording metrics, storing log messages, buffering information before writing to a database, recording data that comes from sensors, and much more.

You may also like: Kafka Producer Overview

Producer Flow


Producer API flow

We start producing messages to Kafka by creating a ProducerRecord, which must include the topic we want to send the record and value to. We can also specify a key and/or a partition. Once we send the ProducerRecord, the first thing the producer will do is serialize the key and value objects to ByteArrays so they can be sent over the network. Now, the data is sent to a partitioner.

If we specified a partition in the ProducerRecord, the partitioner doesn’t do anything and simply returns the partition we specified. In addition, if we didn’t, the partitioner will choose a partition for us, usually based on the ProducerRecord key. Once a partition is selected, the producer knows which topic and partition the record will go to. Then, it will add the record to a batch of records that will also be sent to the same topic and partition.

A separate thread is responsible for sending those batches of records to the appropriate Kafka brokers. When the broker receives the messages, it sends back a response. If the messages were successfully written to Kafka, it will return a RecordMetadata object with the topic, partition, and the offset of the record within the partition. If the broker failed to write the messages, it will return an error. When the producer receives an error, it may retry sending the message a few more times before giving up and returning an error.

Constructing a Kafka Producer

The first step in writing messages to Kafka is to create a producer object with the properties you want to pass to the producer. A Kafka producer has three mandatory properties:

1. bootstrap.servers
List of host:port pairs of brokers that the producer will use to establish the initial connection to the Kafka cluster. This list doesn’t need to include all brokers since the producer will get more information after the initial connection.

2. key.serializer
Name of a class that will be used to serialize the keys of the records. We will produce to Kafka. Kafka brokers expect byte arrays as keys and values of messages. key.serializer should be set to the name of a class that implements the org.apache.kafka.common.serialization.Serializer interface. The producer will use this class to serialize the key object to a byte array.

3. value.serializer
The name of a class that will be used to serialize the values of the records we will produce to Kafka. The same way you set key.serializer to a name of a class that will serialize the message key object to a byte array, you set value.serializer to a class that will serialize the message value object.

private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);


  • We start with a Properties object.
  • Since we plan on using strings for message key and value, we use the built-in StringSerializer.
  • Here, we create a new producer by setting the appropriate key and value types and passing the Properties object.

Start Sending Messages

We can send data by three types:

1. Fire-and-forget
We send a message to the server and don’t really care if it arrives successfully or not. Most of the time, it will arrive successfully, since Kafka is highly available and the producer will retry sending messages automatically. However, some messages will get lost using this method.

ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}


2. Synchronous send
We send a message, the send() method returns a Future object, and we use get() to wait on the future and see if the send() was successful or not.

ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}


Here, we are using Future.get() to wait for a reply from Kafka.

3. Asynchronous send
We call the send() method with a callback function, which gets triggered when it receives a response from the Kafka broker.

private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());


To use callbacks, you need a class that implements the org.apache.kafka. clients.producer.Callback interface, which has a single function—onCompletion().

Conclusion

Thank you for reading! If you like this article, please do show your appreciation by giving it a thumbs up. Share this article and feel free to give me suggestions on how I can improve my future posts to suit your needs. Follow me to get updates on different technologies. For any queries, feel free to contact me at jashan.goyal@knoldus.in.

Further Reading

Kafka Technical Overview

kafka API Database Object (computer science)

Published at DZone with permission of Jashan Goyal. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Minimizing Latency in Kafka Streaming Applications That Use External API or Database Calls
  • Integration Patterns in Microservices World
  • OWASP TOP 10 API Security Part 2 (Broken Object Level Authorization)
  • Reactive Messaging Examples for Quarkus

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!