Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

How to Install and Configure Spark 2.0 to Connect With Cassandra 3.X

DZone's Guide to

How to Install and Configure Spark 2.0 to Connect With Cassandra 3.X

Learn how to install Scala 2.11, Spark 2.0 as a service, and the DataStax spark-cassandra-connector library on the client program.

· Integration Zone
Free Resource

Today’s data climate is fast-paced and it’s not slowing down. Here’s why your current integration solution is not enough. Brought to you in partnership with Liaison Technologies.

In this guide, we will be installing Scala 2.11, Spark 2.0 as a service, and the DataStax spark-cassandra-connector library on the client program. If you have any of these software packages installed and configured already, you can skip that step. This guide assumes you have a Cassandra 3.x cluster that is already up and running. For more information on installing and using Cassandra, visit this site.

Note: The following steps should be performed on all the nodes in the cluster unless otherwise noted.

Install Scala 2.11

Ensure that you have Java installed.

$ java -version
java version "1.8.0_91"
Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)

If you don't have Java installed, follow this tutorial to get Java 8 installed.

Install Scala 2.11.8

$ wget www.scala-lang.org/files/archive/scala-2.11.8.deb
$ sudo dpkg -i scala-2.11.8.deb
$ scala -version

Install SBT 0.13

$ echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
$ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823
$ sudo apt-get update
$ sudo apt-get install sbt

Install Spark 2.0

Download Spark 2.0 from this link and unpack the TAR file:

$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz
$ tar zxf spark-2.0.2-bin-hadoop2.7.tgz
$ sudo mv spark-2.0.2-bin-hadoop2.7/ /usr/local/spark/

Update System Variables

$ sudo nano /etc/environment

Add an environment variable called SPARK_HOME:

export SPARK_HOME=/usr/local/spark

At the end of the PATH variable, add $SPARK_HOME/bin:

PATH="<previous_entries>:/usr/local/spark/bin"

Refresh the environment.

$ source /etc/environment

Create a Spark user and make it the owner of the SPARK_HOME directory:

$ sudo adduser spark --system --home /usr/local/spark/ --disabled-password
$ sudo chown -R spark:root /usr/local/spark

Create LOG and PID directories:

$ sudo mkdir /var/log/spark
$ sudo chown spark:root /var/log/spark
$ sudo -u spark mkdir $SPARK_HOME/run

Create the Spark Configuration Files

Create the Spark Configuration files by copying the templates

$ sudo cp /usr/local/spark/conf/spark-env.sh.template /usr/local/spark/conf/spark-env.sh
$ sudo cp /usr/local/spark/conf/spark-defaults.conf.template /usr/local/spark/conf/spark-defaults.conf
$ sudo chown spark:root /usr/local/spark/conf/spark-*

Edit the Spark Environment file spark-env.sh.

export SPARK_LOG_DIR=/var/log/spar
export SPARK_PID_DIR=${SPARK_HOME}/run

Configure Spark Nodes to Join the Cluster

If you will not be managing Spark using the Mesos or YARN cluster managers, you'll be running Spark in what is called standalone mode.

In standalone mode, Spark will have a master node (which is the cluster manager) and worker nodes. You should select one of the nodes in your cluster to be the master. Then, on every worker node, you must edit the /etc/spark/conf/spark-env.sh to point to the host where the Spark Master runs.

# Options for the daemons used in the standalone deploy mode
export SPARK_MASTER_HOST=<spark_master_ip_or_hostname_here>

You can also change other elements of the default configuration by editing the /etc/spark/conf/spark-env.sh. Some other configs to consider are:

  • SPARK_MASTER_PORT and SPARK_MASTER_WEBUI_PORT to use non-default ports.
  • SPARK_WORKER_CORES to set the number of cores to use on this machine.
  • SPARK_WORKER_MEMORY to set how much memory to use (for example, 1000MB, 2GB).
  • SPARK_WORKER_PORT and SPARK_WORKER_WEBUI_PORT.
  • SPARK_WORKER_INSTANCE to set the number of worker processes per node.
  • SPARK_WORKER_DIR to set the working directory of worker processes.

Install Spark as a Service

Run the following commands to create a service for the spark-master and spark-worker.

