Over a million developers have joined DZone.

Spark: Use Cassandra as a Resilient Distributed DataSet Source

DZone's Guide to

Spark: Use Cassandra as a Resilient Distributed DataSet Source

· Java Zone ·
Free Resource

Java-based (JDBC) data connectivity to SaaS, NoSQL, and Big Data. Download Now.

In this article, we will see how we can use Cassandra as a resilient distributed dataset (RDD) source for Spark, in order to perform RDD operations.

We will use the Stratio library to perform a Group By Operation, that Cassandra does not support natively.

Specifically, we will use the World Cup matchs results, and “group” these results “by” winner team and display, for each team, how many times it won during World Cup (until now).

Here are the steps to perform this:
- Retrieve world cup matchs statistics from worldcup.sfg.io json API, using Retrofit. Here is the JSON data that we will use : http://worldcup.sfg.io/matches.
- Store retrieved statistics into an embedded Cassandra server (using Cassandra Unit and the Datastax Java driver).
- Perform a “group By” winner country operation on all matches, using Stratio, allowing using Cassandra Database as a resilient distributed dataset (RDD) source for Spark.

In the following, the whole code is in the Java language.


To run this test, you need:
- JDK 7 or greater
- Git
- Maven

Retrieving World Cup statistics

Retrofit allows to perform HTTP requests, and map JSON results automatically. We just need to create an interface mapping the subsequent HTTP request:

public interface WorldCupService {
    List<Match> getMatchs();

The Match class contains fields mapping exactly returned JSON fields:

public class Match {
    private int match_number=-1;
    private String location=null;
    private String status=null;
    private String datetime=null;
    private String winner=null;
    private Team home_team = null;
    private Team away_team = null;

with the Team class being:

public class Team {
    private String country=null;
    private String code=null;
    private int goals=-1;

Finally, we create an adapter based on the previous interface:

RestAdapter restAdapter = new RestAdapter.Builder().setEndpoint(
WorldCupService service = restAdapter.create(WorldCupService.class);
return service.getMatchs();

Retrofit will automatically populate Match and Team fields from JSON fields (http://worldcup.sfg.io/matches.) using reflection.

Persisting Data to Cassandra

The keyspace and table are created using CQL:

CREATE KEYSPACE WorldCup WITH replication={'class' : 'SimpleStrategy', 'replication_factor':1};
use WorldCup;
    number int PRIMARY KEY,
    status varchar,
    location varchar,
    datetime  varchar,
    winner varchar,
    home_team_code varchar,
    home_team_country varchar,
    home_team_goals int,
    away_team_code varchar,
    away_team_country varchar,
    away_team_goals int

Then we will use the Datastax Java driver to store our Matchs to Cassandra, using a Batch Statement:

Insert insertStatement = QueryBuilder.insertInto("Match");
insertStatement.value("number", QueryBuilder.bindMarker())
 .value("status", QueryBuilder.bindMarker())
 .value("location", QueryBuilder.bindMarker())
 .value("datetime", QueryBuilder.bindMarker())
 .value("winner", QueryBuilder.bindMarker())
 .value("home_team_code", QueryBuilder.bindMarker())
 .value("home_team_country", QueryBuilder.bindMarker())
 .value("home_team_goals", QueryBuilder.bindMarker())
 .value("away_team_code", QueryBuilder.bindMarker())
 .value("away_team_country", QueryBuilder.bindMarker())
 .value("away_team_goals", QueryBuilder.bindMarker());
PreparedStatement ps = session.prepare(insertStatement.toString());
BatchStatement batch = new BatchStatement();
for(Match match : matchs){

Spark Context

You can see a overview of what Spark does here. Basically, it provides a way to perform operations (group by, join, etc) in a distributed way on a data set, that should also be able to be retrieved in a distributed way. Thus, Spark manages the Hadoop filesystem (HDFS), distributed by nature, and we will see here how can use Cassandra, as a resilient distributed dataset (RDD) source.

To do so, we need to use the Stratio API.

First, we need to initialize a DeepSparkContext, inheriting from JavaSparkContext:

String cluster = "local";
String job = "myJobName";
String sparkHome = "";
DeepSparkContext deepContext =
   new DeepSparkContext(cluster, job, sparkHome, new String[]{});

Here, the Spark cluster will be embedded in the Java Virtual Machine, therefore we set the cluster parameter as “local”.

We can set any job name.

The sparkHome parameter is not needed here, since we will not rely on an existing Spark installation.

The String[] parameter indicates jar files, that we need if we use Entity RDD to map a cql table with a Java Object (like Hibernate). In that case only, jar files should contain classes related to these objects. Here, we will use the more generic Cell RDD, where columns are bound to generic cells that include metadata along with the values. Therefore no need to specify jar files.

Job Config

We need a way to tell Stratio where and what data we need to retrieve from Cassandra. This can be done by initializing an ICassandraDeepJobConfig bean:

ICassandraDeepJobConfig<Cells> config = DeepJobConfigFactory

Here, we’re telling Stratio how to connect to Cassandra, and which keyspace and table will contain data that will be used as a source of RDD. To optimize further access, we can also specify which columns we need to retrieve, as well as filtering with secondary indexes with the filterByField(filterColumnName, filterValue) method (not used here).

Creating a RDD and planning operations

Once done, we can create our RDD abstraction, and plan operations (plan because operations are lazy) on this data set:

CassandraJavaRDD rdd = deepContext.cassandraJavaRDD(config);
JavaPairRDD<String, Iterable<Cells>> groups = rdd.groupBy(new Function<Cells, String>() {
    public String call(Cells cells) throws Exception {
        Object cellValue = cells.getCellByName("winner").getCellValue();
        return cellValue!=null ? cellValue.toString() : null;
JavaPairRDD<String,Integer> counts = groups.mapToPair(new PairFunction<Tuple2<String, Iterable<Cells>>, String, Integer>() {
    public Tuple2<String, Integer> call(Tuple2<String, Iterable<Cells>> t) throws Exception {
        return new Tuple2<String,Integer>(t._1(), Lists.newArrayList(t._2()).size());

Here, we’re telling that we will “group” match results “by” winner team. Then, we will map each team with the number of times it won.

Collecting results

Finally, we can collect and display results:

List<Tuple2<String,Integer>> results = counts.collect();
LOGGER.info("GroupBy Results:");
for(Tuple2<String,Integer> tuple : results){

This code will output the following (as of 29 June 2014):

GroupBy Results:
(Ivory Coast,1)
(Costa Rica,2)
(Bosnia and Herzegovina,1)


Now, you will know how to perform distributed operations using Spark and Cassandra.
By the way, Databricks and Datastax, companies supporting Spark and Cassandra, recently announced Partnership centring around a supported open-source integration of their products.

Hope you enjoyed this post, and don’t forget to share!

Get the code

The code and details on how to run this test are available on GitHub.

About the author

Julien Sebrien, passionate developer, follow me here.


Spark home page : https://spark.apache.org/
Cassandra home page : http://cassandra.apache.org/
Stratio home page : http://www.openstratio.org/
Retrofit home page : http://square.github.io/retrofit/
Cassandra Unit Github Repository : https://github.com/jsevellec/cassandra-unit
Datastax Java driver : http://www.datastax.com/documentation/developer/java-driver/2.0
WorldCup Api home page : http://worldcup.sfg.io/

Connect any Java based application to your SaaS data.  Over 100+ Java-based data source connectors.


Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}