Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}
Refcard #254

Apache Kafka

Download this new Refcard to get started with Apache Kafka. Learn more about its background, steps to run the quickstart script, components, and how to connect REST APIs.

12.8k

Brought to you by

Confluent
Free .PDF for easy Reference

Written by

Jun Rao Co-founder, Confluent
Refcard #254

Apache Kafka

Download this new Refcard to get started with Apache Kafka. Learn more about its background, steps to run the quickstart script, components, and how to connect REST APIs.

12.8k
Free .PDF for easy Reference

Written by

Jun Rao Co-founder, Confluent

Brought to you by

Confluent
Table of Contents

Why Apache Kafka

About Apache Kafka

Quickstart for Apache Kafka

Pub/Sub in Apache Kafka

Kafka Connect

Quickstart for Kafka Connect

Transformations in Connect

Connect REST API

Kafka Streams

KStream vs. KTable

KStreams DSL

Querying the States in KStreams

Exactly-Once Processing in KStreams

KSQL

Additional resources

Section 1

Why Apache Kafka

Two trends have emerged in the information technology space. First, the diversity and velocity of the data that an enterprise wants to collect for decision-making continue to grow. Such data include not only transactional records, but also business metrics, IoT data, operational metrics, application logs, etc.

Second, there is a growing need for an enterprise to make decisions in real time based on that collected data. Finance institutions want to not only defect fraud immediately, but also offer a better banking experience through features like real-time alerting, real-time recommendation, more effective customer service, and so on. Similarly, it’s critical for retailers to make changes in catalog, inventory, and pricing available as quickly as possible. It is truly a real-time world.

Before Apache Kafka, there wasn’t a system that perfectly met both of the above business needs. Traditional messaging systems are real-time, but weren’t designed to handle data at scale. Newer systems such as Hadoop are much more scalable, but were mostly designed for batch processing.

Apache Kafka is a streaming platform for collecting, storing, and processing high volumes of data in real-time. As illustrated in Figure 1, Kafka typically serves as a central data hub in which all data within an enterprise are collected. The data can then be used for continuous processing or fed into other systems and applications in real time. Kafka is in use by more than 30% of Fortune 500 companies across all industries.

Image title

Figure 1. Kafka as a central real-time hub

Section 2

About Apache Kafka

Kafka was originally developed at LinkedIn in 2010, and became a top level Apache project in 2012. It has three main components: Pub/Sub, Kafka Connect, and Kafka Streams. The role of each component is summarized in the table below.

Pub/Sub Storing and delivering data efficiently and reliably at scale.
Kafka Connect Integrating Kafka with external data sources and data sinks.
Kafka Streams Processing data in Kafka in real time.

The main benefits of Kafka are:

  1. High throughput: Each server is capable of handling 100s MB/sec of data.
  2. High availability: Data can be stored redundantly in multiple servers and can survive individual server failure.
  3. High scalability: New servers can be added over time to scale out the system.
  4. Easy integration with external data sources or data sinks.
  5. Built-in real-time processing layer.
Section 3

Quickstart for Apache Kafka

It’s easy to get started on Kafka. The following are the steps to run the quickstart script.

  1. Download the Kafka binary distribution from http://kafka.apache.org/downloads and untar it:
  2.      > tar -xzf kafka_2.11-1.0.0.tgz
         > cd kafka_2.11-1.0.0
  3. Start the Zookeeper server:
  4.      > bin/zookeeper-server-start.sh 
          config/zookeeper.properties
  5. Start the Kafka broker:
  6.      > bin/kafka-server-start.sh 
          config/server.properties
  7. Create a topic:
  8.      > bin/kafka-topics.sh --create 
               --zookeeper localhost:2181
               --topic test --partitions 1
               --replication-factor 1
  9. Produce data:
  10.      > bin/kafka-console-producer.sh 
               --broker-list localhost:9092 
               --topic test
    
           hello
           world
  11. Consume data:
  12.      > bin/kafka-console-consumer.sh 
            --bootstrap-server localhost:9092 
            --topic test --from-beginning
    
           hello
           world

After step 5, one can type in some text in the console. Then, in step 6, the same text will be displayed after running the consumer.

Section 4

Pub/Sub in Apache Kafka

The first component in Kafka deals with the production and the consumption of the data. The following table describes a few key concepts in Kafka:

