DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Harnessing Real-Time Insights With Streaming SQL on Kafka
  • How to Design Event Streams, Part 2
  • How to Design Event Streams, Part 1
  • High-Speed Real-Time Streaming Data Processing

Trending

  • Segmentation Violation and How Rust Helps Overcome It
  • Chaos Engineering for Microservices
  • The Modern Data Stack Is Overrated — Here’s What Works
  • Comparing SaaS vs. PaaS for Kafka and Flink Data Streaming
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Problems With Kafka Streams: The Saga Continues

Problems With Kafka Streams: The Saga Continues

Problems with the old version of Kafka haven't been solved completely. Some shortcomings have been fixed while some problems have been introduced. The cycle continues.

By 
Aleksandar Pejakovic user avatar
Aleksandar Pejakovic
·
Updated Aug. 10, 22 · Opinion
Likes (11)
Comment
Save
Tweet
Share
17.3K Views

Join the DZone community and get the full member experience.

Join For Free

This post continues where the previous article about Kafka Streams left off, so be sure to check it before proceeding with the current one.

After many long years, the first stable Kafka version has been released: 1.0.0. In the previous version of Kafka Streams (0.11.0.0), we saw some suboptimal behavior when using the Processor API. With the new version came updates that are supposed to alleviate some pressure from the developer when using Kafka Streams.

Kafka Streams Overview

Before diving directly into the problem, let's see how Kafka Streams are implemented. Warning: There are traces of some actual code!

Firstly, we need a main class that will contain topology, so let's implement a dummy one:

object DummyMainClass {

  def main(args: Array[String]): Unit = {
    val properties = CreateKafkaStreamProperties()
    val topology: TopologyBuilder = new TopologyBuilder

    topology.addSource("Input", properties.inputTopic)

    topology.addProcessor("Input_processor", new DummyProcessor(properties.punctuateTime), "Input")

    topology.addSink("Output", properties.outputTopic, "Input_processor")

    val streams: KafkaStreams = new KafkaStreams(topology, properties)
    streams.start()
  }

  def CreateKafkaStreamProperties(): Properties = {
    //some code
  }
}


In several lines of code, we have created a topology for the Kafka Streams application that reads messages from the input topic, processes them with DummyProcessor, and outputs them to the output topic. For more details, consult the documentation. 

Next in line is DummyProcessor:

class DummyProcessor(punctuateTime: Long) extends ProcessorSupplier[String, String] with Logging {

  override def get(): Processor[String, String] = new Processor[String, String] {
    var context: ProcessorContext = _
    var arrived: Int = 0

    override def init(context: ProcessorContext): Unit = {
      this.context = context
      this.context.schedule(punctuateTime)
    }

    override def process(key: String, value: String): Unit = {
      this.arrived += 1
      this.logger.debug(s"Arrived: $value")
    }

    override def punctuate(timestamp: Long): Unit = {
      this.logger.debug("Punctuate call")
      this.context.forward(null, s"Processed: ${this.arrived}")
      this.arrived = 0

      this.context.commit()
    }

    override def close(): Unit = {
      // does nothing
    }
  }
}


This DummyProcessor is only used for demonstration purposes. Its job is to output to the topic the number of arrived messages per punctuate period; for debugging, we have added console logging. Now, we have the entire pipeline needed for testing how Kafka Streams operate.

The input topic has three partitions and a replication factor of 1. The output topic has only one partition and a replication factor of 1.

Testing Kafka Streams

We are going to repeat the testing mentioned in the previous article. Input messages for all tests are going to be the same: message_<offset>. The punctuate period is set to occur every ten seconds. There are two scenarios that we need to check for both Kafka versions:

  • For the first test, we have a constant rate of messages on the input topic: one message per second.

  • For the second test, we have nine messages (one message per second), followed by a pause that lasts 21 seconds, followed by one more message.

Older Kafka Streams Version

In the first test scenario, we see ten messages on the console in the format "Arrived: message_<offset>" accompanied by one "Punctuate call", followed by one message in the output topic. Messages in the output topic are "Processed: 10". This behavior repeats as long as we have messages on the input stream.

In the second test scenario, we see nine messages on the console in the format "Arrived: message_<offset>". After the 21-second long pause has passed on the console, three messages appear that read "Punctuate call". On the output topic, we can see three messages: "Processed: 10", "Processed: 0", and "Processed 0". The late message ended up in the same time window as the first nine messages and we have two unwanted punctuate calls.

New Kafka Streams Version

After switching to a new version, we have some modifications in the code:

  • TopologyBuilder in the main class has to be changed to Topology class.

  • Schedule method in DummyProcessor class is now deprecated.

One would expect that by changing the version, the previous behavior would remain the same. Well, it hasn't. What has changed?

