In our previous blog, we talked about monitoring Kafka as a broker service, looking at ways to think about disk utilization and replication problems. But the Kafka brokers sit in the middle of an ecosystem, with Kafka producers on one side writing data, and Kafka consumers on the other side reading data. In this post, we will dive into the consumer side of this application ecosystem, which means looking closely at Kafka consumer group monitoring.
Kafka as a broker service has a very simple API, and could practically be used with many kinds of applications and application architectures leveraging the brokers for i/o and queueing messages. However, it turns out that there is a common architecture pattern: a group of application nodes collaborates to consume messages, often scaling out as message volume goes up, and handling the scenario where nodes crash or drop out. This pattern generally keeps the data and messages flowing with reliability and with certainty even as application nodes come and go.
There’s also a reference implementation for this architecture based on decades of hard won experience with high performance distributed systems, called the Kafka Consumer Group. This reference implementation is shipped with Apache Kafka as a JAR and is well documented, although it is possible to implement a Consumer Group application in any language.
A Consumer Group’s Relationship to Partitions
While the Consumer Group uses the broker APIs, it is more of an application pattern or a set of behaviors embedded into your application. The Kafka brokers are an important part of the puzzle but do not provide the Consumer Group behavior directly. A Consumer Group based application may run on several nodes, and when they start up they coordinate with each other in order to split up the work. This is slightly imperfect because the work, in this case, is a set of partitions defined by the Producer. Each Consumer node can read a partition and one can split up the partitions to match the number of consumer nodes as needed. If the number of Consumer Group nodes is more than the number of partitions, the excess nodes remain idle. This might be desirable to handle failover. If there are more partitions than Consumer Group nodes, then some nodes will be reading more than one partition.
Reading Multiple Partitions on One Node
There are a couple of tricky things to consider as one designs a Consumer Group. If a consumer node takes multiple partitions or ends up taking multiple partitions on failover, those partitions will appear intermingled, if viewed as a single stream of messages. So a Consumer Group application could get row #100 from partition 3, then row #90 from partition 4, then back to partition 3 for row #101. Nothing in Kafka can guarantee order across partitions, as only messages within a partition are in order. So either order should not matter to the consumer application, or the consumer application is able to order these partitions by splitting the stream appropriately.
Multiple Topics Within a Consumer Group
The other tricky design consideration is that each member of a Consumer Group may subscribe to some, but not all, of the topics being handled in the group. This makes thinking about distribution a little complex. In a simple case of a Consumer Group handling one and only one topic, all nodes would subscribe to that topic and distribution of work would be uniform. If there are two topics, and only some nodes subscribe to Topic-1, then those Topic-1 partitions will only be assigned to the subscribing nodes, and if one goes down it will be reassigned only to one of the remaining subscribing nodes, if there are any. Think of this Consumer Group design like a group of groups, where each subgroup is pooled and balanced independently.
The Rebalancing Phase
As nodes in a Consumer Group come and go, the running nodes decide how to divide up the partitions. In the reference implementation, each partition is assigned one owner in a rebalancing phase. Rebalancing triggers under different circumstances, but think of it as the phase that happens when an application scales up and down. When an application crashes, all the well-behaved nodes stop work, unsubscribe from their partitions, and their former partitions will be available to be reassigned. Those well-behaved nodes will then wait for all the partitions to reach this state. The less-well-behaved nodes, such as the one that suddenly crashed, will of course not unsubscribe to their partitions.
In this failure case, where some nodes are waiting patiently and some other nodes are gone, wedged, or otherwise non-responsive, two timeouts start ticking. One is a timeout for the Kafka client, which might be something like zookeeper.session.timeout.ms. This is a heartbeat window which is used for detecting that a node hasn’t reported back in a timely manner. This is tested all the time and used to evict bad nodes. The other timeout is rebalance.backoff.ms * rebalance.max.retries.
This is the largest window allowed for the rebalancing phase, where clients are not reading anything from Kafka. But if this window is smaller than the Kafka client session timer, rebalancing could fail due to a crashed node and you’d have a stopped Consumer Group. And if the Kafka client session timer is too small, you could evict application nodes by mistake and trigger unnecessary rebalancing. So thinking carefully about these two timeout windows is necessary to keep your application running well.
Keep an eye out for Part II, coming soon!