Kafka Consumer Delivery Semantics
Everything you need to know about consumer delivery semantics.
Join the DZone community and get the full member experience.Join For Free
This article is a continuation of part 1 Kafka technical overview, part 2 Kafka producer overview, part 3 Kafka producer delivery semantics and part 4 Kafka consumer overview. Let's understand different consumer configurations and consumer delivery semantics.
To read records from Kafka topic, create an instance of Kafka consumer and subscribe to one or more of Kafka topics. You can subscribe to a list of topics using regular expressions, for example,
Properties props = new Properties(); props.put("bootstrap.servers", "broker1:9092,broker2:9092"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe("myTopic.*");
Consumers read data from Kafka by polling for new data. The poll method takes care of all coordination like partition rebalancing, heartbeat, and data fetching. When auto-commit is set to true poll method not only reads data but also commits the offsets and then reads the next batch of record as well.
You may also like: A Kafka Tutorial for Everyone, no Matter Your Stage in Development.
Kafka consumer behavior is configurable through the following properties. These properties are passed as key-value pairs when consumer instance is created.
This defines how offsets are committed to Kafka — by default
enable.auto.commit is set to true. When this property is set to true, you may also want to set how frequent offsets should be committed using
auto.commit.interval.ms is set to 5,000ms (5 seconds). When
enable.auto.commit is set to true, consumer delivery semantics is "At most once," and commits are async. Key points:
- Enable.auto.commit = true (default).
- auto.commit.interval.ms = 5,000ms (default).
- At most once delivery semantic.
- Commits are async when
In the previous article Kafka consumer overview, we learned that consumers in a consumer group are assigned different partitions. The partitions are assigned to consumers based on
PartitionAssignor is a class that defines the required interface for the assignment strategy. Kafka comes inbuilt with
RoundRobinAssignor, supporting Range and Round Robin strategy respectively.
Range strategy: In range strategy, partitions are assigned in ranges to consumers. For example, if there are seven partitions in two topics each, consumed by two consumers, then range strategy assigns the first four partitions (0 – 3) to the first consumer from both topics and three partitions (4 – 6) from both topics to the second consumer. The partitions are unevenly assigned, with first consumer processing 8 partitions and second consumer processing only 6 partitions. By default
partition.assignment.streatergy is set to
Round-robin strategy: In round-robin strategy partitions are assigned to the consumer in a round-robin fashion resulting in even distribution of partitions to the consumer. For example, if there are seven partitions in two topics each consumed by two consumers, then round-robin strategy assigns four partitions (0, 2, 4, 6) of first topic and three partitions (1,3,5) of the second topic to first consumer and three partitions (1,3,5) of first topic and four partitions (0, 2, 4, 6) of the second topic to the second consumer. Key points:
- partition.assignment.strategy –decides how partitions are assigned to consumers.
- Range strategy (RangerAssignor) is the default.
- Range strategy may result in an uneven assignment.
Defines a minimum number of bytes required to send data from Kafka to the consumer. When Consumer polls for data, if the minimum number of bytes is not reached, then Kafka waits until the pre-defined size is reached and then sends the data.
The default value is set to 1MB. By increasing the
fetch.min.bytes load on both consumer and broker are reduced increasing both latency and throughput. When the messages are too many and small resulting in higher CPU consumption, it’s better to increase
Defines max time to wait before sending data from Kafka to the consumer. When
fetch.min.bytes control minimum bytes required, sometime minimum bytes may not be reached even for a long time and to keep a balance on how long Kafka should wait before sending data
fetch.max.wait.ms is used. Default value of
fetch.max.wait.ms is 500ms (.5 seconds). Increasing this value will increase latency and throughput of the application, define both
fetch.max.wait.ms based on SLA.
Defines how long a consumer can be out of contact with the broker. While
heartbeat.interval.ms defines how often poll method should send a heartbeat,
session.timeout.ms defines how long consumers can be out of contact with the broker. When session times out consumer is considered lost and rebalance is triggered.
To avoid this from happening often it’s better to set
heartbeat.interval.ms value three times higher than
session.timeout.ms. By setting a higher value you can avoid unwanted rebalancing and other overheads associated with it.
Defines max bytes per partitions to be sent from broker to consumer. By default value is set to 1 MB.
max.partitions.getch.bytes will decide the memory required per consumer to receive the message.
Defines the number of records to be returned for a single
poll() call. Helps control number of records to be processed per poll method call.
When reading from the broker for the first time, as Kafka may not have any committed offset value, this property defines where to start reading from. You could set “earliest” or “latest”, while “earliest” will read all messages from the beginning “latest” will read only new messages after a consumer has subscribed to the topic. The default value of
auto.offset.reset is “latest.”
As stated in earlier article Kafka producer delivery semantics there are three delivery semantics namely At most once, At least once and Exactly once. When data is consumed from Kafka by Consumer group/consumer, only "At least once" and "At most once" semantics are supported. You could still achieve output similar to exactly once by choosing a suitable data store that writes by a unique key. For example, any key-value store, RDBMS (primary key), elastic search or any other store that supports idempotent write.
At Most Once
In at most once delivery semantics a message should be delivered maximum only once. It's acceptable to lose a message rather than delivering a message twice in this semantic. Applications adopting at most semantics can easily achieve higher throughput and low latency. By default, Kafka consumers are set to use “At most once” delivery semantics as “enable.auto.commit” is true.
In case consumer fails after messages are committed as read but before processing them, the unprocessed messages are lost and will not be read again. Partition rebalancing will result in another consumer reading messages from last committed offset. As shown in the diagram below, messages are read in batches and some or all of the messages in the batch might be unprocessed but still committed as processed.
At Least Once
In at least once delivery semantics it is acceptable to deliver a message more than once but no message should be lost. The consumer ensures that all messages are read and processed for sure even though it may result in message duplication. This is mostly preferred semantics out of all. Applications adopting at least once semantics may have moderate throughput and moderate latency. By setting “enable.auto.commit” value to “false” you can manually commit after the messages are processed.
In case consumer fails before processing them, the unprocessed messages are not lost as the offsets are not committed as read. Partition rebalancing will result in another consumer reading the same messages again from last committed offset resulting in duplicate messages. As shown in the diagram below, messages are read in batches and some or all of the messages in the batch might be processed again but no messages will be lost.
In exactly-once delivery semantics, a message must be delivered only once and no message should be lost. This is the most difficult delivery semantic of all. Applications adopting exactly once semantics may have lower throughput and higher latency compared other 2 semantics. As stated earlier you could still achieve output similar to exactly once by choosing suitable data store that writes by a unique key. For example any key-value store, RDBMS (primary key), elastic search or any other store that supports idempotent write.
Configure Kafka consumer to achieve desired performance and delivery semantics based on the following properties.
Kafka consumer supports only At most once and At least once delivery semantics.
Opinions expressed by DZone contributors are their own.