Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Spark SQL Against Cassandra Example

DZone's Guide to

Spark SQL Against Cassandra Example

Spark SQL is awesome. It allows you to query any Resilient Distributed Dataset (RDD) using SQL (including data stored in Cassandra!).

· Database Zone
Free Resource

Read why times series is the fastest growing database category.

Spark SQL is awesome.  It allows you to query any Resilient Distributed Dataset (RDD) using SQL (including data stored in Cassandra!).

First thing to do is to create a SQLContext from your SparkContext.  I'm using Java so...
(sorry -- I'm still not hip enough for Scala)

JavaSparkContext context =new JavaSparkContext(conf);
JavaSQLContext sqlContext =new JavaSQLContext(context);

Now you have a SQLContext, but you have no data.  Go ahead and create an RDD, just like you would in regular Spark:

JavaPairRDD<Integer, Product> productsRDD = 
  javaFunctions(context).cassandraTable("test_keyspace", "products",
    productReader).keyBy(new Function<Product, Integer>() {
  @Override
  public Integer call(Product product) throws Exception {
    return product.getId();
  }
});

(The example above comes from the spark-on-cassandra-quickstart project, as described in my previous post.)


Now that we have a plain vanilla RDD,  we need to spice it up with a schema, and let the sqlContext know about it.  We can do that with the following lines:


JavaSchemaRDD schemaRDD =   sqlContext.applySchema(productsRDD.values(), Product.class);        
sqlContext.registerRDDAsTable(schemaRDD, "products");   

Shazam.  Now your sqlContext is ready for querying.  Notice that it inferred the schema from the Java bean. (Product.class).  (Next blog post, I'll show how to do this dynamically)


You can prime the pump with a:

System.out.println("Total Records = [" + productsRDD.count() + "]");

The count operation forces Spark to load the data into memory, which makes queries like the following lightning fast:

JavaSchemaRDD result = sqlContext.sql("SELECT id from products WHERE price < 0.50");
for (Row row : result.collect()){
  System.out.println(row);
}

That's it.  You're off to the SQL races.

P.S.  If you try querying the sqlContext without applying a schema and/or without registering the RDD as a table, you may see something similar to this:

Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'id, tree:
'Project ['id]
 'Filter ('price < 0.5)
  NoRelation$

Learn how to get 20x more performance than Elastic by moving to a Time Series database.

Topics:
sql ,nosql ,spark ,cassandra ,database

Published at DZone with permission of Brian O' Neill, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}