{{announcement.body}}
{{announcement.title}}
refcard cover
Refcard #378

Apache Kafka Patterns and Anti-Patterns

Apache Kafka offers the operational simplicity of data engineers' dreams. A message broker that allows clients to publish and read streams of data — Kafka has an ecosystem of open-source components that, when combined together, help store, process, and integrate data streams with other parts of your system in a secure, reliable, and scalable manner. This Refcard dives into select patterns and anti-patterns spanning across Kafka Client APIs, Kafka Connect, and Kafka Streams, covering topics such as reliable messaging, scalability, error handling, and more.

Free PDF for Easy Reference
refcard cover

Written By

author avatar Abhishek Gupta
Principal Developer Advocate, AWS
Section 1

Overview of the Apache Kafka Ecosystem

A Kafka broker (also referred to as a node) is the fundamental building block that runs the Kafka JVM process. Although a single Kafka broker will suffice for development purposes, production systems typically have three or more brokers (odd numbers 5, 7, etc.) for high availability and scalability. These groups of Kafka brokers form a cluster, and each cluster has a leader along with follower nodes that replicate data from the leader.

Client applications send messages to Kafka, and each message consists of a key (which can be null) and a value. These messages are stored and organized into topics. They are written to a topic in an append-only fashion (much like a commit log), i.e., a new message always ends up at the end of a topic. A topic has one or more partitions, and each message is placed in a specific partition based on the result of hash function on the key (if the key is null, the message is directed to a partition in a round-robin manner). Data in each of these partitions is replicated across the Kafka cluster.

Figure 1: Topic and Partitions in Kafka

Here is a summary of the key projects that are part of core Kafka:

  1. Kafka Client APIs — Producer and Consumer APIs that allow external systems to write data to and read data from Kafka topics, respectively. Kafka has client libraries in many programming languages, with the Java client being part of the core Kafka project.
  2. Kafka Connect — Provides a high-level framework to build connectors that help integrate Kafka with external systems. They allow us to move data from external systems to Kafka topics (Source connector) and from Kafka topics into external systems (Sink connector). Popular examples of connectors include the JDBC connector and Debezium.
  3. Kafka Streams — A standalone Java library that provides distributed stream processing primitives on top of data in Kafka topics. It provides high-level APIs (DSL and Processor), which you can create topologies to execute stateless transformations (map, filter, etc.) as well as stateful computations (join, aggregations, etc.) on streaming data.
Section 2

Common Apache Kafka Patterns and Anti-Patterns

This section will cover some of the common patterns — along with their respective anti-patterns — for Kafka Producer and Consumer APIs, Kafka Connect, and Kafka Streams.

Kafka Client API – Producer 

The Kafka Producer API sends data to topics in a Kafka cluster. Here are a couple of patterns and anti-patterns to consider:

Reliable Producer 

Goal

While producing a message, you want to ensure that it has been sent to Kafka.

Pattern

Use the acks=all Configuration for producer.

Anti-Pattern

Using the default configuration (acks = 1).

The acks property allows producer applications to specify the number of acknowledgments that the leader node should have received before considering a request complete. If you don’t provide one explicitly, acks=1 is used by default. The client application will receive an acknowledgment as soon as the leader node receives the message and writes it to its local log. If the message has not yet been replicated to follower nodes and the current leader node, it will result in data loss.

If you set acks=all (or -1), your application only receives a successful confirmation when all the in-sync replicas in the cluster have acknowledged the message. There is a trade-off between latency and reliability/durability here: Waiting for acknowledgment from all the in-sync replicas will incur more time, but the message will not be lost as long as at least one of the in-sync replicas is available.

A related configuration is min.in.sync.replicas. Guidance on this topic will be covered later in this Refcard.

No More Duplicates 

Goal

The producer needs to be idempotent because your application cannot tolerate duplicate messages.

Pattern

Set enable.idempotence=true.

Anti-Pattern

Using a default configuration.

 It is possible that the producer application may end up sending the same message to Kafka more than once. Imagine a scenario where the message is actually received by the leader (and replicated to in-sync replicas if acks=all is used), but the application does not receive the acknowledgment from the leader due to request timeout, or maybe the leader node just crashed. The producer will try to resend the message — if it succeeds, you will end up with duplicate messages in Kafka. Depending upon your downstream systems, this may not be acceptable.

The Producer API provides a simple way to avoid this by using the enable.idempotence property (which is set to false by default). When set to true, the producer attaches a sequence number to every message. This is validated by the broker so that a message with a duplicate sequence number will get rejected. 

From Apache Kafka 3.0 onwards, acks=all and enable.idempotence=true are set by default, thereby providing strong delivery guarantees for producer.

Kafka Client API – Consumer 

With the Kafka Consumer API, applications can read data from topics in a Kafka cluster.

Idle Consumers Instances

Goal

Scale out your data processing pipeline.

Pattern

Run multiple instances of your consumer application.

Anti-Pattern

Number of consumer instances is more than the number of topic partitions.

A Kafka consumer group is a set of consumers that ingest data from one or more topics. The topic partitions are load-balanced among consumers in the group. This load distribution is managed on the fly when new consumer instances are added or removed from a consumer group. For example, if there are ten topic partitions and five consumers in a consumer group for that topic, Kafka will make sure that each consumer instance receives data from two topic partitions of the topic.

You can end up with a mismatch between the number of consumer instances and topic partitions. This could be due to incorrect topic configuration, wherein the number of partitions is set to one. Or, maybe your consumer applications are packaged using Docker and operated on top of an orchestration platform such as Kubernetes, which can, in turn, be configured to auto-scale them. 

Keep in mind: You might end up with more instances than partitions. You need to be mindful of the fact that such instances remain inactive and do not participate in processing data from Kafka. Thus, the degree of consumer parallelism is directly proportional to the number of topic partitions. In the best-case scenario, for a topic with N partitions, you can have N instances in a consumer group, each processing data from a single topic partition.

Figure 2: Inactive consumers

Committing Offsets: Automatic or Manual? 

Goal

Avoid duplicates and/or data loss while processing data from Kafka.

Pattern

Set enable.auto.commit to false and use manual offset management.

Anti-Pattern

Using default configuration with automatic offset management.

Consumers acknowledge the receipt (and processing) of messages by committing the offset of the message they have read. By default, enable.auto.commit is set to true for consumer apps, which implies that the offsets are automatically committed asynchronously (for example, by a background thread in the Java consumer client) at regular intervals (defined by auto.commit.interval.ms property that defaults to 5 seconds). While this is convenient, it allows for data loss and/or duplicate message processing.

Duplicate messages: Consider a scenario where the consumer app has read and processed messages from offsets 198, 199, and 200 of a topic partition — and the automatic commit process was able to successfully commit offset 198 but then crashed/shutdown after that. This will trigger a rebalance to another consumer app instance (if available), and it will look for the last committed offset, which in this case was 198. Hence, the messages at offsets 199 and 200 will be redelivered to the consumer app.

Data loss: The consumer app has read the messages for offsets 198, 199, and 200. The auto-commit process commits these offsets before the application is able to actually process these messages (perhaps through some transformation and store the result in a downstream system), and the consumer app crashes. In this situation, the new consumer app instance will see that the last committed offset is 200 and will continue reading new messages from thereon. Messages from offsets 198, 199, and 200 were effectively lost. 

To have greater control over the commit process, you need to explicitly set enable.auto.commit to false and handle the commit process manually. The manual commit API offers synchronous and asynchronous options, and as expected, each of these has its trade-offs.

