Exploring Spark Structured Streaming
Exploring Spark Structured Streaming
Streaming is very difficult, and it's only going to grow more so. Learn about what Structured Streaming in Spark is and what its benefits are.
Join the DZone community and get the full member experience.Join For Free
Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.
Streaming apps are growing more complex and it is getting difficult to do with the current sate of distributed streaming engines.
Why streaming is hard:
- Streaming computations don’t run in isolation.
- Data arriving out of time order is a problem for batch-processed processing.
- Writing stream processing operations from scratch is not easy.
Problems with DStreams:
- Processing with event-time; dealing with late data.
- Interoperate streaming with batch and interactive.
- Reasoning about end-to-end guarantees.
Apache Spark 2.0 adds the first version of a new higher-level API, Structured Streaming, for building continuous applications. The main goal is to make it easier to build end-to-end streaming applications, which integrate with storage, serving systems, and batch jobs in a consistent and fault-tolerant way.
The last benefit of Structured Streaming is that the API is very easy to use — it is simply Spark’s DataFrame and Dataset API. Users just describe the query they want to run, the input and output locations, and optionally a few more details. The system then runs their query incrementally, maintaining enough state to recover from failure, keep the results consistent in external storage, etc.
Conceptually, Structured Streaming treats all the data arriving as an unbounded input table. Each new item in the stream is like a row appended to the input table. We won’t actually retain all the input, but our results will be equivalent to having all of it and running a batch job.
A query on the input will generate the Result Table. Every trigger interval (say, every one second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we want to write the changed result rows to an external sink.
The last part of the model is output modes. Each time the result table is updated, the developer wants to write the changes to an external system, such as S3, HDFS, or a database. We usually want to write output incrementally. For this purpose, Structured Streaming provides three output modes:
- Append: Only the new rows appended to the result table since the last trigger will be written to the external storage. This is applicable only on queries where existing rows in the result table cannot change (e.g. a map on an input stream).
- Complete: The entire updated result table will be written to external storage.
- Update: Only the rows that were updated in the result table since the last trigger will be changed in the external storage. This mode works for output sinks that can be updated in place, such as a MySQL table.
Let's see an example. Maintain a running word count of text data received from a data server listening on a TCP socket. Let's see how it works with Structured Streaming.
Let's see this done step by step. First, we have to import the necessary classes and create a local SparkSession, the starting point of all functionalities related to Spark.
import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession val spark = SparkSession .builder .appName(“StructuredNetworkWordCount”) .getOrCreate() import spark.implicits._
Next, we will create a streaming DataFrame representing the text data received from a server listening on localhost:9999 and transform the DataFrame to calculate word counts.
// Creating DataFrame representing the stream of input lines from connection to localhost:9999 val lines = spark.readStream .format(“socket”) .option(“host”, “localhost”) .option(“port”, 9999) .load() // Split the lines into words val words = lines.as[String].flatMap(_.split(” “)) // Generate running word count val wordCounts = words.groupBy(“value”).count()
lines DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named
value, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have converted the DataFrame to a Dataset of String using
.as[String], so that we can apply the
flatMap operation to split each line into multiple words. The resultant
words Dataset contains all the words. Finally, we have defined the
wordCounts DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
We have now set up the query on the streaming data. All that is left is to actually start receiving data and computing the counts. To do this, we set it up to print the complete set of counts (specified by
outputMode("complete")) to the console every time they are updated. And then start the streaming computation using
// Start running the query that prints the running counts to the console val query = wordCounts.writeStream .outputMode("complete") .format("console") .start() query.awaitTermination()
After this code is executed, the streaming computation will have started in the background. The
query object is a handle to that active streaming query, and we have decided to wait for the termination of the query using
query.awaitTermination() to prevent the process from exiting while the query is active.
To actually execute this example code, you can either compile the code in your own Spark application or simply run the example once you have downloaded Spark. You will first need to run Netcat (a small utility found in most Unix-like systems) as a data server byusing
$ nc -lk 9999.
Then, in a different terminal, you can start the example by using:
$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
Then, any lines typed in the terminal running the netcat server will be counted and printed on the screen every second. It will display the output as below.
# TERMINAL 1: # Running Netcat $ nc -lk 9999 apache spark apache hadoop
It will display the output as below.
# TERMINAL 2: RUNNING StructuredNetworkWordCount $ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999 ——————————————- Batch: 0 ——————————————- +——+—–+ | value|count| +——+—–+ |apache| 1| | spark| 1| +——+—–+ ——————————————- Batch: 1 ——————————————- +——+—–+ | value|count| +——+—–+ |apache| 2| | spark| 1| |hadoop| 1| +——+—–+
Let's understand the Structured Streaming model with the above example.
lines DataFrame is the input table, and the final
wordCounts DataFrame is the result table. Note that the query on streaming
lines DataFrame to generate
wordCounts is exactly the same as it would be a static DataFrame. However, when this query is started, Spark will continuously check for new data from the socket connection. If there is new data, Spark will run an “incremental” query that combines the previous running counts with the new data to compute updated counts, as shown below.
This model is very different from other streaming processing engines. Many streaming systems require the user to maintain the state. In this model, Spark is responsible for updating the Result Table when there is new data, thus relieving the users from reasoning about it.
Handling Event-Time and Late Data
Event-time is the time embedded in the data itself. For many applications, you may want to operate on this event-time. This model naturally handles data that has arrived later than expected based on its event-time. Since Spark is updating the Result Table, it has full control over updating old aggregates when there is late data, as well as cleaning up old aggregates to limit the size of intermediate state data. Since Spark 2.1, we have support for watermarking which allows the user to specify the threshold of late data and allows the engine to accordingly clean up old state.
Fault Tolerance Semantics
Delivering end-to-end exactly-once semantics was one of the key goals behind the design of Structured Streaming. To achieve that, every streaming source is assumed to have offsets (similar to Kafka offsets, or Kinesis sequence numbers) to track the read position in the stream. The engine uses checkpointing and write ahead logs to record the offset range of the data being processed in each trigger.
Comparison With Other Streaming Engines
To show what’s unique about Structured Streaming, the next table compares it with several other systems.
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. Note that Structured Streaming is still ALPHA in Spark 2.0 and the APIs are still experimental.
Published at DZone with permission of Mahesh Chand Kandpal , DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.