DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • How AI Is Rewriting Full-Stack Java Systems: Practical Patterns with Spring Boot, Kafka and WebSockets
  • From APIs to Event-Driven Systems: Modern Java Backend Design
  • Java Consumer and Producer Messages Between Kafka Server [Video Tutorials]
  • How To Install CMAK, Apache Kafka, Java 18, and Java 19 [Video Tutorials]

Trending

  • Dear Micromanager: Your Distrust Has a Job; It’s Just Not the One You’re Doing
  • Bridging Gaps in SOC Maturity Using Detection Engineering and Automation
  • The Cost of Knowing: When Observability Becomes the Outage
  • Querying Without a Query Language
  1. DZone
  2. Data Engineering
  3. Big Data
  4. How To Implement a Kafka Producer

How To Implement a Kafka Producer

We run through a quick tutorial on how create and use Kafka producers for a big data project, and how to take advantage of Kafka's Java API.

By 
Mallikarjun M user avatar
Mallikarjun M
·
Oct. 01, 18 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
71.5K Views

Join the DZone community and get the full member experience.

Join For Free

This article deals with the ways to implement a Kafka producer.

A Kafka producer is an application that can act as a source of data in a Kafka cluster. A producer can publish messages on one or more Kafka topics.

So, how many ways are there to implement a Kafka producer? Well, there are a lot! But in this article, we shall walk you through two ways.

  1. Kafka Command Line Tools
  2. Kafka Producer Java API

Create a Kafka Producer Using the Command Line Interface

In addition to the APIs provided by Kafka for different programming languages, Kafka also provides tools to work with the basic components like producers, consumers, topics, etc., via the Command Line Interface.

Before we can start a Kafka producer, we should start a Kafka cluster and create a Kafka topic.

Navigate to the root directory of the Kafka package and run the following commands.

$ bin/zookeeper-server-start.sh config/zookeeper.properties

$ bin/kafka-server-start.sh config/server.properties


To create a Kafka topic named "sampleTopic", run the following command.

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sampleTopic


Finally, to create a Kafka Producer, run the following:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sampleTopic


The Kafka producer created connects to the cluster which is running on localhost and listening on port 9092. The producer posts the messages to the topic, "sampleTopic".

When you run the above shell script, a console appears. 

Apache Kafka Console Producer

We can start sending the messages to the Kafka cluster from the console. The messages will be published to the Kafka Topic, "sampleTopic".

Kafka Producer Using Java

Kafka provides a Java API. Using this, we can write a Java program that acts as a Kafka producer.

To create a Kafka producer using Java, we need to write two Java files.

  • SampleProducer.java: A thread that acts as a producer and contains configuration information.
  • KafkaProducerDemo.java: Contains code to start the thread, SampleProducer.java.

SampleProducer.java

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
* Producer Example in Apache Kafka
* @author www.tutorialkart.com
*/
public class SampleProducer extends Thread {
    private final KafkaProducer<Integer, String> producer;
    private final String topic;
    private final Boolean isAsync;

    public static final String KAFKA_SERVER_URL = "localhost";
    public static final int KAFKA_SERVER_PORT = 9092;
    public static final String CLIENT_ID = "SampleProducer";

    public SampleProducer(String topic, Boolean isAsync) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
        properties.put("client.id", CLIENT_ID);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(properties);
        this.topic = topic;
        this.isAsync = isAsync;
    }

    public void run() {
        int messageNo = 1;
        while (true) {
            String messageStr = "Message_" + messageNo;
            long startTime = System.currentTimeMillis();
            if (isAsync) { // Send asynchronously
                producer.send(new ProducerRecord<>(topic,
                        messageNo,
                        messageStr), new DemoCallBack(startTime, messageNo, messageStr));
            } else { // Send synchronously
                try {
                    producer.send(new ProducerRecord<>(topic,
                            messageNo,
                            messageStr)).get();
                    System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                    // handle the exception
                }
            }
            ++messageNo;
        }
    }
}

class DemoCallBack implements Callback {

    private final long startTime;
    private final int key;
    private final String message;

    public DemoCallBack(long startTime, int key, String message) {
        this.startTime = startTime;
        this.key = key;
        this.message = message;
    }

    /**
     * onCompletion method will be called when the record sent to the Kafka Server has been acknowledged.
     *
     * @param metadata  The metadata contains the partition and offset of the record. Null if an error occurred.
     * @param exception The exception thrown during processing of this record. Null if no error occurred.
     */
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
            System.out.println(
                    "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
                    "), " +
                    "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
        } else {
            exception.printStackTrace();
        }
    }
} 


Also, messages can be sent synchronously and asynchronously. For asynchronously sending messages, theDemoCallback class is used as a callback after the message is successfully posted to the Kafka topic.

KafkaProducerDemo.java

public class KafkaProducerDemo {
    public static final String TOPIC = "testTopic";

    public static void main(String[] args) {
        boolean isAsync = false;
        SampleProducer producerThread = new SampleProducer(TOPIC, isAsync);
        // start the producer
        producerThread.start();

    }
} 


In the above example, a synchronous Kafka producer is created and then started using Thread.start().

Run KafkaProducerDemo.java.

Sent message: (1, Message_1)
Sent message: (2, Message_2)
Sent message: (3, Message_3)
Sent message: (4, Message_4)
Sent message: (5, Message_5)
Sent message: (6, Message_6)
Sent message: (7, Message_7)
Sent message: (8, Message_8)
Sent message: (9, Message_9)
Sent message: (10, Message_10)
Sent message: (11, Message_11)
Sent message: (12, Message_12) 


The messages are published synchronously to the Kafka topic.

References and Further Reading

  • Kafka Documentation - Producer API
  • Java Tutorials
kafka Java (programming language)

Opinions expressed by DZone contributors are their own.

Related

  • How AI Is Rewriting Full-Stack Java Systems: Practical Patterns with Spring Boot, Kafka and WebSockets
  • From APIs to Event-Driven Systems: Modern Java Backend Design
  • Java Consumer and Producer Messages Between Kafka Server [Video Tutorials]
  • How To Install CMAK, Apache Kafka, Java 18, and Java 19 [Video Tutorials]

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook