How to Order Streamed DataFrames
Many of the solutions that you experiment with to help you order streamed DataFrames will bring you to disappointment. Luckily, there's a light at the end of the table!
Join the DZone community and get the full member experience.
Join For FreeA few days ago, I had to perform aggregation on a streaming DataFrame. And the moment I applied groupBy
for aggregation, the data got shuffled. Now, a new situation arises regarding how to maintain order.
Yes, I can use orderBy
with a streaming DataFrame using Spark structured streaming, but only in complete
mode. There is no way of doing the ordering of streaming data in append
mode nor in update
mode.
I have tried different things to solve this issue. For example, if I decided to go with Spark structured streaming, I might sort the streamed data in batches but not across batches.
I started looking for solutions with different technologies like Apache Flink, Apache Storm, etc. But what I faced at the end was disappointment.
A Light at the End of the Tunnel
Luckily, there is Apache Kafka Stream, which provides the facility of accessing its StateStore
. Kafka Stream provides a Processor API.
The low-level Processor API provides a client to access stream data and to perform our business logic on the incoming data stream. It sends the result as downstream data. It is done via extending the abstract class AbstractProcessor
and overriding the init
,punctuate
, close
, and process
methods, which contain our logic. This process method is called once for every key-value pair.
Where the High-Level DSL provides ready-to-use methods with functional style, the low-level Processor API provides you the flexibility to implement processing logic according to your needs. The trade-off is just the lines of code you need to write for specific scenarios. For more information, refer to the references at the bottom of this article.
So, the abstract idea is, after aggregating the DataFrame, to write it to Kafka. Read it as a KStream and apply the business logic using the low-level Processor API to sort the data and write it back to Kafka.
val builder = new KStreamBuilder
//add the source processor node that takes Kafka topic "input-topic" as input
builder
.addSource("source", "input-topic")
// add the MyProcessor node which takes the source processor as its upstream processor
.addProcessor("p", new ProcessorSupplier[String, String] {
override def get(): Processor[String, String] = new MyProcessor
}, "source")
.addStateStore(Stores.create("state")
.withStringKeys()
.withStringValues()
.persistent().build(), "p")
// add the sink processor node that takes Kafka topic "output-topic" as output
.addSink("sink", "output-topic", "p")
Here, the main idea is to keep adding records in listbuffer
until it reaches a certain size — let’s say 20. As the buffer size approaches 20, we move to another part where we will iterate the listbuffer
and parse every record to extract the specific column that will sort the record. We are going to make a listbuffer
of tuple2
. One element of tuple2
is that it's a specific column and element2
is a consumed value from Kafka.
After that, we will sort the listbuffer
of tuple2
on the basis of the extracted column and send only the second element to Kafka. After that, we will drop the all
element of listbuffer
. This process will run continuously. We can also handle late data and system shutdown by saving listbuffer
in KeyValueStore
according to requirements.
class MyProcessor extends AbstractProcessor[String, String] {
var processorContext: ProcessorContext = _
var keyValueStore: KeyValueStore[String, String] = _
val listBuffer = new scala.collection.mutable.ListBuffer[String]
val localStateStoreName = "state"
val localStateStoreKey = "key"
val pattern = "yyyy-MM-dd HH:mm:ss.SSS"
val dateTimeFormatter = DateTimeFormat.forPattern(pattern)
override def init(context: ProcessorContext): Unit = {
processorContext = context
keyValueStore = processorContext.getStateStore(localStateStoreName).asInstanceOf[KeyValueStore[String, String]]
}
override def process(key: String, value: String): Unit = {
if(listBuffer.size < 20)
listBuffer += value
else{
val tempList = listBuffer.map { str =>
val jValue = JsonMethods.parse(str)
val dateLongstr = (jValue \ "time").extract[String]
val dateLong = LocalDateTime.parse(dateLongstr, dateTimeFormatter).toDateTime.getMillis
(dateLong, str)
}
val sortedList = tempList.sortBy(_.1)
sortedList.foreach { case (_, record) =>
processorContext.forward(localStateStoreKey, record) // Sending sorted data to output-topic
processorContext.commit()
}
listBuffer = listBuffer.drop(listBuffer.size)
listBuffer += value
}
}
override def punctuate(timestamp: Long): Unit = {
}
override def close(): Unit = {
keyValueStore.close()
}
}
Here, I have implemented the idea in MyProcessor
. In my case, I have three columns of values (i.e. time
, col1
, col2
). I have extracted time
column so that I can sort the record on the basis of time. After sorting, each record is sent to a Kafka topic. Now, I can consume it as a DataFrame again.
Conclusion
Ordering streaming data is always a hard problem. But with Kafka Streams, we can sort the streamed data using its lower-level Processor APIs. The main goal of this blog was not to explain how to use low-level Processor APIs but to get you familiar with the idea of how to sort streamed data.
Hopefully, this blog has helped you!
References
Published at DZone with permission of Mahesh Chand Kandpal, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments