{{announcement.body}}
{{announcement.title}}
refcard cover
Refcard #254

Apache Kafka Essentials

Dive into Apache Kafka: Readers will review its history and fundamental components — Pub/Sub, Kafka Connect, and Kafka Streams. Key concepts in these areas are supplemented with detailed code examples that demonstrate producing and consuming data, using connectors for easy data streaming and transformation, performing common operations in KStreams, and more.

Free PDF for Easy Reference

Brought to You By

Solace
refcard cover

Written By

author avatar Sudip Sengupta
Technical Writer, Javelynn
author avatar William McLane
Messaging Evangelist, TIBCO Software Inc.
Section 1

Introduction

With the changing tech landscape, several trends have emerged in recent years. First, with the increasing adoption of the cloud-native framework, microservice architectures are now a preferred choice for application development. These loosely coupled microservices enable enhanced scalability, fault isolation, and rapid deployment across hybrid cloud environments. Second, the diversity, volume, and velocity of the data that an enterprise wants to collect for decision-making continues to grow. Additionally, there is a growing need for an enterprise to make real-time decisions on collected data. Undoubtedly, the increasing application of high-performance data analytics to support modern businesses is considered central to their rapid decision-making, augmented predictive analyses, and factual strategic initiatives.

Apache Kafka is a streaming engine for collecting, caching, and processing high volumes of data in real time. The distributed event store typically serves as a central data hub that accepts all enterprise data for aggregation, transformation, enrichment, ingestion, and analysis. The data can then be used for continuous processing or fed into other systems and applications in real time.

Figure 1: Apache Kafka as a central real-time hub

Section 2

About Apache Kafka

Kafka was originally developed at LinkedIn in 2010 and later open sourced via the Apache Software Foundation in 2012. The platform comprises three main components: Pub/Sub, Kafka Connect, and Kafka Streams. The role of each component is summarized in the table below:

Pub/Sub

Storing and delivering data efficiently and reliably at scale

Kafka Connect

Integrating Kafka with external data sources and data sinks

Kafka Streams

Processing data in Kafka in real time

The main benefits of Kafka are:

  • High throughput – Each server is capable of handling hundreds of MB per second of data.
  • High availability – Data can be stored redundantly in multiple servers and can survive individual server failure.
  • High scalability – New servers can be added over time to scale out the system.
  • Easy integration with external data sources or data sinks.
  • Built-in real-time processing layer.
  • Fault isolation – Kafka clusters are highly distributed, durable, and fault-tolerant data stores.
  • Enterprise support – It offers various enterprise versions to support mission-critical workloads.
  • Client library availability – It allows for event stream processing in multiple coding languages.
Section 3

Apache Kafka Fundamentals

Apache Kafka runs in distributed clusters, with each cluster node being referred to as a Broker. Kafka Connect integrates Kafka instances on Brokers with producers and consumers — clients that produce and consume event data, respectively. All these components rely on the publish-subscribe durable messaging ecosystem to enable instant exchange of event data between servers, processes, and applications.

Pub/Sub in Apache Kafka

The first component in Kafka deals with the production and consumption of the data. The following table describes a few key concepts in Kafka:

Topic

Defines a logical name for producing and consuming records

Partition

Defines a non-overlapping subset of records within a topic

Offset

A unique sequential number assigned to each record within a topic partition

Record

Contains a key, value, timestamp, and list of headers

Broker

Server where records are stored; multiple brokers can be used to form a cluster


Figure 2 depicts a topic with two partitions. Partition 0 has 5 records, with offsets from 0 to 4, and partition 1 has 4 records, with offsets from 0 to 3.

Figure 2: Partitions in a topic

The following code snippet shows how to produce records to the topic, "test", using the Java API:

Java
 
Properties props = new Properties();
   props.put(“bootstrap.servers”,
      “localhost:9092”);
props.put(“key.serializer”,

   “org.apache.kafka.common.serialization.StringSerializer”);
props.put(“value.serializer”,

   “org.apache.kafka.common.serialization.StringSerializer”);
Producer<String, String> producer = new
    KafkaProducer<>(props);
producer.send(
    new ProducerRecord<String, String>(“test”, “key”, “value”));


