Transforming and Aggregating Kafka Messages With Kafka Streams
Kafka Streams let us to have one less component in our streaming ETL pipeline. We can transform a single message and perform aggregation calculations across messages.
Join the DZone community and get the full member experience.
Join For FreeKafka Streams are a very exciting new feature in the Kafka 0.10 release. It is a stream processing framework that comes bundled with Apache Kafka. In that sense, it can be viewed as an alternative to Spark, Storm, Flink, or Samza for some use cases.
Kafka Streams allow us to have one less component in our streaming ETL pipeline. That is, just using the Kafka server, we can perform the transformation of a single message and perform aggregation calculation across messages. We don't need to set up a separate Spark or Storm cluster.
Additionally, since it uses the Kafka framework, we can use the familiar tools that Kafka comes bundled with. Some of these are the console commands that are packaged in the bin directory like kafka-topics, kafka-consumer-groups, or kafka-run-class (with, for example, GetOffsetShell).
I have included sample Scala code snippets of a simple transformation (i.e., no aggregation across messages) Kafka Stream. This retrieves a message from a Kafka topic with a single partition and then distributes that to a second Kafka topic with multiple partitions.
package com.mycompany.kafkastreams
import java.util.Properties
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder}
object KStreamPartitioningStream {
def main(args: Array[String]): Unit = {
if (args.length != 3) {
System.err.println("Usage: "
+ this.getClass.getName
+ "sourceTopic destintationTopic applicationIdConfig")
System.exit(1)
}
val Array(sourceTopic, destintationTopic, applicationIdConfig) = args
val builder: KStreamBuilder = new KStreamBuilder
val streamingConfig = {
val settings = new Properties()
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationIdConfig)
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181")
settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass.getName)
settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
settings
}
val stringSerde: Serde[String] = Serdes.String()
val textLines: KStream[Array[Byte], String] = builder.stream(sourceTopic)
val customPartitioner = new CustomStreamPartitioner()
textLines.to(customPartitioner, destintationTopic)
val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
stream.start()
}
}
package com.mycompany.kafkastreams
import java.util.Random
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.kafka.streams.processor.StreamPartitioner
class CustomStreamPartitioner extends StreamPartitioner[Array[Byte], String] {
override def partition(key: Array[Byte], value: String, numPartitions: Int): Integer = {
if (value == null) {
getDefaultPartitionId(numPartitions)
} else {
val objectMapper: ObjectMapper = new ObjectMapper()
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
objectMapper.registerModule(DefaultScalaModule)
val jsonObject:customJsonRecord = objectMapper
.readValue(value, classOf[CustomJsonRecord])
val trackingId = jsonObject.tracking_id
if (trackingId == null) {
return getDefaultPartitionId(numPartitions)
} else {
val partitionId = trackingId.asInstanceOf[String].charAt(0).getNumericValue
if (partitionId > numPartitions) {
throw new IllegalStateException("partitionId is > number of partitions for this topic")
}
partitionId
}
}
}
def getDefaultPartitionId(numPartitions: Int): Integer = {
val partitionId = new Random().nextInt() % numPartitions
partitionId
}
}
Opinions expressed by DZone contributors are their own.
Comments