{{announcement.body}}
{{announcement.title}}

Spark Structured Streaming Using Java

DZone 's Guide to

Spark Structured Streaming Using Java

In this article, take a look at Spark structured streaming using Java.

· Big Data Zone ·
Free Resource

Spark provides streaming library to process continuously flowing of data from real-time systems.

Concept 

Spark Streaming is originally implemented with DStream API that runs on Spark RDD’s where the data is divided into chunks from the streaming source,  processed and then send to destination.

From Spark 2, a new model has been developed in Spark which is structured streaming that is built on top of Spark SQL engine working on DataFrames and Datasets.

Structured Streaming makes use of continuous data stream as an unbounded table being updated continuously as events are processed from the stream.

Spark streaming application can be implemented using SQL queries performing various computations on this unbounded data.

Structured streaming handles several challenges like exactly-once stream processing, incremental updates, etc.

Structured Streaming works on polling of data based on trigger interval for fetching the data from the source. An output mode is required while writing result data set to sink. It supports append mode (only new data elements added to the result table will be written to sink), update mode (only the data elements that are updated in the result table will be written to sink), complete mode (all items in the result table will be written to sink).


In built Sources and Sinks

Structured Streaming supports following in built data sources.

File Source: Allows to read the files placed in certain directory. The formats supported are text, CSV, parquet, JSON

Kafka Source: Streaming library provides Kafka consumer to read data from Kafka broker. This is highly used in production.

Socket Source: Data can be read from socket using socket connections in UTF-8 format

The various built in sinks supported are as follows

File sink: Stores the output to a directory

Kafka sink: Stores the output to one or more topics in Kafka

Console sink: Prints the output to console, used for debugging purpose

Memory sink: Output is stored in memory as in-memory table, used for debugging purpose

Foreach sink: Runs adhoc computation on the records in output

Handling Fault Tolerance

Structured streaming provides fault tolerance by using check pointing to save the state of job and restart the job from the failed stage. This is also applicable to Spark Streaming using DStreams.

In addition, Structured Streaming provides following conditional based recovery mechanism for fault tolerance.

  1. The source must be replayable which is not yet committed
  2. The streaming sinks are designed to be idempotent for handling reprocessing

Demonstration

File Source

In this demo, we will be seeing Spark Streaming with some computations using File source and generating output to console sink. 

This use case contains sample csv data set related to employee containing "empId, empName, department" fields. The data in csv will be placed in particular location of folder.

Spark Stream inbuilt file source listens for the directory update event notifications and passes the data to computation layer for required analytics. The output is streamed to console in this example.

The following example is Spark Structured Streaming program that computes the count of employees in a particular department based on file streaming data

Java
 




x
32


1
package com.sparkstreaming;
2
 
          
3
import org.apache.spark.sql.Dataset;
4
import org.apache.spark.sql.Row;
5
import org.apache.spark.sql.SparkSession;
6
import org.apache.spark.sql.streaming.OutputMode;
7
import org.apache.spark.sql.streaming.StreamingQuery;
8
import org.apache.spark.sql.types.DataTypes;
9
import org.apache.spark.sql.types.StructType;
10
 
          
11
public class FileStream {
12
    public static void main(String[] args) throws Exception {
13
        //build the spark sesssion
14
        SparkSession spark = SparkSession.builder().appName("spark streaming").config("spark.master", "local")
15
                .config("spark.sql.warehouse.dir", "file:///app/").getOrCreate();
16
        //set the log level only to log errors
17
        spark.sparkContext().setLogLevel("ERROR");
18
        
19
        //define schema type of file data source
20
        StructType schema = new StructType().add("empId", DataTypes.StringType).add("empName", DataTypes.StringType)
21
                .add("department", DataTypes.StringType);
22
 
          
23
        //build the streaming data reader from the file source, specifying csv file format  
24
        Dataset<Row> rawData = spark.readStream().option("header", true).format("csv").schema(schema)
25
                .csv("D:/streamingfiles/*.csv");
26
        rawData.createOrReplaceTempView("empData");
27
      //count of employees grouping by department
28
        Dataset<Row> result = spark.sql("select count(*), department from  empData group by department");
29
 
          
30
        //write stream to output console with update mode as data is being aggregated 
31
        StreamingQuery query = result.writeStream().outputMode(OutputMode.Update()).format("console").start();
32
        query.awaitTermination();
33
 
          
34
    }
35
 
          
36
}



