Over a million developers have joined DZone.

Spark Backend for Apache Gora

I’ve worked on a Spark backend of Apache Gora as my GSoC 2015 project and finished it.

· Big Data Zone

Read this eGuide to discover the fundamental differences between iPaaS and dPaaS and how the innovative approach of dPaaS gets to the heart of today’s most pressing integration problems, brought to you in partnership with Liaison.

I’ve worked on a Spark backend of Apache Gora as my GSoC 2015 project and finished it. For this blog post, I’ll explain how it works and how to use it. First of all, I suggest you to read my previous posts about my GSoC 2015 acceptance and Apache Gora if you haven’t read them.

Apache Gora provides an in-memory data model and persistence for big data. Gora supports persisting to column stores, key value stores, document stores, and RDBMS's as well as analyzing the data with extensive Apache Hadoop MapReduce support. Gora is a powerful project which can work like Hibernate of theNoSQL world, and one can run Map/Reduce jobs on it. Even Spark is powerful compared to Map/Reduce, which Gora currently supports; there was no Spark backend for Gora, and my GSoC project is aimed that.

I’ll follow a log analytics example to explain Spark backend implementation during my post. It will use Apache server logs which is persisted at a data store. I suggest you download and compile this Gora source code and read this Gora tutorial to find and persist an example log. You can use its built-in scripts at Gora to persist example data.

Because we are using Apache Gora, we can either use Hbase, Solr, MongoDB, and others (for a full list: http://gora.apache.org/) as a data store. Gora will run your code independent of which data store you are using. In this example, I’ll persist an example set of Apache server logs into Hbase (version:, I’ll read them from there, run Spark codes on it, and write the result into Solr (version: 4.10.3).

Firstly, startup a data store to read value from. I’ll start Hbase as a persistence data store:

furkan@kamaci:~/apps/hbase-$ ./bin/start-hbase.sh
starting master, logging to /home/furkan/apps/hbase-

Persist example logs into Hbase (you should have extracted access.log.tar.gz before this command):

furkan@kamaci:~/projects/gora$ ./bin/gora logmanager -parse gora-tutorial/src/main/resources/access.log

After running parse command, run shell command of hbase:

furkan@kamaci:~/apps/hbase-$ ./bin/hbase shell
2015-08-31 00:20:16,026 WARN [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version, re1dbf4df30d214fca14908df71d038081577ea46, Sun May 17 12:34:26 PDT 2015

and run list command. You should see the output below if there were not any tables at your Hbase before:

hbase(main):025:0> list
1 row(s) in 0.0150 seconds
=> ["AccessLog"]

Check whether any data exists in the table:

hbase(main):026:0> scan 'AccessLog', {LIMIT=>1}
\x00\x00\x00\x00\x00\x00\x00\x00 column=common:ip, timestamp=1440970360966, value=
\x00\x00\x00\x00\x00\x00\x00\x00 column=common:timestamp, timestamp=1440970360966, value=\x00\x00\x01\x1F\xF1\xAElP
\x00\x00\x00\x00\x00\x00\x00\x00 column=common:url, timestamp=1440970360966, value=/index.php?a=1__wwv40pdxdpo&k=218978
\x00\x00\x00\x00\x00\x00\x00\x00 column=http:httpMethod, timestamp=1440970360966, value=GET
\x00\x00\x00\x00\x00\x00\x00\x00 column=http:httpStatusCode, timestamp=1440970360966, value=\x00\x00\x00\xC8
\x00\x00\x00\x00\x00\x00\x00\x00 column=http:responseSize, timestamp=1440970360966, value=\x00\x00\x00+
\x00\x00\x00\x00\x00\x00\x00\x00 column=misc:referrer, timestamp=1440970360966, value=http://www.buldinle.com/index.php?a=1__WWV40pdxdpo&k=218978
\x00\x00\x00\x00\x00\x00\x00\x00 column=misc:userAgent, timestamp=1440970360966, value=Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)
1 row(s) in 0.0810 seconds

To write the result into Solr, create a schemaless core named as Metrics. To do it easily, you can rename default core of collection1 to Metrics which is at solr-4.10.3/example/example-schemaless/solr folder and edit /home/furkan/Desktop/solr-4.10.3/example/example-schemaless/solr/Metrics/core.properties as:


Then run start command for Solr:

furkan@kamaci:~/Desktop/solr-4.10.3/example$ java -Dsolr.solr.home=example-schemaless/solr/ -jar start.jar

Let’s start the example. Read data from Hbase, generate some metrics and write results into Solr with Spark via Gora. Here is how to initialize in and out data stores:

public int run(String[] args) throws Exception {
DataStore<Long, Pageview> inStore;
DataStore<String, MetricDatum> outStore;
Configuration hadoopConf = new Configuration();
if (args.length > 0) {
String dataStoreClass = args[0];
inStore = DataStoreFactory.getDataStore(
dataStoreClass, Long.class, Pageview.class, hadoopConf);
if (args.length > 1) {
dataStoreClass = args[1];
outStore = DataStoreFactory.getDataStore(
dataStoreClass, String.class, MetricDatum.class, hadoopConf);
} else {
inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, hadoopConf);
outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, hadoopConf);

Pass input data store’s key and value classes and instantiate a GoraSparkEngine:

GoraSparkEngine<Long, Pageview> goraSparkEngine = new GoraSparkEngine<>(Long.class,

Construct a JavaSparkContext. Register input data store’s value class as Kryo class:

SparkConf sparkConf = new SparkConf().setAppName(
"Gora Spark Integration Application").setMaster("local");
Class[] c = new Class[1];
c[0] = inStore.getPersistentClass();
JavaSparkContext sc = new JavaSparkContext(sparkConf);

You can get JavaPairRDD from input data store:

JavaPairRDD<Long, Pageview> goraRDD = goraSparkEngine.initialize(sc, inStore);

When you get it, you can work on it as like you are writing a code for Spark! For example:

long count = goraRDD.count();
System.out.println("Total Log Count: " + count);

Here is my functions for map and reduce phases for this example:

/** The number of milliseconds in a day */
private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
* map function used in calculation
private static Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> mapFunc = new Function<Pageview, Tuple2<Tuple2<String, Long>, Long>>() {
public Tuple2<Tuple2<String, Long>, Long> call(Pageview pageview)
throws Exception {
String url = pageview.getUrl().toString();
Long day = getDay(pageview.getTimestamp());
Tuple2<String, Long> keyTuple = new Tuple2<>(url, day);
return new Tuple2<>(keyTuple, 1L);
* reduce function used in calculation
private static Function2<Long, Long, Long> redFunc = new Function2<Long, Long, Long>() {
public Long call(Long aLong, Long aLong2) throws Exception {
return aLong + aLong2;
* metric function used after map phase
private static PairFunction<Tuple2<Tuple2<String, Long>, Long>, String, MetricDatum> metricFunc = new PairFunction<Tuple2<Tuple2<String, Long>, Long>, String, MetricDatum>() {
public Tuple2<String, MetricDatum> call(
Tuple2<Tuple2<String, Long>, Long> tuple2LongTuple2) throws Exception {
String dimension = tuple2LongTuple2._1()._1();
long timestamp = tuple2LongTuple2._1()._2();
MetricDatum metricDatum = new MetricDatum();
String key = metricDatum.getMetricDimension().toString();
key += "_" + Long.toString(timestamp);
return new Tuple2<>(key, metricDatum);
* Rolls up the given timestamp to the day cardinality, so that data can be
* aggregated daily
private static long getDay(long timeStamp) {
return (timeStamp / DAY_MILIS) * DAY_MILIS;

Here is how to run map and reduce functions at existing JavaPairRDD:

JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD
JavaPairRDD<String, MetricDatum> reducedGoraRdd = JavaPairRDD

When you want to persist result into output data store, (in our example it is Solr), you should do it as follows:

Configuration sparkHadoopConf = goraSparkEngine.generateOutputConf(outStore);

That’s all! Check Solr to see the results:


You can see that one can read data from a data store (i.e. Hbase), run Spark codes on it (map/reduce), and write results into same or another data store (i.e. Solr). GoraSparkEngine gives a Spark backend capability to Apache Gora, and I think that it will make Gora much more powerful.

Discover the unprecedented possibilities and challenges, created by today’s fast paced data climate and why your current integration solution is not enough, brought to you in partnership with Liaison

big data,apache,spark,gora,spark backend

Published at DZone with permission of Furkan Kamaci, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}