DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations

Trending

  • Health Check Response Format for HTTP APIs
  • What Is mTLS? How To Implement It With Istio
  • Web Development Checklist
  • Essential Architecture Framework: In the World of Overengineering, Being Essential Is the Answer

Trending

  • Health Check Response Format for HTTP APIs
  • What Is mTLS? How To Implement It With Istio
  • Web Development Checklist
  • Essential Architecture Framework: In the World of Overengineering, Being Essential Is the Answer
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Kafka Consumer Pooling

Kafka Consumer Pooling

In this article, take a look at Kafka consumer pooling and see a use case.

Sutanu Dalui user avatar by
Sutanu Dalui
·
Oct. 01, 20 · Tutorial
Like (5)
Save
Tweet
Share
6.23K Views

Join the DZone community and get the full member experience.

Join For Free

Consumer pooling in Kafka? Do we ever need it? This would probably be the first question that comes to mind. After all, the standard Kafka consumption pattern would be to have consumers assign to a known consumer group, and subscribe to topic/s. And let the broker balance the topic partitions across the group.

Thus, inherently consumer sessions are long-running and sticky (I am loosely using the italicized terms) in nature. And by pooling, we generally think of a repetitive usage of a resource for short-term executions (JDBC connection pooling for example).

Use Case

So let’s discuss a typical use case where a consumer pooling was in fact required for a data solution platform PoC I was working on.

We wanted to design a bi-directional conversational communication channel, leveraging a request-reply pattern on Kafka topic — a 2 phase commit protocol where a manager component would orchestrate a consensus amongst worker components. While there could be other solutions (Zookeeper etc) for such a distributed coordination, we chose Kafka as it was already part of the solution stack.

Request-Reply

A simple approach for a request-reply pattern would be: publish to a request topic with a correlationId as the key (or header) — and listen on a reply topic, mapping responses messages by correlationId. Return response from this in-memory map in a wait-notify manner. A standard Kafka consumer listening on the reply topic would suffice here. However, there are 2 things to be noted.

  1. It needs to be ensured by design, that the reply message is appended to a partition that is assigned to this consumer
  2. Managing the memory required by the map object and providing a thread safe access to it

The approach what we followed was to avoid using a map for storing results.

  • Use a single topic for both request and reply
  • When a write is submitted (with correlationId as key), get the publish partition and offset
  • Get a consumer assigning with the above write partition and offset
  • Poll messages for same correlationId as key

Since the request and reply have same correlationId, we ensure that the messages will be on the same partition. Since reply will always happen after the request publishing, we can safely make the consumer seek to the write offset.


However, this takes us to a different problem — now, we would need to have a forward seeking consumer for every request-reply cycle. And our 2PC protocol will actually have more than 1 request-reply cycles per invocation and there will be concurrent invocations.

That will require a consumer for every 2PC session, and we would not want to create a new Kafka consumer on each request, of course. Hence, use a reusable pool of Kafka consumers.

Apache Commons Pool2

The pooling is developed on top of apache-commons-pool framework. Hence some background on commons-pool framework is a prerequisite for the following sections.

I am not getting into the detail of how to use commons-pool, since there are plenty of articles available on that, as well as to maintain the conciseness of this article.

This implementation discussion mostly focuses on the Kafka consumer aspects that we need to keep in mind, in order to create a pool.

Implementation Approach

We would only allow assign() invocation and disallow subscribe() on the consumer, so that the pool can have control on the random topic/partition/offset assigned.

This can be achieved by proxying the KafkaConsumer instance, and bypassing the necessary methods

Java
 




x
21


 
1
class ConsumerProxy<K,V> implements InvocationHandler {
2

          
3
   private final KafkaConsumer<K, V> instance;
4
   
5
   public ConsumerProxy(Map<String, Object> configs) {
6
      instance = new KafkaConsumer<>(configs);
7
      //.. more
8
   }
9

          
10
   @Override
11
   public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
12
      if(method.getName().equals("close") ||  method.getName().equals("unsubscribe")) {
13
         //skip methods
14
         return Void.TYPE;
15
      }
16
      // .. more
17
      return method.invoke(instance, args);
18
   }
19

          
20

          
21
}



The PooledObjectFactory implementation should return this proxied instance.

Java
 