topic Defines a logical name for producing and consuming records.
partition Defines a non-overlapping subset of records within a topic.
offset A unique sequential number assigned to each record within a topic partition.
record A record contains a key, a value, a timestamp, and a list of headers.
broker Servers where records are stored. Multiple brokers can be used to form a cluster.

Figure 2 depicts a topic with two partitions. Partition 0 has 5 records, with offsets from 0 to 4, and partition 1 has 4 records, with offsets from 0 to 3.

Image title

Figure 2: Partitions in a Topic

The following code snippet shows how to produce records to a topic "test" using the Java API:

Properties props = new Properties();
  props.put("bootstrap.servers",      
   "localhost:9092");
props.put("key.serializer", 
   "org.apache.kafka.common.serialization.StringSerializer");  
props.put("value.serializer",  
   "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new 
   KafkaProducer<>(props);
producer.send(
   new ProducerRecord<String, String>("test", "key", "value"));

In the above example, both the key and value are strings, so we are using a StringSerializer. It’s possible to customize the serializer when types become more complex. For example, the KafkaAvroSerializer from https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html allows the user to produce Avro records.

The following code snippet shows how to consume records with string key and value in Java.

Properties props = new Properties();  props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", 
   "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", 
   "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = 
   new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));

while (true) {                 
  ConsumerRecords<String, String> records =
     consumer.poll(100);
  for (ConsumerRecord<String, String> record : records)              
    System.out.printf("offset=%d, key=%s, value=%s", 
      record.offset(), record.key(), record.value());
  consumer.commitSync();
}


Records within a partition are always delivered to the consumer in offset order. By saving the offset of the last consumed record from each partition, the consumer can resume from where it left off after a restart. In the example above, we use the commitSync() API to save the offsets explicitly after consuming a batch of records. One can also save the offsets automatically by setting the property enable.auto.commit to true.

Unlike other messaging systems, a record in Kafka is not removed from the broker immediately after it is consumed. Instead, it is retained according to a configured retention policy. The following table summarizes the two common policies:

Retention policy Meaning
log.retention.hours The number of hours to keep a record on the broker.
log.retention.bytes The maximum size of records retained in a partition.
Section 5

Kafka Connect

The second component in Kafka is Kafka Connect, which is a framework for making it easy to stream data between Kafka and other systems. As shown in Figure 3, one can deploy a Connect cluster and run various connectors to import data from sources like MySQL, MQ, or Splunk into Kafka and export data in Kafka to sinks such as HDFS, S3, and Elastic Search. A connector can be either of source or sink type:

Image title

Figure 3: Usage of Kafka Connect

The benefits of using Kafka Connect are:

  • Parallelism and fault tolerance
  • Avoiding ad-hoc code by reusing existing connectors
  • Built-in offset and configuration management
Section 6

Quickstart for Kafka Connect

The following steps show how to run the existing file connector in standalone mode to copy the content from a source file to a destination file via Kafka:

  1. Prepare some data in a source file:
  2.      > echo -e "hello\nworld" > test.txt
  3. Start a file source and a file sink connector:
  4.      > bin/connect-standalone.sh
           config/connect-file-source.properties 
        config/connect-file-sink.properties
  5. Verify the data in the destination file:
  6.      > more test.sink.txt 
    
      hello
  7. Verify the data in Kafka:
  8.      > bin/kafka-console-consumer.sh 
            --bootstrap-server localhost:9092 
            --topic connect-test
            --from-beginning
    
           {"schema":{"type":"string",
                      "optional":false},
            "payload":"hello"}
           {"schema":{"type":"string",
                      "optional":false},
            "payload":"world"}

In the above example, the data in the source file test.txt is first streamed into a Kafka topic connect-test through a file source connector. The records in connect-test are then streamed into the destination file test.sink.txt. If a new line is added to test.txt, it will show up immediately test.sink.txt. Note that we achieve the above by running two connectors without writing any custom code.

The following is a partial list of existing connectors. A more complete list can be found at https://www.confluent.io/product/connectors/.

Connector type Connector type
elastic search sink HDFS sink
Amazon S3 sink Cassandra sink
Oracle source MangoDB source
MQTT source JMS sink
Couchbase

sink

source

JDBC

sink

source

IBM MQ

Sink

source

Dynamo

DB

sink

source

