Spark/Cassandra Stack: Perform RDD Operations Using Datastax Connector
We will use the brand new Datastax Cassandra/Spark connector to be able to load data from a Cassandra table and run RDD operations on this data using Spark.
Join the DZone community and get the full member experience.
Join For FreeIn this article, we will use the brand new Datastax Cassandra/Spark connector to be able to load data from a Cassandra table and run RDD operations on this data using Spark.
From a specific roadtrip sets, we will compute the average roadtrip distance, grouped by origin city.
Here are the steps to perform this:
- Insert roadtrips information (from roadtrips.zip in /src/main/resources) into an embedded Cassandra server (using Cassandra Unit and the Datastax Java driver)
- Then, count the number of trips, grouped by origin city, using the Datastax Cassandra/Spark connector and Spark.
- Finally, compute the average roadtrip distance, still grouped by origin city.
In the following, the whole code is in the Java language.
Prerequisites
To run this test, you need:
- JDK 7 or greater
- Git
- Maven
Storing Roadtrips information
We will import roadtrips information from a CSV file and store it into a Cassandra Table:
CREATE TABLE RoadTrip (
id int PRIMARY KEY,
origin_city_name varchar,
origin_state_abr varchar,
destination_city_name varchar,
destination_state_abr varchar,
elapsed_time int,
distance int
);
The RoadTrip class is as follows:
public class RoadTrip {
private Integer id = null;
private String originCityName = null;
private String originStateAbr = null;
private String destinationCityName = null;
private String destinationStateAbr = null;
private Integer elapsedTime = null;
private Integer distance = null;
...
}
We will store Roadtrip beans with the Datastax Java driver, using a Batch statement:
Insert insertStatement = QueryBuilder.insertInto("RoadTrip");
insertStatement.value("id", QueryBuilder.bindMarker())
.value("origin_city_name", QueryBuilder.bindMarker())
.value("origin_state_abr", QueryBuilder.bindMarker())
.value("destination_city_name", QueryBuilder.bindMarker())
.value("destination_state_abr", QueryBuilder.bindMarker())
.value("elapsed_time", QueryBuilder.bindMarker())
.value("distance", QueryBuilder.bindMarker())
;
PreparedStatement ps = session.prepare(insertStatement.toString());
...
BatchStatement batch = new BatchStatement();
for (RoadTrip roadtrip : roadtrips) {
batch.add(ps.bind(roadtrip.getId(),
roadtrip.getOriginCityName(),
roadtrip.getOriginStateAbr(),
roadtrip.getDestinationCityName(),
roadtrip.getDestinationStateAbr(),
roadtrip.getElapsedTime(),
roadtrip.getDistance()
));
}
session.execute(batch);
...
Spark Context
You can see a overview of what Spark does here. Basically, it provides a way to perform operations (group by, join, etc) on partionned data sets, and we will see here how we can use Cassandra, as a resilient distributed dataset (RDD) source.
To do so, we need to use the Datastax Cassandra/Spark connector.
First, we need to initialize a SparkContext, and create a Resilient Distributed Dataset abstraction, using a Cassandra table as this dataset source:
SparkConf conf = new SparkConf(true)
.setMaster("local")
.setAppName("DatastaxTests")
.set("spark.executor.memory", "1g")
.set("spark.cassandra.connection.host", "localhost")
.set("spark.cassandra.connection.native.port", "9142")
.set("spark.cassandra.connection.rpc.port", "9171");
SparkContext ctx = new SparkContext(conf);
SparkContextJavaFunctions functions = CassandraJavaUtil.javaFunctions(ctx);
JavaRDD<CassandraRow> rdd = functions.cassandraTable("roadtrips", "roadtrip").toJavaRDD();
Here, the Spark cluster will be embedded in the Java Virtual Machine, therefore we set the master parameter as “local”.
We need to provide basic information about the Application name, the amount of memory allocated for the context, as well as Cassandra host and ports.
Finally, we’re telling Spark what table will be the data source of the RDD.
Creating a RDD and planning operations
Once done, we can create our RDD abstraction, and plan operations (plan because operations are lazy) on this data set.
What we need to do first is to cache this RDD. This will tell Spark, not to recompute this dataset and persist it in memory (which is the default setting). This will also allow us to re-use this RDD for several operations, which will be the case later in this article.
JavaRDD<CassandraRow> rdd = functions.cassandraTable("roadtrips", "roadtrip").toJavaRDD();
rdd.cache();
Now, we will compute the number of roadtrips, and group by origin_city:
JavaPairRDD<String, Integer> sizes = rdd.groupBy( new Function<CassandraRow, String>() {
@Override
public String call(CassandraRow row) throws Exception {
return row.getString("origin_city_name");
}
}).mapToPair(new PairFunction<Tuple2<String,Iterable<CassandraRow>>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Iterable<CassandraRow>> t) throws Exception {
return new Tuple2<String,Integer>(t._1(), Lists.newArrayList(t._2()).size());
}
});
sizes.cache();
That’s all! Basically, using the groupBy operation, we’re creating a Tuple2 <String, Iterable<CassandraRow>> tuple where the String is an origin city, and Iterable<CassandraRow>, the list of rows having this very origin city.
Then, during the chained mapToPair operation, we’re receiving each previous Tuple, and create a new abstraction that will be of the form Tuple2<String, Integer> where the String is a origin city, and the Integer the number of Roadtrips starting from this city.
Here, you will notice that we also cache the sizes RDD abstraction, since we will re-use this intermediate result to compute the average distance, still by origin city.
Here is the output as this stage:
Nb RoadTrips by origin
Albuquerque : 61
Raleigh/Durham : 62
Memphis : 24
Seattle : 31
Orlando : 154
Salt Lake City : 31
Newark : 61
Hartford : 31
Miami : 773
San Antonio : 176
New York : 978
Omaha : 57
Portland : 9
San Jose : 57
Austin : 194
Charlotte : 31
Kansas City : 93
Chicago : 1108
Fort Lauderdale : 31
Dayton : 31
San Francisco : 362
Tulsa : 62
Los Angeles : 957
Atlanta : 31
Indianapolis : 1
Fayetteville : 31
Wichita : 62
Columbus : 31
Washington : 358
St. Louis : 204
Kahului : 93
El Paso : 31
Oklahoma City : 31
Ontario : 36
Phoenix : 124
Santa Ana : 33
Baltimore : 27
Burbank : 8
Kona : 31
Las Vegas : 93
Norfolk : 50
Philadelphia : 8
Minneapolis : 30
Houston : 58
Lihue : 42
Palm Springs : 31
Honolulu : 164
San Juan : 62
Louisville : 1
Tampa : 124
Fort Myers : 31
Colorado Springs : 31
San Diego : 159
Boston : 212
Mission/McAllen/Edinburg : 30
West Palm Beach/Palm Beach : 62
Dallas/Fort Worth : 2275
Charlotte Amalie : 31
Now, we need to compute the roadtrip average distance, still grouped by origin city:
JavaPairRDD<String, Integer> sums = rdd.mapToPair(new PairFunction<CassandraRow, String, Integer>() {
@Override
public Tuple2<String, Integer> call(CassandraRow row) throws Exception {
return new Tuple2(row.getString("origin_city_name"), row.getInt("distance"));
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer d1, Integer d2) throws Exception {
return Integer.valueOf(d1.intValue()+d2.intValue());
}
});
Here, for each CassandraRow, we create an Tuple2<String, Integer> where the String is an origin city, and the Integer the distance of the corresponding RoadTrip (ie. CassandraRow).
Then we use the reduceByKey operation to compute the sum of roadtrip distances for a specify Tuple2 key, which is an origin city here. This operation will go through all the distances previously found, and for each distance pair (d1, d2), returns its sum. This computed sum will be used for the next iteration until the whole distance collection is processed.
Finally, to compute the average distance for each RoadTrip, we need a way to join the number of roadtrips and the sum of distances we previously computed. This can be done with the join operation:
List<Tuple2<String,Double>> averageResults = sums.join(sizes)
.mapValues(new Function<Tuple2<Integer,Integer>, Double>() {
@Override
public Double call(Tuple2<Integer, Integer> tuple) throws Exception {
return Double.valueOf((double)tuple._1() / tuple._2());
}
}).collect();
sums.join(sizes) create a new RDD abstraction which will be of the form JavaRDD<String,Tuple2<Integer,Integer>> where the String is an origin city, and the Tuple2<Integer,Integer> containing respectively the sums of distance, and the number of RoadTrips.
Note that we re-use here the previously computed sizes RDD abstraction.
Finally, the mapValues operation will tranform this JavaRDD<String,Tuple2<Integer,Integer>> abstraction into a JavaRDD<String,Double> abstraction where the Double represents the average distance.
Calling collect() will then output the following:
Average distance by origin
Albuquerque : 569.0
Raleigh/Durham : 880.5
Memphis : 432.0
Seattle : 2428.8387096774195
Orlando : 1313.7662337662337
Salt Lake City : 989.0
Newark : 1904.1311475409836
Hartford : 1471.0
Miami : 1404.1875808538164
San Antonio : 247.0
New York : 1639.402862985685
Omaha : 583.0
Portland : 1616.0
San Jose : 1643.7894736842106
Austin : 520.7835051546392
Charlotte : 936.0
Kansas City : 441.0
Chicago : 906.5361010830325
Fort Lauderdale : 1182.0
Dayton : 861.0
San Francisco : 2099.5552486187844
Tulsa : 448.61290322580646
Los Angeles : 2424.0010449320794
Atlanta : 731.0
Indianapolis : 761.0
Fayetteville : 280.0
Wichita : 328.0
Columbus : 926.0
Washington : 1322.2067039106146
St. Louis : 752.1764705882352
Kahului : 2881.043010752688
El Paso : 551.0
Oklahoma City : 175.0
Ontario : 1188.0
Phoenix : 1154.0
Santa Ana : 1315.5151515151515
Baltimore : 1217.0
Burbank : 1231.0
Kona : 2504.0
Las Vegas : 1605.6666666666667
Norfolk : 1212.0
Philadelphia : 1303.0
Minneapolis : 852.0
Houston : 619.5172413793103
Lihue : 2615.0
Palm Springs : 1126.0
Honolulu : 3112.8231707317073
San Juan : 1045.0
Louisville : 733.0
Tampa : 789.25
Fort Myers : 1120.0
Colorado Springs : 592.0
San Diego : 1558.4528301886792
Boston : 1871.1462264150944
Mission/McAllen/Edinburg : 469.0
West Palm Beach/Palm Beach : 1123.0
Dallas/Fort Worth : 1040.072087912088
Charlotte Amalie : 1623.0
Conclusion
Now, you will know how to perform distributed operations using Spark and Cassandra and the Datastax Connector.
Hope you enjoyed this post, and don’t forget to share!
Get the code
The code and details on how to run this test are available on GitHub.
About the author
Julien Sebrien, passionate developer, follow me here.
Links
- Spark home page : https://spark.apache.org/
- Cassandra home page : http://cassandra.apache.org/
- Cassandra/Spark Datastax Connector Repository : https://github.com/datastax/spark-cassandra-connector/
- Cassandra Unit Github Repository : https://github.com/jsevellec/cassandra-unit
- Datastax Java driver : http://www.datastax.com/documentation/developer/java-driver/2.0
Published at DZone with permission of Julien Sebrien, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments