DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Building a Real-Time Change Data Capture Pipeline With Debezium, Kafka, and PostgreSQL
  • Event-Driven Microservices: How Kafka and RabbitMQ Power Scalable Systems
  • System Coexistence: Bridging Legacy and Modern Architecture
  • Event-Driven Architectures: Designing Scalable and Resilient Cloud Solutions

Trending

  • Caching 101: Theory, Algorithms, Tools, and Best Practices
  • Designing Fault-Tolerant Messaging Workflows Using State Machine Architecture
  • ITBench, Part 1: Next-Gen Benchmarking for IT Automation Evaluation
  • Next Evolution in Integration: Architecting With Intent Using Model Context Protocol
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Kafka Consumer Pooling

Kafka Consumer Pooling

In this article, take a look at Kafka consumer pooling and see a use case.

By 
Sutanu Dalui user avatar
Sutanu Dalui
·
Oct. 01, 20 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
8.2K Views

Join the DZone community and get the full member experience.

Join For Free

Consumer pooling in Kafka? Do we ever need it? This would probably be the first question that comes to mind. After all, the standard Kafka consumption pattern would be to have consumers assign to a known consumer group, and subscribe to topic/s. And let the broker balance the topic partitions across the group.

Thus, inherently consumer sessions are long-running and sticky (I am loosely using the italicized terms) in nature. And by pooling, we generally think of a repetitive usage of a resource for short-term executions (JDBC connection pooling for example).

Use Case

So let’s discuss a typical use case where a consumer pooling was in fact required for a data solution platform PoC I was working on.

We wanted to design a bi-directional conversational communication channel, leveraging a request-reply pattern on Kafka topic — a 2 phase commit protocol where a manager component would orchestrate a consensus amongst worker components. While there could be other solutions (Zookeeper etc) for such a distributed coordination, we chose Kafka as it was already part of the solution stack.

Request-Reply

A simple approach for a request-reply pattern would be: publish to a request topic with a correlationId as the key (or header) — and listen on a reply topic, mapping responses messages by correlationId. Return response from this in-memory map in a wait-notify manner. A standard Kafka consumer listening on the reply topic would suffice here. However, there are 2 things to be noted.

  1. It needs to be ensured by design, that the reply message is appended to a partition that is assigned to this consumer
  2. Managing the memory required by the map object and providing a thread safe access to it

The approach what we followed was to avoid using a map for storing results.

  • Use a single topic for both request and reply
  • When a write is submitted (with correlationId as key), get the publish partition and offset
  • Get a consumer assigning with the above write partition and offset
  • Poll messages for same correlationId as key

Since the request and reply have same correlationId, we ensure that the messages will be on the same partition. Since reply will always happen after the request publishing, we can safely make the consumer seek to the write offset.


However, this takes us to a different problem — now, we would need to have a forward seeking consumer for every request-reply cycle. And our 2PC protocol will actually have more than 1 request-reply cycles per invocation and there will be concurrent invocations.

That will require a consumer for every 2PC session, and we would not want to create a new Kafka consumer on each request, of course. Hence, use a reusable pool of Kafka consumers.

Apache Commons Pool2

The pooling is developed on top of apache-commons-pool framework. Hence some background on commons-pool framework is a prerequisite for the following sections.

I am not getting into the detail of how to use commons-pool, since there are plenty of articles available on that, as well as to maintain the conciseness of this article.

This implementation discussion mostly focuses on the Kafka consumer aspects that we need to keep in mind, in order to create a pool.

Implementation Approach

We would only allow assign() invocation and disallow subscribe() on the consumer, so that the pool can have control on the random topic/partition/offset assigned.

This can be achieved by proxying the KafkaConsumer instance, and bypassing the necessary methods

Java
 




x
21


 
1
class ConsumerProxy<K,V> implements InvocationHandler {
2

          
3
   private final KafkaConsumer<K, V> instance;
4
   
5
   public ConsumerProxy(Map<String, Object> configs) {
6
      instance = new KafkaConsumer<>(configs);
7
      //.. more
8
   }
9

          
10
   @Override
11
   public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
12
      if(method.getName().equals("close") ||  method.getName().equals("unsubscribe")) {
13
         //skip methods
14
         return Void.TYPE;
15
      }
16
      // .. more
17
      return method.invoke(instance, args);
18
   }
19

          
20

          
21
}



The PooledObjectFactory implementation should return this proxied instance.

Java
 




