Self-Learning Kafka Streams With Scala (Part 1)

DZone 's Guide to

Self-Learning Kafka Streams With Scala (Part 1)

This is the first step for me in learning Kafka Streams with Scala. Come follow along with me!

· Big Data Zone ·
Free Resource

A few days ago, I came across a situation where I wanted to do a stateful operation on the streaming data. So, I started finding possible solutions for it. I came across many solutions that used many different technologies like Spark structured streaming, Apache Flink, Kafka Streams, etc.

All the solutions solved my problem, but I selected Kafka Streams because it met the majority of my requirements. After that, I started reading its documentation and trying to run its examples. But as soon as I started learning it, I hit a major roadblock in that Kafka Streams does not provide a Scala API! I was shocked to know that.

The reason I was expecting Kafka Streams to have a Scala API is that I was using Scala to build my application, and if Kafka Streams provided an API for it, then it would have been easy for me to include it in my application. But that didn’t turn out to be the case. And on top of that, when I searched for its Scala examples, I was only able to find a handful of them.

So I decided to learn it on my own. My first step was to build a “Hello World!” program using Kafka Streams and Scala, like this:

package com.knoldus.kafka.examples

import java.util.Properties

import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.KStreamBuilder

  * Copyright Knoldus Software LLP, 2017. All rights reserved.
object StreamApplication {
  def main(args: Array[String]): Unit = {
    val config = {
      val properties = new Properties()
      properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application")
      properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
      properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
      properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)

    val builder = new KStreamBuilder()
    val sourceStream = builder.stream("SourceTopic")

    val streams = new KafkaStreams(builder, config)

Before running this example, we need to start the Kafka server. To do that, you can read their quick start guide. After that, send some messages to Kafka topic SourceTopic and start a Kafka Consumer for Kafka topic SinkTopic.

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic SourceTopic
hello world!

Now, run the example and you will see that Kafka consumer topic SinkTopic will receive the message.

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic SinkTopic
hello world!

This means that now we are able to send messages from one Kafka topic to another via Kafka Streams.

This was my first step in learning Kafka Streams with Scala. I know it's not much, but I still need to explore more in Kafka Streams like transformations, joins, aggregations, etc., about which I will be writing in my future posts. Stay tuned!

The complete code can be downloaded from GitHub.

big data, kafka streams, scala, tutorial

Published at DZone with permission of Himanshu Gupta , DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}