DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workkloads.

Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Java Consumer and Producer Messages Between Kafka Server [Video Tutorials]
  • How To Install CMAK, Apache Kafka, Java 18, and Java 19 [Video Tutorials]
  • Consumer Group in Kafka [Video Tutorials]: Partitions and Consumers
  • Apache Kafka in Java [Video Tutorials]: Architecture and Simple Consumer/Producer

Trending

  • Testing SingleStore's MCP Server
  • It’s Not About Control — It’s About Collaboration Between Architecture and Security
  • Debugging Core Dump Files on Linux - A Detailed Guide
  • Analyzing “java.lang.OutOfMemoryError: Failed to create a thread” Error
  1. DZone
  2. Data Engineering
  3. Big Data
  4. How To Implement a Kafka Producer

How To Implement a Kafka Producer

We run through a quick tutorial on how create and use Kafka producers for a big data project, and how to take advantage of Kafka's Java API.

By 
Mallikarjun M user avatar
Mallikarjun M
·
Oct. 01, 18 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
71.3K Views

Join the DZone community and get the full member experience.

Join For Free

This article deals with the ways to implement a Kafka producer.

A Kafka producer is an application that can act as a source of data in a Kafka cluster. A producer can publish messages on one or more Kafka topics.

So, how many ways are there to implement a Kafka producer? Well, there are a lot! But in this article, we shall walk you through two ways.

  1. Kafka Command Line Tools
  2. Kafka Producer Java API

Create a Kafka Producer Using the Command Line Interface

In addition to the APIs provided by Kafka for different programming languages, Kafka also provides tools to work with the basic components like producers, consumers, topics, etc., via the Command Line Interface.

Before we can start a Kafka producer, we should start a Kafka cluster and create a Kafka topic.

Navigate to the root directory of the Kafka package and run the following commands.

$ bin/zookeeper-server-start.sh config/zookeeper.properties

$ bin/kafka-server-start.sh config/server.properties


To create a Kafka topic named "sampleTopic", run the following command.

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sampleTopic


Finally, to create a Kafka Producer, run the following:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sampleTopic


The Kafka producer created connects to the cluster which is running on localhost and listening on port 9092. The producer posts the messages to the topic, "sampleTopic".

When you run the above shell script, a console appears. 

Apache Kafka Console Producer

We can start sending the messages to the Kafka cluster from the console. The messages will be published to the Kafka Topic, "sampleTopic".

Kafka Producer Using Java

Kafka provides a Java API. Using this, we can write a Java program that acts as a Kafka producer.

To create a Kafka producer using Java, we need to write two Java files.

  • SampleProducer.java: A thread that acts as a producer and contains configuration information.
  • KafkaProducerDemo.java: Contains code to start the thread, SampleProducer.java.

SampleProducer.java

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
* Producer Example in Apache Kafka
* @author www.tutorialkart.com
*/
public class SampleProducer extends Thread {
    private final KafkaProducer<Integer, String> producer;
    private final String topic;
    private final Boolean isAsync;

    public static final String KAFKA_SERVER_URL = "localhost";
    public static final int KAFKA_SERVER_PORT = 9092;
    public static final String CLIENT_ID = "SampleProducer";

    public SampleProducer(String topic, Boolean isAsync) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
        properties.put("client.id", CLIENT_ID);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(properties);
        this.topic = topic;
        this.isAsync = isAsync;
    }

    public void run() {
        int messageNo = 1;
        while (true) {
            String messageStr = "Message_" + messageNo;
            long startTime = System.currentTimeMillis();
            if (isAsync) { // Send asynchronously
                producer.send(new ProducerRecord<>(topic,
                        messageNo,
                        messageStr), new DemoCallBack(startTime, messageNo, messageStr));
            } else { // Send synchronously
                try {
                    producer.send(new ProducerRecord<>(topic,
                            messageNo,
                            messageStr)).get();
                    System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                    // handle the exception
                }
            }
            ++messageNo;
        }
    }
}

class DemoCallBack implements Callback {

    private final long startTime;
    private final int key;
    private final String message;

    public DemoCallBack(long startTime, int key, String message) {
        this.startTime = startTime;
        this.key = key;
        this.message = message;
    }

    /**
     * onCompletion method will be called when the record sent to the Kafka Server has been acknowledged.
     *
     * @param metadata  The metadata contains the partition and offset of the record. Null if an error occurred.
     * @param exception The exception thrown during processing of this record. Null if no error occurred.
     */
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
            System.out.println(
                    "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
                    "), " +
                    "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
        } else {
            exception.printStackTrace();
        }
    }
} 


Also, messages can be sent synchronously and asynchronously. For asynchronously sending messages, theDemoCallback class is used as a callback after the message is successfully posted to the Kafka topic.

KafkaProducerDemo.java

public class KafkaProducerDemo {
    public static final String TOPIC = "testTopic";

    public static void main(String[] args) {
        boolean isAsync = false;
        SampleProducer producerThread = new SampleProducer(TOPIC, isAsync);
        // start the producer
        producerThread.start();

    }
} 


In the above example, a synchronous Kafka producer is created and then started using Thread.start().

Run KafkaProducerDemo.java.

Sent message: (1, Message_1)
Sent message: (2, Message_2)
Sent message: (3, Message_3)
Sent message: (4, Message_4)
Sent message: (5, Message_5)
Sent message: (6, Message_6)
Sent message: (7, Message_7)
Sent message: (8, Message_8)
Sent message: (9, Message_9)
Sent message: (10, Message_10)
Sent message: (11, Message_11)
Sent message: (12, Message_12) 


The messages are published synchronously to the Kafka topic.

References and Further Reading

  • Kafka Documentation - Producer API
  • Java Tutorials
kafka Java (programming language)

Opinions expressed by DZone contributors are their own.

Related

  • Java Consumer and Producer Messages Between Kafka Server [Video Tutorials]
  • How To Install CMAK, Apache Kafka, Java 18, and Java 19 [Video Tutorials]
  • Consumer Group in Kafka [Video Tutorials]: Partitions and Consumers
  • Apache Kafka in Java [Video Tutorials]: Architecture and Simple Consumer/Producer

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!