Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Transforming and Aggregating Kafka Messages With Kafka Streams

DZone's Guide to

Transforming and Aggregating Kafka Messages With Kafka Streams

Kafka Streams let us to have one less component in our streaming ETL pipeline. We can transform a single message and perform aggregation calculations across messages.

· Integration Zone
Free Resource

Share, secure, distribute, control, and monetize your APIs with the platform built with performance, time-to-value, and growth in mind. Free 90-day trial of 3Scale by Red Hat

Kafka Streams are a very exciting new feature in the Kafka 0.10 release. It is a stream processing framework that comes bundled with Apache Kafka. In that sense, it can be viewed as an alternative to Spark, Storm, Flink, or Samza for some use cases.

Kafka Streams allow us to have one less component in our streaming ETL pipeline. That is, just using the Kafka server, we can perform the transformation of a single message and perform aggregation calculation across messages. We don't need to set up a separate Spark or Storm cluster.

Additionally, since it uses the Kafka framework, we can use the familiar tools that Kafka comes bundled with. Some of these are the console commands that are packaged in the bin directory like kafka-topics, kafka-consumer-groups, or kafka-run-class (with, for example, GetOffsetShell).

I have included sample Scala code snippets of a simple transformation (i.e., no aggregation across messages) Kafka Stream. This retrieves a message from a Kafka topic with a single partition and then distributes that to a second Kafka topic with multiple partitions.

package com.mycompany.kafkastreams

import java.util.Properties

import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder}


object KStreamPartitioningStream {

  def main(args: Array[String]): Unit = {

    if (args.length != 3) {
      System.err.println("Usage: "
        + this.getClass.getName
        + "sourceTopic destintationTopic applicationIdConfig")
      System.exit(1)
    }

    val Array(sourceTopic, destintationTopic, applicationIdConfig) = args

    val builder: KStreamBuilder = new KStreamBuilder

    val streamingConfig = {
      val settings = new Properties()
      settings.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationIdConfig)
      settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
      settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181")
      settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass.getName)
      settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
      settings
    }

    val stringSerde: Serde[String] = Serdes.String()

    val textLines: KStream[Array[Byte], String] = builder.stream(sourceTopic)

    val customPartitioner = new CustomStreamPartitioner()

    textLines.to(customPartitioner, destintationTopic)

    val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
    stream.start()

  }

}
package com.mycompany.kafkastreams

import java.util.Random

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.kafka.streams.processor.StreamPartitioner

class CustomStreamPartitioner extends StreamPartitioner[Array[Byte], String] {

  override def partition(key: Array[Byte], value: String, numPartitions: Int): Integer = {
    if (value == null) {
      getDefaultPartitionId(numPartitions)
    } else {

      val objectMapper: ObjectMapper = new ObjectMapper()
      objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
      objectMapper.registerModule(DefaultScalaModule)

      val jsonObject:customJsonRecord = objectMapper
        .readValue(value, classOf[CustomJsonRecord])

      val trackingId = jsonObject.tracking_id

      if (trackingId == null) {
        return getDefaultPartitionId(numPartitions)
      } else {

      val partitionId = trackingId.asInstanceOf[String].charAt(0).getNumericValue

      if (partitionId > numPartitions) {
        throw new IllegalStateException("partitionId is > number of partitions for this topic")
      }
      partitionId

      }
    }
  }

  def getDefaultPartitionId(numPartitions: Int): Integer = {
    val partitionId = new Random().nextInt() % numPartitions
    partitionId
  }

}

Explore the core elements of owning an API strategy and best practices for effective API programs. Download the API Owner's Manual, brought to you by 3Scale by Red Hat

Topics:
kafka ,scala ,streaming ,stream processing ,integration

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}