Over a million developers have joined DZone.

Spark and ZooKeeper: Fault-Tolerant Job Manager out of the Box

DZone 's Guide to

Spark and ZooKeeper: Fault-Tolerant Job Manager out of the Box

· Java Zone ·
Free Resource

Problem definition

Imagine that you’ve been using some RDBMS for last ten years, and there’s millions of records inside, moreover the information keeps flowing every minute. And one day you needed to organize some complex information retrieval process on this data with a full-fledge enterprise search engine like Apache Solr. In order to do this you need to develop some ETL process that will be able to convert your data to internal Solr documents. You obviously want to distribute it and make fault-tolerant (you don’t want to lose even a row of information). And the last requirement, that makes everything more interesting - you want zero support burden. Everything should come out of the box! Too difficult? Absolutely not! Let’s see how to nail it with Solr, Spark and Zookeeper binded together.

Why Spark? It is distributed, it doesn’t force you to use MapReduce programming model (like Hadoop does), and finally, it has a failover mechanism backed by Zookeeper.

So, we introduced our toolbox, now it’s time to make it work. Let’s start with Solr and Zookeeper. Historically, Solr was just an universal search engine built on top of Apache Lucene, but starting from 4.x version it comes with distributed search support aka SolrCloud, which provides additional failure resilience, delegating cluster management to Zookeeper. Spark can also use Zookeeper for failure recovery in cluster mode.


First, let’s prepare a simple application that will emulate our long running data import task. We’ll submit this application to Spark and try to simulate node/process failure, while it sends simple documents to Solr. Our goal is to ensure that there’s no document loss due to any failures.

