Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Problems With Kafka Streams: The Saga Continues

DZone's Guide to

Problems With Kafka Streams: The Saga Continues

The problems with the old version of Kafka haven't been solved completely. Some shortcomings have been fixed while some problems have been introduced. The cycle continues.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

This blog continues where the previous left, so be sure to check it before proceeding with the current.

After many long years, the first stable Kafka version has been released: 1.0.0. In the previous version of Kafka Streams (0.11.0.0), we saw some suboptimal behaviour when using the Processor API. With the new version came updates that are supposed to alleviate some pressure from the developer when using Kafka Streams.

Kafka Streams Overview

Before diving directly into the problem, let's see how Kafka Streams are implemented. Warning: There are traces of some actual code!

Firstly, we need a main class that will contain topology, so let's implement a dummy one:

object DummyMainClass {

  def main(args: Array[String]): Unit = {
    val properties = CreateKafkaStreamProperties()
    val topology: TopologyBuilder = new TopologyBuilder

    topology.addSource("Input", properties.inputTopic)

    topology.addProcessor("Input_processor", new DummyProcessor(properties.punctuateTime), "Input")

    topology.addSing("Output", properties.outputTopic, "Input_processor")

    val streams: KafkaStreams = new KafkaStreams(topology, properties)
    streams.start()
  }

  def CreateKafkaStreamProperties(): Properties = {
    //some code
  }
}

In several lines of code, we have created a Topology for the Kafka Streams application that reads messages from input topic, processes them with DummyProcessor, and outputs them to output topic. For more details, consult the documentation. Next in line is DummyProcessor:

class DummyProcessor(punctuateTime: Long) extends ProcessorSupplier[String, String] with Logging {

  override def get(): Processor[String, String] = new Processor[String, String] {
    var context: ProcessorContext = _
    var arrived: Int = 0

    override def init(context: ProcessorContext): Unit = {
      this.context = context
      this.context.schedule(punctuateTime)
    }

    override def process(key: String, value: String): Unit = {
      this.arrived += 1
      this.logger.debug(s"Arrived: $value")
    }

    override def punctuate(timestamp: Long): Unit = {
      this.logger.debug("Punctuate call")
      this.context.forward(null, s"Processed: ${this.arrived}")
      this.arrived = 0

      this.context.commit()
    }

    override def close(): Unit = {
      // does nothing
    }
  }
}

This DummyProcessor is only used for demonstration purposes. Its job is to output to the topic the number of arrived messages per punctuate period; for debugging, we have added console logging. Now, we have the entire pipeline needed for testing how Kafka Streams operate.

The input topic has three partitions and a replication factor of 1. The output topic has only one partition and a replication factor of 1.

Testing Kafka Streams

We are going to repeat the testing mentioned in the previous article. Input messages for all tests are going to be same: message_<offset>. The punctuate period is set to occur every ten seconds. There are two scenarios that we need to check for both Kafka versions:

  • For the first test, we have a constant rate of messages on input topic — one message per second.

  • For the second test, we have nine messages (one message per second), followed by a pause that lasts 21 seconds, followed by one more message.

Older Kafka Streams Version

In the first test scenario, we see ten messages on the console in the format "Arrived: message_<offset>" accompanied by one "Punctuate call", followed by one message in the output topic. Messages in the output topic are "Processed: 10". This behaviour repeats as long as we have messages on the input stream.

In the second test scenario, we see nine messages on the console in the format "Arrived: message_<offset>". After the 21-second long pause has passed on the console, three messages appear: "Punctuate call". On output topic, we can see three messages: "Processed: 10", "Processed: 0", and "Processed 0". The late message ended up in same time window as the first nine messages and we have two unwanted punctuate calls.

New Kafka Streams Version

After switching to a new version, we have some modification in the code:

  • TopologyBuilder in main class has to be changed to Topology class.

  • Schedule method in DummyProcessor class is now deprecated.

One would expect that by changing the version, the previous behaviour would remain the same. Well, it hasn't. What has changed?

After each process method, a punctuate method is called. After punctuateInterval is scheduled, punctuate also occurs. This means the following:  

  • In the first test scenario, each "Arrived: message_<offset>" message in the console is accompanied by "Punctuate call". Unsurprisingly, we have one: "Processed: 1" message in output topic. After ten messages, we have another: "Punctuate call" and "Processed: 0" pair. 

  • In the second scenario, we have nine: "Arrived: message_<offset>" and "Punctuate call" pairs on the console, followed with 9: "Processed: 1" in the output topic. After the pause and tenth message we have: "Arrived: message_<offset>" and 3 "Punctuate call". In the output topic, we see "Processed: 1", "Processed: 0", and "Processed 0".