The output is streamed in batches where the first batch related to first file and second batch relates to second file etc

First csv file contains below data

Second csv file contains below data

The output of streaming query is as shown below where the count is grouped by department name. The 2nd batch data is aggregated with first batch based on key values and the output is updated in 2nd batch.

Kafka Source

In this demo, we will be seeing implementation of Spark structured streaming reading from Kafka broker and computing aggregations on 2 minute window period for batch

In this use case, web application is continuously generating logged in session duration of the users to Kafka broker. 

Using Spark streaming program, for every 2  minute window we compute the sum of session duration of the user logged into the website

The below is the Spark Streaming program in Java that computes the window based aggregations

Java
 




x


 
1
package com.sparkstreaming;
2
import org.apache.spark.sql.Dataset;
3
import org.apache.spark.sql.Encoders;
4
import org.apache.spark.sql.Row;
5
import org.apache.spark.sql.SparkSession;
6
import org.apache.spark.sql.api.java.UDF1;
7
import org.apache.spark.sql.streaming.OutputMode;
8
import org.apache.spark.sql.streaming.StreamingQuery;
9
import org.apache.spark.sql.streaming.StreamingQueryException;
10
import org.apache.spark.sql.types.DataTypes;
11
public class DataStream {
12
    public static void main(String[] args) throws StreamingQueryException
13
    {
14
        //set the hadoop home directory for kafka source
15
        System.setProperty("hadoop.home.dir", "d:/winutils");   
16
        SparkSession session = SparkSession.builder()
17
                .master("local[*]")
18
                .appName("structuredViewingReport")
19
                .getOrCreate();
20
        session.sparkContext().setLogLevel("ERROR");
21
        //define UDF to parse kafka message that can be passed to Spark SQL
22
        session.udf().register("sessionDurationFn", new UDF1<String, Long>() {
23
            @Override
24
            public Long call(String messageValue) throws Exception {
25
                String[]  strArr = messageValue.split(",");
26
                //returns the session duration value from Kafka message which is the first value in the coma delimited string value passed fron Kafka broker
27
                return Long.parseLong(strArr[0]);
28
            }
29
        }, DataTypes.LongType);
30
        session.udf().register("userNameFn", new UDF1<String, String>() {
31
            @Override
32
            public String call(String messageValue) throws Exception {
33
                String[]  strArr = messageValue.split(",");
34
                //returns user name value from Kafka message which is the second value in the coma delimited string value passed fron Kafka broker
35
                return strArr[1];
36
            }
37
        }, DataTypes.StringType);
38
                
39
        //define kafka streaming reader
40
        Dataset<Row> df = session.readStream()
41
                                 .format("kafka")
42
                                 .option("kafka.bootstrap.servers", "localhost:9092")
43
                                 .option("subscribe", "sessiondata")
44
                                 .load();
45
        
46
        // start  dataframe operations
47
        df.createOrReplaceTempView("session_data_init");
48
        // key, value, timestamp are the core attributes in the kafka message. Value contains the coma delimited string with sessionduration,userid value format
49
        Dataset<Row> preresults = session.sql("select sessionDurationFn(cast (value as string)) as session_duration, userNameFn(cast (value as string)) as userName,timestamp from session_data_init");
50
        preresults.createOrReplaceTempView("session_data");
51
      //compute sum of session duration grouping on 2 minute window and userName
52
        Dataset<Row> results = 
53
                session.sql("select window,sum(session_duration) as session_duration,userName  from session_data group by window(timestamp,'2 minutes'),userName");
54
                
55
        //log the results to console
56
        StreamingQuery query = results
57
           .writeStream()
58
           .format("console")
59
           .outputMode(OutputMode.Update())
60
           .option("truncate", false)
61
           .option("numRows", 50)
62
           .start();
63
        query.awaitTermination();   
64
    }
65
}
66
 
          


