Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Introduction to SparkSession

DZone's Guide to

Introduction to SparkSession

We go over how to use this new feature of Apache Spark 2.0, covering all the Scala and SQL code you'll need to get started.

· Big Data Zone ·
Free Resource

How to Simplify Apache Kafka. Get eBook.

Spark 2.0 is the next major release of Apache Spark. This brings major changes to the level of abstraction for the Spark API and libraries. In this blog post, I’ll be discussing SparkSession.

Intro to SparkSession

Before getting into SparkSession, let's understand the entry point. An entry point is where control is transferred from the operating system to the provided program. Before 2.0, the entry point to Spark Core was sparkContext. Apache Spark is a powerful cluster computing engine, therefore it is designed for fast computation of big data.

sparkContext in Apache Spark

Web

An important step for any Spark driver application is to generate sparkContext. It allows your Spark application to access the Spark cluster with the help of the resource manager. The resource manager can be one of these three:

  • SparkStandalone

  • YARN

  • Apache Mesos

Functions of sparkContext in Apache Spark

  • Get the current status of your Spark application.

  • Set configurations.

  • Access various services.

  • Cancel a job.

  • Cancel a stage.

  • Closure cleaning.

  • Register SparkListener.

  • Programmable dynamic allocation.

  • Access persistent RDD.

Prior to Spark 2.0, sparkContext was used as a channel to access all Spark functionalities. The Spark driver program uses sparkContext to connect to the cluster through resource manager.

SparkConf is required to create sparkContext objects, which stores configuration parameters like appName (to identify your Spark driver), the core number, and the memory size of the executor running on a worker node.

In order to use SQL APIs, Hive, and streaming, separate contexst need to be created.

Example:

val conf = new SparkConf()

.setMaster("local")

.setAppName("Spark Practice2")

val sc = new SparkContext(conf)

SparkSession – New entry-point of Spark

introduction-to-apache-spark-20-12-638

As we know, in previous versions, sparkContext is the entry point for Spark. As RDD was the main API, it was created and manipulated using context APIs. For every other API, we needed to use a different context.

For streamin, we needed streamingContext. For SQL, sqlContext, and for Hive, hiveContext. But as DataSet and DataFrame APIs are becoming new standalone APIs, we need an entry point build for them. So in Spark 2.0, we have a new entry point build for DataSet and DataFrame APIs called as SparkSession.

jumpstart-on-apache-spark-22-on-databricks-40-638

It's a combination of SQLContext, HiveContext, and streamingContext. All the APIs available on those contexts are available on SparkSession; SparkSession also has a sparkContext for actual computation.

spark-sql-SessionState

 Now we can look at how to create a SparkSession and interact with it.

Creating a SparkSession

The following code comes in handy when you want to create a SparkSession:

val spark = SparkSession.builder()

.master("local")

.appName("example of SparkSession")

.config("spark.some.config.option", "some-value")

.getOrCreate()

SparkSession.builder()

This method is created for constructing a SparkSession.

master(“local”)

Sets the Spark master URL to connect to:

“local” to run locally

“local[4]” to run locally with 4 cores

“spark://master:7077” to run on a spark standalone cluster

appName( )

Set a name for the application which will be shown in the spark Web UI.

If no application name is set, a randomly generated name will be used.

Config

This keyword sets a config option using this method that are automatically propagated to both ‘SparkConf’ and ‘SparkSession’ configurations. Its arguments consist of key-value pairs.

GetOrElse

Gets an existing SparkSession or, if there is a valid thread-local SparkSession, it returns that one. It then checks whether there is a valid global default SparkSession and, if so, returns that one. If no valid global SparkSession exists, the method creates a new SparkSession and assigns newly created SparkSessions as the global default.

In case an existing SparkSession is returned, the config option specified in this builder will be applied to existing SparkSessions.

The above is similar to creating a SparkContext with local and creating an SQLContext wrapping it. If you can need to create a Hive context, you can use the below code to create a SparkSession with Hive support:

val spark = SparkSession.builder()

.master("local")

.master("local")

.appName("example of SparkSession")

.config("spark.some.config.option", "some-value")

.enableHiveSupport()

.getOrCreate()

enableHiveSupport on the factory enables Hive support, which is similar to HiveContext, a created SparkSession, and we can use it to read the data.

Read Data Using SparkSession

SparkSession is the entry point for reading data, similar to the old SQLContext.read.