In the example above, both the key and value are strings, so we are using a StringSerializer. It’s possible to customize the serializer when types become more complex. The following code snippet shows how to consume records with key and value strings in Java:

Java
 
Properties props = new Properties();  props.put(“bootstrap.servers”, “localhost:9092”);
props.put(“key.deserializer”, 

   “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”,

   “org.apache.kafka.common.serialization.StringDeserializer”);
KafkaConsumer<String, String> consumer =
   new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(“test”));
while (true) {
    ConsumerRecords<String, String> records =
      consumer.poll(100);
   for (ConsumerRecord<String, String> record : records)
      System.out.printf(“offset=%d, key=%s, value=%s”,
         record.offset(), record.key(), record.value());
   consumer.commitSync();
}


Records within a partition are always delivered to the consumer in offset order. By saving the offset of the last consumed record from each partition, the consumer can resume from where it left off after a restart. In the example above, we use the commitSync() API to save the offsets explicitly after consuming a batch of records. Users can also save the offsets automatically by setting the property, enable.auto.commit, to true.

A record in Kafka is not removed from the broker immediately after it is consumed. Instead, it is retained according to a configured retention policy. The following are two common retention policies:

  • log.retention.hours – number of hours to keep a record on the broker
  • log.retention.bytes – maximum size of records retained in a partition

Kafka Connect

The second component is Kafka Connect, which is a framework that makes it easy to stream data between Kafka and other systems. Users can deploy a Connect cluster and run various connectors to import data from different sources into Kafka (through Source Connectors) and export data from Kafka further (through Sink Connectors) to storage platforms such as HDFS, S3, or Elasticsearch.

The benefits of using Kafka Connect are:

  • Parallelism and fault tolerance
  • Avoidance of ad hoc code by reusing existing connectors
  • Built-in offset and configuration management

Quick Start for Kafka Connect

The following steps show how to run the existing file connector in standalone mode to copy the content from a source file to a destination file via Kafka:

  1. Prepare some data in a source file:
Shell
 
> echo -e \”hello\nworld\” > test.txt


  1. Start a file source and a file sink connector:
Shell
 
> bin/connect-standalone.sh
config/connect-file-source.properties
config/connect-file-sink.properties


  1. Verify the data in the destination file:
Shell
 
> more test.sink.txt
hello


  1. Verify the data in Kafka:
Shell
 
> bin/kafka-console-consumer.sh
      --bootstrap-server localhost:9092
      --topic connect-test
      --from-beginning
   {“schema”:{“type”:”string”,
         “optional”:false},
      “payload”:”hello”}
   {“schema”:{“type”:”string”,
         “optional”:false},
      “payload”:”world”}


In the example above, the data in the source file, test.txt, is first streamed into a Kafka topic, connect-test, through a file source connector. The records in connect-test are then streamed into the destination file, test.sink.txt. If a new line is added to test.txt, it will show up immediately in test.sink.txt. Note that we achieve this by running two connectors without writing any custom code.

Connectors are powerful tools that allow for integration of Apache Kafka into many other systems. There are many open-source and commercially supported options for integrating Apache Kafka — both at the connector layer as well as through an integration services layer — that can provide much more flexibility in message transformation.

Transformations in Connect

Connect is primarily designed to stream data between systems as-is, whereas Kafka Streams is designed to perform complex transformations once the data is in Kafka. That said, Kafka Connect provides a mechanism used to perform simple transformations per record. The following example shows how to enable a couple of transformations in the file source connector:

  1. Add the following lines to connect-file-source.properties:
Shell
 
transforms=MakeMap, InsertSource
transforms.MakeMap.type=org.apache.kafka
    .connect.transforms.HoistField$Value
transforms.MakeMap.field=line
transforms.InsertSource.type=org.apache
   .kafka.connect.transforms
   .InsertField$Value
transforms.InsertSource.static.field=
   data_source
transforms.InsertSource.static.value=
   test-file-source


  1. Start a file source connector:
Shell
 
> bin/connect-standalone.sh
config/connect-file-source.properties


  1. Verify the data in Kafka:
Shell
 
> bin/kafka-console-consumer.sh
      --bootstrap-server localhost:9092
   --topic connect-test
{“line”:”hello”,”data_source”:”test
   -file-source”}
{“line”:”world”,”data_source”:”test
   -file-source”}