Kafka broker in this example sends the messages in key value pair where value is coma delimited string with session duration and userName value.

The output of the above program contains sequence of batches on window computation

These use cases are built as maven project. The below are the dependencies added in pom.xml

XML
 




xxxxxxxxxx
1
31


 
1
        <dependency>
2
            <groupId>org.apache.spark</groupId>
3
            <artifactId>spark-core_2.11</artifactId>
4
            <version>2.4.0</version>
5
        </dependency>
6
        <dependency>
7
            <groupId>org.apache.spark</groupId>
8
            <artifactId>spark-sql_2.11</artifactId>
9
            <version>2.4.0</version>
10
        </dependency>
11
        <dependency>
12
            <groupId>org.apache.spark</groupId>
13
            <artifactId>spark-streaming_2.11</artifactId>
14
            <version>2.4.0</version>
15
        </dependency>
16
        <dependency>
17
            <groupId>org.apache.spark</groupId>
18
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
19
            <version>2.4.0</version>
20
        </dependency>       
21
        <dependency>
22
            <groupId>org.apache.spark</groupId>
23
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
24
            <version>2.4.0</version>
25
        </dependency>               
26
        <dependency>
27
            <groupId>org.apache.hadoop</groupId>
28
            <artifactId>hadoop-hdfs</artifactId>
29
            <version>2.2.0</version>
30
        </dependency>



Performance Tuning Tips

Here are few performance tips to be considered in the Spark streaming applications

1. In the above Spark streaming output for Kafka source, there are some late arrival data. For example as shown below, first version of user19 data might be supposed to be arrived in batch1, but this has arrived in batch 2 for computation.

There are scenarios of late arrival data and computation of this data has to be performed on the earlier window data. In this scenario, the result of earlier window data is stored in memory and then aggregated with the late arrival data. Frameworks like Spark Streaming takes care of this process. But there might be possibilities of more memory consumption as the historical data is stored in the memory till the missed data is arrived which might lead to memory accumulation. In these scenarios, Spark streaming has feature of watermarking which discards the late arrival data when it crosses threshold value. In some cases, business results might have mismatch because of discarding these values. To avoid these type of issues, instead of applying watermarking feature, custom functionality has to be implemented to check the timestamp of data and then store it in HDFS or any cloud native object storage system to perform batch computations on the data. This implementation leads to complexity.

2. In the above Spark streaming output for kafka source, there is some performance lag where the data computation is slow. When we open the spark console and view the task and job info, we can predict the root cause for this issue.

The job is taking more than 1.5 minutes

When we open the task info of the completed job, we can see there are many partitions created with 0 shuffling tasks handled. These dummy tasks on these partitions will take some time to start and stop and there by increase the total processing time which leads to latency 

By default 200 partitions are getting created as shown above. Spark  while processing uses shuffling when grouping operation is performed. During shuffling, partitions of data will be created and each partition will have the tasks assigned. Spark SQL has to take decision to use how many partitions to use. In these scenarios, more partitions will be created during shuffle read and many partitions will not have data to work on.  This is because there are not many keys in the output as partitions size.  These dummy tasks on these partitions will take some time to start and stop and there by increase the total processing time which leads to latency. The recommendation is to set the below property in the Spark code.   This property determines the number of partitions that are used when shuffling data for joins or aggregations

Java
 




xxxxxxxxxx
1


 
1
session.conf().set("spark.sql.shuffle.partitions", "10");



With this parameter, we can observe the job performance has been increased and also optimal number of partitions are created as shown below which is viewed from Spark console

In this way, we can leverage Spark Structured Streaming in real time applications and get benefits of optimized Spark SQL based computing on the streaming data.

Topics:
big data, java, spark, spark sql, structured streaming, tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}