Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

How to Order Streamed DataFrames

DZone's Guide to

How to Order Streamed DataFrames

Many of the solutions that you experiment with to help you order streamed DataFrames will bring you to disappointment. Luckily, there's a light at the end of the table!

Free Resource

Read why times series is the fastest growing database category.

A few days ago, I had to perform aggregation on a streaming DataFrame. And the moment I applied groupBy for aggregation, the data got shuffled. Now, a new situation arises regarding how to maintain order.

Yes, I can use orderBy with a streaming DataFrame using Spark structured streaming, but only in complete mode. There is no way of doing the ordering of streaming data in append mode nor in update  mode.

I have tried different things to solve this issue. For example, if I decided to go with Spark structured streaming, I might sort the streamed data in batches but not across batches.

I started looking for solutions with different technologies like Apache Flink, Apache Storm, etc. But what I faced at the end was disappointment.

A Light at the End of the Tunnel

Luckily, there is Apache Kafka Stream, which provides the facility of accessing its StateStore. Kafka Stream provides a Processor API.

The low-level Processor API provides a client to access stream data and to perform our business logic on the incoming data stream. It sends the result as downstream data. It is done via extending the abstract class AbstractProcessor and overriding the init,punctuateclose, and process methods, which contain our logic. This process method is called once for every key-value pair.

Where the High-Level DSL provides ready-to-use methods with functional style, the low-level Processor API provides you the flexibility to implement processing logic according to your needs. The trade-off is just the lines of code you need to write for specific scenarios. For more information, refer to the references at the bottom of this article.

So, the abstract idea is, after aggregating the DataFrame, to write it to Kafka. Read it as a KStream and apply the business logic using the low-level Processor API to sort the data and write it back to Kafka.

val builder = new KStreamBuilder

//add the source processor node that takes Kafka topic "input-topic" as input
builder
.addSource("source", "input-topic")
// add the MyProcessor node which takes the source processor as its upstream processor
.addProcessor("p", new ProcessorSupplier[String, String] {
override def get(): Processor[String, String] = new MyProcessor
}, "source")
.addStateStore(Stores.create("state")
.withStringKeys()
.withStringValues()
.persistent().build(), "p")
// add the sink processor node that takes Kafka topic "output-topic" as output
.addSink("sink", "output-topic", "p")

Here, the main idea is to keep adding records in listbuffer until it reaches a certain size — let’s say 20. As the buffer size approaches 20, we move to another part where we will iterate the listbuffer and parse every record to extract the specific column that will sort the record. We are going to make a listbuffer of tuple2. One element of tuple2 is that it's a specific column and element2 is a consumed value from Kafka.

After that, we will sort the listbuffer of tuple2 on the basis of the extracted column and send only the second element to Kafka. After that, we will drop the all element of listbuffer. This process will run continuously. We can also handle late data and system shutdown by saving listbuffer in KeyValueStore according to requirements.

class MyProcessor extends AbstractProcessor[String, String] {
  var processorContext: ProcessorContext = _
  var keyValueStore: KeyValueStore[String, String] = _
  val listBuffer = new scala.collection.mutable.ListBuffer[String]
  val localStateStoreName = "state"
  val localStateStoreKey = "key"
  val pattern = "yyyy-MM-dd HH:mm:ss.SSS"
  val dateTimeFormatter = DateTimeFormat.forPattern(pattern)
 override def init(context: ProcessorContext): Unit = {
    processorContext = context
    keyValueStore = processorContext.getStateStore(localStateStoreName).asInstanceOf[KeyValueStore[String, String]]

 }
  override def process(key: String, value: String): Unit = {
           if(listBuffer.size < 20)
               listBuffer += value
           else{
             val tempList = listBuffer.map { str =>
             val jValue = JsonMethods.parse(str)
             val dateLongstr = (jValue \ "time").extract[String]
             val dateLong = LocalDateTime.parse(dateLongstr, dateTimeFormatter).toDateTime.getMillis
           (dateLong, str)
         }
          val sortedList = tempList.sortBy(_.1)
          sortedList.foreach { case (_, record) =>
          processorContext.forward(localStateStoreKey, record) // Sending sorted data to output-topic
          processorContext.commit()
         }
       listBuffer = listBuffer.drop(listBuffer.size)
       listBuffer += value
     }
 }
  override def punctuate(timestamp: Long): Unit = {
  }
  override def close(): Unit = {
    keyValueStore.close()
  }
}

Here, I have implemented the idea in MyProcessor. In my case, I have three columns of values (i.e. time, col1, col2). I have extracted time column so that I can sort the record on the basis of time. After sorting, each record is sent to a Kafka topic. Now, I can consume it as a DataFrame again.

Conclusion

Ordering streaming data is always a hard problem. But with Kafka Streams, we can sort the streamed data using its lower-level Processor APIs. The main goal of this blog was not to explain how to use low-level Processor APIs but to get you familiar with the idea of how to sort streamed data.

Hopefully, this blog has helped you!

References

  1. Confluent documentation on Processor API
  2. Apache Spark Structured Streaming Programming Guide

Learn how to get 20x more performance than Elastic by moving to a Time Series database.

Topics:
database ,dataframe ,aggregation ,kafka streams ,tutorial ,processor api

Published at DZone with permission of Mahesh Chand Kandpal, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}