How to Perform Distributed Spark Streaming With PySpark
In this post, we look at how to use PySpark to quickly analyze in-coming data streams to provide real-time metrics. Read on to get started!
Join the DZone community and get the full member experience.
Join For FreeI am excited to share my experience with Spark Streaming, a tool which I am playing with on my own. Before we get started, let's have a sneak peak at the code that lets you watch some data stream through a sample application.
from operator import add, sub
from time import sleep
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Set up the Spark context and the streaming context
sc = SparkContext(appName="PysparkNotebook")
ssc = StreamingContext(sc, 1)
# Input data
rddQueue = []
for i in range(5):
rddQueue += [ssc.sparkContext.parallelize([i, i+1])]
inputStream = ssc.queueStream(rddQueue)
inputStream.map(lambda x: "Input: " + str(x)).pprint()
inputStream.reduce(add)\
.map(lambda x: "Output: " + str(x))\
.pprint()
ssc.start()
sleep(5)
ssc.stop(stopSparkContext=True, stopGraceFully=True)
Spark Streaming has a different view of data than Spark. In non-streaming Spark, all data is put into a Resilient Distributed Dataset, or RDD. That isn’t good enough for streaming. In Spark Streaming, the main noun is DStream — Discretized Stream. Thats basically the sequence of RDDs. The verbs are pretty much the same thing — the way we have actions and transformations with RDDs, we also have actions and transformations with DStreams.
What Is DStream?
An RDD is a container which holds all your data, like a kind of spread sheet. Well, and a D-Stream is a spread sheet, and then another, then another one as they move through time. We get a stream of data, we discretize it, we freeze it at a moment, then another moment and then another moment. In a way, we do not have to wait for all the data to pile up to make one giant RDD, we take a portion of RDD and then quickly do the processing and then we give a bit more data and the processing continues on the small portion of the data. In this way, we do not have to wait for all the data to give the answers, we can come to an answer based on the data that we have. This is really a big win, if your client is on the dashboard, we cannot ask them to wait for answers for a week or so. In general, they want to know something immediately. In a nutshell, "Quick Results are Valuable."
Any useful computation involves Input and Output.
Input
In all my examples, I am going to use cheezy QueueStream Inputs; its basically some debug canned input stream which I am going to feed into my application. In the real world, you will use Kafka, File Input, or some Socket Input. But for the examples here we will talk about streaming using QueueStream.
Output
Once we have the DStream, and we’ve done some computations, we need to the get the data back somehow. Spark offers a number of output operations. The most common output operation is saveAsTextFiles
, which dumps the output as a text file. Spark also provides "foreachRDD
," which allows you to process the output any way you need to. You could push it into a Kafka queue, save it into a database, or POST it to a web service. For our examples here, we will use the slightly cheesy pprint
, which will print back to the command line.
Let's look into our second example
sc = SparkContext(appName="PysparkNotebook")
ssc = StreamingContext(sc, 1)
inputData = [
[1,2,3],
[0],
[4,4,4],
[0,0,0,25],
[1,-1,10],
]
rddQueue = []
for datum in inputData:
rddQueue += [ssc.sparkContext.parallelize(datum)]
inputStream = ssc.queueStream(rddQueue)
inputStream.reduce(add).pprint()
ssc.start()
sleep(5)
ssc.stop(stopSparkContext=True, stopGraceFully=True)
First and foremost, we have created a Spark context which we have wrapped inside a streaming context. Line 2 is marked with "1
," meaning that we want this to run every second.
Here we have created the inputData, which is a list of integers. With Spark Context parallelize we have this as RDD which we wrap in a Spark streaming context as queueStream
. Now we have an input stream which is going to pop up every second, which we will see as:
[1,2,3] --> second 1
[0] --> second 2
[4,4,4] --> second 3
[0,0,0,25] --> second 4, and finally
[1,-1,10] --> second 5
Once we receive the input, we use a reduce operation which is going to shrink the data by using addition as an operation. The key difference between a Spark Batch job and a Spark Streaming job is that, in Streaming jobs, we do not start the job which runs and finishes. Here, we have an action, pprint
, which is not lazy, which should force the computation. Here we are setting up a Streaminh topology, and, once we start it, it will run forever because the streaming topology never knows, 'hey I might not get more data in.' Also, we must explicitly stop the topology.
In my next article, I'll talk about Spark Streaming — specifically, windowing and reduceByKeyAndWindow
. Stay tuned!
Opinions expressed by DZone contributors are their own.
Comments