Kafka Consumer Pooling
In this article, take a look at Kafka consumer pooling and see a use case.
Join the DZone community and get the full member experience.Join For Free
Consumer pooling in Kafka? Do we ever need it? This would probably be the first question that comes to mind. After all, the standard Kafka consumption pattern would be to have consumers assign to a known consumer group, and subscribe to topic/s. And let the broker balance the topic partitions across the group.
Thus, inherently consumer sessions are long-running and sticky (I am loosely using the italicized terms) in nature. And by pooling, we generally think of a repetitive usage of a resource for short-term executions (JDBC connection pooling for example).
So let’s discuss a typical use case where a consumer pooling was in fact required for a data solution platform PoC I was working on.
We wanted to design a bi-directional conversational communication channel, leveraging a request-reply pattern on Kafka topic — a 2 phase commit protocol where a manager component would orchestrate a consensus amongst worker components. While there could be other solutions (Zookeeper etc) for such a distributed coordination, we chose Kafka as it was already part of the solution stack.
A simple approach for a request-reply pattern would be: publish to a request topic with a correlationId as the key (or header) — and listen on a reply topic, mapping responses messages by correlationId. Return response from this in-memory map in a wait-notify manner. A standard Kafka consumer listening on the reply topic would suffice here. However, there are 2 things to be noted.
- It needs to be ensured by design, that the reply message is appended to a partition that is assigned to this consumer
- Managing the memory required by the map object and providing a thread safe access to it
The approach what we followed was to avoid using a map for storing results.
- Use a single topic for both request and reply
- When a write is submitted (with correlationId as key), get the publish partition and offset
- Get a consumer assigning with the above write partition and offset
- Poll messages for same correlationId as key
Since the request and reply have same correlationId, we ensure that the messages will be on the same partition. Since reply will always happen after the request publishing, we can safely make the consumer seek to the write offset.
However, this takes us to a different problem — now, we would need to have a forward seeking consumer for every request-reply cycle. And our 2PC protocol will actually have more than 1 request-reply cycles per invocation and there will be concurrent invocations.
That will require a consumer for every 2PC session, and we would not want to create a new Kafka consumer on each request, of course. Hence, use a reusable pool of Kafka consumers.
Apache Commons Pool2
The pooling is developed on top of apache-commons-pool framework. Hence some background on commons-pool framework is a prerequisite for the following sections.
I am not getting into the detail of how to use commons-pool, since there are plenty of articles available on that, as well as to maintain the conciseness of this article.
This implementation discussion mostly focuses on the Kafka consumer aspects that we need to keep in mind, in order to create a pool.
We would only allow assign() invocation and disallow subscribe() on the consumer, so that the pool can have control on the random topic/partition/offset assigned.
This can be achieved by proxying the
KafkaConsumer instance, and bypassing the necessary methods
PooledObjectFactory implementation should return this proxied instance.
PoolableConsumer interface extends the Kafka
Consumer interface by adding commons-pool callback methods. This may not be strictly necessary, however, and a decorator based proxying can also be used.
To keep alive the consumer connections, we would
pause() the consumer on pool checkin, and start a scheduled background thread (at least prior to v0.10.1.0 refer this KIP) that invokes the
poll() method periodically.
On pool checkout, we would
resume() the consumer, and then
unsubscribe() the last assignment.
After checking out a consumer from the pool, we would need to assign the new topic/partitions to the consumer. This would be part of the
The consumer should be closed only when the instance is evicted from the pool. For stale checking on checkout, we can invoke some metadata method on the consumer.
That's all for now. The complete implementation can be found at the following GitHub link. Please feel free to share your candid comments. Thank you!
Opinions expressed by DZone contributors are their own.