Jump over to Part I if you missed it!
Looking a little deeper into rebalancing, one might wonder how these assignments between clients and partitions happen. This turns out to be an interesting area for Kafka’s roadmap, as you can imagine different strategies for assigning work can be quite useful to different kinds of applications. One might, for example, have specialist nodes that are better for some kinds of work within the group, and it might be nice to try to push the right data to them. Today that can be done at the Topic level, as mentioned above, essentially dividing a Consumer Group into a bunch of subgroups.
Or one might want some assignment that results in uniform workloads, based on the number of messages in each partition. But until we have pluggable assignment functions, the reference implementation has a straightforward assignment strategy called Range Assignment. There is also a newer Round Robin assignor which is useful for applications like Mirror Maker, but most applications just use the default assignment algorithm.
The Range Assignor tries to land on a uniform distribution of partitions, at least within each topic, while at the same time avoiding the need to coordinate and bargain between nodes. This last goal, independent assignment, is done by each node executing a fixed algorithm: sort the partitions, sort the consumers, then for each topic take same-sized ranges of partitions for each consumer. Where the sizes cannot be the same, the consumers at the beginning of the sorted list will end up with one extra partition. With this algorithm, each application node can see the entire layout by itself, and from there take up the right assignments.
Let’s look at an example from comments in the source code:
*Forexample,suppose there are two consumers C0 andC1,two topics t0 andt1,andeachtopic has3partitions, *resulting inpartitions t0p0,t0p1,t0p2,t1p0,t1p1,andt1p2. *The assignment will be: *C0:[t0p0,t0p1,t1p0,t1p1] *C1:[t0p2,t1p2]
Notice that each topic is broken up into ranges regardless of the other topics, so the first application node, in this case, ends up with one extra partition from Topic-1 and one extra partition from Topic-2. This could be twice the work for our unbalanced node, as it has four partitions while the second node has only two. But if a third node were added, everything would become perfectly balanced, as each node would have one partition from each topic. And if a fourth node were added, you’d have one idle node doing nothing, because no topic has four partitions.
You might be wondering at this point where all of these assignment decisions are stored. In earlier versions of the Consumer Group reference implementation, Zookeeper was used to store all of this kind of meta data. Since then, newer versions of Kafka have a set of APIs to support storing Consumer Group metadata in the brokers themselves. Each Consumer Group can sync up with one of the brokers that will take on the role of Coordinator for that group.
While all the decision making is still down in the application nodes, the Coordinator can fulfill a JoinGroup request and supply metadata about the Consumer Group, like assignments and offsets. This Coordinator node is also responsible for the heartbeat timer mentioned above, so if the Consumer Group application node that is leading group decisions disappears, the Coordinator could kick everyone out and essentially require the Consumer Group to be reformed by the remaining nodes. An important part of Consumer Group behavior, then, is electing leader nodes and working with the Coordinator to read and write metadata about assignments and partitions.
This is a lot of complex behavior that you “get for free” when you use a Consumer Group, so it is important to understand not just how to configure and set up your application, but also how to get operational insight into the various systems. To cover the application ecosystem end to end, you must monitor at least Zookeeper, Brokers / Coordinators, Producers, and Consumers. Zookeeper is at least used to bootstrap everything else, but often is also used to store Consumer Group assignments and offset updates. Brokers / Coordinators must be fully functional of course as every message must pass through them. It is possible to see Brokers in a degraded state while the Producers and Consumers are working correctly, but it typically cannot last this way for a long time without eventually starting to impact throughput or error rates at least on the Producers.
Monitoring Producers is like monitoring a simpler Kafka application, which just wants to write to a partition. And we can see Producer behavior holistically from the Consumer’s point of view, as it is possible to tell from Broker metadata how much data is being added to each of the partitions under a Consumer Group. So even though the Producers are not necessarily coordinated or aware of a Consumer Group, the Consumer Group can naturally tell if Producers have sudden spikes or drops in traffic.
Lag as a KPI
Just by looking at the metadata of a Consumer Group, we can determine a few key metrics: how many messages are being written to the partitions within the group, how many messages are being read from those partitions, and what is the difference? The difference is called Lag, and represents how far the Consumer Group application is behind the producers. Producer offsets are kept in the Kafka Broker in charge of that partition, which can tell you the last offset in the partition. Consumer offsets are kept either in Zookeeper or the Kafka Coordinator, and tell you the most recently read offset in each partition.
Note that these offsets are eventually consistent, and synchronized on different heartbeats by different application clusters, so they may not make perfect sense at all times. For example, you could counterintuitively have a Consumer offset that is greater than a Producer offset, but if you waited another heartbeat cycle or two and then updated the Producer offset, it should normally be ahead of the previous Consumer offset. In aggregate, total application lag is the sum of all the partition lags. For a normal Consumer Group, lag should be close to zero or at least somewhat flat and stable, which would mean the application is keeping up with the producers. Total lag is the number of messages behind real time. For an application that wants to be near real time it is important to monitor lag as a key performance indicator, and to drive lag down.
Monitoring Consumer Lag With OpsClarity
As you can see, the mechanics of consumer lag and monitoring can be complex and difficult. Most monitoring solutions offer Kafka Broker monitoring and leave it to the user to collect application metrics around Consumer Groups. In the next blog, we’ll look at Consumer Group monitoring with open source solutions like Burrow, and compare to how we monitor Kafka at OpsClarity. OpsClarity has automated monitoring of the entire Kafka ecosystem, from Producers to Brokers to Consumer Groups, integrated with surrounding systems critical to your application.