Over a million developers have joined DZone.

Transforming and Aggregating Kafka Messages With Kafka Streams

Kafka Streams let us to have one less component in our streaming ETL pipeline. We can transform a single message and perform aggregation calculations across messages.

· Integration Zone

Build APIs from SQL and NoSQL or Salesforce data sources in seconds. Read the Creating REST APIs white paper, brought to you in partnership with CA Technologies.

Kafka Streams are a very exciting new feature in the Kafka 0.10 release. It is a stream processing framework that comes bundled with Apache Kafka. In that sense, it can be viewed as an alternative to Spark, Storm, Flink, or Samza for some use cases.

Kafka Streams allow us to have one less component in our streaming ETL pipeline. That is, just using the Kafka server, we can perform the transformation of a single message and perform aggregation calculation across messages. We don't need to set up a separate Spark or Storm cluster.

Additionally, since it uses the Kafka framework, we can use the familiar tools that Kafka comes bundled with. Some of these are the console commands that are packaged in the bin directory like kafka-topics, kafka-consumer-groups, or kafka-run-class (with, for example, GetOffsetShell).

I have included sample Scala code snippets of a simple transformation (i.e., no aggregation across messages) Kafka Stream. This retrieves a message from a Kafka topic with a single partition and then distributes that to a second Kafka topic with multiple partitions.

package com.mycompany.kafkastreams

import java.util.Properties

import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder}


object KStreamPartitioningStream {

  def main(args: Array[String]): Unit = {

    if (args.length != 3) {
      System.err.println("Usage: "
        + this.getClass.getName
        + "sourceTopic destintationTopic applicationIdConfig")
      System.exit(1)
    }

    val Array(sourceTopic, destintationTopic, applicationIdConfig) = args

    val builder: KStreamBuilder = new KStreamBuilder

    val streamingConfig = {
      val settings = new Properties()
      settings.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationIdConfig)
      settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
      settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181")
      settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass.getName)
      settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
      settings
    }

    val stringSerde: Serde[String] = Serdes.String()

    val textLines: KStream[Array[Byte], String] = builder.stream(sourceTopic)

    val customPartitioner = new CustomStreamPartitioner()

    textLines.to(customPartitioner, destintationTopic)

    val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
    stream.start()

  }

}
package com.mycompany.kafkastreams

import java.util.Random

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.kafka.streams.processor.StreamPartitioner

class CustomStreamPartitioner extends StreamPartitioner[Array[Byte], String] {

  override def partition(key: Array[Byte], value: String, numPartitions: Int): Integer = {
    if (value == null) {
      getDefaultPartitionId(numPartitions)
    } else {

      val objectMapper: ObjectMapper = new ObjectMapper()
      objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
      objectMapper.registerModule(DefaultScalaModule)

      val jsonObject:customJsonRecord = objectMapper
        .readValue(value, classOf[CustomJsonRecord])

      val trackingId = jsonObject.tracking_id

      if (trackingId == null) {
        return getDefaultPartitionId(numPartitions)
      } else {

      val partitionId = trackingId.asInstanceOf[String].charAt(0).getNumericValue

      if (partitionId > numPartitions) {
        throw new IllegalStateException("partitionId is > number of partitions for this topic")
      }
      partitionId

      }
    }
  }

  def getDefaultPartitionId(numPartitions: Int): Integer = {
    val partitionId = new Random().nextInt() % numPartitions
    partitionId
  }

}

The Integration Zone is brought to you in partnership with CA Technologies.  Use CA Live API Creator to quickly create complete application backends, with secure APIs and robust application logic, in an easy to use interface.

Topics:
kafka ,scala ,streaming ,stream processing ,integration

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}