DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

SBOMs are essential to circumventing software supply chain attacks, and they provide visibility into various software components.

Related

  • Event-Driven Microservices: How Kafka and RabbitMQ Power Scalable Systems
  • System Coexistence: Bridging Legacy and Modern Architecture
  • Event-Driven Architectures: Designing Scalable and Resilient Cloud Solutions
  • How to Integrate Event-Driven Ansible With Kafka

Trending

  • Docker Model Runner: Running AI Models Locally Made Simple
  • Software Specs 2.0: An Elaborate Example
  • When Incentives Sabotage Product Strategy
  • How My AI Agents Learned to Talk to Each Other With A2A
  1. DZone
  2. Data Engineering
  3. Big Data
  4. How to Order Streamed DataFrames

How to Order Streamed DataFrames

Many of the solutions that you experiment with to help you order streamed DataFrames will bring you to disappointment. Luckily, there's a light at the end of the table!

By 
Mahesh Chand Kandpal user avatar
Mahesh Chand Kandpal
·
Aug. 15, 17 · Tutorial
Likes (1)
Comment
Save
Tweet
Share
6.7K Views

Join the DZone community and get the full member experience.

Join For Free

A few days ago, I had to perform aggregation on a streaming DataFrame. And the moment I applied groupBy for aggregation, the data got shuffled. Now, a new situation arises regarding how to maintain order.

Yes, I can use orderBy with a streaming DataFrame using Spark structured streaming, but only in complete mode. There is no way of doing the ordering of streaming data in append mode nor in update  mode.

I have tried different things to solve this issue. For example, if I decided to go with Spark structured streaming, I might sort the streamed data in batches but not across batches.

I started looking for solutions with different technologies like Apache Flink, Apache Storm, etc. But what I faced at the end was disappointment.

A Light at the End of the Tunnel

Luckily, there is Apache Kafka Stream, which provides the facility of accessing its StateStore. Kafka Stream provides a Processor API.

The low-level Processor API provides a client to access stream data and to perform our business logic on the incoming data stream. It sends the result as downstream data. It is done via extending the abstract class AbstractProcessor and overriding the init,punctuate, close, and process methods, which contain our logic. This process method is called once for every key-value pair.

Where the High-Level DSL provides ready-to-use methods with functional style, the low-level Processor API provides you the flexibility to implement processing logic according to your needs. The trade-off is just the lines of code you need to write for specific scenarios. For more information, refer to the references at the bottom of this article.

So, the abstract idea is, after aggregating the DataFrame, to write it to Kafka. Read it as a KStream and apply the business logic using the low-level Processor API to sort the data and write it back to Kafka.

val builder = new KStreamBuilder

//add the source processor node that takes Kafka topic "input-topic" as input
builder
.addSource("source", "input-topic")
// add the MyProcessor node which takes the source processor as its upstream processor
.addProcessor("p", new ProcessorSupplier[String, String] {
override def get(): Processor[String, String] = new MyProcessor
}, "source")
.addStateStore(Stores.create("state")
.withStringKeys()
.withStringValues()
.persistent().build(), "p")
// add the sink processor node that takes Kafka topic "output-topic" as output
.addSink("sink", "output-topic", "p")

Here, the main idea is to keep adding records in listbuffer until it reaches a certain size — let’s say 20. As the buffer size approaches 20, we move to another part where we will iterate the listbuffer and parse every record to extract the specific column that will sort the record. We are going to make a listbuffer of tuple2. One element of tuple2 is that it's a specific column and element2 is a consumed value from Kafka.

After that, we will sort the listbuffer of tuple2 on the basis of the extracted column and send only the second element to Kafka. After that, we will drop the all element of listbuffer. This process will run continuously. We can also handle late data and system shutdown by saving listbuffer in KeyValueStore according to requirements.

class MyProcessor extends AbstractProcessor[String, String] {
  var processorContext: ProcessorContext = _
  var keyValueStore: KeyValueStore[String, String] = _
  val listBuffer = new scala.collection.mutable.ListBuffer[String]
  val localStateStoreName = "state"
  val localStateStoreKey = "key"
  val pattern = "yyyy-MM-dd HH:mm:ss.SSS"
  val dateTimeFormatter = DateTimeFormat.forPattern(pattern)
 override def init(context: ProcessorContext): Unit = {
    processorContext = context
    keyValueStore = processorContext.getStateStore(localStateStoreName).asInstanceOf[KeyValueStore[String, String]]

 }
  override def process(key: String, value: String): Unit = {
           if(listBuffer.size < 20)
               listBuffer += value
           else{
             val tempList = listBuffer.map { str =>
             val jValue = JsonMethods.parse(str)
             val dateLongstr = (jValue \ "time").extract[String]
             val dateLong = LocalDateTime.parse(dateLongstr, dateTimeFormatter).toDateTime.getMillis
           (dateLong, str)
         }
          val sortedList = tempList.sortBy(_.1)
          sortedList.foreach { case (_, record) =>
          processorContext.forward(localStateStoreKey, record) // Sending sorted data to output-topic
          processorContext.commit()
         }
       listBuffer = listBuffer.drop(listBuffer.size)
       listBuffer += value
     }
 }
  override def punctuate(timestamp: Long): Unit = {
  }
  override def close(): Unit = {
    keyValueStore.close()
  }
}

Here, I have implemented the idea in MyProcessor. In my case, I have three columns of values (i.e. time, col1, col2). I have extracted time column so that I can sort the record on the basis of time. After sorting, each record is sent to a Kafka topic. Now, I can consume it as a DataFrame again.

Conclusion

Ordering streaming data is always a hard problem. But with Kafka Streams, we can sort the streamed data using its lower-level Processor APIs. The main goal of this blog was not to explain how to use low-level Processor APIs but to get you familiar with the idea of how to sort streamed data.

Hopefully, this blog has helped you!

References

  1. Confluent documentation on Processor API
  2. Apache Spark Structured Streaming Programming Guide
kafka

Published at DZone with permission of Mahesh Chand Kandpal, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Event-Driven Microservices: How Kafka and RabbitMQ Power Scalable Systems
  • System Coexistence: Bridging Legacy and Modern Architecture
  • Event-Driven Architectures: Designing Scalable and Resilient Cloud Solutions
  • How to Integrate Event-Driven Ansible With Kafka

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • [email protected]

Let's be friends: