DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
View Events Video Library
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Integrating PostgreSQL Databases with ANF: Join this workshop to learn how to create a PostgreSQL server using Instaclustr’s managed service

Mobile Database Essentials: Assess data needs, storage requirements, and more when leveraging databases for cloud and edge applications.

Monitoring and Observability for LLMs: Datadog and Google Cloud discuss how to achieve optimal AI model performance.

Automated Testing: The latest on architecture, TDD, and the benefits of AI and low-code tools.

Related

  • Building Real-Time Applications to Process Wikimedia Streams Using Kafka and Hazelcast
  • Kafka: Powerhouse Messaging
  • Microsoft Azure Event Hubs
  • Streaming Solution for Better Transparency

Trending

  • A Better Web3 Experience: Account Abstraction From Flow (Part 1)
  • Freedom to Code on Low-Code Platforms
  • Supercharge Your Communication With Twilio and Ballerina
  • Parallel Sort
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Transforming and Aggregating Kafka Messages With Kafka Streams

Transforming and Aggregating Kafka Messages With Kafka Streams

Kafka Streams let us to have one less component in our streaming ETL pipeline. We can transform a single message and perform aggregation calculations across messages.

Basanth Roy user avatar by
Basanth Roy
·
Jan. 28, 17 · Opinion
Like (1)
Save
Tweet
Share
21.32K Views

Join the DZone community and get the full member experience.

Join For Free

Kafka Streams are a very exciting new feature in the Kafka 0.10 release. It is a stream processing framework that comes bundled with Apache Kafka. In that sense, it can be viewed as an alternative to Spark, Storm, Flink, or Samza for some use cases.

Kafka Streams allow us to have one less component in our streaming ETL pipeline. That is, just using the Kafka server, we can perform the transformation of a single message and perform aggregation calculation across messages. We don't need to set up a separate Spark or Storm cluster.

Additionally, since it uses the Kafka framework, we can use the familiar tools that Kafka comes bundled with. Some of these are the console commands that are packaged in the bin directory like kafka-topics, kafka-consumer-groups, or kafka-run-class (with, for example, GetOffsetShell).

I have included sample Scala code snippets of a simple transformation (i.e., no aggregation across messages) Kafka Stream. This retrieves a message from a Kafka topic with a single partition and then distributes that to a second Kafka topic with multiple partitions.

package com.mycompany.kafkastreams

import java.util.Properties

import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder}


object KStreamPartitioningStream {

  def main(args: Array[String]): Unit = {

    if (args.length != 3) {
      System.err.println("Usage: "
        + this.getClass.getName
        + "sourceTopic destintationTopic applicationIdConfig")
      System.exit(1)
    }

    val Array(sourceTopic, destintationTopic, applicationIdConfig) = args

    val builder: KStreamBuilder = new KStreamBuilder

    val streamingConfig = {
      val settings = new Properties()
      settings.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationIdConfig)
      settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
      settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181")
      settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass.getName)
      settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
      settings
    }

    val stringSerde: Serde[String] = Serdes.String()

    val textLines: KStream[Array[Byte], String] = builder.stream(sourceTopic)

    val customPartitioner = new CustomStreamPartitioner()

    textLines.to(customPartitioner, destintationTopic)

    val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
    stream.start()

  }

}
package com.mycompany.kafkastreams

import java.util.Random

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.kafka.streams.processor.StreamPartitioner

class CustomStreamPartitioner extends StreamPartitioner[Array[Byte], String] {

  override def partition(key: Array[Byte], value: String, numPartitions: Int): Integer = {
    if (value == null) {
      getDefaultPartitionId(numPartitions)
    } else {

      val objectMapper: ObjectMapper = new ObjectMapper()
      objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
      objectMapper.registerModule(DefaultScalaModule)

      val jsonObject:customJsonRecord = objectMapper
        .readValue(value, classOf[CustomJsonRecord])

      val trackingId = jsonObject.tracking_id

      if (trackingId == null) {
        return getDefaultPartitionId(numPartitions)
      } else {

      val partitionId = trackingId.asInstanceOf[String].charAt(0).getNumericValue

      if (partitionId > numPartitions) {
        throw new IllegalStateException("partitionId is > number of partitions for this topic")
      }
      partitionId

      }
    }
  }

  def getDefaultPartitionId(numPartitions: Int): Integer = {
    val partitionId = new Random().nextInt() % numPartitions
    partitionId
  }

}
kafka Stream processing

Opinions expressed by DZone contributors are their own.

Related

  • Building Real-Time Applications to Process Wikimedia Streams Using Kafka and Hazelcast
  • Kafka: Powerhouse Messaging
  • Microsoft Azure Event Hubs
  • Streaming Solution for Better Transparency

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: