Over a million developers have joined DZone.

Spark: Use Cassandra as a Resilient Distributed DataSet Source

· Java Zone

Navigate the Maze of the End-User Experience and pick up this APM Essential guide, brought to you in partnership with CA Technologies

In this article, we will see how we can use Cassandra as a resilient distributed dataset (RDD) source for Spark, in order to perform RDD operations.

We will use the Stratio library to perform a Group By Operation, that Cassandra does not support natively.

Specifically, we will use the World Cup matchs results, and “group” these results “by” winner team and display, for each team, how many times it won during World Cup (until now).

Here are the steps to perform this:
- Retrieve world cup matchs statistics from worldcup.sfg.io json API, using Retrofit. Here is the JSON data that we will use : http://worldcup.sfg.io/matches.
- Store retrieved statistics into an embedded Cassandra server (using Cassandra Unit and the Datastax Java driver).
- Perform a “group By” winner country operation on all matches, using Stratio, allowing using Cassandra Database as a resilient distributed dataset (RDD) source for Spark.

In the following, the whole code is in the Java language.

Prerequisites

To run this test, you need:
- JDK 7 or greater
- Git
- Maven

Retrieving World Cup statistics

Retrofit allows to perform HTTP requests, and map JSON results automatically. We just need to create an interface mapping the subsequent HTTP request:

public interface WorldCupService {
    @GET("/matches")
    List<Match> getMatchs();
}

The Match class contains fields mapping exactly returned JSON fields:

public class Match {
    private int match_number=-1;
    private String location=null;
    private String status=null;
    private String datetime=null;
    private String winner=null;
    private Team home_team = null;
    private Team away_team = null;
 ...
}

with the Team class being:

public class Team {
    private String country=null;
    private String code=null;
    private int goals=-1;
...
}

Finally, we create an adapter based on the previous interface:

RestAdapter restAdapter = new RestAdapter.Builder().setEndpoint(
                "http://worldcup.sfg.io").build();
WorldCupService service = restAdapter.create(WorldCupService.class);
return service.getMatchs();