In step one above, we add two transformations, MakeMap and InsertSource, which are implemented by the classes, HoistField$Value and InsertField$Value, respectively. The first one adds a field name, line, to each input string. The second one adds an additional field, data_source, that indicates the name of the source file. After applying the transformation logic, the data in the input file is now transformed to the output in step three. Because the last transformation step is more complex, we implement it with the Streams API (covered in more detail below):

Java
 
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
StreamsBuilder builder = new StreamsBuilder();
// build a stream from an input topic
KStream<String, String> source = builder.stream(
   “streams-plaintext-input”,
    Consumed.with(stringSerde, stringSerde));
KTable<String, Long> counts = source
   .flatMapValues(value -> Arrays.asList(value.toLowerCase().split(“ “)))
   .groupBy((key, value) -> value) 
   .count();
// convert the output to another topic
counts.toStream().to(“streams-wordcount-output”,
    Produced.with(stringSerde, longSerde));


Connect REST API

In production, Kafka Connect usually runs in distributed mode and can be managed through REST APIs. The following table lists the common APIs. See the Apache Kafka documentation for more information.

Connect REST API Action

GET /connectors

Return a list of active connectors

POST /connectors 

Create a new connector

GET /connectors/{name} 

Get information for the connector

GET /connectors/{name} /config 

Get configuration parameters for the connector

PUT /connectors/{name} /config 

Update configuration parameters for the connector

GET /connectors/{name} /status 

Get the current status of the connector

Kafka Streams

Kafka Streams is a client library for building real-time applications and microservices where the input and/or output data is stored in Kafka. The benefits of using Kafka Streams are:

  • Less code in the application
  • Built-in state management
  • Lightweight
  • Parallelism and fault tolerance

The most common way of using Kafka Streams is through the Streams DSL, which includes operations such as filtering, joining, grouping, and aggregation. The following code snippet shows the main logic of a Streams example called WordCountDemo:

Java
 
final Serde stringSerde = Serdes.String();
final Serde longSerde = Serdes.Long();
StreamsBuilder builder = new StreamsBuilder();
// build a stream from an input topic
   KStream source = builder.stream(
   “streams-plaintext-input”,
    Consumed.with(stringSerde, stringSerde));
KTable counts = source
   .flatMapValues(value -\> Arrays.asList(value.
toLowerCase().split(“ “)))
   .groupBy((key, value) -\> value) .count();
// convert the output to another topic
counts.toStream().to(“streams-wordcount-output”,
    Produced.with(stringSerde, longSerde));


The code above first creates a stream from an input topic, streams-plaintext-input. It then applies a transformation to split each input line into words. Next, it counts the number of occurrences of each unique word. Finally, the results are written to an output topic, streams-wordcount-output.

The following are steps to run the example code:

  1. Create the input topic:
Shell
 
bin/kafka-topics.sh --create
       --zookeeper localhost:2181
       --replication-factor 1
       --partitions 1
       --topic streams-plaintext-input


  1. Run the stream application:
Shell
 
bin/kafka-run-class.sh org.apache.
kafka.streams.examples.wordcount.
WordCountDemo


  1. Produce some data in the input topic: 
Shell
 
bin/kafka-console-producer.sh
        --broker-list localhost:9092
        --topic streams-plaintext-input
    hello world


  1. Verify the data in the output topic:
Shell
 
bin/kafka-console-consumer.sh
        --bootstrap-server localhost:9092
        --topic streams-wordcount-output
        --from-beginning
        --formatter kafka.tools.
            DefaultMessageFormatter
        --property print.key=true
        --property print.value=true
        --property key.deserializer=
            org.apache.kafka.common.
            serialization.StringDeserializer
        --property value.deserializer=
            org.apache.kafka.common.
            serialization.LongDeserializer
   hello 1
   world 1


KStream vs. KTable

There are two key concepts in Kafka Streams: KStream and KTable. A topic can be viewed as either of the two. Their differences are summarized in the table below:


KStream KTable

Concept

Each record is treated as an append to the stream.

Each record is treated as an update to an existing key.

Usage

Model append-only data (e.g., click streams).

Model updatable reference data (e.g., user profiles).

 

The following example illustrates their differences:

(Key, Value) Records

Sum of Values as KStream

Sum of Values as KTable

(“k1”, 2) (“k1”, 5)

7

5


When a topic is viewed as a KStream, there are two independent records and, thus, the sum of the values is 7. On the other hand, if the topic is viewed as a KTable, the second record is treated as an update to the first record since they have the same key: “k1”. Therefore, only the second record is retained in the stream, and the sum is 5 instead.

KStreams DSL

The following section outlines a list of common operations available in Kafka Streams.

KStream operators help abstract each record stream into independent key-value pairs. Such operations include:

  1. filter(Predicate) – Create a new KStream that consists of all records of this stream that satisfy the given predicate.

Example:

Shell
 
ks_out = ks_in.filter( (key, value) -> value > 5 );
ks_in: ks_out: (“k1”, 2) (“k2”, 7) (“k2”, 7)


  1. map(KeyValueMapper) – Transform each record of the input stream into a new record in the output stream (both key and value type can be altered arbitrarily).

Example:

Shell
 
ks_out = ks_in..map( (key, value) -> new KeyValue<>(key, key) )
ks_in: ks_out: (“k1”, 2) (“k1”, “k1”) (“k2”, 7) (“k2”, “k2”)


  1. groupBy() – Group the records by their current key into a KGroupedStream while preserving the original values.

Example:

Shell
 
ks_out = ks.groupBy()
ks_in: ks_out: (“k1”, 1) (“k1”, ((“k1”, 1), (“k2”, 2) (“k1”, 3))) (“k1”, 3) (“k2”, ((“k2”, 2)))


  1. join(KTable, ValueJoiner) – Join records of the input stream with records from the KTable if the keys from the records match. Return a stream of the key and the combined value using ValueJoiner.

Example:

Shell
 
ks_out = ks_in.join( kt, (value1, value2) -> value1 + value2 );
ks_in: kt: (“k1”, 1) (“k1”, 11) (“k2”, 2) (“k2”, 12) (“k3”, 3) (“k4”, 13)
ks_out: (“k1”, 12) (“k2”, 14)


  1. join(KStream, ValueJoiner, JoinWindows) – Join records of the two streams if the keys match and the timestamp from the records satisfy the time constraint specified by JoinWindows. Return a stream of the key and the combined value using ValueJoiner.

Example:

Shell
 
ks_out = ks1.join( ks2, (value1, value2) -> value1 + value2, JoinWindows. of(100) );
ks1: ks2: (“k1”, 1, 100t) (“k1”, 11, 150t) (“k2”, 2, 200t) (“k2”, 12, 350t) (“k3”, 3, 300t) (“k4”, 13, 380t) * t indicates a timestamp.
ks_out: (“k1”, 12)


KGroupedStream operators, unlike KStream operators, help abstract grouped streams of key-value pairs. Such operations include:

  1. count() – Count the number of records in this stream by the grouped key and return it as a KTable.

Example:

Shell
 
kt = kgs.count(); kgs: (“k1”, ((“k1”, 1), (“k1”, 3))) (“k2”, ((“k2”, 2)))
kt: (“k1”, 2) (“k2”, 1)


  1. reduce(Reducer) – Combine the values of records in this stream by the grouped key and return it as a KTable.

Example:

Shell
 
kt = kgs.reduce( (aggValue, newValue) -> aggValue + newValue );
kgs: (“k1”, (“k1”, 1), (“k1”, 3))) (“k2”, ((“k2”, 2)))
kt: (“k1”, 4) (“k2”, 2)


  1. windowedBy(Windows) – Further group the records by the timestamp and return it as a TimeWindowedKStream.

Example:

Shell
 
twks = kgs.windowedBy( TimeWindows.
of(100) );
kgs: (“k1”, ((“k1”, 1, 100t), (“k1”, 3, 150t))) (“k2”, ((“k2”, 2, 100t), (“k2”, 4, 250t))) * t indicates a timestamp.
twks: (“k1”, 100t -- 200t, ((“k1”, 1, 100t), (“k1”, 3, 150t))) (“k2”, 100t -- 200t, ((“k2”, 2, 100t))) (“k2”, 200t -- 300t, ((“k2”, 4, 250t)))


Additional details on the respective set of operations on KTable and KGroupedTable can be found in the Kafka Documentation.

Querying States in KStreams