xxxxxxxxxx
1
18


 
1
class ConsumerPoolFactory<K,V> extends BasePooledObjectFactory<PoolableConsumer<K, V>> {
2

          
3
   private static AtomicInteger n = new AtomicInteger();
4
   
5
   private final Map<String, Object> consumerProperties = new HashMap<>();
6
   @SuppressWarnings("unchecked")
7
   @Override
8
   public PoolableConsumer<K, V> create() throws Exception {
9
      Map<String, Object> props = new HashMap<>(consumerProperties);
10
      String  groupdId = groupPrefix;
11
      if(props.containsKey(ConsumerConfig.GROUP_ID_CONFIG))
12
         groupdId = props.get(ConsumerConfig.GROUP_ID_CONFIG).toString();
13

          
14
      props.put(ConsumerConfig.GROUP_ID_CONFIG, groupdId+"__"+n.getAndIncrement());
15
      return (PoolableConsumer<K, V>) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] { PoolableConsumer.class },
16
            new ConsumerProxy<K, V>(new HashMap<>(props)));
17
   }
18
}



And the PoolableConsumer interface extends the Kafka Consumer interface by adding commons-pool callback methods. This may not be strictly necessary, however, and a decorator based proxying can also be used.

Java
 




xxxxxxxxxx
1


 
1
interface PoolableConsumer<K,V> extends Consumer<K, V> {
2

          
3
   void destroyProxy();
4
   boolean validateProxy();
5
   void activateProxy();
6
   void passivateProxy();
7
}



To keep alive the consumer connections, we would pause() the consumer on pool checkin, and start a scheduled background thread (at least prior to v0.10.1.0 refer this KIP) that invokes the poll() method periodically. 

On pool checkout, we would resume() the consumer, and then unsubscribe() the last assignment.

Java
 




x


 
1
// in the invoke() method of the ConsumerProxy
2
if(method.getName().equals("activateProxy")) {
3
   if (wasPassivated) {
4
      future.cancel(true);
5
      instance.resume(instance.paused());
6
      instance.unsubscribe();
7
      wasPassivated = false;
8
   }
9
   return Void.TYPE;
10
}
11
if(method.getName().equals("passivateProxy")) {
12
   instance.pause(instance.assignment());
13
    if(heartBeatThreadIsEnabled){
14
      future = timerThread.scheduleWithFixedDelay(() ->             instance.poll(Duration.ofMillis(100)), 1000, 1000, TimeUnit.MILLISECONDS);
15
    }
16
   
17
   wasPassivated = true;
18
   return Void.TYPE;
19
}



After checking out a consumer from the pool, we would need to assign the new topic/partitions to the consumer. This would be part of the ObjectPool implementation.

Java
 




xxxxxxxxxx
1
34


 
1
public class KafkaConsumerPool<K, V> extends GenericObjectPool<PoolableConsumer<K, V>> {
2
/**
3
 * Acquire consumer starting from a given offset
4
 * @param maxwait
5
 * @param unit
6
 * @param topicPartitionOffset
7
 * @return
8
 * @throws Exception
9
 */
10
public Consumer<K, V> acquire(long maxwait, TimeUnit unit, Map<TopicPartition, Long> topicPartitionOffset) throws Exception {
11
   PoolableConsumer<K, V> consumer = borrowObject(unit.toMillis(maxwait));
12
   consumer.assign(topicPartitionOffset.keySet());
13
   topicPartitionOffset.keySet().forEach(tp -> {
14
      consumer.seek(tp, topicPartitionOffset.get(tp));
15
   });
16
   return consumer;
17
}
18

          
19
/**
20
 * Acquire consumer starting from beginning offset
21
 * @param maxwait
22
 * @param unit
23
 * @param topicPartitions
24
 * @return
25
 * @throws Exception
26
 */
27
public Consumer<K, V> acquireLatest(long maxwait, TimeUnit unit, TopicPartition... topicPartitions) throws Exception {
28
   PoolableConsumer<K, V> consumer = borrowObject(unit.toMillis(maxwait));
29
   List<TopicPartition> partitionList = Arrays.asList(topicPartitions);
30
   consumer.assign(partitionList);
31
   consumer.seekToEnd(partitionList);
32
   return consumer;
33
}
34
}



The consumer should be closed only when the instance is evicted from the pool. For stale checking on checkout, we can invoke some metadata method on the consumer.

Java
 




xxxxxxxxxx
1
14


 
1
// in the invoke() method of the proxy
2
if(method.getName().equals("destroyProxy")) {
3
   instance.close();
4
   return Void.TYPE;
5
}
6
if(method.getName().equals("validateProxy")) {
7
   try {
8
      instance.listTopics(Duration.ofMillis(1000));
9
      return true;
10
   } catch (Exception e) {
11
      e.printStackTrace();
12
   }  
13
   return false;
14
}



That's all for now. The complete implementation can be found at the following GitHub link. Please feel free to share your candid comments. Thank you!

kafka

Opinions expressed by DZone contributors are their own.

Trending

  • Health Check Response Format for HTTP APIs
  • What Is mTLS? How To Implement It With Istio
  • Web Development Checklist
  • Essential Architecture Framework: In the World of Overengineering, Being Essential Is the Answer

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com

Let's be friends: