# Distributed Graphs Processing With Spark GraphX

# Distributed Graphs Processing With Spark GraphX

### In this article, a developer will explain the basic concepts of graph theory, and relate it to a more well known statistical phenomenon.

Join the DZone community and get the full member experience.

Join For Free"Simplicity is prerequisite for reliability," by Edsger Dijkstra

**Prologue**

A graph is an apparent and easy-to-understand data structure. Since Leonard Euler's era graph theory compelled the minds of humanity to reflect on different tasks, such as how one man can go through all seven bridges of Koenigsberg without going through either of them twice or how a traveling salesman can find the most profitable route.

Since the days of Euler, many things have appeared: transistors, programming languages, and distributed computing have appeared. The last of this list dramatically simplified the storage and processing of graphs. This is what will be discussed in this article.

If you are not familiar with the basic concepts of Apache Spark such as RDD, Driver program, Worker node, etc. before continuing reading this article, I would recommend that you read the documentation from Databricks.

As for me, the best way to deal with any technology is to try to write something with it. In this article, we will analyze the likeness of the "social network" using the basic concepts of graph theory.

**Practice**

As the way of storing our "social network" I chose a straightforward and intuitive option: tsv files on disk. Naturally, it could be files of any other format like Parquet, Avro, etc. The storage location for files is not so important at this step, it could be HDFS or S3, even if we need to change something, then Spark SQL would seamlessly make a migration. The network structure looks like this: the first file is the user's Id and its name, the second is the user's Id and a list of its peers. Apache Spark supports the following programming languages as an API: Java, Scala, and Python. I chose Scala.

Just to answer the popular question about whether to use Spark GraphX to store graphs when you have many insert/update operations, the answer is NO; all change operations on RDD force you to change the whole RDD in the cluster, which is not the optimal solution, for this case, NoSQL solutions like Neo4J, Titanium or even Cassandra, HBase come to the rescue. Nothing prevents you from Spark GraphX usage alongside with mentioned NoSQL solutions for processing graphs, downloading data from a database/retrieving by a scheduler and further handling by Spak in event-driven style is a good use case.

Well, let's start writing code. First, we need to load the graph into memory, take the source files and pull out the necessary vertices and edges:

```
def verts: RDD[(VertexId, String)] = sc.textFile(USER_NAMES)
.flatMap(InputDataFlow.parseNames)
def edges: RDD[Edge[PartitionID]] = sc.textFile(USER_GRAPH)
.flatMap(InputDataFlow.makeEdges)
```

**Pregel**

The primary mechanism for graph iteration in GraphX is the Pregel algorithm. The algorithm is developed by Google, and the Pregel model uses the transmission of messages between the vertices in the graph. The transmission of messages goes through a sequence of iterations called supersteps, which is the basic idea of this algorithm. Also, the main idea can be described as: "think like a vertex," in other words, a state of the current vertex depends only on a state of its neighbors.

Pregel becomes extremely necessary if the solution of the problem with the usual MapReduce becomes a complicated process. Interestingly, the name Pregel comes from the name of the river, which swept the seven bridges of Koenigsberg.

The main primitive for traversing a graph is a triplet - it consists of the following components: the current vertex, the vertex to which we pass and the edge between them. As well, you must specify the default distance between the vertices; usually, it's PositiveInfinity, a UDF (user defined function) for each vertex to process the incoming message and count the next vertex, and UDF to merge the two incoming messages, this function must be commutative and associative. Since Scala is a functional language, the last two functions will be represented as two lambda expressions.

The first algorithm that we implement is the Dijkstra algorithm for finding the shortest path from an arbitrary vertex to all others.

```
def dijkstraShortestPath[VT](graph: GenericGraph[VT], sourceId: VertexId) = {
val initialGraph = graph.mapVertices((id, _) =>
if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(_, dist, newDist) => math.min(dist, newDist),
triplet => {
//Distance accumulator
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b)
)
sssp.vertices.sortByKey(ascending = true).collect.mkString("\n")
}
```

