How to Install and Configure Spark 2.0 to Connect With Cassandra 3.X
Learn how to install Scala 2.11, Spark 2.0 as a service, and the DataStax spark-cassandra-connector library on the client program.
Join the DZone community and get the full member experience.
Join For FreeIn this guide, we will be installing Scala 2.11, Spark 2.0 as a service, and the DataStax spark-cassandra-connector
library on the client program. If you have any of these software packages installed and configured already, you can skip that step. This guide assumes you have a Cassandra 3.x cluster that is already up and running. For more information on installing and using Cassandra, visit this site.
Note: The following steps should be performed on all the nodes in the cluster unless otherwise noted.
Install Scala 2.11
Ensure that you have Java installed.
$ java -version
java version "1.8.0_91"
Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)
If you don't have Java installed, follow this tutorial to get Java 8 installed.
Install Scala 2.11.8
$ wget www.scala-lang.org/files/archive/scala-2.11.8.deb
$ sudo dpkg -i scala-2.11.8.deb
$ scala -version
Install SBT 0.13
$ echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
$ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823
$ sudo apt-get update
$ sudo apt-get install sbt
Install Spark 2.0
Download Spark 2.0 from this link and unpack the TAR file:
$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz
$ tar zxf spark-2.0.2-bin-hadoop2.7.tgz
$ sudo mv spark-2.0.2-bin-hadoop2.7/ /usr/local/spark/
Update System Variables
$ sudo nano /etc/environment
Add an environment variable called SPARK_HOME
:
export SPARK_HOME=/usr/local/spark
At the end of the PATH variable, add $SPARK_HOME/bin
:
PATH="<previous_entries>:/usr/local/spark/bin"
Refresh the environment.
$ source /etc/environment
Create a Spark user and make it the owner of the SPARK_HOME
directory:
$ sudo adduser spark --system --home /usr/local/spark/ --disabled-password
$ sudo chown -R spark:root /usr/local/spark
Create LOG and PID directories:
$ sudo mkdir /var/log/spark
$ sudo chown spark:root /var/log/spark
$ sudo -u spark mkdir $SPARK_HOME/run
Create the Spark Configuration Files
Create the Spark Configuration files by copying the templates
$ sudo cp /usr/local/spark/conf/spark-env.sh.template /usr/local/spark/conf/spark-env.sh
$ sudo cp /usr/local/spark/conf/spark-defaults.conf.template /usr/local/spark/conf/spark-defaults.conf
$ sudo chown spark:root /usr/local/spark/conf/spark-*
Edit the Spark Environment file spark-env.sh
.
export SPARK_LOG_DIR=/var/log/spar
export SPARK_PID_DIR=${SPARK_HOME}/run
Configure Spark Nodes to Join the Cluster
If you will not be managing Spark using the Mesos or YARN cluster managers, you'll be running Spark in what is called standalone mode.
In standalone mode, Spark will have a master node (which is the cluster manager) and worker nodes. You should select one of the nodes in your cluster to be the master. Then, on every worker node, you must edit the /etc/spark/conf/spark-env.sh
to point to the host where the Spark Master runs.
# Options for the daemons used in the standalone deploy mode
export SPARK_MASTER_HOST=<spark_master_ip_or_hostname_here>
You can also change other elements of the default configuration by editing the /etc/spark/conf/spark-env.sh
. Some other configs to consider are:
SPARK_MASTER_PORT
andSPARK_MASTER_WEBUI_PORT
to use non-default ports.SPARK_WORKER_CORES
to set the number of cores to use on this machine.SPARK_WORKER_MEMORY
to set how much memory to use (for example, 1000MB, 2GB).SPARK_WORKER_PORT
andSPARK_WORKER_WEBUI_PORT
.SPARK_WORKER_INSTANCE
to set the number of worker processes per node.SPARK_WORKER_DIR
to set the working directory of worker processes.
Install Spark as a Service
Run the following commands to create a service for the spark-master
and spark-worker
.
$ sudo cp /etc/init.d/skeleton /etc/init.d/spark-master
$ sudo chmod 0755 /etc/init.d/spark-master
$ sudo cp /etc/init.d/skeleton /etc/init.d/spark-worker
$ sudo chmod 0755 /etc/init.d/spark-worker
$ sudo update-rc.d spark-master defaults 99
$ sudo update-rc.d spark-worker defaults 99
Edit the /etc/init.d/spark-worker/etc/init.d/spark-master
file. If a variable or function already exists, then replace it with the text below.
DESC="Spark Worker"
NAME=spark-worker
SPARK_HOME=/usr/local/spark
PIDFILE=$SPARK_HOME/run/spark--org.apache.spark.deploy.worker.Worker-1.pid
export SPARK_HOME
# Exit if the package is not installed
#[ -x "$DAEMON" ] || exit 0
if [ -f $SPARK_HOME/conf/spark-env.sh ];then
. $SPARK_HOME/conf/spark-env.sh
else
echo "$SPARK_HOME/conf/spark-env.sh not found. Cannot start service."
fi
#
# Function that returns 0 if process is running, or nonzero if not.
#
# The nonzero value is 3 if the process is simply not running, and 1 if the
# process is not running but the pidfile exists (to match the exit codes for
# the "status" command; see LSB core spec 3.1, section 20.2)
#
is_running()
{
CMD_PATT="org.apache.spark.deploy.worker.Worker"
if [ -f $PIDFILE ]; then
pid=`cat $PIDFILE`
grep -Eq "$CMD_PATT" "/proc/$pid/cmdline" 2>/dev/null && return 0
return 1
fi
return 3
}
#
# Function that starts the daemon/service
#
do_start()
{
# Return
# 0 if daemon has been started
# 1 if daemon was already running
# 2 if daemon could not be started
[ -e `dirname "$PIDFILE"` ] || \
install -d -ospark -groot -m755 `dirname $PIDFILE`
start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE \
--exec $SPARK_HOME/sbin/start-slave.sh \
--test > /dev/null \
|| return 1
start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE \
--exec $SPARK_HOME/sbin/start-slave.sh -- spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT \
|| return 2
}
#
# Function that stops the daemon/service
#
do_stop()
{
start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile $PIDFILE
RETVAL="$?"
rm -f $PIDFILE
return "$RETVAL"
}
#
# Function that sends a SIGHUP to the daemon/service
#
do_reload() {
#
# If the daemon can reload its configuration without
# restarting (for example, when it is sent a SIGHUP),
# then implement that here.
#
start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE
return 0
}
...
status)
is_running
stat=$?
case "$stat" in
0) log_success_msg "$DESC is running" ;;
1) log_failure_msg "could not access pidfile for $DESC" ;;
*) log_success_msg "$DESC is not running" ;;
esac
exit "$stat"
;;
...
Edit the /etc/init.d/spark-master file. If a variable or function already exists then replace it with the text below.
DESC="Spark Master"
NAME=spark-master
SPARK_HOME=/usr/local/spark
PIDFILE=$SPARK_HOME/run/spark--org.apache.spark.deploy.master.Master-1.pid
export SPARK_HOME
# Exit if the package is not installed
#[ -x "$DAEMON" ] || exit 0
if [ -f $SPARK_HOME/conf/spark-env.sh ];then
. $SPARK_HOME/conf/spark-env.sh
else
echo "$SPARK_HOME/conf/spark-env.sh not found. Cannot start service."
fi
#
# Function that returns 0 if process is running, or nonzero if not.
#
# The nonzero value is 3 if the process is simply not running, and 1 if the
# process is not running but the pidfile exists (to match the exit codes for
# the "status" command; see LSB core spec 3.1, section 20.2)
#
is_running()
{
CMD_PATT="org.apache.spark.deploy.worker.Worker"
if [ -f $PIDFILE ]; then
pid=`cat $PIDFILE`
grep -Eq "$CMD_PATT" "/proc/$pid/cmdline" 2>/dev/null && return 0
return 1
fi
return 3
}
#
# Function that starts the daemon/service
#
do_start()
{
# Return
# 0 if daemon has been started
# 1 if daemon was already running
# 2 if daemon could not be started
[ -e `dirname "$PIDFILE"` ] || \
install -d -ospark -groot -m755 `dirname $PIDFILE`
start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE --exec $SPARK_HOME/sbin/start-master.sh --test > /$
|| return 1
start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE --exec $SPARK_HOME/sbin/start-master.sh \
|| return 2
}
#
# Function that stops the daemon/service
#
do_stop()
{
start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile $PIDFILE
RETVAL="$?"
rm -f $PIDFILE
return "$RETVAL"
}
#
# Function that sends a SIGHUP to the daemon/service
#
do_reload() {
#
# If the daemon can reload its configuration without
# restarting (for example, when it is sent a SIGHUP),
# then implement that here.
#
start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE
return 0
}
...
status)
is_running
stat=$?
case "$stat" in
0) log_success_msg "$DESC is running" ;;
1) log_failure_msg "could not access pidfile for $DESC" ;;
*) log_success_msg "$DESC is not running" ;;
esac
exit "$stat"
;;
...
Run Spark as a Service
Start the Spark master node first. On whichever node you've selected to be master, run:
$ sudo service spark-master start
On all the other nodes, start the workers:
$ sudo service spark-worker start
To stop Spark, run the following commands on the appropriate nodes:
$ sudo service spark-worker stop $ sudo service spark-master stop
Service logs will be stored in /var/log/spark
.
Testing the Spark Service
To test the Spark service, start spark-shell
on one of the nodes.
$ spark-shell --master spark://<IP>:<Port>
When the prompt comes up, execute the following line of code:
$ scala> sc.parallelize( 1 to 1000 ).sum()
Get Spark-Cassandra-Connector
on the Client
The spark-cassandra-connector
is a Scala library that exposes Cassandra tables as Spark RDDs, lets you write Spark RDDs to Cassandra tables, and allows you to execute arbitrary computations and CQL queries that are distributed to the Cassandra nodes holding the data, which allows them to be fast. Your code plus the spark-cassandra-connector
and all dependencies are packaged up and sent to the Spark nodes.
If you are writing ad-hoc queries and computations from the spark-shell
, start up the shell by running the command:
$ spark-shell --conf spark.cassandra.connection.host=<master-ip-address> --packages datastax:spark-cassandra-connector:2.0.0-M2-s_2.11
The --packages
option downloads the connector and all of its dependencies from the Spark Packages site and places it in the path of the Spark Driver and all Spark Executors.
If you are writing a Scala application, configure a new Scala project. Your build.sbt
file should look something like this:
name := "MySparkProject"
version := "1.0" scalaVersion := "2.11.8"
val sparkVersion = "2.0.2"
resolvers += "Spark Packages Repo"
at "https://dl.bintray.com/spark-packages/maven"
libraryDependencies ++= Seq(
"org.apache.spark"
% "spark-core"
% sparkVersion,
"org.apache.spark"
% "spark-sql"
% sparkVersion,
"datastax"
% "spark-cassandra-connector" % "2.0.0-M2-s_2.11"
)
Testing the Connector
To start out, create a simple keyspace and table in Cassandra. Run the following statements in cqlsh:
CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }; CREATE TABLE test.kv(key text PRIMARY KEY, value int);
Then insert some example data:
INSERT INTO test.kv(key, value) VALUES ('key1', 1); INSERT INTO test.kv(key, value) VALUES ('key2', 2);
For this test, we'll use thespark-shell
.
$ spark-shell --conf spark.cassandra.connection.host=<master-ip-address> --packages datastax:spark-cassandra-connector:2.0.0-M2-s_2.11
// import the spark connector namespace
import com.datastax.spark.connector._
// read Cassandra data into an RDD
val rdd = sc.cassandraTable("test", "kv")
println(rdd.count)
println(rdd.first)
println(rdd.map(_.getInt("value")).sum)
// Add two more rows to the table
val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
collection.saveToCassandra("test", "kv", SomeColumns("key", "value"))
Published at DZone with permission of Tim Ojo, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments