Over a million developers have joined DZone.

Self-Learning Kafka Streams With Scala (Part 2)

DZone's Guide to

Self-Learning Kafka Streams With Scala (Part 2)

Learn how transformations are written in Kafka Streams (KStream) with Scala and learn how to create a new Stream from this.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

In our previous blog — Self-Learning Kafka Streams With Scala (Part 1) — we saw how to create a simple KStream in Scala. In this blog, we will see how to transform a KStream and create a new Stream from it.

But before we get into the details of the KStream transformations, let’s take a look at the code:

package com.knoldus.kafka.examples

import java.util.Properties

import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.{KStreamBuilder, KeyValueMapper}

  * Copyright Knoldus Software LLP, 2017. All rights reserved.
object MapExample {
  def main(args: Array[String]): Unit = {
    val config = {
      val properties = new Properties()
      properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application")
      properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
      properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
      properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)

    val stringSerde = Serdes.String()
    val integerSerde = Serdes.Integer()

    val builder = new KStreamBuilder()
    val originalStream = builder.stream("SourceTopic")

    val mappedStream =
      originalStream.map[String, Integer] {
        new KeyValueMapper[String, String, KeyValue[String, Integer]] {
          override def apply(key: String, value: String): KeyValue[String, Integer] = {
            new KeyValue(key, new Integer(value.length))
    mappedStream.to(stringSerde, integerSerde, "SinkTopic")

    val streams = new KafkaStreams(builder, config)

Now, there are two major points to be noted down here:

  1. Why are we using anonymous functions of Java in the Scala map function? The answer lies in Part 1, where we mentioned that Kafka streams do not provide a Scala API, which leaves us with no choice but to use Java 8 anonymous functions.
  2. Here, we are providing serializer/deserializer (SerDe) for SinkTopic explicitly. Why do we need that? In our previous blog’s example, we didn’t do that. The reason is that we have given the String SerDe in KafkaStreams properties. This leaves Kafka Streams with only one type of SerDe to work with (String), whereas we need an integer SerDe for SinkTopic.

At last, let’s start the Kafka server, run the example, and send some messages:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic SourceTopic
hello world!

The result that we will receive is as follows:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer --topic SinkTopic

For the consumer, we have to specify the value.deserializer property. Otherwise, we will receive the result in binary format.

So, this is how transformations are written in Kafka Streams with Scala. I hope you liked this explanation and want to learn more about other operations in Kafka Streams like joins, aggregations, etc.

The complete code can be downloaded from GitHub.

Please feel free to make suggestions or leave a comment!

Hortonworks Community Connection (HCC) is an online collaboration destination for developers, DevOps, customers and partners to get answers to questions, collaborate on technical articles and share code examples from GitHub.  Join the discussion.

big data ,streaming ,kafka ,scala ,tutorial

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}