DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Apache Spark 4.0: Transforming Big Data Analytics to the Next Level
  • Cloud-Driven Analytics Solution Strategy in Healthcare
  • Knowledge Graphs and Analytics Without Graph Databases for Gen-AI
  • Building Analytics Architectures to Power Real-Time Applications

Trending

  • Event-Driven Microservices: How Kafka and RabbitMQ Power Scalable Systems
  • How To Introduce a New API Quickly Using Quarkus and ChatGPT
  • Apple and Anthropic Partner on AI-Powered Vibe-Coding Tool – Public Release TBD
  • The End of “Good Enough Agile”
  1. DZone
  2. Data Engineering
  3. Data
  4. Scalable Graph Analytics with Apache Spark: Part I

Scalable Graph Analytics with Apache Spark: Part I

Get started with scalable graph analysis via simple examples that utilize GraphFrames and Spark SQL on HDFS.

By 
Mirko Kämpf user avatar
Mirko Kämpf
·
Oct. 05, 16 · Tutorial
Likes (7)
Comment
Save
Tweet
Share
5.9K Views

Join the DZone community and get the full member experience.

Join For Free

Graphs—also known as “networks”—are ubiquitous across web applications. As a refresher, a graph consists of nodes and edges. A node can be any object, such as a person or an airport, and an edge is a relation between two nodes, such as a friendship or an airline connection between two cities. Social networks and content networks (which comprise interlinked documents, such as web pages or citation networks) are other very common examples of a graph. Finally, the Internet of Things is basically a huge “graph of graphs”.

Why should one care about graph theory? Because the relations between things, or rather the links in a network, contain information that complements that about individual components. One common relevant use case here is topology analysis, from which one can derive information about graph properties. If you can track changes in topology, you can track changes across the entire system. Thus, topological analysis is a handy tool in multiple fields, including financial risk management and predictive maintenance.

In the remainder of this post, you will get some introductory hands-on exposure to graph analysis using the GraphFrames package in Apache Spark. As you’ll see, there are some powerful advantages to using GraphFrames and GraphX on HDFS in combination with Spark SQL. (Important note: Neither GraphX nor GraphFrames are considered production-ready, nor are they currently supported by Cloudera. This information presented here is for self-educational purposes with respect to graph analytics, only.)

Prerequisites

For both examples, you’ll need to install Cloudera QuickStart, which is available in VM and Docker versions. (CDH 5.5/Spark 1.5/GraphFrames 0.1.0 are used for the examples here, but the current versions—CDH 5.8/Spark 1.6/GraphFrames 0.2.0—should work just as well.) The GraphFrame package is available in the spark-packages repository. In addition to understanding the basic concepts of graph analysis, you’ll also need to know how to use RDDs and DataFrames in Spark.

Some code is skipped here for brevity, but complete exercise scripts, sample data, and the full tutorial as PDF file are available in this repo.

Exercise 1: Calculating PageRank using Scalable Graph Analysis

Google’s PageRank—perhaps the most famous graph-processing algorithm in Internet history—is calculated per node (or vertex). A web page’s PageRank is related to the importance of a node in the network and results from the topology (represented by links). In this example, we will use GraphFrames to calculate the PageRank for each node. Furthermore, calculating the clustering coefficient of a graph is based on the number of triplets and the number of fully connected triangles is possible, using GraphX.

First, download this data file from the SNAP website. This file can be downloaded, decompressed, and loaded into HDFS using this bootstrap script; or, you can do all that manually:

$ wget https://snap.stanford.edu/data/facebook_combined.txt.gz
$ gunzip *.gz
$ hdfs dfs -put facebook_combined.txt

Start a spark-shell with the optional packages to work with GraphFrame and the CSV processing libraries:

$ spark-shell --packages graphframes:graphframes:0.1.0-spark1.5,\ com.databricks:spark-csv_2.10:1.4.0

Note that the \ symbol indicates that input continues on the next line. The --package option allows you to specify a comma-separated list of Apache Maven coordinates for required third-party packages used in your spark-shell session. Those will automatically be loaded by the Spark cluster from a central repository. Here we need the GraphFrame package and a library for CSV parsing from Databricks.

Building the Graph

Next, we’ll define a DataFrame by loading data from a CSV file, which is stored in HDFS.

Our datafile facebook_combined.txt contains two columns to represent links between network nodes. The first column is called source (src), and the second is the destination (dst) of the link. (Some other systems, such as Gephi, use “source” and “target” instead.)

First, we define a custom schema, and then we load the DataFrame, using SQLContext.

scala> val customSchema = StructType(Array(StructField("src", IntegerType, true),StructField("dst", IntegerType, true)))
scala> val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").option("delimiter", " ").schema(customSchema).load("facebook_combined.txt")

Here we use space (” “) as a specific delimiter. More options can be defined as shown in the documentation for the CSV parsing library.

In theory, this edge list contains all information to build a graph, but a GraphFrame needs nodes data and edges in separate DataFrames. Let’s transform the edge list to also get a node list.

Enter GraphFrames

There are multiple options for doing scalable graph analysis on top of HDFS. Currently, Apache Spark contains GraphX, but in the past, the deprecated Bagel project was used.

Bagel was a low-level implementation of Google’s Pregel graph-processing framework. According to the docs, “Bagel operates on a graph represented as a distributed dataset of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state.” Its core idea was that “In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages.” (Apache Giraph implements the same concept.) Although this is a low-level abstraction, it was a useful starting point for implementation and development of new graph-analysis algorithms.

In contrast, the GraphX framework uses a property graph model, which is a popular high-level abstraction for representing a graph. In GraphX, the data is managed by RDDs – one for nodes and one for edges. The graph operations needn't be implemented by a particular compute function, as in Bagel; instead, the GraphX library offers the graph class, which provides access to nodes, edges, and even to functionality to analyze the graph. The Pregel API is also available on this level in GraphX.

So, what is the relationship between GraphFrames and GraphX? I personally like to answer that question with an analogy: What a DataFrame is to an RDD, a GraphFrame is to the graph class in GraphX. With GraphFrames, you can simply define a graph from your DataFrames. One holds the node list and the other the edge list. No matter how the data is stored in HDFS, DataFrames does all the preparation—like filtering grouping and projections—before the specific graph algorithms, implemented in the GraphFrame class, are applied. Results of such computations can be new columns in the node list. And because this is a DataFrame, we can easily join and finally export results from multiple operations (as demonstrated later).

Graph Analysis Made Easy

Next, let’s do some graph analysis on the Facebook ego-net graph.

We sort the nodes by degree and calculate the PageRank. Finally, we count the triangles in which a node is embedded.

val k = g1.degrees.sort(desc("degree"))
k.show()
val pr2 = g1.pageRank.resetProbability(0.15).maxIter(10).run()
pr2.vertices.show()

Which are the nodes with the highest PageRank?

val pr3 = pr2.vertices.sort(desc("pagerank"))
pr3.show()
+----+----+------------------+
|name| id| pagerank|
+----+----+------------------+
|3434|3434|18.184854992469376|
|1911|1911|18.123485211404812|
|2655|2655|17.525235604357597|
|1902|1902| 17.35563914066005|
|1888|1888| 13.34316669756043|
+----+----+------------------+
only showing top 5 rows

Into how many triangles are our nodes embedded?

val tcr = g1.triangleCount.run()

Because we plan to use this data later again, we can use the persist() function, but this makes only sense if the dataset is not too big.

tcr.persist()
tcr.sort(desc("count")).limit(4).show()
+-----+----+----+
|count|name| id|
+-----+----+----+
|30025|1912|1912|
|15502|2543|2543|
|15471|2233|2233|
|15213|2464|2464|
+-----+----+----+
only showing top 4 rows

Finally, we use these results to evaluate if the number of triangles is related to the PageRank. We join both results and export the table as a CSV file. This allows us to draw the scatter plot in Gnuplot or R.

tcr is the triangle-count result; we did not persist it, so we have to calculate it again next time we do the join. The PageRank was calculated as a per-vertex value in the pr2.vertices DataFrame.

scala> tcr.printSchema
root
|-- count: long (nullable = true)
|-- name: integer (nullable = true)
|-- id: integer (nullable = true)
scala> pr2.vertices.printSchema
root
|-- name: integer (nullable = true)
|-- id: integer (nullable = true)
|-- pagerank: double (nullable = true)

Since both datasets are DataFrames, we can simply use the DataFrame functionality to join both on the id column:

scala> val scatter = pr2.vertices.join(tcr, pr2.vertices.col("id").equalTo(tcr("id")))
scala> scatter.limit(10).show
16/05/10 03:24:53 INFO DAGScheduler: Job 18 finished: show at <console>:51, took 653,309080 s
+----+----+-------------------+-----+----+----+
|name| id| pagerank|count|name| id|
+----+----+-------------------+-----+----+----+
| 31| 31|0.16137995810046757| 110| 31| 31|
| 231| 231|0.35635621183428823| 95| 231| 231|
| 431| 431| 0.2495015120580725| 1248| 431| 431|
| 631| 631|0.23890006729816274| 41| 631| 631|
| 831| 831| 0.3651072380090138| 64| 831| 831|
|1031|1031|0.15013201313406596| 5|1031|1031|
|1231|1231| 0.2566120648129746| 1452|1231|1231|
|1431|1431|0.44670540616673726|10441|1431|1431|
|1631|1631| 0.285116150576581| 35|1631|1631|
|1831|1831| 2.1506290231917156| 226|1831|1831|
+----+----+-------------------+-----+----+----+

We export this result as a CSV file but to get only one result file, we use the coalesce function of the DataFrame:

scatter.coalesce(1).write.format("com.databricks.spark.csv").option("header","true").save("scatter.csv")

Alternatively, we could also have used the getmerge command from HDFS to combine all the parts files into one single file—but in this case, it is important to use the option header=false, which turns the header lines off.

Now, let’s plot our previous result data set in Gnuplot. It was already exported into a folder named scatter_1.csv but to HDFS. If Gnuplot is not available, simply install it (for example, by typing sudo yum install gnuplot on CentOS). We have to load the result file to the local folder and then we use the following Gnuplot commands stored in a script file named plot.cmd:

Filename: plot.cmd
set datafile separator ","
set term png
set output "g1.png"
#
# Add titles and labels.
#
set xlabel "PageRank"
set ylabel "Triangles"
set title "PR vs. T"
unset key
#
# Add grid lines.
#
set grid
set size ratio -1
#
# Timestamp the plot.
#
set timestamp
plot 'scatter_1.csv/part-00000' using 3:4

We can easily plot the figure as a scatter plot (more about that below) assuming that the data set and the plot commands are in the same directory:

$ gnuplot plot.cmd

In a previous post, we used the scalaplot library to create charts. You can simply add the Maven coordinates during startup:

$ spark-shell --packages graphframes:graphframes:0.1.0-spark1.5,\ com.databricks:spark-csv_2.10:1.4.0,org.sameersingh.scalaplot:scalaplot:0.0.4

And now, those imports:

import org.sameersingh.scalaplot.Implicits._

allow you to plot charts in gnuplot or based on the JFreeChart library within the Spark shell.

Plotting the Results

This was a fun ride, starting with some public data, loading the edges, creating the node list, and defining a GraphFrame followed by analyzing the graph and merging partial results for the final plot. Finally, we can visualize the final results.

This scatter plot (see below) is not yet the ideal way to understand the data, but it’s a good starting point. A 2D histogram (example) would encode the number of (PageRank,Triangle) pairs, as well.

scatter-plot

An alternative second approach to inspecting the data, a log-log plot (see below), would reveal a power-law distribution if a straight line were shown or a stretched power-low in the case of a bended curve. The relation between the PageRank of a node and the number of triangles of which a node is part, is not a power law.

loglog-plot

From the scatter plot, we can already conclude that a high number of triangles does not guarantee a high PageRank, and furthermore, that a high PageRank can be found for nodes that are embedded in a small number of triangles and also for nodes with more triangle embedding.

In Part II, we will do the same exercise with a different data set.

Graph (Unix) Apache Spark Data file PageRank Analytics Apache Maven

Published at DZone with permission of Mirko Kämpf. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Apache Spark 4.0: Transforming Big Data Analytics to the Next Level
  • Cloud-Driven Analytics Solution Strategy in Healthcare
  • Knowledge Graphs and Analytics Without Graph Databases for Gen-AI
  • Building Analytics Architectures to Power Real-Time Applications

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!