Getting Started With Spark Streaming
An introduction to Spark Streaming and how to use it with an example data set.
Join the DZone community and get the full member experience.
Join For FreeThis post will help you get started using Apache Spark Streaming with HBase. Spark Streaming is an extension of the core Spark API that enables continuous data stream processing.
What is Spark Streaming?
First of all, what is streaming? A data stream is an unbounded sequence of data arriving continuously. Streaming divides continuously flowing input data into discrete units for processing. Stream processing is low latency processing and analyzing of streaming data. Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data. Spark Streaming is for use cases which require a significant amount of data to be quickly processed as soon as it arrives. Example real-time use cases are:
- Website monitoring, network monitoring
- Fraud detection
- Web clicks
- Advertising
- Internet of Things sensors
Spark Streaming supports data sources such as HDFS directories, TCP sockets, Kafka, Flume, Twitter, etc. Data Streams can be processed with Spark’s core APIS, DataFrames SQL, or machine learning APIs, and can be persisted to a filesystem, HDFS, databases, or any data source offering a Hadoop OutputFormat.
How Spark Streaming Works
Spark Streaming divides a data stream into batches of X seconds called Dstreams, which internally is a sequence of RDDs. Your Spark Application processes the RDDs using Spark APIs, and the processed results of the RDD operations are returned in batches.
Architecture of the example Streaming Application
The Spark Streaming example code does the following:
- Reads streaming data.
- Processes the streaming data.
- Writes the processed data to an HBase Table.
Other Spark example code does the following:
- Reads HBase Table data written by the streaming code
- Calculates daily summary statistics
- Writes summary statistics to the HBase table Column Family stats
Example Data Set
The oil pump sensor data comes in as comma separated value (csv) files dropped in a directory. Spark Streaming will monitor the directory and process any files created in that directory. (As stated before, Spark Streaming supports different streaming data sources; for simplicity, this example will use files.) Below is an example of the csv file with some sample data:
We use a Scala case class to define the sensor schema corresponding to the sensor data csv files, and a parseSensor function to parse the comma separated values into the sensor case class.
HBase Table Schema
The HBase Table Schema for the streaming data is as follows:
- Composite row key of the pump name date and time stamp
- Column Family data with columns corresponding to the input data fields Column Family alerts with columns corresponding to any filters for alarming values. Note that the data and alert column families could be set to expire values after a certain amount of time.
The Schema for the daily statistics summary rollups is as follows:
- Composite row key of the pump name and date
- Column Family stats
- Columns for min, max, and avg.
The function below converts a Sensor object into an HBase Put object, which is used to insert a row into HBase.
Configuration for Writing to an HBase Table
You can use the TableOutputFormat class with Spark to write to an HBase table, similar to how you would write to an HBase table from MapReduce. Below we set up the configuration for writing to HBase using the TableOutputFormat class.
The Spark Streaming Example Code
These are the basic steps for Spark Streaming code:
- Initialize a Spark StreamingContext object.
- Apply transformations and output operations to DStreams.
- Start receiving data and processing it using streamingContext.start().
- Wait for the processing to be stopped using streamingContext.awaitTermination().
We will go through each of these steps with the example application code.
Initializing the StreamingContext
First we create a StreamingContext, the main entry point for streaming functionality, with a 2 second batch interval.
val sparkConf = new SparkConf().setAppName("HBaseStream")
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sparkConf, Seconds(2))
Next, we use the StreamingContext textFileStream(directory) method to create an input stream that monitors a Hadoop-compatible file system for new files and processes any files created in that directory.
// create a DStream that represents streaming data from a directory source
val linesDStream = ssc.textFileStream("/user/user01/stream")
The linesDStream represents the stream of data, each record is a line of text. Internally a DStream is a sequence of RDDs, one RDD per batch interval.
Apply Transformations and Output Operations to DStreams
Next we parse the lines of data into Sensor objects, with the map operation on the linesDStream.
// parse each line of data in linesDStream into sensor objects
val sensorDStream = linesDStream.map(Sensor.parseSensor)
The map operation applies the Sensor.parseSensor function on the RDDs in the linesDStream, resulting in RDDs of Sensor objects.
Next we use the DStream foreachRDD method to apply processing to each RDD in this DStream. We filter the sensor objects for low psi to create alerts, then we write the sensor and alert data to HBase by converting them to Put objects, and using the PairRDDFunctions saveAsHadoopDataset method, which outputs the RDD to any Hadoop-supported storage system using a Hadoop Configuration object for that storage system (see Hadoop Configuration for HBase above).
// for each RDD. performs function on each RDD in DStream
sensorRDD.foreachRDD { rdd =>
// filter sensor data for low psi
val alertRDD = rdd.filter(sensor => sensor.psi < 5.0)
// convert sensor data to put object and write to HBase Table CF data
rdd.map(Sensor.convertToPut).saveAsHadoopDataset(jobConfig)
// convert alert to put object write to HBase Table CF alerts
rdd.map(Sensor.convertToPutAlert).saveAsHadoopDataset(jobConfig)
}
The sensorRDD objects are converted to put objects then written to HBase.
Start Receiving Data
To start receiving data, we must explicitly call start() on the StreamingContext, then call awaitTermination to wait for the streaming computation to finish.
// Start the computation
ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()
Spark Reading From and Writing to HBase
Now we want to read the HBase sensor table data , calculate daily summary statistics and write these statistics to the stats column family.
The code below reads the HBase table sensor table psi column data, calculates statistics on this data using StatCounter, then writes the statistics to the sensor stats column family.
// configure HBase for reading
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, HBaseSensorStream.tableName)
// scan data column family psi column
conf.set(TableInputFormat.SCAN_COLUMNS, "data:psi")
// Load an RDD of (row key, row Result) tuples from the table
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
// transform (row key, row Result) tuples into an RDD of Results
val resultRDD = hBaseRDD.map(tuple => tuple._2)
// transform into an RDD of (RowKey, ColumnValue)s , with Time removed from row key
val keyValueRDD = resultRDD.
map(result => (Bytes.toString(result.getRow()).
split(" ")(0), Bytes.toDouble(result.value)))
// group by rowkey , get statistics for column value
val keyStatsRDD = keyValueRDD.
groupByKey().
mapValues(list => StatCounter(list))
// convert rowkey, stats to put and write to hbase table stats column family
keyStatsRDD.map { case (k, v) => convertToPut(k, v) }.saveAsHadoopDataset(jobConfig)
The diagram below shows that the output from newAPIHadoopRDD is an RDD of row key, result pairs. The PairRDDFunctions saveAsHadoopDataset saves the Put objects to HBase.
Software
- This tutorial will run on the MapR Sandbox, which includes Spark.
- You can download the code and data to run these examples from here:
Running the Application
You can run the code as a standalone application as described in the tutorial on Getting Started with Spark on MapR Sandbox.
Here are the steps summarized:
- Log into the MapR Sandbox, as explained in Getting Started with Spark on MapR Sandbox, using userid user01, password mapr.
- Build the application using maven.
- Copy the jar file and data file to your sandbox home directory /user/user01 using scp.
- Run the streaming app:
/opt/mapr/spark/spark-<version>/bin/spark-submit --driver-class-path `hbase classpath` --class examples.HBaseSensorStream sparkstreamhbaseapp-1.0.jar
- Copy the streaming data file to the stream directory:
cp sensordata.csv /user/user01/stream/
- Read data and calculate stats for one column
/opt/mapr/spark/spark-<version>/bin/spark-submit --driver-class-path `hbase classpath` --class examples.HBaseReadWrite sparkstreamhbaseapp-1.0.jar
- Calculate stats for whole row
/opt/mapr/spark/spark-<version>/bin/spark-submit --driver-class-path `hbase classpath` --class examples.HBaseReadRowWriteStats sparkstreamhbaseapp-1.0.jar
Summary
This concludes the tutorial on Spark Streaming with HBase. You can find more information in the references section.
References and More Information:
Opinions expressed by DZone contributors are their own.
Comments