[This article was written by Neha Narkhede.]
I am very excited to tell you about the forthcoming 0.8.2 release of Apache Kafka. Kafka is a fault-tolerant, low-latency, high-throughput distributed messaging system used in data pipelines at several companies. Kafka became a top-level Apache project in 2012 and was originally created at LinkedIn, where it forms a critical part of LinkedIn’s infrastructure and transmits data to all systems and applications. The project is currently under active development from a diverse group of contributors.
Since there are many new features in 0.8.2, we released 0.8.2-beta. The final release will be done when 0.8.2 is stable.
Here is a quick overview of the notable work in this release.
The JVM clients that Kafka ships haven’t changed much since Kafka was originally built. Over time, we have realized some of the limitations and problems that came both from the design of these clients and their APIs. We are in the process of rewriting both the producer and consumer Java clients. The producer is complete and will be included in 0.8.2.
At a high level, the primary difference in this producer is that it removes the distinction between the “sync” and “async” producer. Effectively, all requests are sent asynchronously but always return a future response object that returns the offset as well as any error that may have occurred when the request is complete.
The batching that is done only in the async producer today is done whenever possible now. This means that the sync producer, under load, can get performance as good as the async producer. This works similarly to group commit in databases but with respect to the actual network transmission – any messages that arrive while a send is in progress are batched together. It is also possible to encourage batching even under low load to save server resources by introducing a delay on the send to allow more messages to accumulate; this is done using the linger.ms config (similar to Nagle’s algorithms in TCP). This new producer does all network communication asynchronously and in parallel to all servers so the performance penalty for acks=-1 and waiting on replication should be much reduced.
The new Java clients will be protocol compatible with any 0.8.x Kafka server. The API docs for the new producer can be found here.
This is our third try releasing a working delete topic feature. You would think deleting the data would be easy and not deleting it would be the hard part. But each of our previous attempts had flaws that appeared with usage. This time significant testing has been done and we are hopeful that your data will really be gone.
One of the expensive operations a messaging system has to perform is keeping track of what messages are consumed. Kafka does this using “offsets”, a marker it stores for the consumer’s position in the log of messages. Previously this marker was stored by the JVM client in ZooKeeper. The problem with this usage is that for consumers that want to update their position frequently this can become a bottleneck. Zookeeper writes are expensive, and can’t be scaled horizontally. In 0.8.2, we added native offset storage functionality and a new API for making use of this. This makes offset storage horizontally scalable.
Internally the implementation of the offset storage is just a compacted Kafka topic (
__consumer_offsets) keyed on the consumer’s group, topic, and partition. The offset commit request writes the offset to the compacted Kafka topic using the highest level of durability guarantee that Kafka provides (
acks=-1) so that offsets are never lost in the presence of uncorrelated failures. Kafka maintains an in-memory view of the latest offset per <consumer group, topic, partition> triplet, so offset fetch requests can be served quickly without requiring a full scan of the compacted offsets topic. With this feature, consumers can checkpoint offsets very often, possibly per message.
Automated leader rebalancing
In Kafka, the leader replica for a partition does the reading and writing of messages. For an efficient load distribution, it is important to ensure that the leader replicas are evenly distributed amongst the brokers in a cluster.
When a topic is created, Kafka’s replica placement strategy places the preferred replica (first replica in the list of replicas) for every partition evenly across the cluster. This ensures that as long as the leader lives on the preferred replica for a majority of partitions, it will lead to an even distribution of leaders in a cluster. When brokers are bounced or failures occur, leaders automatically failover to a different live replica. Over time, this may lead to an uneven distribution of leaders in a cluster with some brokers serving more data compared to others.
In Kafka 0.8.1, we added an experimental feature that automatically detects such leader imbalance and periodically triggers leader re-election to maintain an even distribution of leaders by moving leaders back to the preferred replica if alive. Since then, the feature has been tested and is ready for broader usage. Previously, this was purely an admin operation that could be triggered via the
kafka-preferred-replica-election.sh command that ships with Kafka. This feature greatly improves the operability of Kafka.
In Kafka 0.8.0, we added a configurable controlled shutdown feature (
controlled.shutdown.enable) that reduces partition unavailability when brokers are bounced for upgrades or routine maintenance. The way this works is that if a Kafka broker receives a request to shutdown and detects that controlled shutdown is enabled, it moves the leaders from itself to other alive brokers in the cluster, before shutting down. If controlled shutdown is disabled, the broker shuts down immediately and, from that time until the time the controller elects a new leader for all partitions whose leaders lived on the dead broker, those partitions are unavailable. Depending on the size of the cluster and the number of topic partitions, this time could be significant. For planned broker restarts, it is desirable to move the leaders proactively so partitions are always writable even when individual brokers are temporarily unavailable. Previously, this feature didn’t work well with automatic leader rebalancing and also with single replica partitions, but those problems are fixed in 0.8.2, so controlled shutdown should be enabled by default at all times on a production Kafka cluster.
Stronger durability guarantees
We added a few features that improve the durability guarantees provided by Kafka.
- The first is the ability to turn off unclean leader election, which means moving the leader to a replica that is not one of the in-sync replicas. Some replicas, in most cases all, that are caught up to the leader are part of the in-sync replica set and hence are safe contenders for being the next leader if the current leader dies. Prior to this feature, if a leader fails and there is no replica in the in-sync replica set, Kafka moved the leader to any available replica. Such a replica may not have caught up to the previous leader and hence when it becomes the leader, it’s log might miss some messages that were previously committed. By turning unclean.leader.election to false, essentially you are picking durability over availability since Kafka would avoid electing a leader and instead make the partition unavailable if no in-sync replica is available to become the next leader safely.
- Another feature, contributed by Gwen Shapira, that increases the durability of writes is, what we call, min.isr support. In addition to acks=-1 which means waiting on the producer acknowledgement until the messages are committed by all the in-sync replicas, now you can also specify the minimum number of in-sync replicas that must be available to take the write safely. In other words, if 2 out of 3 replicas fail and the in-sync replica set reduces to 1, if you set min.isr to 2, the Kafka broker rejects the producer’s writes since it doesn’t meet the producer’s requested durability level. This setting only takes effect if the producer uses required.acks=-1 and guarantees that the message will be acknowledged by at least this many in-sync replicas. This setting offers a trade-off between consistency and availability. A higher setting for minimum ISR size guarantees better consistency since the message is guaranteed to be written to more replicas which reduces the probability that it will be lost. However, it reduces availability since the partition will be unavailable for writes if the number of in-sync replicas drops below the minimum threshold.
Before 0.8.2, a single application could create an unbounded number of connections to a Kafka broker. This causes a broker to run out of open file handles and effectively renders the broker unusable it is restarted. In 0.8.2, we added a server side config (
max.connections.per.ip) to control the number of socket connections per client IP.
We plan to release 0.8.2 towards the end of this year. Meanwhile, you can find 0.8.2-beta here.
Even after all this work, there’s still a lot to be done. In our forthcoming releases, we plan to focus on usability and operability. This work includes:
If you enjoy working on Kafka and would like to do so full time, we are hiring at Confluent!