Graphs—also known as “networks”—are ubiquitous across web applications. As a refresher, a graph consists of nodes and edges. A node can be any object, such as a person or an airport, and an edge is a relation between two nodes, such as a friendship or an airline connection between two cities. Social networks and content networks (which comprise interlinked documents, such as web pages or citation networks) are other very common examples of a graph. Finally, the Internet of Things is basically a huge “graph of graphs”.
Why should one care about graph theory? Because the relations between things, or rather the links in a network, contain information that complements that about individual components. One common relevant use case here is topology analysis, from which one can derive information about graph properties. If you can track changes in topology, you can track changes across the entire system. Thus, topological analysis is a handy tool in multiple fields, including financial risk management and predictive maintenance.
In the remainder of this post, you will get some introductory hands-on exposure to graph analysis using the GraphFrames package in Apache Spark. As you’ll see, there are some powerful advantages to using GraphFrames and GraphX on HDFS in combination with Spark SQL. (Important note: Neither GraphX nor GraphFrames are considered production-ready, nor are they currently supported by Cloudera. This information presented here is for self-educational purposes with respect to graph analytics, only.)
For both examples, you’ll need to install Cloudera QuickStart, which is available in VM and Docker versions. (CDH 5.5/Spark 1.5/GraphFrames 0.1.0 are used for the examples here, but the current versions—CDH 5.8/Spark 1.6/GraphFrames 0.2.0—should work just as well.) The GraphFrame package is available in the spark-packages repository. In addition to understanding the basic concepts of graph analysis, you’ll also need to know how to use RDDs and DataFrames in Spark.
Some code is skipped here for brevity, but complete exercise scripts, sample data, and the full tutorial as PDF file are available in this repo.
Exercise 1: Calculating PageRank using Scalable Graph Analysis
Google’s PageRank—perhaps the most famous graph-processing algorithm in Internet history—is calculated per node (or vertex). A web page’s PageRank is related to the importance of a node in the network and results from the topology (represented by links). In this example, we will use GraphFrames to calculate the PageRank for each node. Furthermore, calculating the clustering coefficient of a graph is based on the number of triplets and the number of fully connected triangles is possible, using GraphX.
First, download this data file from the SNAP website. This file can be downloaded, decompressed, and loaded into HDFS using this bootstrap script; or, you can do all that manually:
$ wget https://snap.stanford.edu/data/facebook_combined.txt.gz $ gunzip *.gz $ hdfs dfs -put facebook_combined.txt
Start a spark-shell with the optional packages to work with GraphFrame and the CSV processing libraries:
$ spark-shell --packages graphframes:graphframes:0.1.0-spark1.5,\ com.databricks:spark-csv_2.10:1.4.0
Note that the \ symbol indicates that input continues on the next line. The
--package option allows you to specify a comma-separated list of Apache Maven coordinates for required third-party packages used in your spark-shell session. Those will automatically be loaded by the Spark cluster from a central repository. Here we need the GraphFrame package and a library for CSV parsing from Databricks.
Building the Graph
Next, we’ll define a DataFrame by loading data from a CSV file, which is stored in HDFS.
facebook_combined.txt contains two columns to represent links between network nodes. The first column is called source (
src), and the second is the destination (
dst) of the link. (Some other systems, such as Gephi, use “source” and “target” instead.)
First, we define a custom schema, and then we load the DataFrame, using
scala> val customSchema = StructType(Array(StructField("src", IntegerType, true),StructField("dst", IntegerType, true))) scala> val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").option("delimiter", " ").schema(customSchema).load("facebook_combined.txt")
Here we use space (” “) as a specific delimiter. More options can be defined as shown in the documentation for the CSV parsing library.
In theory, this edge list contains all information to build a graph, but a GraphFrame needs nodes data and edges in separate DataFrames. Let’s transform the edge list to also get a node list.
Bagel was a low-level implementation of Google’s Pregel graph-processing framework. According to the docs, “Bagel operates on a graph represented as a distributed dataset of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state.” Its core idea was that “In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages.” (Apache Giraph implements the same concept.) Although this is a low-level abstraction, it was a useful starting point for implementation and development of new graph-analysis algorithms.
In contrast, the GraphX framework uses a property graph model, which is a popular high-level abstraction for representing a graph. In GraphX, the data is managed by RDDs – one for nodes and one for edges. The graph operations needn't be implemented by a particular compute function, as in Bagel; instead, the GraphX library offers the
graph class, which provides access to nodes, edges, and even to functionality to analyze the graph. The Pregel API is also available on this level in GraphX.
So, what is the relationship between GraphFrames and GraphX? I personally like to answer that question with an analogy: What a DataFrame is to an RDD, a GraphFrame is to the
graph class in GraphX. With GraphFrames, you can simply define a graph from your DataFrames. One holds the node list and the other the edge list. No matter how the data is stored in HDFS, DataFrames does all the preparation—like filtering grouping and projections—before the specific graph algorithms, implemented in the GraphFrame class, are applied. Results of such computations can be new columns in the node list. And because this is a DataFrame, we can easily join and finally export results from multiple operations (as demonstrated later).
Graph Analysis Made Easy
Next, let’s do some graph analysis on the Facebook ego-net graph.
We sort the nodes by degree and calculate the PageRank. Finally, we count the triangles in which a node is embedded.
val k = g1.degrees.sort(desc("degree")) k.show() val pr2 = g1.pageRank.resetProbability(0.15).maxIter(10).run() pr2.vertices.show()
Which are the nodes with the highest PageRank?
val pr3 = pr2.vertices.sort(desc("pagerank")) pr3.show() +----+----+------------------+ |name| id| pagerank| +----+----+------------------+ |3434|3434|18.184854992469376| |1911|1911|18.123485211404812| |2655|2655|17.525235604357597| |1902|1902| 17.35563914066005| |1888|1888| 13.34316669756043| +----+----+------------------+ only showing top 5 rows
Into how many triangles are our nodes embedded?
val tcr = g1.triangleCount.run()
Because we plan to use this data later again, we can use the
persist() function, but this makes only sense if the dataset is not too big.
tcr.persist() tcr.sort(desc("count")).limit(4).show() +-----+----+----+ |count|name| id| +-----+----+----+ |30025|1912|1912| |15502|2543|2543| |15471|2233|2233| |15213|2464|2464| +-----+----+----+ only showing top 4 rows
Finally, we use these results to evaluate if the number of triangles is related to the PageRank. We join both results and export the table as a CSV file. This allows us to draw the scatter plot in Gnuplot or R.
tcr is the triangle-count result; we did not persist it, so we have to calculate it again next time we do the join. The PageRank was calculated as a per-vertex value in the
scala> tcr.printSchema root |-- count: long (nullable = true) |-- name: integer (nullable = true) |-- id: integer (nullable = true) scala> pr2.vertices.printSchema root |-- name: integer (nullable = true) |-- id: integer (nullable = true) |-- pagerank: double (nullable = true)
Since both datasets are DataFrames, we can simply use the DataFrame functionality to join both on the id column:
scala> val scatter = pr2.vertices.join(tcr, pr2.vertices.col("id").equalTo(tcr("id"))) scala> scatter.limit(10).show 16/05/10 03:24:53 INFO DAGScheduler: Job 18 finished: show at <console>:51, took 653,309080 s +----+----+-------------------+-----+----+----+ |name| id| pagerank|count|name| id| +----+----+-------------------+-----+----+----+ | 31| 31|0.16137995810046757| 110| 31| 31| | 231| 231|0.35635621183428823| 95| 231| 231| | 431| 431| 0.2495015120580725| 1248| 431| 431| | 631| 631|0.23890006729816274| 41| 631| 631| | 831| 831| 0.3651072380090138| 64| 831| 831| |1031|1031|0.15013201313406596| 5|1031|1031| |1231|1231| 0.2566120648129746| 1452|1231|1231| |1431|1431|0.44670540616673726|10441|1431|1431| |1631|1631| 0.285116150576581| 35|1631|1631| |1831|1831| 2.1506290231917156| 226|1831|1831| +----+----+-------------------+-----+----+----+
We export this result as a CSV file but to get only one result file, we use the
coalesce function of the DataFrame:
Alternatively, we could also have used the
getmerge command from HDFS to combine all the parts files into one single file—but in this case, it is important to use the option
header=false, which turns the header lines off.
Now, let’s plot our previous result data set in Gnuplot. It was already exported into a folder named
scatter_1.csv but to HDFS. If Gnuplot is not available, simply install it (for example, by typing
sudo yum install gnuplot on CentOS). We have to load the result file to the local folder and then we use the following Gnuplot commands stored in a script file named
Filename: plot.cmd set datafile separator "," set term png set output "g1.png" # # Add titles and labels. # set xlabel "PageRank" set ylabel "Triangles" set title "PR vs. T" unset key # # Add grid lines. # set grid set size ratio -1 # # Timestamp the plot. # set timestamp plot 'scatter_1.csv/part-00000' using 3:4
We can easily plot the figure as a scatter plot (more about that below) assuming that the data set and the
plot commands are in the same directory:
$ gnuplot plot.cmd
$ spark-shell --packages graphframes:graphframes:0.1.0-spark1.5,\ com.databricks:spark-csv_2.10:1.4.0,org.sameersingh.scalaplot:scalaplot:0.0.4
And now, those imports:
allow you to plot charts in gnuplot or based on the JFreeChart library within the Spark shell.
Plotting the Results
This was a fun ride, starting with some public data, loading the edges, creating the node list, and defining a GraphFrame followed by analyzing the graph and merging partial results for the final plot. Finally, we can visualize the final results.
This scatter plot (see below) is not yet the ideal way to understand the data, but it’s a good starting point. A 2D histogram (example) would encode the number of (PageRank,Triangle) pairs, as well.
An alternative second approach to inspecting the data, a log-log plot (see below), would reveal a power-law distribution if a straight line were shown or a stretched power-low in the case of a bended curve. The relation between the PageRank of a node and the number of triangles of which a node is part, is not a power law.
From the scatter plot, we can already conclude that a high number of triangles does not guarantee a high PageRank, and furthermore, that a high PageRank can be found for nodes that are embedded in a small number of triangles and also for nodes with more triangle embedding.
In Part II, we will do the same exercise with a different data set.