$ sudo cp /etc/init.d/skeleton /etc/init.d/spark-master
$ sudo chmod 0755 /etc/init.d/spark-master
$ sudo cp /etc/init.d/skeleton /etc/init.d/spark-worker
$ sudo chmod 0755 /etc/init.d/spark-worker
$ sudo update-rc.d spark-master defaults 99
$ sudo update-rc.d spark-worker defaults 99

Edit the /etc/init.d/spark-worker/etc/init.d/spark-master file. If a variable or function already exists, then replace it with the text below.

DESC="Spark Worker"
NAME=spark-worker
SPARK_HOME=/usr/local/spark
PIDFILE=$SPARK_HOME/run/spark--org.apache.spark.deploy.worker.Worker-1.pid
export SPARK_HOME


# Exit if the package is not installed
#[ -x "$DAEMON" ] || exit 0


if [ -f $SPARK_HOME/conf/spark-env.sh ];then
        . $SPARK_HOME/conf/spark-env.sh
else
        echo "$SPARK_HOME/conf/spark-env.sh not found. Cannot start service."
fi


#
# Function that returns 0 if process is running, or nonzero if not.
#
# The nonzero value is 3 if the process is simply not running, and 1 if the
# process is not running but the pidfile exists (to match the exit codes for
# the "status" command; see LSB core spec 3.1, section 20.2)
#
is_running()
{
    CMD_PATT="org.apache.spark.deploy.worker.Worker"
    if [ -f $PIDFILE ]; then
        pid=`cat $PIDFILE`
        grep -Eq "$CMD_PATT" "/proc/$pid/cmdline" 2>/dev/null && return 0
        return 1
    fi
    return 3
}


#
# Function that starts the daemon/service
#
do_start()
{
        # Return
        #   0 if daemon has been started
        #   1 if daemon was already running
        #   2 if daemon could not be started


        [ -e `dirname "$PIDFILE"` ] || \
                install -d -ospark -groot -m755 `dirname $PIDFILE`


        start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE  \
                --exec $SPARK_HOME/sbin/start-slave.sh  \
                --test > /dev/null \
                || return 1
        start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE \
                --exec $SPARK_HOME/sbin/start-slave.sh -- spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT \
                || return 2


}


#
# Function that stops the daemon/service
#
do_stop()
{
        start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile $PIDFILE
        RETVAL="$?"
        rm -f $PIDFILE
        return "$RETVAL"
}


#
# Function that sends a SIGHUP to the daemon/service
#
do_reload() {
        #
        # If the daemon can reload its configuration without
        # restarting (for example, when it is sent a SIGHUP),
        # then implement that here.
        #
        start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE
        return 0
}


...
status)
      is_running
      stat=$?
      case "$stat" in
              0) log_success_msg "$DESC is running" ;;
              1) log_failure_msg "could not access pidfile for $DESC" ;;
              *) log_success_msg "$DESC is not running" ;;
      esac
      exit "$stat"
      ;;
...

Edit the /etc/init.d/spark-master file. If a variable or function already exists then replace it with the text below.

DESC="Spark Master"
NAME=spark-master
SPARK_HOME=/usr/local/spark
PIDFILE=$SPARK_HOME/run/spark--org.apache.spark.deploy.master.Master-1.pid
export SPARK_HOME


# Exit if the package is not installed
#[ -x "$DAEMON" ] || exit 0


if [ -f $SPARK_HOME/conf/spark-env.sh ];then
        . $SPARK_HOME/conf/spark-env.sh
else
        echo "$SPARK_HOME/conf/spark-env.sh not found. Cannot start service."
fi


#
# Function that returns 0 if process is running, or nonzero if not.
#
# The nonzero value is 3 if the process is simply not running, and 1 if the
# process is not running but the pidfile exists (to match the exit codes for
# the "status" command; see LSB core spec 3.1, section 20.2)
#
is_running()
{
    CMD_PATT="org.apache.spark.deploy.worker.Worker"
    if [ -f $PIDFILE ]; then
        pid=`cat $PIDFILE`
        grep -Eq "$CMD_PATT" "/proc/$pid/cmdline" 2>/dev/null && return 0
        return 1
    fi
    return 3
}


