{{announcement.body}}
{{announcement.title}}

Kafka Consumer Delivery Semantics

DZone 's Guide to

Kafka Consumer Delivery Semantics

Everything you need to know about consumer delivery semantics.

· Big Data Zone ·
Free Resource

people-walking-in-new-york

This article is a continuation of part 1 Kafka technical overview, part 2 Kafka producer overview, part 3 Kafka producer delivery semantics and part 4 Kafka consumer overview. Let's understand different consumer configurations and consumer delivery semantics.

Subscribe

To read records from Kafka topic, create an instance of Kafka consumer and subscribe to one or more of Kafka topics. You can subscribe to a list of topics using regular expressions, for example,  myTopic.*.

Properties props = new Properties();

props.put("bootstrap.servers", "broker1:9092,broker2:9092");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String,

String>(props);

consumer.subscribe("myTopic.*");    


Poll Method

Consumers read data from Kafka by polling for new data. The poll method takes care of all coordination like partition rebalancing, heartbeat, and data fetching. When auto-commit is set to true poll method not only reads data but also commits the offsets and then reads the next batch of record as well.

Kafka consumer poll method

Kafka consumer poll method


You may also like: A Kafka Tutorial for Everyone, no Matter Your Stage in Development.

Consumer Configurations

Kafka consumer behavior is configurable through the following properties. These properties are passed as key-value pairs when consumer instance is created.

Enable.auto.commit

This defines how offsets are committed to Kafka — by default  enable.auto.commit is set to true. When this property is set to true, you may also want to set how frequent offsets should be committed using  auto.commit.interval.ms.

By default, auto.commit.interval.ms is set to 5,000ms (5 seconds). When  enable.auto.commit is set to true, consumer delivery semantics is "At most once," and commits are async. Key points:

  • Enable.auto.commit = true (default).
  • auto.commit.interval.ms = 5,000ms (default).
  • At most once delivery semantic.
  • Commits are async when  enable.auto.commit is true.

Partition.assignment.stratergy

In the previous article Kafka consumer overview, we learned that consumers in a consumer group are assigned different partitions. The partitions are assigned to consumers based on partition.assignment.strategy property.  PartitionAssignor is a class that defines the required interface for the assignment strategy. Kafka comes inbuilt with RangeAssignor and  RoundRobinAssignorsupporting Range and Round Robin strategy respectively.

Range strategy: In range strategy, partitions are assigned in ranges to consumers. For example, if there are seven partitions in two topics each, consumed by two consumers, then range strategy assigns the first four partitions (0 – 3) to the first consumer from both topics and three partitions (4 – 6) from both topics to the second consumer. The partitions are unevenly assigned, with first consumer processing 8 partitions and second consumer processing only 6 partitions. By default  partition.assignment.streatergy is set to RangeAssignor.

Round-robin strategy: In round-robin strategy partitions are assigned to the consumer in a round-robin fashion resulting in even distribution of partitions to the consumer. For example, if there are seven partitions in two topics each consumed by two consumers, then round-robin strategy assigns four partitions (0, 2, 4, 6) of first topic and three partitions (1,3,5) of the second topic to first consumer and three partitions (1,3,5) of first topic and four partitions (0, 2, 4, 6) of the second topic to the second consumer. Key points:

  • partition.assignment.strategy –decides how partitions are assigned to consumers.
  • Range strategy (RangerAssignor) is the default.
  • Range strategy may result in an uneven assignment.

Fetch.min.bytes

Defines a minimum number of bytes required to send data from Kafka to the consumer. When Consumer polls for data, if the minimum number of bytes is not reached, then Kafka waits until the pre-defined size is reached and then sends the data.

The default value is set to 1MB. By increasing the fetch.min.bytes load on both consumer and broker are reduced increasing both latency and throughput. When the messages are too many and small resulting in higher CPU consumption, it’s better to increase fetch.min.bytes value.

Fetch.max.wait.ms

Defines max time to wait before sending data from Kafka to the consumer. When  fetch.min.bytes control minimum bytes required, sometime minimum bytes may not be reached even for a long time and to keep a balance on how long Kafka should wait before sending data  fetch.max.wait.ms is used. Default value of  fetch.max.wait.ms is 500ms (.5 seconds). Increasing this value will increase latency and throughput of the application, define both  fetch.min.bytes and  fetch.max.wait.ms based on SLA.

