Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Neo4j With Scala: Awesome Experience With Spark

DZone's Guide to

Neo4j With Scala: Awesome Experience With Spark

Learn how to integrate Apache Spark with your Neo4j database using Scala to get things up and running.

· Database Zone
Free Resource

Whether you work in SQL Server Management Studio or Visual Studio, Redgate tools integrate with your existing infrastructure, enabling you to align DevOps for your applications with DevOps for your SQL Server databases. Discover true Database DevOps, brought to you in partnership with Redgate.

In our last article, we discussed data migration from other databases to Neo4j. Now, we will discuss how we combine Neo4j with Spark.

Before starting, here is recap:

  1. Getting Started Neo4j with Scala: An Introduction
  2. Neo4j with Scala: Defining User-Defined Procedures and APOC
  3. Neo4j with Scala: Migrate Data From Other Database to Neo4j

Now we can start our journey. We know that Apache Spark is a generalized framework for distributed data processing providing a functional API for manipulating data at a large scale, in-memory data caching and reuse across computations. We have to follow some basic steps before start playing with Spark.

  1. We can download Apache Spark 2.0.0 (download here). We have to remember that we can use only Spark 2.0.0 because the connector we are going to use is built on Spark 2.0.0.
  2. Set the SPARK_HOME Path in the .bashrc file (for Linux users).
  3. Now We can use the Neo4j-Spark-Connector.

Configuration With Neo4j

  1. When we are running Neo4j with the default host and port, we have to configure our password in the spark-defaults.conf.
     spark.neo4j.bolt.password=your_neo4j_password
  2. If we are not running Neo4j with the default host and port then we have to configure it in spark-defaults.conf.
    spark.neo4j.bolt.url=bolt://$host_name:$port_number
  3. We can provide the username and password with the URL.
    bolt://neo4j:<password>@localhost

Integrating Neo4j-Spark-Connector

  1. Download Neo4j-Spark-Connector (download here) and build it. After building, we can provide the JAR path when we start spark-shell.
    $ $SPARK_HOME/bin/spark-shell --jars neo4j-spark-connector_2.11-full-2.0.0-M2.jar
  2. We can provide Neo4j-Spark-Connector as a package when we start spark-shell.
    $ $SPARK_HOME/bin/spark-shell --packages neo4j-contrib:neo4j-spark-connector:2.0.0-M2
  3. We can directly integrate Neo4j-Spark-Connector in the SBT project.
    resolvers += "Spark Packages Repo" at "http://dl.bintray.com/spark-packages/maven"
    libraryDependencies += "neo4j-contrib" % "neo4j-spark-connector" % "2.0.0-M2"
  4. We can directly integrate Neo4j-Spark-Connector with POM (see here).

Now we are ready to use Neo4j with Spark. We will start Spark from the command prompt.

$ $SPARK_HOME/bin/spark-shell --packages neo4j-contrib:neo4j-spark-connector:2.0.0-M2

Screenshot from 2016-09-26 14:03:43.png

Now we are ready to start a new journey with Spark and Neo4j.

I suppose that you have created data in Neo4j for running basic commands. If you have not created that yet, then here is the Cypher for creating 1,000 records in your Neo4j database:

UNWIND range(1, 1000) AS row
   CREATE (:Person {id: row, name: 'name' + row, age: row % 100})

UNWIND range(1, 1000) AS row
   MATCH (n)
   WHERE id(n) = row
   MATCH (m)
   WHERE id(m) = toInt(rand() * 1000)
   CREATE (n)-[:KNOWS]->(m)


RDD Operations

We start with some RDD operations on the data

import org.neo4j.spark._

val neo = Neo4j(sc)

val rowRDD = neo.cypher("MATCH (n:Person) RETURN n.name as name limit 10").loadRowRdd
rowRDD.map(t => "Name: " + t(0)).collect.foreach(println)


Screenshot from 2016-10-04 15:17:28.png

import org.neo4j.spark._

val neo = Neo4j(sc)

//calculate mean from the age data

val rowRDD = neo.cypher("MATCH (n:Person) RETURN n.age").loadRdd[Long].mean
//rowRDD: Double = 49.5

// load relationships via pattern

neo.pattern("Person",Seq("KNOWS"),"Person").partitions(12).batch(100).loadNodeRdds.count
//res30: Long = 1000


DataFrame Operations

Now we will perform some operations on the DataFrame.

import org.neo4j.spark._

val neo = Neo4j(sc)
val df = neo.cypher("MATCH (n:Person) RETURN n.name as name, n.age as age limit 5").loadDataFrame

df.collect.foreach(println)


Screenshot from 2016-10-04 16:42:46.png

import org.neo4j.spark._

val neo = Neo4j(sc)

//calculate sum from the age data

val df = neo.cypher("MATCH (n:Person) RETURN n.age as age").loadDataFrame
df.agg(sum(df.col("age"))).collect
//res10: Array[org.apache.spark.sql.Row] = Array([49500])

// load relationships via pattern

val df = neo.pattern("Person",Seq("KNOWS"),"Person").partitions(2).batch(100).loadDataFrame.count
//df: Long = 200


Graphx Graph Operations

Now we will perform some operations on the Graphx graph:

import org.neo4j.spark._

val neo = Neo4j(sc)

import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._

//load graph

val graphQuery = "MATCH (n:Person)-[r:KNOWS]->(m:Person) RETURN id(n) as source, id(m) as target, type(r) as value"
val graph: Graph[Long, String] = neo.rels(graphQuery).partitions(10).batch(100).loadGraph

graph.vertices.count
//res0: Long = 1000

graph.edges.count
//res1: Long = 999

//load graph with pattern

val graph = neo.pattern(("Person","id"),("KNOWS","since"),("Person","id")).partitions(10).batch(100).loadGraph[Long,Long]

//Count in-degree

graph.inDegrees.count
//res0: Long = 505

val graph2 = PageRank.run(graph, 5)

graph2.vertices.sortBy(_._2).take(3)

//res1: Array[(org.apache.spark.graphx.VertexId, Double)] = Array((602,0.15), (630,0.15), (476,0.15))


And there you have it. This has been a basic example for using Spark with Neo4j. I hope it will help you to start working with both of them.

Thanks.

Reference

  1. Neo4j Spark Connector

It’s easier than you think to extend DevOps practices to SQL Server with Redgate tools. Discover how to introduce true Database DevOps, brought to you in partnership with Redgate

Topics:
neo4j ,database ,apache spark ,scala

Published at DZone with permission of Anurag Srivastava, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}