Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Introducing an Apache Spark Datasource for OrientDB

DZone's Guide to

Introducing an Apache Spark Datasource for OrientDB

Head over to GitHub to find a connector that will help you leverage Apache Spark's data processing power for your OrientDB database. See it in action here.

· Database Zone
Free Resource

Traditional relational databases weren’t designed for today’s customers. Learn about the world’s first NoSQL Engagement Database purpose-built for the new era of customer experience.

Today, we will discuss how to connect Apache Spark to OrientDB. OrientDB, if you're not familiar with it, is a fast distributed multi-model database. Apache Spark, of course, is the widely popular engine for large-scale data processing.

We will discuss how to use this Apache Spark datasource for OrientDB to leverage Spark's capabilities while using OrientDB as the datastore.

Now, for getting started, let's see how we can use the Spark datasource for OrientDB in Spark applications.

Add the following to POM.xml.

<repository>
   <id>bintray</id>
   <name>bintray</name>
   <url>https://dl.bintray.com/sbcd90/org.apache.spark/</url>
</repository>

Then add the following as a maven dependency.

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-orientdb-{spark.version}_2.10</artifactId>
   <version>1.3</version>
</dependency>

Now, we create an OrientDB class, 'democlass', and then create an OrientDB document that belongs to the class from the Spark Datasource.

import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)
sqlContext.createDataFrame(sc.parallelize(Array(1, 2, 3, 4, 5)), 
      StructType(Seq(StructField("id", IntegerType)))
      .write
      .format("org.apache.spark.orientdb.documents")
      .option("dburl", ORIENTDB_CONNECTION_URL)
      .option("user", ORIENTDB_USER).option("password", ORIENTDB_PASSWORD)
      .option("class", "democlass")
      .mode(SaveMode.Overwrite)
      .save()

Now, we'll read all the documents from 'democlass' now.

import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)
val loadedDf = sqlContext.read
      .format("org.apache.spark.orientdb.documents")
      .option("dburl", ORIENTDB_CONNECTION_URL)
      .option("user", ORIENTDB_USER)
      .option("password", ORIENTDB_PASSWORD)
      .option("class", "democlass")
      .load()

We can also write OrientDB SQL to filter the documents fetched.

import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)
val loadedDf = sqlContext.read
      .format("org.apache.spark.orientdb.documents")
      .option("dburl", ORIENTDB_CONNECTION_URL)
      .option("user", ORIENTDB_USER)
      .option("password", ORIENTDB_PASSWORD)
      .option("class", "democlass")
      .option("query", s"select * from democlass where id = 1")
      .load()

These APIs now return a dataframe — on top of which any kind of Spark dataframe operation can be performed.

Now, let us see how the Spark datasource can create OrientDB Graphs.

Let's create the OrientDB Graph vertices first, which belongs to the vertex type 'demovertex'.

import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)
sqlContext.createDataFrame(sc.parallelize(Array(1, 2, 3, 4, 5)),
      StructType(Seq(StructField("id", IntegerType)))
      .write
      .format("org.apache.spark.orientdb.graphs")
      .option("dburl", ORIENTDB_CONNECTION_URL)
      .option("user", ORIENTDB_USER)
      .option("password", ORIENTDB_PASSWORD)
      .option("vertextype", "demovertex")
      .mode(SaveMode.Overwrite)
      .save()

Now let's create the edges that belong to the edge type 'demoedge'.

import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)
sqlContext.createDataFrame(
      sc.parallelize(Seq(
            Row(1, 2, "friends"),
            Row(2, 3, "enemy"),
            Row(3, 4, "friends"),
            Row(4, 1, "enemy")
      )),
      StructType(Seq(
            StructField("src", IntegerType),
            StructField("dst", IntegerType),
            StructField("relationship", StringType)
          )))
      .write
      .format("org.apache.spark.orientdb.graphs")
      .option("dburl", ORIENTDB_CONNECTION_URL)
      .option("user", ORIENTDB_USER)
      .option("password", ORIENTDB_PASSWORD)
      .option("vertextype", "demovertex")
      .option("edgetype", "demoedge")
      .mode(SaveMode.Overwrite)
      .save()

We can individually load the OrientDB vertices and edges into Spark dataframes.

import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)
val loadedDf = sqlContext.read
                    .format("org.apache.spark.orientdb.graphs")
                    .option("dburl", ORIENTDB_CONNECTION_URL)
                    .option("user", ORIENTDB_USER)
                    .option("password", ORIENTDB_PASSWORD)
                    .option("vertextype", "demovertex")
                    .load()
import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)
val loadedDf = sqlContext.read
                   .format("org.apache.spark.orientdb.graphs")
                   .option("dburl", ORIENTDB_CONNECTION_URL)
                   .option("user", ORIENTDB_USER)
                   .option("password", ORIENTDB_PASSWORD)
                   .option("edgetype", "demoedge")
                   .load()

And here, we can write OrientDB graph SQL queries to suit our needs. Now that we have the OrientDB vertices and edges dataframes, let's create a Spark Graphframe out of this.

import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)
val loadedVerticesDf = sqlContext.read
                 .format("org.apache.spark.orientdb.graphs")
                 .option("dburl", ORIENTDB_CONNECTION_URL)
                 .option("user", ORIENTDB_USER)
                 .option("password", ORIENTDB_PASSWORD)
                 .option("vertextype", "demovertex")
                 .option("query", s"select * from demovertex where id = 1")
                 .load()

val loadedEdgesDf = sqlContext.read
                 .format("org.apache.spark.orientdb.graphs")
                 .option("dburl", ORIENTDB_CONNECTION_URL)
                 .option("user", ORIENTDB_USER)
                 .option("password", ORIENTDB_PASSWORD)
                 .option("edgetype", "demoedge")
                 .option("query", s"select * from demoedge where relationship = 'friends'")
                 .load()

val g = GraphFrame(loadedVerticesDf, loadedEdgesDf)

This allows us to leverage the features of Spark graphframes on top of OrientDB graphs. I hope you enjoy using the new Spark datasource for OrientDB!!

Thanks!!

Learn how the world’s first NoSQL Engagement Database delivers unparalleled performance at any scale for customer experience innovation that never ends.

Topics:
orientdb ,datasource ,database ,apache spark

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}