Section 7

Transformations in Connect

Connect is primarily designed to stream data between systems as-is, whereas Streams is designed to perform complex transformations once the data is in Kafka. That said, Connect does provide a mechanism to perform some simple transformations per record. The following example shows how to enable a couple of transformations in the file source connector.

  1. Add the following lines to connect-file-source.properties:
  2.  transforms=MakeMap, InsertSource   
     transforms.MakeMap.type=org.apache.kafka 
       .connect.transforms.HoistField$Value
     transforms.MakeMap.field=line
     transforms.InsertSource.type=org.apache
       .kafka.connect.transforms
       .InsertField$Value
     transforms.InsertSource.static.field=
       data_source
     transforms.InsertSource.static.value=
       test-file-source
  3. Start a file source connector:
  4.      > bin/connect-standalone.sh
           config/connect-file-source.properties
  5. Verify the data in Kafka:
  6.      > bin/kafka-console-consumer.sh 
            --bootstrap-server localhost:9092 
       --topic connect-test
    {"line":"hello","data_source":"test
      -file-source"}
    {"line":"world","data_source":"test
      -file-source"}

In step 1 above, we add two transformations MakeMapandInsertSource, which are implemented by class HoistField$Value and InsertField$Value, respectively. The first one adds a field name "line" to each input string. The second one adds an additional field "data_source" that indicates the name of the source file. After applying the transformation logic, the data in the input file is now transformed to the output in step 3. Because the last transformation step is more complex, we implement it with the Streams API (covered in more detail below):

final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

StreamsBuilder builder = new StreamsBuilder();

// build a stream from an input topic
KStream<String, String> source = builder.stream(
     "streams-plaintext-input",
     Consumed.with(stringSerde, stringSerde));

KTable<String, Long> counts = source
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" ")))
    .groupBy((key, value) -> value) 
    .count();

// convert the output to another topic
counts.toStream().to("streams-wordcount-output", 
    Produced.with(stringSerde, longSerde));
Section 8

Connect REST API

In production, Connect typically runs in distributed mode and can be managed through REST APIs. The following table shows the common APIs.

Connect REST API Meaning
GET /connectors Return a list of active connectors
POST /connectors Create a new connector
GET /connectors/{name} Get the information of a specific connector
GET /connectors/{name} /config Get configuration parameters for a specific connector
PUT /connectors/{name} /config Update configuration parameters for a specific connector
GET /connectors/{name} /status Get the current status of the connector
Section 9

Kafka Streams

Kafka Streams is a client library for building real-time applications and microservices, where the input and/or output data is stored in Kafka. The benefits of using Kafka Streams are:

  • Less code in the application
  • Built-in state management
  • Lightweight
  • Parallelism and fault tolerance

The most common way of using Kafka Streams is through the Streams DSL, which includes operations such as filtering, joining, grouping, and aggregation. The following code snippet shows the main logic of a Streams example called WordCountDemo.

The above code first creates a stream from an input topic streams-plaintext-input. It then applies a transformation to split each input line into words. After that, it groups by the words and count the number of occurrences in each word. Finally, the results are written to an output topic streams-wordcount-output.

The following are the steps to run the example code.

  1. Create the input topic:
  2.      > bin/kafka-topics.sh --create
             --zookeeper localhost:2181
             --replication-factor 1
             --partitions 1
             --topic streams-plaintext-input
  3. Run the stream application:
  4.      > bin/kafka-run-class.sh org.apache.
            kafka.streams.examples.wordcount.
            WordCountDemo
  5. Produce some data in the input topic:
  6.      > bin/kafka-console-producer.sh
             --broker-list localhost:9092
             --topic streams-plaintext-input 
    
      hello world
  7. Verify the data in the output topic:
  8.      > bin/kafka-console-consumer.sh 
            --bootstrap-server localhost:9092
            --topic streams-wordcount-output 
            --from-beginning
            --formatter kafka.tools.
                DefaultMessageFormatter
            --property print.key=true
            --property print.value=true
            --property key.deserializer=
             org.apache.kafka.common.   
             serialization.StringDeserializer
            --property value.deserializer=
             org.apache.kafka.common.   
             serialization.LongDeserializer
    
           hello 1
           world 1
Section 10

KStream vs. KTable

