How to Effectively Use ExecutorService in Kafka Consumers

Apache Kafka is one of today's most commonly used event streaming platforms. While using the Kafka platform, quite often, we run into a scenario where we have to process a large number of events/messages that are placed on a broker. Traditional approaches, where a consumer is listening to a topic and then processes these message within the consumer itself, can become a performance bottleneck if the number of messages being placed on the topic is high. In such cases, the rate at which a consumer can process messages will be very low, as there are a large number of messages getting placed on the topic. A potential solution that can be applied in such a scenario is to offload message processing to the worker threads in a thread pool.

In this section, we will take a look into how a Kafka consumer can offload its work to a thread pool. We will leverage Java’s ExecutorService framework to create a thread pool.

This approach primarily involves two steps. The first step is to create a KafkaConsumer that can read messages from a topic. Once the messages are read, they are delivered to a threadpool for further processing. The second step is to create worker threads that perform further processing of each message.

Step 1, Kafka Consumer Implementation: Here, we read the messages from a topic and dispatch the messages to a thread pool created using ThreadPoolExecutorService.

public class KafkaProcessor {

    private final KafkaConsumer<String, String> myConsumer;
    private ExecutorService executor;
    private static final Properties KAFKA_PROPERTIES = new Properties();
    static {
        KAFKA_PROPERTIES.put("bootstrap.servers", "localhost:9092");
        KAFKA_PROPERTIES.put("group.id", "test-consumer-group");
        KAFKA_PROPERTIES.put("enable.auto.commit", "true");
        KAFKA_PROPERTIES.put("auto.commit.interval.ms", "1000");
        KAFKA_PROPERTIES.put("session.timeout.ms", "30000");
        KAFKA_PROPERTIES.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KAFKA_PROPERTIES.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    public KafkaProcessor() {
        this.myConsumer = new KafkaConsumer<>(KAFKA_PROPERTIES);

public void init(int numberOfThreads) {
      //Create a threadpool
      executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 0L, TimeUnit.MILLISECONDS,
      new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());

      while (true) {
        ConsumerRecords<String, String> records = myConsumer.poll(100);
        for (final ConsumerRecord<String, String> record : records) {
        executor.submit(new KafkaRecordHandler(record)); 


public void shutdown() {
        if (myConsumer != null) {
        if (executor != null) {
        try {
          if (executor != null && !executor.awaitTermination(60, TimeUnit.MILLISECONDS)) {
        }catch (InterruptedException e) {

Step 2, Worker Thread(Message/Record Handler) Implementation: Here, we perform further processing of the messages.

public class KafkaRecordHandler implements Runnable {
private ConsumerRecord<String, String> record;

    public KafkaRecordHandler(ConsumerRecord<String, String> record) {
    this.record = record;

    public void run() { // this is where further processing happens
        System.out.println("value = "+record.value());
        System.out.println("Thread id = "+ Thread.currentThread().getId());

The final step is to create a KafkaConsumer (KafkaProcessor) and specify the number of worker threads through the init() method.

public class ConsumerTest {

    public static void main(String[] args) {
      KafkaProcessor processor = new KafkaProcessor();
      try {
      }catch (Exception exp) {

This approach might not be needed/suitable for all scenarios. You have to carefully evaluate the best approach to be used with your Kafka consumer implementation.

