Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

How to Implement a Kafka Producer

DZone 's Guide to

How to Implement a Kafka Producer

We run through a quick tutorial on how create and use Kafka producers for a big data project, and how to take advantage of Kafka's Java API.

· Big Data Zone ·
Free Resource

This article deals with the ways to implement a Kafka producer.

A Kafka producer is an application that can act as a source of data in a Kafka cluster. A producer can publish messages to one or more Kafka topics.

So, how many ways are there to implement a Kafka producer? Well, there are a lot! But in this article, we shall walk you through two ways.

  1. Kafka Command Line Tools

  2. Kafka Producer Java API

Create a Kafka Producer Using the Command Line Interface

In addition to the APIs provided by Kafka for different programming languages, Kafka is also providing tools to work with the basic components like producers, consumers, topics, etc., via the Command Line Interface.

Before we can start a Kafka producer, we should start a Kafka cluster and create a Kafka topic.

Navigate to the root directory of the Kafka package and run the following commands.

$ bin/zookeeper-server-start.sh config/zookeeper.properties

$ bin/kafka-server-start.sh config/server.properties

To create a Kafka topic named "sampleTopic", run the following command.

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sampleTopic

Finally, to create a Kafka Producer, run the following:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sampleTopic

The Kafka producer created connects to the cluster which is running on localhost and listening on port 9092. The producer posts the messages to the topic, "sampleTopic".

When you run the above shell script, a console appears. 

Apache Kafka Console Producer

We can start sending the messages to the Kafka cluster from the console. The messages will be published to the Kafka Topic, "sampleTopic".

Kafka Producer Using Java

Kafka provides a Java API. Using this, we can write a Java program that acts as a Kafka producer.

To create a Kafka producer using Java, we need to write two Java files.

  • SampleProducer.java: A thread that acts as a producer and contains configuration information.

  • KafkaProducerDemo.java: Contains code to start the thread, SampleProducer.java.

SampleProducer.java

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
* Producer Example in Apache Kafka
* @author www.tutorialkart.com
*/
public class SampleProducer extends Thread {
    private final KafkaProducer<Integer, String> producer;
    private final String topic;
    private final Boolean isAsync;

    public static final String KAFKA_SERVER_URL = "localhost";
    public static final int KAFKA_SERVER_PORT = 9092;
    public static final String CLIENT_ID = "SampleProducer";

    public SampleProducer(String topic, Boolean isAsync) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
        properties.put("client.id", CLIENT_ID);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(properties);
        this.topic = topic;
        this.isAsync = isAsync;
    }

    public void run() {
        int messageNo = 1;
        while (true) {
            String messageStr = "Message_" + messageNo;
            long startTime = System.currentTimeMillis();
            if (isAsync) { // Send asynchronously
                producer.send(new ProducerRecord<>(topic,
                        messageNo,
                        messageStr), new DemoCallBack(startTime, messageNo, messageStr));
            } else { // Send synchronously
                try {
                    producer.send(new ProducerRecord<>(topic,
                            messageNo,
                            messageStr)).get();
                    System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                    // handle the exception
                }
            }
            ++messageNo;
        }
    }
}

class DemoCallBack implements Callback {

    private final long startTime;
    private final int key;
    private final String message;

    public DemoCallBack(long startTime, int key, String message) {
        this.startTime = startTime;
        this.key = key;
        this.message = message;
    }

    /**
     * onCompletion method will be called when the record sent to the Kafka Server has been acknowledged.
     *
     * @param metadata  The metadata contains the partition and offset of the record. Null if an error occurred.
     * @param exception The exception thrown during processing of this record. Null if no error occurred.
     */
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
            System.out.println(
                    "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
                    "), " +
                    "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
        } else {
            exception.printStackTrace();
        }
    }
} 

Also, messages can be sent synchronously and asynchronously. For asynchronously sending messages, theDemoCallback class is used as a callback after the message is successfully posted to the Kafka topic.

KafkaProducerDemo.java

public class KafkaProducerDemo {
    public static final String TOPIC = "testTopic";

    public static void main(String[] args) {
        boolean isAsync = false;
        SampleProducer producerThread = new SampleProducer(TOPIC, isAsync);
        // start the producer
        producerThread.start();

    }
} 

In the above example, a synchronous Kafka producer is created and then started using Thread.start().

Run KafkaProducerDemo.java.

Sent message: (1, Message_1)
Sent message: (2, Message_2)
Sent message: (3, Message_3)
Sent message: (4, Message_4)
Sent message: (5, Message_5)
Sent message: (6, Message_6)
Sent message: (7, Message_7)
Sent message: (8, Message_8)
Sent message: (9, Message_9)
Sent message: (10, Message_10)
Sent message: (11, Message_11)
Sent message: (12, Message_12) 

The messages are published synchronously to the Kafka topic.

References and Further Reading

Kafka Documentation - Producer API

Kafka Tutorial by TutorialKart

Topics:
kafka ,java ,big data ,producers ,tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}