Over a million developers have joined DZone.

Degrees of separation with Spark GraphX

DZone's Guide to

Degrees of separation with Spark GraphX

Learn how you can unleash the power of Spark for data analytics by using GraphX, a component of Spark used for graphs and graph-parallel computation.

· Big Data Zone ·
Free Resource

The Architect’s Guide to Big Data Application Performance. Get the Guide.

With the emergence of Apache Spark, analyzing big data became easier. Spark implements a lot of useful algorithms for data mining, data analysis, machine learning, and algorithms on graphs. Spark takes on the challenge of implementing sophisticated algorithms with tricky optimization and the ability to run your code on distributed clusters. It effectively solves problems like fault tolerance and it provides a simple API to make parallel computations.

In this article, I would like to tell you about the significant part of Spark: GraphX. It’s a component for graphs and graph-parallel computation. GraphX reuses the Spark RDD concept, simplifies graph analytics tasks, and provides the ability to make operations on a directed multigraph with properties attached to each vertex and edge. There a lot of algorithms in graph theory, which is widely used in data analytics and computer science. Graphs are the perfect data structure for describing social networks. For this reason, companies like Facebook emphasize developing software with this methodology. GraphX provides an API for fast and robust development for leveraging graphs.

Let’s start with GraphX and unleash the power of Spark for data analysis. Let’s consider an example how to use Spark GraphX for analyzing a graph of social network users. First, we need the users' data. In this example, we will use two TSV (data separated by tabs) files. The first describes the user’s metadata — it’s a simple tuple in form user_id > user_login (Note: I took fake usernames for educational purposes). The second file describes the connections of users; the first number in the row is the user’s ID and the remainder is the user's connections. See the snapshot below or see the original links on GitHub:

#users sketch

#graph sketch
5988 748 1722 3752 4655 5743 1872 3413 5527 6368 6085 4319 4728 1636 
5989 4080 4264 4446 3779 2430 2297 6169 3530 3272 4282 6432 2548 
5982 217 595 1194 3308 2940 1815 794 1503 5197 859 5096 6039 2664 
651 2244 528 284 1449 1097 1172 1092 108 3405 5204 387 4607 4545 
3705 4930 1805 4712 4404 247 4754 4427 1845 536 5795 5978 533 3984

We need to parse data from a file with names using the method parseNames and build edges between connections, which will be obtained from the second file:

  def parseNames(line: String): Option[(VertexId, String)] = {
    val fields = line.split('\t')
    if (fields.length > 1)
      Some(fields(0).trim().toLong, fields(1))
    else None

  def makeEdges(line: String) : List[Edge[Int]] = {
    var edges = new ListBuffer[Edge[Int]]()
    val fields = line.split(" ")
    val origin = fields(0)
    (1 until fields.length)
      .foreach { p => edges += Edge(origin.toLong, fields(p).toLong, 0) }

Next, we construct the graph:

class GraphX(sc: SparkContext) {
  private def verts = sc.textFile(USER_NAMES).flatMap(InputDataFlow.parseNames)
  private def edges = sc.textFile(USER_GRAPH).flatMap(InputDataFlow.makeEdges)

    * Build social graph from verts and edges
    * stored in tsv files
    * @return build graph
  private def graph = Graph(verts, edges).cache()

Let’s find the most connected users in our social network. We need to join our graph with verts and sort by connections.

    * Find the most connected user graph.degrees
    * @param amount threshold for returning first n user
    * @return most connected user in social graph
  def getMostConnectedUsers(amount:Int): Array[(VertexId, (PartitionID, String))] = {
      .sortBy( {case( (_, (connection, _))) => connection }, ascending=false ).take(amount)

Next, let's search for the degree of separation in our social network between the single user and other users and between two users.

    * Represent breadth-first search statement of social graph
    * via delegation to Pregel algorithm starting from the edge root
    * @param root The point of departure in BFS
    * @return breadth-first search statement
  private def getBFS(root:VertexId) = {
    val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else Double.PositiveInfinity)

    val bfs = initialGraph.pregel(Double.PositiveInfinity, 10)(
      (_, attr, msg) => math.min(attr, msg),
      triplet => {
        if (triplet.srcAttr != Double.PositiveInfinity) {
          Iterator((triplet.dstId, triplet.srcAttr+1))
        } else {
      (a,b) => math.min(a,b)).cache()

    *  Degree of separation for single user
    *  as adapter to getBfs
    * @param root The point of departure in BFS
    * @return Degree of separation for the user
  def degreeOfSeparationSingleUser(root:VertexId): Array[(VertexId, (Double, String))] = {

    * Degree of separation between two user
    * @param firstUser VertexId for the first user
    * @param secondUser VertexId for the second user
    * @return Degree of separation for the users
  def degreeOfSeparationTwoUser(firstUser:VertexId, secondUser:VertexId) = {
      .filter{case (vertexId, _) => vertexId == secondUser}
      .collect.map{case (_, degree) => degree}

To search for connections between users, we use the breadth-first search algorithm with Pregel. Breadth-first search (BFS) is an algorithm for traversing or searching on the graph. It starts at the root (in our case, the ID of the user is defined as VertexI it explores the neighbor nodes first before moving to the next level neighbors). Pregel is a data flow paradigm and system created by Google for large-scale graph processing and solving problems that are hard or expensive to solve using only the MapReduce framework. Pregel is essentially a message-passing interface constrained to the edges of a graph. The idea is to "think like a vertex;" algorithms within the Pregel framework are algorithms in which the computation of state for a given node depends only on the states of its neighbors. See here to learn more about Pregel.

Code demo:

    val sc = new SparkContext("local[*]", "GraphX")

    val graph = new GraphX(sc)

    println("Top 10 most-connected users:")
    graph.getMostConnectedUsers(10) foreach println

    println("Computing degrees of separation for user Arch")
    graph.degreeOfSeparationSingleUser(5306) foreach println

    println("Computing degrees of separation for user Arch and Fred")
    graph.degreeOfSeparationTwoUser(5306, 14) foreach println

You can find a full listing of the source code on my GitHub repository.

Learn how taking a DataOps approach will help you speed up processes and increase data quality by providing streamlined analytics pipelines via automation and testing. Learn More.

big data ,data mining ,apache spark ,graphx ,big data analytics

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}