Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Problems With Kafka Streams: The Trilogy

DZone's Guide to

Problems With Kafka Streams: The Trilogy

· ·
Free Resource

The Trilogy is part 3 of investigation of current Kafka Streams. Before diving straight into the problem it's advised to check part 2 and part 1.

In previous tests, we have experienced some unwanted behaviour with both Kafka versions - 1.0.0 and 0.11.0.0. In this blog, we will go over some possible solutions while further investigating behaviour in corner cases. 

Behaviour upgrades

In the following text we will focus on following main problems:

  • correct time windows - messages that arrive after a long pause (longer than punctuate period) are processed and presented in the same time window as messages before the pause.

  • unwanted punctuate calls - multiple consecutive punctuate calls for late messages & "early" punctuate calls

  • code duplication - punctuate method is defined in two locations

  • inconsistent API

Manual solutions

The obvious first solution is to introduce variable for lastPunctuate to manually check if the punctuate should be called. The code would look something like:


class DummyProcessor(punctuateTime: Long) extends ProcessorSupplier[String, String] with Logging {
  override def get(): Processor[String, String] = new Processor[String, String] {
    var context: ProcessorContext = _
    var arrived: Int = 0
    var lastPunctuate: Int = _

    override def init(context: ProcessorContext): Unit = {
      this.context = context
      this.context.schedule(punctuateTime)
      this.lastPunctuate = System.currentTimeMilis()
    }
    override def process(key: String, value: String): Unit = {
      this.arrived += 1
      this.logger.debug(s"Arrived: $value")
    }
    override def punctuate(timestamp: Long): Unit = {
      if (timestamp - this.lastPunctuate <= punctuateTime) return

      // some code

      this.lastPunctuate = timestamp
    }
    override def close(): Unit = {
      // does nothing
    }
  }
}

With this approach, it's certain that no more than one punctuate call in succession will occur per partition. Even if punctuate method is evoked multiple times we know that at most once per punctuateTime if statement won't return true.

Why have we emphasised at most once?

To understand that we need to understand how Kafka Streams calls punctuate and why this approach has flaws.

Kafka Streams calls punctuate method at regular intervals that align with System clock.  For example, for current tests, punctuateTime is set to occur every 10 seconds - punctuate calls are scheduled to occur at 0th second, 10th second, 20th second, ... of current minute regardless of starting time. 

Image title

On the diagram, messages are represented as green rectangles, their length represents a duration of their process methods and for easiness, Kafka topic has 1 partition.  Let's see the problems in proposed code on provided example.

After A arrives and is processed first punctuate call should occur - It arrived before 0th second and after it's processing we entered the next punctuate interval. If we agree that all punctuate calls last for 1 second we would get that first punctuate has finished around 2nd second. Moving on to message B, after it's processing the punctuate will be called again. The difference in current time and saved lastPunctuate time is larger than punctuate interval - if expression in punctuate method won't exit. So far so good.

What after the processing of C message?

The finishing time of processing C message ended up in next punctuate interval but the difference between the current time and lastPuntuate is less than punctuate interval. In lastPunctuate starting time of punctuate method is stored instead of starting time of punctuate interval. Fix for this is straightforward and one way to implement it is:

this.lastPunctuate = timestamp - timestamp % punctuateTime

After C message, pause occurred and D message arrived late. After process method has finished for D messaged punctuate is called - first punctuate call passes, following punctuate calls (4 of them) are ignored as they occurred in same punctuate interval. Here comes the next problem: punctuate should be called before the D has been processed. That can be solved by manually invoking punctuate from process method, something like:

override def process(key: String, value: String): Unit = {
  val time = System.currentTimeMillis()
  if (time - this.lastPunctuate >= punctuateTime) punctuate(time + punctuateTime)

  // rest of the code
}

Focus on arguments for punctuate call! If punctuate was called with time as argument punctuate would be called before completing the process method as it should, but also after finishing the process method.

Why?

To answer that take a look at how the lastPunctuate is calculated. lastPunctuate would save 60 as value and after completing the process method the difference between lastPunctuate and timestamp would be greater than punctuate interval.

API problems

In init method, we have seen two approaches for calling schedule. What would happen if we mixed them up? Would one override the other? The simple answer is No. If we would have something like this:

override def init(context: ProcessorContext): Unit = {
  this.context = context
  this.context.schedule(punctuateTime)
  this.context.schedule(punctuateTime, PunctuateType.WALL_CLOCK_TIME, new Puncutator {
    override def punctuate(timestamp: Long): Unit = {
     //some code 
    }
  })
}

With this we would end up with the behaviour of both PunctuateTypes. Outer punctuate method would be called as that stream_time was used and inner punctuate is called accordingly to wall_clock_time - pretty much untrackable behaviour.  This can not be solved with some basic manual interventions - wait for Kafka Streams to fix that and for now watch how you schedule. 

All that remains is code duplication and for that Scala has neet solution:


class DummyProcessor(punctuateTime: Long) extends ProcessorSupplier[String, String] with Logging {
  override def get(): Processor[String, String] = new Processor[String, String] { self => 
    var context: ProcessorContext = _

    override def init(context: ProcessorContext): Unit = {
      this.context = context
      this.context.schedule(punctuateTime, PunctuationType.STREAM_TIME, new Punctuator {
      override def punctuate(timestamp: Long): Unit = self.punctuate(timestamp)
      })
    }
    override def process(key: String, value: String): Unit = {
      // some code
    }
    override def punctuate(timestamp: Long): Unit = {
      // some code
    }
    override def close(): Unit = {
      // does nothing
    }
  }
}

By defining an alias for Processor class (self =>)  it allows us to call outer punctuate method from Punctuator defined in schedule method.

Conclusion

The notorious problems mentioned in the previous and first blog can be fixed to some degree by manually controlling the flow of the application. Some illogicalities still remain present in the ecosystem like inconsistent API. 

When working with Kafka Streams user should pay special attention to corner cases. Correctness requires a lot of tweaking and testing, most of the use-cases will not require completely fault proof solution. Remember that every check and calculation will hinder performance - there is always a trade-off.

Topics:
big data ,kafka streams ,stream processing ,apache kafka ,jvm ,scala ,performace

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}