After each process method, a punctuate method is called. After punctuateInterval is scheduled, punctuate also occurs. This means the following:  

  • In the first test scenario, each "Arrived: message_<offset>" message in the console is accompanied by Punctuate call. Unsurprisingly, we have one "Processed: 1" message in the output topic. After ten messages, we have another "Punctuate call" and "Processed: 0" pair. 

  • In the second scenario, we have nine "Arrived: message_<offset>" and "Punctuate call" pairs on the console, followed by 9 "Processed: 1" in the output topic. After the pause and tenth message, we have "Arrived: message_<offset>" and 3 "Punctuate call". In the output topic, we see "Processed: 1", "Processed: 0", and "Processed 0".

Now, if we want the same behavior as with the previous version, we need to manually check if the punctuation period has passed and then output the message to the output topic and reset the counter. From one problem (multiple punctuate calls for the late messages), we now have two problems (multiple punctuate calls for the late messages and punctuate calls after each processed message).

Putting that aside, we have noticed that the schedule method is deprecated. Along with the new version came a new API for scheduling. The old format has been preserved for backward compatibility — but it works differently.

The new schedule method has the following format:

context.schedule(punctuateInterval, PuntuationType, Punctuator)


The first parameter remains the same: the interval on which scheduled punctuation should occur. The second parameter is something new: PunctuationType. It's enum, and it has two values: STREAM_TIME and WALL_CLOCK_TIME (more on them later). The third and last parameter is Punctuator — interface with only one method punctuate (Long). With these changes, our DummyProcessor has a new look:

class DummyProcessor(punctuateTime: Long) extends ProcessorSupplier[String, String] with Logging {

  override def get(): Processor[String, String] = new Processor[String, String] {
    var context: ProcessorContext = _
    var arrived: Int = 0

    override def init(context: ProcessorContext): Unit = {
      this.context = context
      this.context.schedule(punctuateTime, PunctuationType.STREAM_TIME, new Punctuator {
        override def punctuate(timestamp: Long): Unit = {
          this.logger.debug("Inner punctuate call")
          this.context.forward(null, s"Inner processed: ${this.arrived}")
          this.arrived = 0

          this.context.commit()
        }
      })
    }

    override def process(key: String, value: String): Unit = {
      this.arrived += 1
      this.logger.debug(s"Arrived: $value")
    }

    override def punctuate(timestamp: Long): Unit = {
      this.logger.debug("Punctuate call")
      this.context.forward(null, s"Processed: ${this.arrived}")
      this.arrived = 0

      this.context.commit()
    }

    override def close(): Unit = {
      // does nothing
    }
  }
}


This is not a typo — we have two punctuate methods. The only difference is that the inner punctuation method has the Inner prefix in its messages. Running both test scenarios yields almost identical results, with the only difference being outputted messages. On the console, "Inner punctuate call" will replace "Punctuate call" messages, and in the output topic, "Inner processed: *" will replace "Processed: *" messages. We have identical behavior when using the schedule (punctuateInterval) method, but the outer punctuate method is never called, even though we are obligated to implement it.

Using WALL_CLOCK_TIME and running test scenarios, we get the following results:

  • In the first scenario, after starting the application, three punctuate methods occur (one for each partition in the input topic). After each processing message, one inner punctuate method is called. After the tenth message, we see three punctuate calls.

  • In the second scenario, the init method is followed by three punctuate methods. The first nine messages are followed by inner punctuate call. Then, after one second, we see three scheduled inner punctuate calls. As no messages arrive for another ten seconds, we see again three scheduled inner punctuate calls. Ten more seconds pass, and we see three scheduled inner punctuate calls followed by the processing of the tenth message and one punctuate call (for the corresponding partition to which the message was entered).

WALL_CLOCK_TIME solves the problem of multiple consecutive punctuate calls, but introduces the problem of the punctuate call after each processed message. We are just shifting the problem from one place to another.

A few notes:

  • After starting Kafka Streams with WALL_CLOCK_TIME, before consuming messages, the inner punctuate for each partition is called.

  • The duration of the process method should be shorter than the punctuate interval. If the process method is longer than the punctuate interval, multiple consecutive calls will occur.

  • Inside the process method, we can call context.partition() to get partition id for a partition to which message arrived. Even though Kafka Streams creates one thread (worker) per partition, calling context.partition() inside punctuate method (both inner and outer) will return -1. This means that in punctuate, we can't know which partition the code refers to.

  • Punctuate methods are called after process method because we have multiple partitions and the rate of input messages is relatively slow compared to punctuate period. If we would increase the rate of messages, for many of them, punctuate will not be called after a process method. 

Conclusion

Upgrading to the first stable Kafka version requires testing and checking existing applications. Some classes have been replaced or renamed so that users will pay attention to those changes to ensure that code compiles. The problem lies in changes that don't directly require interventions. There have been some changes under the hood, and now, Kafka Streams applications that compile with the newer version will produce different results. 

The problems mentioned in the previous article haven't been solved completely. Some shortcomings have been fixed while some new problems have been introduced — the cycle continues.

kafka Stream (computing)

Opinions expressed by DZone contributors are their own.

Related

  • Harnessing Real-Time Insights With Streaming SQL on Kafka
  • How to Design Event Streams, Part 2
  • How to Design Event Streams, Part 1
  • High-Speed Real-Time Streaming Data Processing

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!