The code block below shows how to explicitly commit the offset for each message using the synchronous API:

 
try {
          while(running) {
              ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
             for (TopicPartition partition : records.partitions()) {
                  List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                  consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
        consumer.close();
}

Kafka Connect

Thanks to the Kafka Connect API, there are a plethora of ready-to-use connectors. But you need to be careful about some of its caveats:

Handling JSON Messages 

Goal

Read/write JSON messages from/to Kafka using Kafka Connect.

Pattern

Use a Schema Registry and appropriate JSON schema converter implementation.

Anti-Pattern

Embedding a schema with every JSON message or not enforcing a schema at all.

Although JSON is a common message format, it does not have a strict schema associated with it. By design, Kafka producers and consumer apps are decoupled from each other. Imagine a scenario where your producer applications introduce additional fields to the JSON payload/events, and your downstream consumer applications are not equipped to handle that and hence fail — this can break your entire data processing pipeline.

For a production-grade Kafka Connect setup, it’s imperative that you use a Schema Registry to provide a contract between producers and consumers while still keeping them decoupled. For source connectors, if you want data to be fetched from an external system and stored in Kafka as JSON, you should configure the connector to point to a Schema Registry and also use an appropriate converter.

For example:

 
value.converter=<fully qualified class name of json schema converter implementation>
value.converter.schema.registry.url=<schema registry endpoint e.g. http://localhost:8081>

When reading data from Kafka topic, the sink connector also needs the same configuration as above.

However, if you are not using a Schema Registry, the next best option is to use JSON converter implementation, which is native to Kafka. In this case, you would configure your source and sink connectors as follows:

 
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

Thanks to value.converter.schemas.enable=true, the source connector will add an embedded schema payload to each of your JSON messages, and the sink connector will respect that schema as well. An obvious drawback here is the fact that you have schema information in every message. This will increase the size of the message and can impact latency, performance, costs, etc. As always, this is a trade-off that you need to accept.

If the above is unacceptable, you will need to make a different trade-off with the following configuration:

 
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

Now, your JSON messages will be treated as ordinary strings, hence prone to the aforementioned risks to your data processing pipeline. Evolving the structure of your messages will involve scrutinizing and (re)developing your consumer apps to ensure they don’t break in response to changes — you need to constantly keep them in-sync (manually).

Another thing to be careful about is using the same configuration for both source and sink connectors. Not doing so will cause issues. For example, if you produce messages without a schema and use value.converter.schemas.enable=true in your sink configuration, Kafka Connect will fail to process those messages.

Error Handling in Kafka Connect 

Goal

Handle errors in your Kafka Connect data pipeline.

Pattern

Use a dead-letter queue.

Anti-Pattern

Using the default configuration, thereby ignoring errors.

When you’re stitching together multiple systems using Kafka and building complex data processing pipelines, errors are inevitable. It’s important to plan on how you want to handle them, depending on your specific requirements. Apart from exceptional scenarios, you don’t want your data pipeline to terminate just because there was an error. But, by default, Kafka Connect is configured to do exactly that:

 
errors.tolerance=none

It does what it says and does not tolerate any errors — the Kafka Connect task shuts down as soon as it encounters an error. To avoid this, you can use:

 
errors.tolerance=all

But this is not useful in isolation. You should also configure your connector to use a dead-letter queue, a topic to which Kafka Connect can automatically route messages it failed to process. You just need to provide a name for that topic in the Kafka Connect config:

 
errors.tolerance=all
errors.deadletterqueue.topic.name=<name of the topic>

Since it’s a standard Kafka topic, you have flexibility in terms of how you want to introspect and potentially (re)process failed messages. Additionally, you would also want these to surface in your Kafka Connect logs. To enable this, add the following config:

 
errors.log.enable=true

An even better option would be to embed the failure reason in the message. All you need is to add this configuration:

 
errors.deadletterqueue.context.headers.enable=true

This will provide additional context and details about the error so that you can use it in your re-processing logic.

Kafka Streams

This section introduces some advanced options to help the Kafka Streams library for large-scale stream processing scenarios. 

Rebalances and Their Impact on Interactive Queries 

Goal

Large state stores to minimize recovery/migration time during rebalance.

Pattern

Use standby replicas.

Anti-Pattern

Using the default configuration.

Kafka Streams provides state stores to support stateful stream processing semantics — these can be combined with interactive queries to build powerful applications whose local state can be accessed externally (by an RPC layer such as HTTP or gRPC API).

These state stores are fault-tolerant since their data is replicated to changelog topics in Kafka, and updates to the state stores are tracked and kept up-to-date in Kafka. In case of failure or restart of a Kafka Streams app instance, new or existing instances fetch the state store data from Kafka. As a result, you can continue to query your application state using interactive queries. 

However, depending on the data volume, these state stores can get quite large (in order of 10s of GBs). A rebalance event in such a case will result in a large amount of data being replayed and/or restored from the changelog topics — this can take a lot of time. However, during this timeframe, the state of your Kafka Streams apps in not available via interactive queries. It’s similar to a “stop-the-world” situation, during the JVM garbage collection. Depending upon your use case, the non-availability of state stores might be unacceptable.

To minimize the downtime in such cases, you can enable standby replicas for your Kafka Streams application. By setting thenum.standby.replicas config (defaults to 0), you can ask Kafka Streams to maintain additional instances that simply keep a backup of the state stores of your active app instances (by reading it from the changelog topics in Kafka). In case of rebalance due to restart or failure, these standby replicas act as “warm” backups and are available for serving interactive queries — this reduces the failover time duration.

Stream-Table Join in Kafka Streams 

Goal

Enrich streaming data in your Kafka Streams application.

Pattern

Use stream-table join.

Anti-Pattern

Invoking external data store(s) for every event in the stream.

A requirement for stream processing apps is to be able to access an external SQL database often to enrich streaming data with additional information. For example, it will fetch customer details from an existing customer’s table to supplement and enrich the stream of order information.

The obvious solution is to query the database to get the information and add it to the existing stream record.

 
public Customer getCustomerInfo(String custID) {
      //query customers table in a database
}
 
…..
 
KStream<String, Order> orders = builder.stream(“orders-topic”); //input KStream contains customer ID (String) and Order info (POJO)

//enrich order data
orders.forEach((custID, order) -> {
      Customer cust = getCustomerInfo(custID);
      order.setCustomerEmail(cust.getEmail());
});
 
orders.to(“orders-enriched-topic”); //write to new topic

This is not a viable choice, especially for medium- to large-scale applications. The latency incurred for the database invocation for each and every record in your stream will most likely create pressure on downstream applications and affect the overall performance SLA of your system.

The preferred way of achieving this is via a stream-table join.

First, you will need to source the data (and subsequent changes to it) in the SQL database into Kafka. This can be done by writing a traditional client application to query and push data into Kafka using the Producer API — but a better solution is to use a Kafka Connect connector such as JDBC source, or even better, a CDC-based connector such as Debezium

Once the data is in Kafka topics, you can use a KTable to read that data into the local state store. This also takes care of updating the local state store since we have a pipeline already created wherein database changes will be sent to Kafka. Now, our KStream can access this local state store to enrich the streaming data with additional content — this is much more efficient than remote database queries.

 
KStream<String, Order> orders = ...;
KTable<String, Customer> customers = ...;
 
KStream<String, Order> enriched = orders.join(customers,
    (order, cust) -> {
order.setCustomerEmail(cust.getEmail());
return order;
            }
  );
 
enriched.to(“orders-enriched-topic”);

General 

The following patterns apply to Kafka in general and are not specific to Kafka Streams, Kafka Connect, etc.

Automatic Topic Creation – Boon or Bane? 

Goal

Use create topics, keeping reliability and high-availability in mind.

Pattern

Disable automatic topic creation and provide explicit configuration while creating topics.

Anti-Pattern

Relying on automatic topic creation.

Kafka topic configuration properties (such as replication factor, partition count, etc.) have a server default that you can optionally override on a per-topic basis. In the absence of explicit configuration, the server default is used. This is why you need to be mindful of the auto.create.topics.enable configuration of your Kafka broker. It is set to true by default, and it creates topics with default settings such as:

  1. The replication factor is set to 1 — this is not good from a high-availability and reliability perspective. The recommended replication factor is 3 so that your system can tolerate the loss of two brokers.
  2. The partition count is set to 1 — this severely limits the performance of your Kafka client apps. For example, you can only have one instance of a consumer app (in a consumer group).

Keeping automatic topic creation enabled also means that you can end up with unwanted topics in your cluster. The reason is that topics (that don’t yet exist) referenced by a producer application and/or subscribed to by a consumer application will automatically get created.

Automatic topic creation creates topics with a cleanup policy set to delete. This means that if you wanted to create a log-compacted topic, you will be in for a surprise!

How Many In-Sync Replicas Do You Need? 

Goal

While producing a message, be sure that it has been sent to Kafka.

Pattern

Specify minimum in-sync replicas along with acks configuration.

Anti-Pattern

Only relying on acks [r1] configuration.

When tuning your producer application for strong reliability, the min.insync.replicas configuration works hand in hand with the acks property (discussed earlier in this Refcard). It’s a broker-level configuration that can be overridden at the topic level and whose value is set to 1 by default.

As a rule of thumb, for a standard Kafka cluster with three brokers and a topic replication factor of 3, min.insync.replicas should be set to 2. This way, Kafka will wait for acknowledgment from two in-sync replica nodes (including the leader), which implies that you can withstand the loss of one broker before Kafka stops accepting writes (due to lack of minimum in-sync replicas).

Section 3

Conclusion

Apache Kafka has a rich ecosystem of projects and APIs. Each of these offers a lot of flexibility in the form of configuration options such that you can tune these components to best fit your use case and requirements. This Refcard covered a few of them. I encourage you to refer to the official Apache Kafka documentation for a deep dive into these areas.

References and Resources

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}