The below code is reading data from CSV using SparkSession. 

In Spark 2.0 onwards, it is better to use SparkSession as it provides access to all the Spark functionalities that sparkContext provides. Also, it provides APIs to work with DataFrames and DataSets

val df = spark.read.format("com.databricks.spark.csv")

.schema(customSchema)

.load("data.csv")

Running SQL Queries

SparkSession can be used to execute SQL queries over data, getting the result back as a DataFrame (i.e. Dataset[ROW]).

display(spark.sql("Select * from TimeStamp"))
+--------------------+-----------+----------+-----+

| TimeStamp|Temperature| date| Time|

+--------------------+-----------+----------+-----+

|2010-02-25T05:42:...| 79.48|2010-02-25|05:42|

|2010-02-25T05:42:...| 59.27|2010-02-25|05:42|

|2010-02-25T05:42:...| 97.98|2010-02-25|05:42|

|2010-02-25T05:42:...| 91.41|2010-02-25|05:42|

|2010-02-25T05:42:...| 60.67|2010-02-25|05:42|

|2010-02-25T05:42:...| 61.41|2010-02-25|05:42|

|2010-02-25T05:42:...| 93.6|2010-02-25|05:42|

|2010-02-25T05:42:...| 50.32|2010-02-25|05:42|

|2010-02-25T05:42:...| 64.69|2010-02-25|05:42|

|2010-02-25T05:42:...| 78.57|2010-02-25|05:42|

|2010-02-25T05:42:...| 66.89|2010-02-25|05:42|

|2010-02-25T05:42:...| 62.87|2010-02-25|05:42|

|2010-02-25T05:42:...| 74.32|2010-02-25|05:42|

|2010-02-25T05:42:...| 96.55|2010-02-25|05:42|

|2010-02-25T05:42:...| 71.93|2010-02-25|05:42|

|2010-02-25T05:42:...| 79.17|2010-02-25|05:42|

|2010-02-25T05:42:...| 73.89|2010-02-25|05:42|

|2010-02-25T05:42:...| 80.97|2010-02-25|05:42|

|2010-02-25T05:42:...| 81.04|2010-02-25|05:42|

|2010-02-25T05:42:...| 53.05|2010-02-25|05:42|

+--------------------+-----------+----------+-----+

Note: Only showing top 20 rows.

Working With Config Options

SparkSession can also be used to set runtime configuration options which can toggle the optimizer behavior or I/O (i.e. Hadoop) behavior.

Spark.conf.get(“Spark.Some.config”,”abcd”)

Spark.conf.get(“Spark.Some.config”)

Config options set can also be used in SQL using variable substitution.

%Sql select “${spark.some.config}”

Working With Metadata Directly

SparkSession also includes a catalog method that contains methods to work with the metastore (i.e. data catalog). The method returns DataSets so you can use the same DataSet API to play with them.

To get a list of tables in the current database, use the following code:

val tables =spark.catalog.listTables()

display(tables)



+----+--------+-----------+---------+-----------+

|name|database|description|tableType|isTemporary|

+----+--------+-----------+---------+-----------+

|Stu |default |null |Managed |false |

+----+--------+-----------+---------+-----------+

use the dataset API to filter on names



display(tables.filter(_.name contains “son”)))



+----+--------+-----------+---------+-----------+

|name|database|description|tableType|isTemporary|

+----+--------+-----------+---------+-----------+

|Stu |default |null |Managed |false |

+----+--------+-----------+---------+-----------+

Get the list of the column for a table



display(spark.catalog.listColumns(“smart”))



+-----+----------+----------+-----------+-------------+--------+

|name |description|dataType |nullable |isPartitioned|isbucket|

+-----+-----------+---------+-----------+-------------+--------+

|email|null |string |true |false |false |

+-----+-----------+---------+-----------+-------------+--------+

|iq |null |bigInt |true |false |false |

+-----+-----------+---------+-----------+-------------+--------+

Access the Underlying SparksContext

SparkSession.sparkContext returns the underlying sparkContext, used for creating RDDs as well as managing cluster resources.

Spark.sparkContext

res17: org.apache.spark.SparkContext = org.apache.spark.SparkContext@2debe9ac

12 Best Practices for Modern Data Ingestion. Download White Paper.

Topics:
spark 2.0.0 ,big data ,apache spark tutorial ,sparksession

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}