There are two key concepts in Kafka Streams, KStream and KTable. A topic can be viewed as either of the two. Their differences are summarized in the table below.

KStream KTable
Concept Each record is treated as an append to the stream. Each record is treated as an update to an existing key.
Usage Model append-only data such as click streams. Model updatable reference data such as user profiles.

The following example illustrates the difference of the two:

(key, value) records Sum of values as KStream Sum of values as KTable
("k1", 2) ("k1", 5) 7 5

When a topic is viewed as a KStream, there are two independent records and thus the sum of the values is 7. On the other hand, if the topic is viewed as a KTable, the second record is treated as an update to the first record since they have the same key "k1". Therefore, only the second record is retained in the stream and the sum is 5 instead.

Section 11

KStreams DSL

The following tables show a list of common operations available in Kafka Streams:

Commonly used operations in KStream

Operation Example
filter(Predicate)

Create a new KStream that consists of all records of this stream that satisfy the given predicate.

ks_out = ks_in.filter(
 (key, value) -> value > 5
);
 ks_in:        ks_out:
("k1", 2)      ("k2", 7)
("k2", 7)
map(KeyValueMapper)

Transform each record of the input stream into a new record in the output stream (both key and value type can be altered arbitrarily).

ks_out = ks_in..map(
 (key, value) ->
  new KeyValue<>(key, key)
)

 ks_in:        ks_out:
("k1", 2)      ("k1", "k1")
("k2", 7)      ("k2", "k2")
groupBy()

Group the records by their current key into a KGroupedStream while preserving the original values.

ks_out = ks.groupBy()

ks_in:        ks_out:
("k1", 1)     ("k1", (("k1", 1),
("k2", 2)             ("k1", 3)))
("k1", 3)     ("k2", (("k2", 2)))
join(KTable, ValueJoiner)

Join records of the input stream with records from the KTable if the keys from the records match. Return a stream of the key and the combined value using ValueJoiner.

ks_out = ks_in.join(
 kt,
  (value1, value2) ->
    value1 + value2
);

ks_in:        kt:
("k1", 1)     ("k1", 11)
("k2", 2)     ("k2", 12)
("k3", 3)     ("k4", 13)

ks_out:
("k1", 12)
("k2", 14)
join(KStream, ValueJoiner, JoinWindows)

Join records of the two streams if the keys match and the timestamp from the records satisfy the time constraint specified by JoinWindows. Return a stream of the key and the combined value using ValueJoiner.

ks_out = ks1.join(
 ks2,
  (value1, value2) ->
    value1 + value2, JoinWindows.of(100)
);

ks1:               ks2:
("k1", 1, 100t)    ("k1", 11, 150t)
("k2", 2, 200t)    ("k2", 12, 350t)
("k3", 3, 300t)    ("k4", 13, 380t)
* t indicates a timestamp.

ks_out:
("k1", 12)

Commonly Used Operations in KGroupedStream

Operation Example
count()

Count the number of records in this stream by the grouped key and return it as a KTable.

 kt = kgs.count();
 kgs:
 (“k1”, ((“k1”, 1),
         (“k1”, 3)))
 (“k2”, ((“k2”, 2)))

 kt:
 (“k1”, 2)
 (“k2”, 1)
 

reduce(Reducer)

Combine the values of records in this stream by the grouped key and return it as a KTable.

kt = kgs.reduce(
  (aggValue, newValue) -> 
    aggValue + newValue
);

 kgs:
("k1", (("k1", 1),
        ("k1", 3)))
("k2", (("k2", 2)))


 kt: 
("k1", 4) 
 ("k2", 2)
windowedBy(Windows)

Further group the records by the timestamp and return it as a TimeWindowedKStream.

twks = kgs.windowedBy(
  TimeWindows.of(100)
 );

 kgs:
 ("k1", (("k1", 1, 100t),
         ("k1", 3, 150t)))
 ("k2", (("k2", 2, 100t),
         ("k2", 4, 250t)))
 * t indicates a timestamp.

 twks:
 ("k1", 100t – 200t,
        (("k1", 1, 100t),
         ("k1", 3, 150t)))
("k2", 100t – 200t,
              (("k2", 2, 100t)))
("k2", 200t – 300t,
              (("k2", 4, 250t)))

A similar set of operations is available on KTable and KGroupedTable. See details at http://kafka.apache.org/documentation/streams/.

