Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Self-Learning Kafka Streams With Scala (Part 2)

DZone's Guide to

Self-Learning Kafka Streams With Scala (Part 2)

Learn how transformations are written in Kafka Streams (KStream) with Scala and learn how to create a new Stream from this.

· Big Data Zone
Free Resource

Learn best practices according to DataOps. Download the free O'Reilly eBook on building a modern Big Data platform.

In our previous blog — Self-Learning Kafka Streams With Scala (Part 1) — we saw how to create a simple KStream in Scala. In this blog, we will see how to transform a KStream and create a new Stream from it.

But before we get into the details of the KStream transformations, let’s take a look at the code:

package com.knoldus.kafka.examples

import java.util.Properties

import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.{KStreamBuilder, KeyValueMapper}

/**
  * Copyright Knoldus Software LLP, 2017. All rights reserved.
  */
object MapExample {
  def main(args: Array[String]): Unit = {
    val config = {
      val properties = new Properties()
      properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application")
      properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
      properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
      properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
      properties
    }

    val stringSerde = Serdes.String()
    val integerSerde = Serdes.Integer()

    val builder = new KStreamBuilder()
    val originalStream = builder.stream("SourceTopic")

    val mappedStream =
      originalStream.map[String, Integer] {
        new KeyValueMapper[String, String, KeyValue[String, Integer]] {
          override def apply(key: String, value: String): KeyValue[String, Integer] = {
            new KeyValue(key, new Integer(value.length))
          }
        }
      }
    mappedStream.to(stringSerde, integerSerde, "SinkTopic")

    val streams = new KafkaStreams(builder, config)
    streams.start()
  }
}

Now, there are two major points to be noted down here:

  1. Why are we using anonymous functions of Java in the Scala map function? The answer lies in Part 1, where we mentioned that Kafka streams do not provide a Scala API, which leaves us with no choice but to use Java 8 anonymous functions.
  2. Here, we are providing serializer/deserializer (SerDe) for SinkTopic explicitly. Why do we need that? In our previous blog’s example, we didn’t do that. The reason is that we have given the String SerDe in KafkaStreams properties. This leaves Kafka Streams with only one type of SerDe to work with (String), whereas we need an integer SerDe for SinkTopic.

At last, let’s start the Kafka server, run the example, and send some messages:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic SourceTopic
hello world!

The result that we will receive is as follows:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer --topic SinkTopic
12

For the consumer, we have to specify the value.deserializer property. Otherwise, we will receive the result in binary format.

So, this is how transformations are written in Kafka Streams with Scala. I hope you liked this explanation and want to learn more about other operations in Kafka Streams like joins, aggregations, etc.

The complete code can be downloaded from GitHub.

Please feel free to make suggestions or leave a comment!

Find the perfect platform for a scalable self-service model to manage Big Data workloads in the Cloud. Download the free O'Reilly eBook to learn more.

Topics:
big data ,streaming ,kafka ,scala ,tutorial

Published at DZone with permission of Himanshu Gupta, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}