#
# Function that starts the daemon/service
#
do_start()
{
        # Return
        #   0 if daemon has been started
        #   1 if daemon was already running
        #   2 if daemon could not be started


        [ -e `dirname "$PIDFILE"` ] || \
                install -d -ospark -groot -m755 `dirname $PIDFILE`


        start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE --exec $SPARK_HOME/sbin/start-master.sh --test > /$
                || return 1
        start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE --exec $SPARK_HOME/sbin/start-master.sh  \
                || return 2
}


#
# Function that stops the daemon/service
#
do_stop()
{
        start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile $PIDFILE


        RETVAL="$?"
        rm -f $PIDFILE
        return "$RETVAL"
}


#
# Function that sends a SIGHUP to the daemon/service
#
do_reload() {
        #
        # If the daemon can reload its configuration without
        # restarting (for example, when it is sent a SIGHUP),
        # then implement that here.
        #
        start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE
        return 0
}


...
status)
      is_running
      stat=$?
      case "$stat" in
              0) log_success_msg "$DESC is running" ;;
              1) log_failure_msg "could not access pidfile for $DESC" ;;
              *) log_success_msg "$DESC is not running" ;;
      esac
      exit "$stat"
      ;;
...

Run Spark as a Service

Start the Spark master node first. On whichever node you've selected to be master, run:

$ sudo service spark-master start

On all the other nodes, start the workers:

$ sudo service spark-worker start

To stop Spark, run the following commands on the appropriate nodes:

$ sudo service spark-worker stop $ sudo service spark-master stop

Service logs will be stored in /var/log/spark.

Testing the Spark Service

To test the Spark service, start spark-shell on one of the nodes.

$ spark-shell --master spark://<IP>:<Port>

When the prompt comes up, execute the following line of code:

$ scala> sc.parallelize( 1 to 1000 ).sum()

Get Spark-Cassandra-Connector on the Client

The spark-cassandra-connector is a Scala library that exposes Cassandra tables as Spark RDDs, lets you write Spark RDDs to Cassandra tables, and allows you to execute arbitrary computations and CQL queries that are distributed to the Cassandra nodes holding the data, which allows them to be fast. Your code plus the spark-cassandra-connector and all dependencies are packaged up and sent to the Spark nodes.

If you are writing ad-hoc queries and computations from the spark-shell, start up the shell by running the command:

$ spark-shell --conf spark.cassandra.connection.host=<master-ip-address> --packages datastax:spark-cassandra-connector:2.0.0-M2-s_2.11

The --packages option downloads the connector and all of its dependencies from the Spark Packages site and places it in the path of the Spark Driver and all Spark Executors.

If you are writing a Scala application, configure a new Scala project. Your build.sbt file should look something like this:

name := "MySparkProject" 
version := "1.0" scalaVersion := "2.11.8" 
val sparkVersion = "2.0.2" 
resolvers += "Spark Packages Repo" 
at "https://dl.bintray.com/spark-packages/maven" 
libraryDependencies ++= Seq(   
  "org.apache.spark"     
  % "spark-core"  
  % sparkVersion,   
  "org.apache.spark"      
  % "spark-sql"   
  % sparkVersion,   
  "datastax"              
% "spark-cassandra-connector" % "2.0.0-M2-s_2.11" 
)

Testing the Connector

To start out, create a simple keyspace and table in Cassandra. Run the following statements in cqlsh:

 CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }; CREATE TABLE test.kv(key text PRIMARY KEY, value int);

Then insert some example data:

 INSERT INTO test.kv(key, value) VALUES ('key1', 1); INSERT INTO test.kv(key, value) VALUES ('key2', 2); 

For this test, we'll use thespark-shell.

$ spark-shell --conf spark.cassandra.connection.host=<master-ip-address> --packages datastax:spark-cassandra-connector:2.0.0-M2-s_2.11

// import the spark connector namespace
import com.datastax.spark.connector._


// read Cassandra data into an RDD
val rdd = sc.cassandraTable("test", "kv")
println(rdd.count)
println(rdd.first)
println(rdd.map(_.getInt("value")).sum)  


// Add two more rows to the table
val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
collection.saveToCassandra("test", "kv", SomeColumns("key", "value"))


Is iPaaS solving the right problems? Not knowing the fundamental difference between iPaaS and iPaaS+ could cost you down the road. Brought to you in partnership with Liaison Technologies.

Topics:
spark ,spark 2.0.0 ,cassandra ,nosql ,integration ,tutorial

Published at DZone with permission of Tim Ojo, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}