Retrofit will automatically populate Match and Team fields from JSON fields (http://worldcup.sfg.io/matches.) using reflection.

Persisting Data to Cassandra

The keyspace and table are created using CQL:

CREATE KEYSPACE WorldCup WITH replication={'class' : 'SimpleStrategy', 'replication_factor':1};
 
use WorldCup;
 
CREATE TABLE Match (
    number int PRIMARY KEY,
    status varchar,
    location varchar,
    datetime  varchar,
    winner varchar,
    home_team_code varchar,
    home_team_country varchar,
    home_team_goals int,
    away_team_code varchar,
    away_team_country varchar,
    away_team_goals int
 
);

Then we will use the Datastax Java driver to store our Matchs to Cassandra, using a Batch Statement:

Insert insertStatement = QueryBuilder.insertInto("Match");
insertStatement.value("number", QueryBuilder.bindMarker())
 .value("status", QueryBuilder.bindMarker())
 .value("location", QueryBuilder.bindMarker())
 .value("datetime", QueryBuilder.bindMarker())
 .value("winner", QueryBuilder.bindMarker())
 .value("home_team_code", QueryBuilder.bindMarker())
 .value("home_team_country", QueryBuilder.bindMarker())
 .value("home_team_goals", QueryBuilder.bindMarker())
 .value("away_team_code", QueryBuilder.bindMarker())
 .value("away_team_country", QueryBuilder.bindMarker())
 .value("away_team_goals", QueryBuilder.bindMarker());
PreparedStatement ps = session.prepare(insertStatement.toString());
BatchStatement batch = new BatchStatement();
for(Match match : matchs){
 batch.add(ps.bind(match.getMatch_number(),
 match.getStatus(),
 match.getLocation(),
 match.getDatetime(),
 match.getWinner(),
 match.getHome_team().getCode(),
 match.getHome_team().getCountry(),
 match.getHome_team().getGoals(),
 match.getAway_team().getCode(),
 match.getAway_team().getCountry(),
 match.getAway_team().getGoals()
));
}
session.execute(batch);
session.close();

Spark Context

You can see a overview of what Spark does here. Basically, it provides a way to perform operations (group by, join, etc) in a distributed way on a data set, that should also be able to be retrieved in a distributed way. Thus, Spark manages the Hadoop filesystem (HDFS), distributed by nature, and we will see here how can use Cassandra, as a resilient distributed dataset (RDD) source.

To do so, we need to use the Stratio API.

First, we need to initialize a DeepSparkContext, inheriting from JavaSparkContext:

String cluster = "local";
String job = "myJobName";
String sparkHome = "";
 
DeepSparkContext deepContext =
   new DeepSparkContext(cluster, job, sparkHome, new String[]{});

Here, the Spark cluster will be embedded in the Java Virtual Machine, therefore we set the cluster parameter as “local”.

We can set any job name.

The sparkHome parameter is not needed here, since we will not rely on an existing Spark installation.

The String[] parameter indicates jar files, that we need if we use Entity RDD to map a cql table with a Java Object (like Hibernate). In that case only, jar files should contain classes related to these objects. Here, we will use the more generic Cell RDD, where columns are bound to generic cells that include metadata along with the values. Therefore no need to specify jar files.

Job Config

We need a way to tell Stratio where and what data we need to retrieve from Cassandra. This can be done by initializing an ICassandraDeepJobConfig bean:

ICassandraDeepJobConfig<Cells> config = DeepJobConfigFactory
                .create().host("localhost").cqlPort(9142)
                .keyspace("worldCup").table("match")
                .inputColumns("winner")
                .initialize();

Here, we’re telling Stratio how to connect to Cassandra, and which keyspace and table will contain data that will be used as a source of RDD. To optimize further access, we can also specify which columns we need to retrieve, as well as filtering with secondary indexes with the filterByField(filterColumnName, filterValue) method (not used here).

Creating a RDD and planning operations

Once done, we can create our RDD abstraction, and plan operations (plan because operations are lazy) on this data set:

CassandraJavaRDD rdd = deepContext.cassandraJavaRDD(config);
         
JavaPairRDD<String, Iterable<Cells>> groups = rdd.groupBy(new Function<Cells, String>() {
    @Override
    public String call(Cells cells) throws Exception {
        Object cellValue = cells.getCellByName("winner").getCellValue();
        return cellValue!=null ? cellValue.toString() : null;
    }
});
JavaPairRDD<String,Integer> counts = groups.mapToPair(new PairFunction<Tuple2<String, Iterable<Cells>>, String, Integer>() {
    @Override
    public Tuple2<String, Integer> call(Tuple2<String, Iterable<Cells>> t) throws Exception {
        return new Tuple2<String,Integer>(t._1(), Lists.newArrayList(t._2()).size());
    }
});

Here, we’re telling that we will “group” match results “by” winner team. Then, we will map each team with the number of times it won.

Collecting results

Finally, we can collect and display results:

List<Tuple2<String,Integer>> results = counts.collect();
LOGGER.info("GroupBy Results:");
for(Tuple2<String,Integer> tuple : results){
    LOGGER.info(tuple.toString());
}

This code will output the following (as of 29 June 2014):

GroupBy Results:
(null,14)
(Portugal,1)
(Brazil,3)
(Netherlands,3)
(Draw,9)
(Uruguay,2)
(Colombia,4)
(Argentina,3)
(Chile,2)
(Croatia,1)
(Belgium,3)
(Mexico,2)
(Ecuador,1)
(Greece,1)
(France,2)
(Italy,1)
(Ivory Coast,1)
(Algeria,1)
(Switzerland,2)
(Spain,1)
(Germany,2)
(USA,1)
(Costa Rica,2)
(Bosnia and Herzegovina,1)
(Nigeria,1)

Conclusion

Now, you will know how to perform distributed operations using Spark and Cassandra.
By the way, Databricks and Datastax, companies supporting Spark and Cassandra, recently announced Partnership centring around a supported open-source integration of their products.

Hope you enjoyed this post, and don’t forget to share!

Get the code

The code and details on how to run this test are available on GitHub.

About the author

Julien Sebrien, passionate developer, follow me here.

Links

Spark home page : https://spark.apache.org/
Cassandra home page : http://cassandra.apache.org/
Stratio home page : http://www.openstratio.org/
Retrofit home page : http://square.github.io/retrofit/
Cassandra Unit Github Repository : https://github.com/jsevellec/cassandra-unit
Datastax Java driver : http://www.datastax.com/documentation/developer/java-driver/2.0
WorldCup Api home page : http://worldcup.sfg.io/




Thrive in the application economy with an APM model that is strategic. Be E.P.I.C. with CA APM.  Brought to you in partnership with CA Technologies.

Topics:

Published at DZone with permission of Julien Sebrien, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}