Spark Trigger Options
In this article, we'll look at the different trigger options available in Spark Structured Streaming.
Join the DZone community and get the full member experience.
Join For FreeObjective
In this article, we'll look at the different trigger options available in Spark Structured Streaming. Setting the right trigger for a stream will decide how quick your stream reads the next set of records. The trigger option enables you to control the latency and throughput of the job.
Default Behavior
When you don’t set any trigger options to write a stream then Spark will try to process the next set of records as soon as it’s done with the current micro batch. The default behavior of write streams in Spark Structured Streaming is the micro batch. In a micro batch, incoming records are grouped into small windows and processed in a periodic fashion.
Bottom line: Spark will try running the next micro batch as soon as possible (ASAP).
The below diagram explains the sequence of a micro batch.
- Driver updates the Write Ahead Log (WAL) with unprocessed offset.
- A micro batch job is created to process the records.
- Input is read and processed.
- The output of the micro batch is persisted in the target.
Refer to the code below to create a write stream without any trigger. Refer to the complete sample job here.
val defaultStream = rateRawData.writeStream.format("console").queryName("Default").option("checkpointLocation", "sparkCheckPoint\\Rate2ConsoleDefaultTrigger\\cp1").start()
Below is a screenshot of the Spark UI processing 1000 records per second without any trigger option set. As shown below, Spark tries to process ASAP, with an average of almost every second.
Below is a screenshot of Spark UI processing 90,000 records per second without any trigger option set. As you can see micro batches are triggered at uneven intervals.
Once
When you set the trigger option to “Once” it processes only once and then terminates the stream. The behavior of this option is very similar to a batch job. The stream is created once and all the pending records are processed and then the stream is terminated.
Refer to the code below to create a write stream with a “Once” trigger. Refer to the complete sample job here.
val onceStream = rateRawData.writeStream.format("console").queryName("Once").trigger(Trigger.Once()).option("checkpointLocation", "sparkCheckPoint\\Rate2ConsoleOnceTrigger\\cp1").start()
Processing Time
This is the most widely used and recommended practice in Spark Structured Streaming. The trigger option of processing time gives you better control over how often micro batch jobs should get triggered. For example, if you set a trigger interval of “20 seconds” then for every 20 seconds a micro batch will process a batch of records. Based on the speed at which you want your job to process the records you can fine-tune the trigger interval and throughput of the job as well.
In the case of micro batch processing, duration exceeds the interval defined, it will start the next micro batch ASAP. For example, if the defined interval is “20 seconds” but processing takes “30 seconds” then the next batch will start ASAP.
Refer to the code below to create a write stream with a “Processing time” trigger. Refer to the complete sample job here.
val processingTimeStream = rateRawData.writeStream.format("console").queryName("Micro Batch").trigger(Trigger.ProcessingTime("20 seconds")).option("checkpointLocation", "sparkCheckPoint\\Rate2ConsoleProgressTrigger\\cp1").start()
Below is a screenshot of Spark UI processing 90K records per second with a 20-second trigger. As you can see, Spark tries to process micro batches every 20 seconds.
Continuous
Continuous stream processing was introduced in Spark 2.3 as an experimental feature. As of Spark 2.4.0, it’s still experimental. It enables you to process records in milliseconds which would otherwise take seconds in micro batch. In this trigger option, records are not processed in micro batch, but, instead, a long-running task is created per write stream and processed as quickly as possible. “Exactly once” semantics is not supported in continuous processing, only “at least once” semantics are supported. Offsets of the processed records are periodically committed asynchronously based on epoch markers added during processing. The time interval between two offset commits process (epoch markers) is epochs. The diagram below represents a continuous stream process in sequence.
- Drive creates a long-running task.
- Input records are processed.
- Processed records are stored in the target.
- A task persists offsets asynchronously.
- Offsets are committed to a Write Ahead Log (WAL).
Bottom line: Long-running task processes the data rather than micro batches to achieve low latency. Epoch markers trigger offset commits asynchronously.
For the sample code to create a write stream with a “Continuous” trigger, refer to the complete sample job here.
Below is a screenshot of Spark UI processing one record per second using a continuous trigger. As you can see there is one long-running task instead of micro batches.
Below is a screenshot of Spark UI processing multiple streams by processing one record per second using a continuous trigger. As you can see, there are two long-running tasks for each stream and again no micro batches.
Comparison
The table below summaries the difference between all the trigger options:
Summary
Use the correct trigger option based on your latency requirements and achieve the preferred result. Use continuous streaming if you want a milliseconds level latency otherwise go with custom processing time rather than a default trigger option. Refer to all the trigger options-related code here.
Opinions expressed by DZone contributors are their own.
Comments