While processing data in real time, a KStreams application locally maintains states such as the word counts in the previous example. Those states can be queried interactively through an API described in the Interactive Queries section of the Kafka documentation. This avoids the need of an external data store for exporting and serving those states.

Exactly-Once Processing in KStreams

Failures in the brokers or the clients may introduce duplicates during the processing of records. KStreams provides the capability of processing records exactly once, even under failures. This can be achieved by simply setting the property, processing.guarantee, to exactly_once in KStreams.

Documenting and Visualizing Data Streams

Processing stream data involves correlating data between streams, analyzing patterns, and understanding application/user behavior to make predictions. In a typical event stream, events are continuously flowing through Apache Kafka clusters, with a rich ecosystem of clients and data sinks integrating with the library. Making real-time decisions from data streams, therefore, requires the enforcement of comprehensive observability, event ownership, and governance.

Governing data in motion is often a shared responsibility between developers, DataOps, and business development teams. This often starts with business managers defining the business and technical requirements for event management, while data engineers are managing client permissions for creating internal topics. Subsequently, developers/system admins configure security settings within the clients and then administer each client’s corresponding settings within Kafka Streams.

Although Kafka lacks an innate event portal to catalog event streams and visualize the topology of the data pipelines, the Kafka streaming client library supports a data lineage visibility to identify the origin and path taken by an event. It is possible to extend Kafka by integrating it with an event management platform that can enforce the discovery, visualization, and cataloging of event streams. Such platforms identify newer data schemas, connectors, client groups, and events as they occur, which streamlines the recording of event changes. These tools help uncover deeper analytical insights and help solve complex data relationships while also fostering efficient collaboration among distributed teams.

Section 4

Extending Apache Kafka

While Apache Kafka supports a wide range of event streaming use cases, it may require customization to support certain mission-critical modern data-driven applications. Some approaches to extend Apache Kafka include:

  • Low-code stream processing – To reduce manual effort redundancy, Kafka data streams can be accessed using low-code approaches, such as templated SQL queries, that require minimal to no coding for redundant tasks. There are also tools such as Apache Spark that offer SQL access to Kafka streams.
  • Linking on-prem and cloud-based clusters – An event mesh is used to connect the Kafka deployment with other network assets. The mesh also enforces real-time connectivity between back-end Kafka clusters and edge networks.
  • Filtering data streams – Kafka offers innate data streaming capabilities using the org.apache.kafka.streams.kstream.KStream filter. As this filtering ability is not available for consumers, client-side filtering options can be achieved through open-source platforms such as Apache Storm or other paid offerings.
  • Sharing data between multiple Kafka flavors – A Kafka mesh can be deployed to replicate event data across different distributions of Apache Kafka to support hybrid/multi-cluster setups.

Figure 3: Extending Kafka with the Kafka Data Mesh

For more information on low-code stream processing options, including SQL access to Apache Kafka, please see the Additional Resources section below.

Section 5

Scaling Data Streams

With the increasing popularity of real-time stream processing and the rise of event-driven architectures, a number of alternatives have started to gain traction for real-time data distribution. Apache Kafka is the flavor of choice for distributed high-volume data streaming; however, many implementations commonly struggle with building solutions at scale when the application’s requirements go beyond a single data center or single location.

While Apache Kafka is purpose-built for real-time data distribution and stream processing, it may not fit the requirements of every enterprise application due to its constraints with architecture, throughput, and amount of data ingested. Some alternatives to Apache Kafka include:

  • Apache Pulsar – a lower-latency, higher-throughput event management platform that relies on geo-replication for enhanced performance.
  • Amazon Kinesis – an AWS streaming service that supports Java, Android, .NET, and Go SDKs.
  • Apache Spark – a unified, open-source analytics engine that is mainly built for streaming data in ML and AI applications.

For more information on comparisons between Apache Kafka and other data distribution solutions, please see the Additional Resources section below.

Section 6

Conclusion

Apache Kafka has become the de facto standard for high performance, distributed data streaming. It has a large and growing community of developers, corporations, and applications that are supporting, maintaining, and leveraging it. If you are building an event-driven architecture or looking for a way to stream data in real time, Apache Kafka is a clear leader in providing a proven, robust platform for enabling stream processing and enterprise communication.

Additional Resources

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}