Now, if we want the same behaviour as with the previous version, we need to manually check if the punctuation period has passed and then output the message to output topic and reset the counter. From one problem (multiple punctuate calls for the late messages), we now have two problems (multiple punctuate calls for the late messages and punctuate call after each processed message).

Putting that aside, we have noticed that the schedule method is deprecated. Along with the new version came a new API for scheduling. The old format has been preserved for backward compatibility — but it works differently.

The new schedule method has the following format:

context.schedule(punctuateInterval, PuntuationType, Punctuator)

The first parameter remains the same — the interval on which scheduled punctuation should occur. The second parameter is something new: PunctuationType. It's enum and it has two values STREAM_TIME and WALL_CLOCK_TIME (more on them later). The third and last parameter is Punctuator — interface with only one method punctuate (Long). With these changes, our DummyProcessor has new look:

class DummyProcessor(punctuateTime: Long) extends ProcessorSupplier[String, String] with Logging {

  override def get(): Processor[String, String] = new Processor[String, String] {
    var context: ProcessorContext = _
    var arrived: Int = 0

    override def init(context: ProcessorContext): Unit = {
      this.context = context
      this.context.schedule(punctuateTime, PunctuationType.STREAM_TIME, new Punctuator {
        override def punctuate(timestamp: Long): Unit = {
          this.logger.debug("Inner punctuate call")
          this.context.forward(null, s"Inner processed: ${this.arrived}")
          this.arrived = 0

          this.context.commit()
        }
      })
    }

    override def process(key: String, value: String): Unit = {
      this.arrived += 1
      this.logger.debug(s"Arrived: $value")
    }

    override def punctuate(timestamp: Long): Unit = {
      this.logger.debug("Punctuate call")
      this.context.forward(null, s"Processed: ${this.arrived}")
      this.arrived = 0

      this.context.commit()
    }

    override def close(): Unit = {
      // does nothing
    }
  }
}

This is not a typo — we have two punctuate methods. The only difference is that the inner punctuation method has the "Inner" prefix in its messages. Running both test scenarios yields almost identical results, with the only difference being outputted messages. On the console, "Inner punctuate call" will replace "Punctuate call" messages and in the output topic, "Inner processed: *" will replace "Processed: *" messages. We have identical behaviour to when using the schedule (punctuateInterval) method, but the outer punctuate method is never called even though we are obligated to implement it.

Using WALL_CLOCK_TIME and running test scenarios, we get following results:

  • In the first scenario, after starting the application, three punctuate methods occur (one for each partition in input topic). After each processing message, one inner punctuate method is called. After the tenth message, we see three punctuate calls.

  • In the second scenario, the init method is followed by three punctuate methods. The first nine messages are followed by inner punctuate call. Then, after one second, we see three scheduled inner punctuate calls. As no messages arrive for another ten seconds, we see again three scheduled inner punctuate calls. Ten more seconds pass and we see three scheduled inner punctuate calls followed by processing of the tenth message and one punctuate call (for the corresponding partition to which message entered).

WALL_CLOCK_TIME solves the problem of multiple consecutive punctuate calls but introduces the problem of the punctuate call after each processed message. We are just shifting the problem from one place to another.

A few notes:

  • After starting Kafka Streams with WALL_CLOCK_TIME, before consuming messages, the inner punctuate for each partition is called.

  • The duration of the process method should be shorter than the punctuate interval. If the process method is longer than the punctuate interval, multiple consecutive calls will occur.

  • Inside the process method, we can call context.partition() to get partition id for a partition to which message arrived. Even though Kafka Streams creates one thread (worker) per partition, calling context.partition() inside punctuate method (both inner and outer) will return -1. This means that in punctuate, we can't know which partition the code refers to.

  • Punctuate methods are called after process method because we have multiple partitions and rate of input messages is relatively slow compared to punctuate period. If we would increase the rate of messages for many of them punctuate will not be called after a process method. 

Conclusion

Upgrading to the first stable Kafka version requires testing and checking existing applications. Some classes have been replaced or renamed so that users will pay attention to those changes to ensure that code compiles. The problem lies in changes that don't directly require interventions. There have been some changes under the hood, and now, Kafka Streams applications that compile with the newer version will produce different results. 

The problems mentioned in the previous article haven't been solved completely. Some shortcomings have been fixed while some new problems have been introduced — the cycle continues.

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

Topics:
big data ,kafka streams ,stream processing ,apache kafka ,jvm ,scala

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}