Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Transforming and Aggregating Kafka Messages With Kafka Streams

DZone's Guide to

Transforming and Aggregating Kafka Messages With Kafka Streams

Kafka Streams let us to have one less component in our streaming ETL pipeline. We can transform a single message and perform aggregation calculations across messages.

· Integration Zone ·
Free Resource

The State of API Integration 2018: Get Cloud Elements’ report for the most comprehensive breakdown of the API integration industry’s past, present, and future.

Kafka Streams are a very exciting new feature in the Kafka 0.10 release. It is a stream processing framework that comes bundled with Apache Kafka. In that sense, it can be viewed as an alternative to Spark, Storm, Flink, or Samza for some use cases.

Kafka Streams allow us to have one less component in our streaming ETL pipeline. That is, just using the Kafka server, we can perform the transformation of a single message and perform aggregation calculation across messages. We don't need to set up a separate Spark or Storm cluster.

Additionally, since it uses the Kafka framework, we can use the familiar tools that Kafka comes bundled with. Some of these are the console commands that are packaged in the bin directory like kafka-topics, kafka-consumer-groups, or kafka-run-class (with, for example, GetOffsetShell).

I have included sample Scala code snippets of a simple transformation (i.e., no aggregation across messages) Kafka Stream. This retrieves a message from a Kafka topic with a single partition and then distributes that to a second Kafka topic with multiple partitions.

package com.mycompany.kafkastreams

import java.util.Properties

import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder}


object KStreamPartitioningStream {

  def main(args: Array[String]): Unit = {

    if (args.length != 3) {
      System.err.println("Usage: "
        + this.getClass.getName
        + "sourceTopic destintationTopic applicationIdConfig")
      System.exit(1)
    }

    val Array(sourceTopic, destintationTopic, applicationIdConfig) = args

    val builder: KStreamBuilder = new KStreamBuilder

    val streamingConfig = {
      val settings = new Properties()
      settings.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationIdConfig)
      settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
      settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181")
      settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass.getName)
      settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
      settings
    }

    val stringSerde: Serde[String] = Serdes.String()

    val textLines: KStream[Array[Byte], String] = builder.stream(sourceTopic)

    val customPartitioner = new CustomStreamPartitioner()

    textLines.to(customPartitioner, destintationTopic)

    val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
    stream.start()

  }

}
package com.mycompany.kafkastreams

import java.util.Random

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.kafka.streams.processor.StreamPartitioner

class CustomStreamPartitioner extends StreamPartitioner[Array[Byte], String] {

  override def partition(key: Array[Byte], value: String, numPartitions: Int): Integer = {
    if (value == null) {
      getDefaultPartitionId(numPartitions)
    } else {

      val objectMapper: ObjectMapper = new ObjectMapper()
      objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
      objectMapper.registerModule(DefaultScalaModule)

      val jsonObject:customJsonRecord = objectMapper
        .readValue(value, classOf[CustomJsonRecord])

      val trackingId = jsonObject.tracking_id

      if (trackingId == null) {
        return getDefaultPartitionId(numPartitions)
      } else {

      val partitionId = trackingId.asInstanceOf[String].charAt(0).getNumericValue

      if (partitionId > numPartitions) {
        throw new IllegalStateException("partitionId is > number of partitions for this topic")
      }
      partitionId

      }
    }
  }

  def getDefaultPartitionId(numPartitions: Int): Integer = {
    val partitionId = new Random().nextInt() % numPartitions
    partitionId
  }

}

Your API is not enough. Learn why (and how) leading SaaS providers are turning their products into platforms with API integration in the ebook, Build Platforms, Not Products from Cloud Elements.

Topics:
kafka ,scala ,streaming ,stream processing ,integration

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}