Session.timeout.ms

Defines how long a consumer can be out of contact with the broker. While  heartbeat.interval.ms defines how often poll method should send a heartbeat,  session.timeout.ms defines how long consumers can be out of contact with the broker. When session times out consumer is considered lost and rebalance is triggered.

To avoid this from happening often it’s better to set heartbeat.interval.ms value three times higher than  session.timeout.ms. By setting a higher value you can avoid unwanted rebalancing and other overheads associated with it.

Max.partitions.fetch.bytes

Defines max bytes per partitions to be sent from broker to consumer. By default value is set to 1 MB.  Max.message.size and max.partitions.getch.bytes will decide the memory required per consumer to receive the message.

Max.pool.records

Defines the number of records to be returned for a single poll() call. Helps control number of records to be processed per poll method call.

Auto.offset.reset

When reading from the broker for the first time, as Kafka may not have any committed offset value, this property defines where to start reading from. You could set “earliest” or “latest”, while “earliest” will read all messages from the beginning “latest” will read only new messages after a consumer has subscribed to the topic. The default value of  auto.offset.reset is “latest.”

Delivery semantics

As stated in earlier article Kafka producer delivery semantics there are three delivery semantics namely At most once, At least once and Exactly once. When data is consumed from Kafka by Consumer group/consumer, only "At least once" and "At most once" semantics are supported. You could still achieve output similar to exactly once by choosing a suitable data store that writes by a unique key. For example, any key-value store, RDBMS (primary key), elastic search or any other store that supports idempotent write.

No alt text provided for this image

At Most Once

In at most once delivery semantics a message should be delivered maximum only once. It's acceptable to lose a message rather than delivering a message twice in this semantic. Applications adopting at most semantics can easily achieve higher throughput and low latency. By default, Kafka consumers are set to use “At most once” delivery semantics as “enable.auto.commit” is true.

At most once delivery semantics

At most once delivery semantics

In case consumer fails after messages are committed as read but before processing them, the unprocessed messages are lost and will not be read again. Partition rebalancing will result in another consumer reading messages from last committed offset. As shown in the diagram below, messages are read in batches and some or all of the messages in the batch might be unprocessed but still committed as processed.

At most once delivery semantic - failure case

At most once delivery semantic - failure case

At Least Once

In at least once delivery semantics it is acceptable to deliver a message more than once but no message should be lost. The consumer ensures that all messages are read and processed for sure even though it may result in message duplication. This is mostly preferred semantics out of all. Applications adopting at least once semantics may have moderate throughput and moderate latency. By setting “enable.auto.commit” value to “false” you can manually commit after the messages are processed.

At least once delivery

At least once delivery

In case consumer fails before processing them, the unprocessed messages are not lost as the offsets are not committed as read. Partition rebalancing will result in another consumer reading the same messages again from last committed offset resulting in duplicate messages. As shown in the diagram below, messages are read in batches and some or all of the messages in the batch might be processed again but no messages will be lost.

At least once delivery semantic - failure case

At least once delivery semantic - failure case

Exactly once

In exactly-once delivery semantics, a message must be delivered only once and no message should be lost. This is the most difficult delivery semantic of all. Applications adopting exactly once semantics may have lower throughput and higher latency compared other 2 semantics. As stated earlier you could still achieve output similar to exactly once by choosing suitable data store that writes by a unique key. For example any key-value store, RDBMS (primary key), elastic search or any other store that supports idempotent write.

Summary

Configure Kafka consumer to achieve desired performance and delivery semantics based on the following properties.

  • Enable.auto.commit.
  • Partition.assignment.strategy.
  • Fetch.max.wait.ms.
  • Fetch.min.bytes.
  • Session.timeout.ms.
  • Max.partitions.fetch.bytes.
  • Max.pool.records.
  • Auto.offset.reset.

Kafka consumer supports only At most once and At least once delivery semantics.


Further Reading

Topics:
apache kafka ,big data ,kafka ,streaming

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}