Does Kafka Really Guarantee the Order of Messages?
Let's find out.
Join the DZone community and get the full member experience.Join For Free
Messages sent by a producer to a particular topic partition will be appended in the order they are sent.
In this post, I would like to show you the situation when guarantees from the documentation are no longer true, even if you are using a default configuration. Let’s first review how Kafka keeps data internally.
How It Works
As you can see on the above picture, which comes from the documentation, Kafka uses topics that are simply names of “feeds” to which records are published. You can also notice that for each topic, Kafka maintains a partitioned log, which is described in the official documentation as:
Each partition is an ordered, immutable sequence of records that is continually appended to — a structured commit log.
So, from our point of view, the most important thing to remember is that Kafka preserves the order of messages within a partition.
For some use cases, preserving the ordering of messages can be very important from a business point of view. For example, let’s say that you are working on some kind of banking project where you are communicating through Kafka. Keeping correct order for operations in a bank account is very important.
Otherwise, if messages will be consumed in the wrong order, then the user of our application may end up with failure during the transaction because the withdrawal message will arrive before the message with deposit, and the transaction won’t be able to be processed.
You may also like: A Kafka Tutorial for Everyone, no Matter Your Stage in Development.
KIP-91 Intuitive User Timeouts in The Producer
Some time ago, there was a change of the Kafka producer implementation proposed called: Intuitive User Timeouts in The Producer. Mainly, this change was introduced in order to wrap all available settings for send timeouts into one: delivery.timeout.ms.
But this proposal introduced another small change, which was documented as: Change the default value of retries to MAX_INT in the proposal document, which didn’t look like something important to remember.
These changes were released in Kafka producer in version 2.1.0, so this also had an impact on Alpakka Kafka Connector since release 1.0. If you are carefully reading Apache Kafka upgrade notes, you could notice that this whole change was described as:
The default value for the producer’s retries config was changed to Integer.MAX_VALUE, as we introduced delivery.timeout.ms in KIP-91, which sets an upper bound on the total time between sending a record and receiving acknowledgement from the broker. By default, the delivery timeout is set to 2 minutes.
Handling Failures — Retry Mechanism
Since Kafka 2.1.0 when a message will not be acknowledged by the broker, then it will be resent again by the producer, by default, at most 2147483647 (MAX_INT) times or until delivery.timeout.ms expires (2 minutes by default), and this is new behaviour introduced in KIP-91. But this is not so important in this case. The most important information is buried in the description of retries parameter:
Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first.
And default value for max.in.flight.requests.per.connection is 5. So as you can expect, in case of failure when a record is not acknowledged by broker, producer may send records which very likely will be stored in the wrong order and this is normal behavior of Kafka producer, so by default, Kafka doesn’t guarantee that messages sent by a producer to a particular topic partition will be appended in the order they are sent.
This behavior can be easily fixed by just changing one producer setting max.in.flight.requests.per.connection to 1 but be aware that changing this property can impact producer throughput. You can also set enable.idempotence=true without the need to decreasing max.in.flight.requests.per.connection but you need to adjust other settings according to documentation:
Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0 and acks must be ‘all’. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a ConfigException will be thrown.
This will work because of the reason how idempotency is implemented by the producer. Without going too much into details, there is a sequence number maintained by the producer that is sent to Kafka with every message and if this sequence number is exactly 1 more than the last one then the message will be stored in Kafka.
More about how this works you can read in Adam Warski blog post: What does Kafka’s exactly-once processing really mean?. It is worth to know that default configuration can lead to producing messages in the wrong order when a failure happens, and if message order is important for your application, you can have a lot of trouble because someone told you about the guarantees that as you can see are not always true.
Published at DZone with permission of Kamil Charlampowicz. See the original article here.
Opinions expressed by DZone contributors are their own.