Section 12

Querying the States in KStreams

While processing the data in real time, a KStreams application locally maintains the states such as the word counts in the previous example. Those states can be queried interactively through an API described in docs.confluent.io/current/streams/developer-guide/index.html#interactive-queries. This avoids the need of an external data store for exporting and serving those states.

Section 13

Exactly-Once Processing in KStreams

Failures in the brokers or the clients may introduce duplicates during the processing of records. KStreams provides the capability of processing records exactly once even under failures. This can be achieved by simply setting the property processing.guarantee to exactly_once in KStreams. More details on exactly-once processing can be found in https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/.

Section 14

KSQL

KSQL is an open source streaming SQL engine that implements continuous, interactive queries against Apache Kafka. It’s built using the Kafka Streams API and further simplifies the job of a developer. Currently, KSQL is not part of the Apache Kafka project, but is available under the Apache 2.0 license.

To see how KSQL works, let’s first download it and prepare some data sets.

  1. Clone the KSQL repository and compile the code:
  2.      > git clone git@github.com:
             confluentinc/ksql.git
         > cd ksql
         > mvn clean compile install 
             -DskipTests
  3. Produce some data in two topics:
  4.      > java -jar ksql-examples/target/
             ksql-examples-0.1-SNAPSHOT-  
               standalone.jar
             quickstart=pageviews 
             format=delimited 
             topic=pageviews 
             maxInterval=10000
    
         > java -jar ksql-examples/target/
             ksql-examples-0.1-SNAPSHOT-
               standalone.jar
             quickstart=users
             format=json   
             topic=users 
             maxInterval=10000

    Next, let’s define the schema of the input topics. Similar to Kafka Streams, one can define a schema as either a stream or a table.

  5. Start KSQL CLI:
  6.      > ./bin/ksql-cli local
  7. Create a KStream from a topic:
  8.      ksql> CREATE STREAM pageviews_stream
               (viewtime bigint,
                userid varchar,
                pageid varchar)
               WITH (kafka_topic='pageviews', 
                 value_format='DELIMITED');
    
         ksql> DESCRIBE pageviews_stream;
    
               Field    | Type            
               ----------------------------
               ROWTIME  | BIGINT          
               ROWKEY   | VARCHAR(STRING) 
               VIEWTIME | BIGINT          
               USERID   | VARCHAR(STRING) 
               PAGEID   | VARCHAR(STRING)
  9. Create a KTable from a topic:
  10.      ksql> CREATE TABLE users_table
               (registertime bigint,
                gender varchar,
                regionid varchar,
                userid varchar)
               WITH (kafka_topic='users', 
                 value_format='JSON');
    
         ksql> DESCRIBE users_ktable;
    
               Field        | Type            
               -----------------------------
               ROWTIME      | BIGINT          
               ROWKEY       | VARCHAR(STRING) 
               REGISTERTIME | BIGINT          
               GENDER       | VARCHAR(STRING) 
               REGIONID     | VARCHAR(STRING) 
               USERID       | VARCHAR(STRING)

    Note that in the above, each schema always contains two built-in fields, ROWTIME and ROWKEY. They correspond to the timestamp and the key of the record, respectively. Finally, let’s run some KSQL queries using the data and the schema that we prepared.

  11. Select a field from a stream:
  12.      ksql> SELECT pageid
               FROM pageviews_stream
               LIMIT 3;
    
         Page_24
         Page_73
         Page_78
  13. Join a stream and a table:
  14.      ksql> CREATE STREAM pageviews_female
               AS SELECT 
               users_table.userid AS userid, 
               pageid, regionid, gender
               FROM pageviews_stream
               LEFT JOIN users_table ON 
                 pageviews_stream.userid =  
                 users_table.userid
               WHERE gender = 'FEMALE';
    
         ksql> SELECT userid, pageid, 
                 regionid, gender 
               FROM pageviews_female;
    
         User_2 | Page_55 | Region_9 | FEMALE
         User_5 | Page_14 | Region_2 | FEMALE
         User_3 | Page_60 | Region_3 | FEMALE

Note that in step 2 above, the query pageviews_female runs continuously in the background once it’s issued. The results from this query are written to a Kafka topic named pageviews_female.

More examples of KSQL can be found at https://github.com/confluentinc/ksql.

Section 15

Additional resources

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}