xxxxxxxxxx
1
18


 
1
class ConsumerPoolFactory<K,V> extends BasePooledObjectFactory<PoolableConsumer<K, V>> {
2

          
3
   private static AtomicInteger n = new AtomicInteger();
4
   
5
   private final Map<String, Object> consumerProperties = new HashMap<>();
6
   @SuppressWarnings("unchecked")
7
   @Override
8
   public PoolableConsumer<K, V> create() throws Exception {
9
      Map<String, Object> props = new HashMap<>(consumerProperties);
10
      String  groupdId = groupPrefix;
11
      if(props.containsKey(ConsumerConfig.GROUP_ID_CONFIG))
12
         groupdId = props.get(ConsumerConfig.GROUP_ID_CONFIG).toString();
13

          
14
      props.put(ConsumerConfig.GROUP_ID_CONFIG, groupdId+"__"+n.getAndIncrement());
15
      return (PoolableConsumer<K, V>) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] { PoolableConsumer.class },
16
            new ConsumerProxy<K, V>(new HashMap<>(props)));
17
   }
18
}



And the PoolableConsumer interface extends the Kafka Consumer interface by adding commons-pool callback methods. This may not be strictly necessary, however, and a decorator based proxying can also be used.

Java
 




xxxxxxxxxx
1


 
1
interface PoolableConsumer<K,V> extends Consumer<K, V> {
2

          
3
   void destroyProxy();
4
   boolean validateProxy();
5
   void activateProxy();
6
   void passivateProxy();
7
}



To keep alive the consumer connections, we would pause() the consumer on pool checkin, and start a scheduled background thread (at least prior to v0.10.1.0 refer this KIP) that invokes the poll() method periodically. 

On pool checkout, we would resume() the consumer, and then unsubscribe() the last assignment.

Java
 




x


 
1
// in the invoke() method of the ConsumerProxy
2
if(method.getName().equals("activateProxy")) {
3
   if (wasPassivated) {
4
      future.cancel(true);
5
      instance.resume(instance.paused());
6
      instance.unsubscribe();
7
      wasPassivated = false;
8
   }
9
   return Void.TYPE;
10
}
11
if(method.getName().equals("passivateProxy")) {
12
   instance.pause(instance.assignment());
13
    if(heartBeatThreadIsEnabled){
14
      future = timerThread.scheduleWithFixedDelay(() ->             instance.poll(Duration.ofMillis(100)), 1000, 1000, TimeUnit.MILLISECONDS);
15
    }
16
   
17
   wasPassivated = true;
18
   return Void.TYPE;
19
}



After checking out a consumer from the pool, we would need to assign the new topic/partitions to the consumer. This would be part of the ObjectPool implementation.

Java
 




xxxxxxxxxx
1
34


 
1
public class KafkaConsumerPool<K, V> extends GenericObjectPool<PoolableConsumer<K, V>> {
2
/**
3
 * Acquire consumer starting from a given offset
4
 * @param maxwait
5
 * @param unit
6
 * @param topicPartitionOffset
7
 * @return
8
 * @throws Exception
9
 */
10
public Consumer<K, V> acquire(long maxwait, TimeUnit unit, Map<TopicPartition, Long> topicPartitionOffset) throws Exception {
11
   PoolableConsumer<K, V> consumer = borrowObject(unit.toMillis(maxwait));
12
   consumer.assign(topicPartitionOffset.keySet());
13
   topicPartitionOffset.keySet().forEach(tp -> {
14
      consumer.seek(tp, topicPartitionOffset.get(tp));
15
   });
16
   return consumer;
17
}
18

          
19
/**
20
 * Acquire consumer starting from beginning offset
21
 * @param maxwait
22
 * @param unit
23
 * @param topicPartitions
24
 * @return
25
 * @throws Exception
26
 */
27
public Consumer<K, V> acquireLatest(long maxwait, TimeUnit unit, TopicPartition... topicPartitions) throws Exception {
28
   PoolableConsumer<K, V> consumer = borrowObject(unit.toMillis(maxwait));
29
   List<TopicPartition> partitionList = Arrays.asList(topicPartitions);
30
   consumer.assign(partitionList);
31
   consumer.seekToEnd(partitionList);
32
   return consumer;
33
}
34
}



The consumer should be closed only when the instance is evicted from the pool. For stale checking on checkout, we can invoke some metadata method on the consumer.

Java
 




xxxxxxxxxx
1
14


 
1
// in the invoke() method of the proxy
2
if(method.getName().equals("destroyProxy")) {
3
   instance.close();
4
   return Void.TYPE;
5
}
6
if(method.getName().equals("validateProxy")) {
7
   try {
8
      instance.listTopics(Duration.ofMillis(1000));
9
      return true;
10
   } catch (Exception e) {
11
      e.printStackTrace();
12
   }  
13
   return false;
14
}



That's all for now. The complete implementation can be found at the following GitHub link. Please feel free to share your candid comments. Thank you!

kafka

Opinions expressed by DZone contributors are their own.

Related

  • Building a Real-Time Change Data Capture Pipeline With Debezium, Kafka, and PostgreSQL
  • Event-Driven Microservices: How Kafka and RabbitMQ Power Scalable Systems
  • System Coexistence: Bridging Legacy and Modern Architecture
  • Event-Driven Architectures: Designing Scalable and Resilient Cloud Solutions

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!