Everything is pretty clear: start from a given vertex, use the minimum function to determine the minimum distance at each step. The first function, which is used in Pregel, preserves the shortest distance between the incoming message and the current vertex. The second function spreads messages to neighbors while the keeping distance. The last function is the analog of the Reduce stage - it selects the minimum value in the case of several incoming messages. Further, we form a convenient graph representation in the string format.

**Degree of Separation**

I am sure that many readers of this article have heard about the theory of six degrees of separation, it's an unproven theory that any person on the planet can be connected to any other person on the Earth through a chain of acquaintances that has no more than five intermediaries. Concerning graph theory, this sounds like this: the diameter of the graph does not exceed 6 for any two persons on Earth.

Let's start writing the code for this task. We need to traverse a graph in breadth, so find contacts of the specified vertex. For this, we need to modify the code of the Dijkstra algorithm:

```
def getBFS(root: VertexId) = {
val initialGraph = graph.mapVertices((id, _) =>
if (id == root) 0.0 else Double.PositiveInfinity)
val bfs = initialGraph.pregel(Double.PositiveInfinity, maxIterations = 10)(
(_, attr, msg) => math.min(attr, msg),
triplet => {
if (triplet.srcAttr != Double.PositiveInfinity) {
Iterator((triplet.dstId, triplet.srcAttr + 1))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b)).cache()
bfs
}
```

Everything is very similar to what was done above with the Dijkstra algorithm, but we have to specify the number of iterations - I received, empirically, the value 10 for my graph, for your graph this may be a different number. Next, we execute `join`

with usernames and take the first 100 values for an arbitrary user:

```
def degreeOfSeparation(root: VertexId): Array[(VertexId, DegreeOfSeparation)] = {
getBFS(root).vertices.join(verts).take(100)
}
```

It's also feasible to find a degree of separation for two arbitrary vertices:

```
def degreeOfSeparationTwoUser(firstUser: VertexId, secondUser: VertexId) = {
getBFS(firstUser)
.vertices
.filter { case (vertexId, _) => vertexId == secondUser }
.collect.map { case (_, degree) => degree }
}
```

Spark GraphX from the box makes it possible to get much information about the graph, for example, to get the connected component of the graph:

```
def getMostConnectedUsers(amount: Int): Array[(VertexId, ConnectedUser)] = {
graph.degrees.join(verts)
.sortBy({ case (_, (userName, _)) => userName }, ascending = false)
.take(amount)
}
```

Get this metric as the number of triangles in the graph (triangle count):

`def socialGraphTriangleCount = graph.triangleCount()`

**PageRank**

For our graph, it looks like this: PageRank assigns importance for each vertex of the graph. For example, if a Twitter user has a large number of subscriptions from other users, then he has a high rating, therefore, this user can be easily found in the search engine.

GraphX has a static and dynamic version of the PageRank implementation. The static version has a fixed number of iterations, while the dynamic version runs until the ranks converge to the specified value.

For our graph it looks like this:

```
def dynamicRanks(socialGraph: SocialGraph, tolerance: Double) =
socialGraph.graph.pageRank(tol = tolerance).vertices
def staticRanks(socialGraph: SocialGraph, tolerance: Double) =
socialGraph.graph.staticPageRank(numIter = 20).vertices
```

**Conclusion**

The attentive reader noted that the topic of this article is distributed processing of graphs, but at the time of writing the code, we did nothing to make the processing distributed. Let's call to mind the quote of Edsger Dijkstra at the very beginning. Spark dramatically simplifies our life by taking on the burden of distributed computing. Writing code that will be launched on a distributed cluster is not as difficult a task as it might seem in the beginning. There are even several options for cluster resource management with Spark: Hadoop YARN, Apache Mesos (personally, this my favorite option) and recently there is support for Kubernetes. All the source code that was considered in this article can be found in this GitHub repository.

Opinions expressed by DZone contributors are their own.

## {{ parent.title || parent.header.title}}

{{ parent.tldr }}

## {{ parent.linkDescription }}

{{ parent.urlSource.name }}