Spark applications are just Java archives containing all project dependencies. Documentation recommends to use eithersbt orMaven project management tools with appropriate plugins:

  We’ll make our application with Maven. In order to do this we need to add a dependency to SolrJ (http://mvnrepository.com/artifact/org.apache.solr/solr-solrj) and put a dummy class with a simple code like this:

 package com.example;

import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.common.SolrInputDocument;

import java.util.ArrayList;
import java.util.List;

public class Executor {

 public static void main(String[] args) {
     String solrServerURL = "http://localhost:8983/solr";
     int offset = 0;
     int delay = 1;

     for (String arg : args) {
         if (arg.startsWith("offset=")) {
             // This offset is needed to avoid sending several documents with the
             // same id. In spite of the fact that Solr can handle such documents,
             // we don't want to have any intersections. If we run 4 tasks like this
             // one, then we need check if all 4000 documents are indexed.
             offset = Integer.parseInt(arg.split("=")[1]);
         } else if (arg.startsWith("delay")) {
             delay = Integer.parseInt(arg.split("=")[1]);
         } else if (arg.startsWith("solr_server")) {
             solrServerURL = arg.split("=")[1];

     SolrServer solrServer = new HttpSolrServer(solrServerURL);
     List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();

     for (int i = 1; i <= offset + 1000; i ++) {
         SolrInputDocument document = new SolrInputDocument();
         document.addField("docid", String.valueOf(i));

         // Add documents to Solr by batches with a size equal to 100
         try {
             if (i % 100 == 0) {
                 docs = new ArrayList<SolrInputDocument>();
         } catch (Exception e) {
             // TODO: Proper exception handling

         // Artificial delay to prolongate total time
         try {
         } catch (InterruptedException e) {
             // Nothing to do

     // Finally commit documents and free resources
     try {
     } catch (Exception e) {
         // TODO: Proper exception handling
     } finally {


This code generates Solr input documents with the only field “id”, and sends them to server using Java client called SolrJ. The “id” field must be unique in the context of Solr index, and nevertheless it is not exceptional situation for Solr to handle two documents with the same id, we need to count every document sent to server to ensure that no document is lost. Fine, let’s build it, and now we have our “uber” JAR with all dependencies.

SolrCloud deployment

Next step. Now we are going to download and unpack SolrCloud distributive. Downloads are located on the pagehttp://lucene.apache.org/solr/downloads.html. I took 4.10.2 version. We’ll be using embedded example in our how-to, let’s clone its folder to make four independent workspaces:

 my_device:~ Root$ cd ~/solr-4.10.2/
my_device:solr-4.10.2 Root$ cp -r example example2
my_device:solr-4.10.2 Root$ cp -r example exampleB
my_device:solr-4.10.2 Root$ cp -r example example2B

Ok, now deploy first instance:

 my_device:example Root$ export SOLR_INSTANCE=~/solr-4.10.2/example
my_device:example Root$ java -Djetty.port=8983 -Djetty.home=$SOLR_INSTANCE  -Dsolr.solr.home=$SOLR_INSTANCE/solr -Dbootstrap_confdir=$SOLR_INSTANCE/solr/collection1/conf -Dcollection.configName=myconf -DzkRun -DzkHost=localhost:9983,localhost:8574,localhost:9900,localhost:9500 -DnumShards=2 -jar $SOLR_INSTANCE/start.jar

And continue deploying, changing jetty.port (8983, 7574, 8900, 8500) and SOLR_INSTANCE (example, example2, exampleB, example2B) parameters accordingly. You can send all these commands to background using “&” appendix or run them separately in four consoles. Here:

    • jetty.port, jetty.home - Solr from example uses Jetty embedded server under the hood, so we just need to specify right port to bind on and working folder

    • solr.solr.home - home directory for all Solr files (properties, index files and so on)

    • bootstrap_confdir - initial boot configuration, that will be uploaded to ZooKeeper

    • collection.configName - sets a name for ZooKeeper configuration

    • zkRun - tells to run embedded ZooKeeper

    • zkHost - specifies all nodes eligible to host ZooKeeper

When everything is done, we’ll get two shards and two replicas SolrCloud. Also we told Solr to run embedded ZooKeeper and upload its configuration there. You can check Solr state, cloud details and ZooKeeper data by going to Solr admin page located by the link http://localhost:port/solr (pic. 1 and 2).

Pic. 1 – SolrCloud graph

Pic. 2 – Zookeeper tree

Spark deployment 

Now, it’s Spark turn. Spark can be downloaded fromhttps://spark.apache.org/downloads.html. There are several pre-built versions available for download for your convenience. When it’s done, unpack an archive and clone it - one instance will became a leader or master node, second one will became spare or standby node. Let’s name these folders accordingly:

 my_device:~ Root$ tar -xzvf spark-1.1.1-bin-hadoop2.4.tgz
my_device:~ Root$ mv spark-1.1.1-bin-hadoop2.4 spark-leader
my_device:~ Root$ cp spark-leader spark-standby

Alright, then we need to configure our Spark nodes. In order to do this, copy or rename existing template:

my_device:~ Root$ cd spark-leader/conf
my_device:~ Root$ cp spark-env.sh.template spark-env.sh

And reduce it’s content to:

export SPARK_MASTER_IP="localhost"
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=localhost:9983,localhost:8574,localhost:9900,localhost:9500"
export SPARK_PID_DIR="/tmp/spark-leader"
export SPARK_LOCAL_DIRS="./data"
export SPARK_WORKER_DIR="./work"

What is this all about? Well, every option is described in the original template, we’ll only pay an attention to the following variables:

    1. SPARK_DAEMON_JAVA_OPTS - we told Spark to use Zookeeper for recovery and pointed to Zookeeper cluster

    2. SPARK_PID_DIR - there’s a dir where Spark will keep process ids, we will need it in order to simulate node failure

  Do the same for the spare node:

export SPARK_MASTER_IP="localhost"
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=localhost:9983,localhost:8574,localhost:9900,localhost:9500"
export SPARK_PID_DIR="/tmp/spark-standby"
export SPARK_LOCAL_DIRS="./data"
export SPARK_WORKER_DIR="./work"
That’s all, now it’s time to deploy Spark cluster. I wrote a simple bash script to achieve this goal:

rm -rf /tmp/spark-leader
rm -rf /tmp/spark-standby

bash spark-leader/sbin/start-master.sh
bash spark-standby/sbin/start-master.sh

for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
 bash spark-leader/sbin/start-slave.sh $(( $i + 1 ))  spark://localhost:1101 --webui-port $(( $SPARK_WORKER_WEBUI_PORT + $i ))

This script first cleans tmp directories, then runs two master nodes and finally starts four worker nodes. What are these master and worker nodes? Worker nodes are executors, they can only run assigned tasks, and master nodes are Spark coordinators, they orchestrate Spark cluster and assign tasks to available workers. If everything is fine, in a minute after the script is run we’ll be able to see following page by addressing to http://locahost:6661:

Screen Shot 2015-02-10 at 13.35.16.png

Pic. 3 – Spark master web UI

In the picture 3 we can see Spark master with four connected worker nodes. If we go to another node (http://localhost:7661), we’ll see similar screen, but without any worker nodes connected (that's because we sent them to spark://localhost:1101).


So, we deployed Solr, Zookeeper and Spark clusters, and we have an assembled application, let’s submit it and experiment with failover!

 my_device:~ Root$ spark-leader/bin/spark-submit --class com.example.Executor --master spark://localhost:1101 --deploy-mode cluster --supervise --executor-memory 512m --total-executor-cores 4 path-to-application.jar offset=0 solr_server=http://localhost:8983/solr &

What do these command mean? First of all, we use spark-submit script to submit our tasks. Second, we point to an executable class within our jar. Further we tell Spark how to submit the tasks (or drivers in terminology of Spark):

    • --deploy-mode cluster – from documentation “Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client)”

    • --supervise – specify this flag to make sure that the driver is automatically restarted if it fails with non-zero exit code. This’s exactly what we’re looking for – in case if Spark worker node is terminated, Spark cluster will just restart it on any free worker node.

  What about the rest of the arguments? Well, one of them is obviously just path to our jar. And the others two are arguments that are sent to our executable class. Solr server url is an endpoint where to send Solr documents. And the last one – offset – we need it, as it was told, to avoid sending several documents with the same id.

Run this command four times increasing offset by thousand. If everything is fine, we’ll get four completed tasks and four thousand documents in Solr (see pictures 4 and 5).

Screen Shot 2015-02-10 at 15.31.11.png

Pic. 4 – finished drivers in Spark master web UI

Screen Shot 2015-02-10 at 15.31.39.png

Pic. 5 – Match all query in Solr web UI

What next? Remember we introduced “delay” command line argument in our code? We need it now to manipulate total execution time causing some artificial processing delay. During the first run we minimized it to default 1 ms, because we needed this run only to collect our control data. During the second run we’ll extend this value up to 180 ms (total execution time will be at least 3 minutes) - this will give us enough time to simulate node failure. In the end we’ll compare control data with test data. But first we need to find out leader node PID:

my_device:~ Root$ cd /tmp/spark-leader
my_device:~ Root$ cat spark-Root-org.apache.spark.deploy.master.Master-1.pid
my_device:~ Root$

Ok, this number – 38073 – is our leader node PID. And let’s find out our worker nodes ids:

my_device:~ Root$ jps
38145 Master
38073 Master
38016 start.jar
38014 start.jar
38015 start.jar
38865 Jps
38013 start.jar
38385 Worker
38325 Worker
38265 Worker
38205 Worker
my_device:~ Root$

We’ll choose one of the workers as well as leader node and terminate them, but first we need to purge Solr and run our modified submit commands (again, four times):

my_device:~ Root$ curl http://localhost:8983/solr/update --data '<delete><query>*:*</query></delete>' -H 'Content-type:text/xml; charset=utf-8'
my_device:~ Root$ curl http://localhost:8983/solr/update --data '<commit/>' -H 'Content-type:text/xml; charset=utf-8'
my_device:~ Root$ spark-leader/bin/spark-submit --class com.example.Executor --master spark://localhost:1101 --deploy-mode cluster --supervise --executor-memory 512m --total-executor-cores 4 path-to-application.jar offset=0 delay=180 solr_server=http://localhost:8983/solr &

Ready? Lets simulate nodes failure:

my_device:~ Root$ kill -9 38385
my_device:~ Root$ kill -9 38073

If we’re fast enough, we can see how Spark failover mechanism works – all we need is to track changes on our standby node web UI (http://localhost:7661). Usually it takes about a half of a minute to see any updates. Expected behaviour is that no submitted task will lost if the worker is down (pic. 6), spare node will change its status from STANDBY to ALIVE (it becomes leader node) if current leader node is terminated, and no orphan worker will left – they all become connected to new leader (pic. 7):

Screen Shot 2015-02-10 at 18.49.47.png

Pic. 6 – Driver relaunching

Screen Shot 2015-02-10 at 18.50.02.png

Pic. 7 – Spark recovery process on spare master node

Finally, when all drivers become completed, we need to go back to Solr and check that all match query returns the same result as we seen (pic. 5) – 4000 documents.

Okay, seems our problem is solved - we simulated long-run distributed task, we randomly terminated executing applications and we saw entire recovery process which ensured that we haven’t lost any data. We didn’t have to write any special code to achieve this level of resiliency and job recovery. So, Spark can be used as a resilient job manager for long running data importing tasks.


Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}