Scalable Graph Analytics with Apache Spark: Part II

DZone 's Guide to

Scalable Graph Analytics with Apache Spark: Part II

In Part II, we will continue with scalable graph analysis with a second exercise.

· Big Data Zone ·
Free Resource

Get started with Part I and Exercise 1 at this link

Exercise 2: Topology Analysis of Web Crawl Results

To prepare the data set for this example, I used the Apache Nutch web crawler engine (which was designed by Doug Cutting and Mike Cafarella before they built Hadoop). Although data ingestion from SQL databases and huge document repositories dominate in commercial use cases, a data scientist should also be aware of the advantages of Nutch for scraping web data, in addition to those of Apache Flume and Apache Sqoop. For example, instead of using APIs or writing clients to load data from specific web pages, portals, or social media applications, you can use Nutch to get the data all at once and then apply specific parsing, such as HTML parsing with JSoup or more advanced triple extraction using Apache Any23.

Our crawl data is loaded into a staging table (in this case, in Apache HBase). Beside the full HTML content, it contains inlinks and outlinks. Because HBase is managing the crawl data (and the state of the crawl dataset), we can add more links, especially inlinks to a particular page, while we crawl more and more data. To distinguish both types of links in visualization, we use link type 1 for inlinks and link type 2 for outlinks. Finally, to analyze the overall network, we need both lists in one homogeneous collection, where each link is described by source, target, and type.

Creation of Node- and Link-Lists

Using Spark SQL and DataFrames allows for inspection and rearrangement of the data in just a few steps. We start with a query that transforms the map of string pairs into individual rows. During the crawl procedure, Nutch aggregates all links in an adjacency list. We have to “explode” the data structure with all links per page to create a link list from the previously optimized representation.

scala> val r1 = sqlContext.sql("FROM yarn_ds1_run_3_webpage_parquet SELECT explode(inlinks) as (source,page), baseurl as target, '1' as type")
| _c0| _c1| target|type|
|https://creativec...| Marking guide|http://wiki.creat...| 1|
|https://creativec...| More info|http://wiki.creat...| 1|
|https://creativec...| Learn more|http://wiki.creat...| 1|
|https://creativec...| More info|http://wiki.creat...| 1|

As revealed above, an exploded map still lacks any useful column name. Let’s change that by defining the names for the two columns, represented by the string pair.

val r1 = sqlContext.sql("FROM yarn_ds1_run_3_webpage_parquet SELECT explode(inlinks) as (source,page), baseurl as target, '1' as type")

For our network, we need one column named source, which contains the URL and the page name. This is why we first explode the inlinks map into “virtual columns” called mt and mp. We can now concatenate both of them and create a new source column:

val r1 = sqlContext.sql("FROM yarn_ds1_run_3_webpage_parquet SELECT concat(mt,mp) as source, baseurl as target, '1' as type, explode(inlinks) as (mt,mp)")
| source| target|type| mt| mp|
|https://creativec...|http://wiki.creat...| 1|https://creativec...| Marking guide|
|https://creativec...|http://wiki.creat...| 1|https://creativec...| More info|
|https://creativec...|http://wiki.creat...| 1|https://creativec...| Learn more|
|https://creativec...|http://wiki.creat...| 1|https://creativec...| More info|
|https://creativec...|http://wiki.creat...| 1|https://creativec...| More info|
|https://creativec...|http://wiki.creat...| 1|https://creativec...| More info|
|https://creativec...|http://creativeco...| 1|https://creativec...| Creative Commons|
|https://creativec...|https://creativec...| 1|https://creativec...|https://creativec...|
|https://creativec...|https://creativec...| 1|https://creativec...| العربية|
|https://creativec...|https://creativec...| 1|https://creativec...| Беларуская|

Now we apply this procedure to the second set of links: our outlinks.

val r2 = sqlContext.sql("FROM yarn_ds1_run_3_webpage_parquet SELECT baseurl as source, concat(mt,mp) as target, '2' as type, explode(outlinks) as (mt,mp)")

To merge both link lists into one we can ignore the temporary columns. We simply select the required columns, use the unionAll operator, and store our page network as an Apache Parquet file in HDFS.

val r1c = r1.select("source","target","type")
val r2c = r2.select("source","target","type")
val rAll = r1c.unionAll( r2c )
rAll.writer.parquet( “full_link_list” )

With this intermediate result, we can revisit GraphFrames with the variable g1 representing the graph loaded by Nutch. As homework, you can next apply the same analysis steps as in Exercise 1.


As you can see, Spark SQL provided access to many different data sources, no matter if we used the Apache Hive table in Parquet format or the HBase table via Hive. And for the many use cases in which the existing table layout cannot be used, Spark SQL made all the filtering, grouping, and projection really easy.

Using GraphFrames, it is possible to turn data tables into graphs with just a few lines of code. The full power of the Pregel API, implemented in GraphX, is available in combination with Spark SQL. As a result, raw data stored in Hadoop in different flavors can easily be combined into huge multi-layer graphs, with graph analysis all done in place via Spark.

graph theory ,nodes ,edge ,topology ,apache spark ,big data

Published at DZone with permission of Mirko Kämpf . See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}