this is an archival repost of a blog post that was originally published on
hortonworks’ blog
.
the
hortonworks data platform
(hdp) conveniently integrates numerous big data tools in the
hadoop
ecosystem. as such, it provides cluster-oriented
storage
,
processing
,
monitoring
, and
data integration services
. hdp simplifies the deployment and management of a production hadoop-based system.
in hadoop, data is represented as
key/value
pairs. in hbase, data is represented as a collection of
wide rows
. these atomic structures makes global data processing (via
mapreduce
) and row-specific reading/writing (via hbase) simple. however, writing queries is nontrivial if the data has a complex, interconnected structure that needs to be analyzed (see
hadoop joins
and
hbase joins
). without an appropriate
abstraction layer
, processing highly structured data is cumbersome. indeed, choosing the
right data representation
and associated tools opens up otherwise unimaginable possibilities. one such data representation that naturally captures complex relationships is a
graph
(or network). this post presents
aurelius
‘ big graph data technology suite in concert with hortonworks data platform. for a real-world grounding, a
github
clone is described in this context to help the reader understand how to use these technologies for building scalable, distributed,
graph
-based systems.
aurelius graph cluster and hortonworks data platform integration
the
aurelius graph cluster
can be used in concert with hortonworks data platform to provide users a distributed graph storage and processing system with the management and integration benefits provided by hdp. aurelius’ graph technologies include
titan
, a highly-scalable graph database optimized for serving real-time results to
thousands of concurrent users
and
faunus
, a distributed graph analytics engine that is optimized for
batch processing
graphs represented across a multi-machine cluster.
in an
online social system
, for example, there typically exists a user base that is creating things and various relationships amongst these things (e.g. likes, authored, references, stream). moreover, they are creating relationships amongst themselves (e.g. friend, group member). to capture and process this structure, a graph database is useful. when the graph is large and it is under heavy transactional load, then a distributed graph database such as titan/hbase can be used to provide real-time services such as searches, recommendations, rankings, scorings, etc. next, periodic offline global graph statistics can be leveraged. examples include identifying the most connected users, or tracking the relative importance of particular trends. faunus/hadoop serves this requirement. graph queries/traversals in titan and faunus are simple, one-line commands that are optimized both semantically and computationally for graph processing. they are expressed using the
gremlin
graph traversal language. the roles that titan, faunus, and gremlin play within hdp are diagrammed below.
a graph representation of github
github
is an online
source code
service where over 2 million people collaborate on over 4 million projects. however, github provides more than just
revision control
. in the last 4 years, github has become a massive online community for software collaboration. some of the biggest software projects in the world use github (e.g. the
linux kernel
).
github is
growing rapidly
— 10,000 to 30,000 events occur each hour (e.g. a user contributing code to a repository). hortonworks data platform is suited to storing, analyzing, and monitoring the state of github. however, it lacks specific tools for processing this data from a relationship-centric perspective. representing github as a
graph
is natural because github connects people, source code, contributions, projects, and organizations in diverse ways. thinking purely in terms of key/value pairs and wide rows obfuscates the underlying relational structure which can be leveraged for more complex real-time and batch analytic algorithms.
github provides
18 event types
, which range from new commits and fork events, to opening new tickets, commenting, and adding members to a project. the activity is aggregated in hourly archives, [each of which] contains a stream of
json
encoded github events. (via
githubarchive.org
)
the aforementioned events can be represented according to the popular
property graph data model
. a graph
schema
describing the types of “things” and relationships between them is diagrammed below. a parse of the raw data according to this schema yields a graph instance.
deploying a graph-based github
to integrate the aurelius graph cluster with hdp,
whirr
is used to launch a 4 m1.xlarge machine cluster on
amazon ec2
. detailed instructions for this process are provided on the
aurelius blog
, with the exception that a modified whirr properties file must be used for hdp. a complete
hdp whirr solution
is currently in development. to add aurelius technologies to an existing hdp cluster, simply download
titan
and
faunus
, which interface with installed components such as hadoop and hbase without further configuration.
5830 hourly github archive files between
mid-march 2012 and mid-november 2012
contain 31 million github events. the archive files are parsed to generate a graph. for example, when a github push event is parsed, vertices with the types user, commit, and repository are generated. an edge with label
pushed
links the user to the commit and an edge with label
to
links the commit to the repository. the user vertex has properties such as user name and email address, the commit vertex has properties such as the unique
sha sum
identifier for the commit and its timestamp, and the repository vertex has properties like its url and the programming language used. in this way, the 31 million events give rise to 27 million vertices and 79 million edges (a relatively small graph — though growing). complete instructions for parsing the data are in the
githubarchive-parser documentation
. once the configuration options are reviewed, launching the automated parallel parser is simple.
$ exportlc_all="c"
$ exportjava_options="-xmx1g"
$ python automatedparallelparser.py batch
the generated vertex and edge data is imported into the titan/hbase cluster using the
batchgraph
wrapper of the
blueprints
graph api (a simple, single threaded insertion tool).
1
|
$
export
java_options=
"-xmx12g"
|
2
|
$ gremlin -e importgithubarchive.groovy vertices.txt edges.txt
|
titan: distributed graph database
titan
is a distributed graph database that leverages existing storage systems for its persistence. currently, titan provides out-of-the-box support for apache
hbase
and
cassandra
(see
documentation
). graph storage and processing in a clustered environment is made possible because of numerous techniques to both efficiently represent a graph within a
bigtable
-style data system and to efficiently process that graph using linked-list walking and
vertex-centric indices
. moreover, for the developer, titan provides native support for the
gremin
graph traversal language. this section will demonstrate various gremlin traversals over the parsed github data.
the following gremlin snippet determines which repositories
marko rodriguez
(
okram
) has committed to the most. the query first locates the vertex with name
okram
and then takes outgoing
pushed
-edges to his commits. for each of those commits, the outgoing
to
-edges are traversed to the repository that commit was pushed to. next, the name of the repository is retrieved and those names are grouped and counted. the side-effect count map is outputted, sorted in decreasing order, and displayed. a graphical example demonstrating gremlins walking is diagrammed below.
01
|
gremlin> g = titanfactory.open('bin/hbase.local')
|
02
|
==>titangraph[hbase:127.0.0.1]
|
03
|
gremlin> g.v('name','okram').out('pushed').out('to').github_name.groupcount.cap.next().sort{-it.value}
|
the above query can be taken 2-steps further to determine marko’s collaborators. if two people have pushed commits to the same repository, then they are collaborators. given that the number of people committing to a repository could be many and typically, a collaborator has pushed numerous commits, a max of 2500 such collaborator paths are searched. one of the most important aspects of graph traversing is understanding the
combinatorial path explosions
that can occur when traversing multiple hops through a graph (see
loopy lattices
).
1
|
gremlin> g.v('name','okram').out('pushed').out('to').in('to').in('pushed').hasnot('name','okram')[0..2500].name.groupcount.cap.next().sort{-it.value}[0..4]
|
complex traversals are easy to formulate with the data in this representation. for example, titan can be used to generate followship
recommendations
. there are numerous ways to express a recommendation (with varying semantics). a simple one is: “
recommend me people to follow based on people who watch the same repositories as me. the more repositories i watch in common with someone, the higher they should be ranked.
” the traversal below starts at marko, then traverses to all the repositories that marko watches. then to who else (not marko) looks at those repositories and finally counts those people and returns the top 5 names of the sorted result set. in fact, marko and
stephen
(
spmallette
) are long time collaborators and thus, have similar tastes in software.
1
|
gremlin> g.v('name','okram').out('watched').in('watched').hasnot('name','okram').name.groupcount.cap.next().sort{-it.value}[0..4]
|
1
|
gremlin> g.v('name','okram').out('created').has('type','comment').count()
|
3
|
gremlin> g.v('name','okram').out('created').has('type','issue').count()
|
5
|
gremlin> g.v('name','okram').out('edited').count()
|
a few self-describing traversals are presented above that are rooted at
okram
. finally, note that titan is optimized for local/ego-centric traversals. that is, from a particular source vertex (or small set of vertices), use some path description to yield a computation based on the explicit paths walked. for doing global graph analyses (where the source vertex set is the entire graph), a batch processing framework such as faunus is used.
faunus: graph analytics engine
every titan traversal begins at a small set of vertices (or edges). titan is not designed for global analyses which involve processing the entire graph structure. the hadoop component of hortonworks data platform provides a reliable backend for global queries via
faunus
. gremlin traversals in faunus are compiled down to
mapreduce
jobs, where the first job’s
inputformat
is titan/hbase. in order to not interfere with the production titan/hbase instance, a snapshot of the live graph is typically generated and stored in hadoop’s distributed file system
hdfs
as a
sequencefile
available for repeated analysis. the most general
sequencefile
(with all vertices, edges, and properties) is created below (i.e. a full graph dump).
01
|
faunus$ cat bin/titan-seq.properties
|
02
|
faunus.graph.input.format=com.thinkaurelius.faunus.formats.titan.hbase.titanhbaseinputformat
|
03
|
hbase.zookeeper.quorum=10.68.65.161
|
04
|
hbase.mapreduce.inputtable=titan
|
05
|
hbase.mapreduce.scan.cachedrows=75
|
06
|
faunus.graph.output.format=org.apache.hadoop.mapreduce.lib.output.sequencefileoutputformat
|
07
|
faunus.sideeffect.output.format=org.apache.hadoop.mapreduce.lib.output.textoutputformat
|
08
|
faunus.output.location=full-seq
|
09
|
faunus.output.location.overwrite=true
|
11
|
faunus$ bin/gremlin.sh
|
15
|
-----oooo-(_)-oooo-----
|
16
|
gremlin> g = faunusfactory.open('bin/titan-seq.properties')
|
17
|
==>faunusgraph[titanhbaseinputformat]
|
18
|
gremlin> g._().tostring()
|
21
|
12/12/13 09:19:53 info mapreduce.faunuscompiler: compiled to 1 mapreduce job(s)
|
22
|
12/12/13 09:19:55 info mapred.jobclient: map 0% reduce 0%
|
23
|
12/12/13 09:21:26 info mapred.jobclient: map 1% reduce 0%
|
24
|
12/12/13 09:21:36 info mapred.jobclient: map 2% reduce 0%
|
25
|
12/12/13 09:21:43 info mapred.jobclient: map 3% reduce 0%
|
28
|
==>rwx------ ubuntu supergroup 0 (d) .staging
|
29
|
==>rwxr-xr-x ubuntu supergroup 0 (d) full-seq
|
30
|
gremlin> hdfs.ls('full-seq/job-0')
|
31
|
==>rw-r--r-- ubuntu supergroup 0 _success
|
32
|
==>rwxr-xr-x ubuntu supergroup 0 (d) _logs
|
33
|
==>rw-r--r-- ubuntu supergroup 243768636 part-m-00000
|
34
|
==>rw-r--r-- ubuntu supergroup 125250887 part-m-00001
|
35
|
==>rw-r--r-- ubuntu supergroup 331912876 part-m-00002
|
36
|
==>rw-r--r-- ubuntu supergroup 431617929 part-m-00003
|
given the generated
sequencefile
, the vertices and edges are counted by type and label, which is by definition a global operation.
01
|
gremlin> g.v.type.groupcount
|
10
|
gremlin> g.e.label.groupcount
|
14
|
==>pullrequested 930796
|
since github is collaborative in a way similar to
wikipedia
, there are a few users who contribute a lot, and many users who contribute little or none at all. to determine the distribution of contributions, faunus can be used to compute the
out degree distribution
of
pushed
-edges, which correspond to users pushing commits to repositories. this is equivalent to gremlin visiting each user vertex, counting all of the outgoing
pushed
-edges, and returning the distribution of counts.
01
|
gremlin> g.v.sideeffect('{it.degree = it.oute("pushed").count()}').degree.groupcount
|
when the degree distribution is
plotted
using log-scaled axes, the results are similar to the
wikipedia contribution distribution
, as expected. this is a common theme in most natural graphs — real-world graphs are
not
random structures and are composed of few “hubs” and numerous “satellites.”
more sophisticated queries can be performed by first extracting a slice of the original graph that only contains relevant information, so that computational resources are not wasted loading needless aspects of the graph. these slices can be saved to hdfs for subsequent traversals. for example, to calculate the most central co-watched project on github, the primary graph is stripped down to only
watched
-edges between users and repositories. the final traversal below, walks the “co-watched” graph 2 times and counts the number of paths that have gone through each repository. the repositories are sorted by their path counts in order to express which repositories are most central/important/respected according to the watches subgraph.
01
|
gremlin> g.e.has('label','watched').keep.v.has('type','repository','user').keep
|
03
|
12/12/13 11:08:13 info mapred.jobclient: com.thinkaurelius.faunus.mapreduce.sideeffect.commitverticesmapreduce$counters
|
04
|
12/12/13 11:08:13 info mapred.jobclient: vertices_dropped=19377850
|
05
|
12/12/13 11:08:13 info mapred.jobclient: vertices_kept=2074099
|
06
|
12/12/13 11:08:13 info mapred.jobclient: com.thinkaurelius.faunus.mapreduce.sideeffect.commitedgesmap$counters
|
07
|
12/12/13 11:08:13 info mapred.jobclient: out_edges_dropped=55971128
|
08
|
12/12/13 11:08:13 info mapred.jobclient: out_edges_kept=1934706
|
10
|
gremlin> g = g.getnextgraph()
|
11
|
gremlin> g.v.in('watched').out('watched').in('watched').out('watched').property('_count',long.class).order(f.decr,'github_name')
|
12
|
==>backbone 4173578345
|
13
|
==>html5-boilerplate 4146508400
|
14
|
==>normalize.css 3255207281
|
16
|
==>three.js 3078851951
|
17
|
==>modernizr 2971383230
|
20
|
==>phantomjs 2589138977
|
21
|
==>homebrew 2528483507
|
conclusion
this post discussed the use of
hortonworks data platform
in concert with the
aurelius graph cluster
to store and process the graph data generated by the online social coding system
github
. the example data set used throughout was provided by
github archive
, an ongoing record of events in github. while the dataset currently afforded by github archive is relatively small, it continues to grow each day. the aurelius graph cluster has been demonstrated in practice to support graphs with hundreds of billions of edges. as more organizations realize the graph structure within their big data, the aurelius graph cluster is there to provide both real-time and batch graph analytics.
acknowledgments
the authors wish to thank
steve loughran
for his help with whirr and hdp. moreover,
russell jurney
requested this post and, in a steadfast manner, ensured it was delivered.
related material
hawkins, p., aiken, a., fisher, k., rinard, m., sagiv, m., “
data representation synthesis
,” pldi’11, june 2011.
pham, r., singer, l., liskin, o., filho, f. f., schneider, k., “
creating a shared understanding of
testing culture on a social coding site
.” leibniz universität hannover, software engineering group: technical report, septeber 2012.
alder, b. t., de alfaro, l., pye, i., raman v., “
measuring author contributions to the wikipedia
,” wikisym ’08 proceedings of the 4th international symposium on wikis, article no. 15, september 2008.
rodriguez, m.a., mallette, s.p., gintautas, v., broecheler, m., “
faunus provides big graph data analytics
,” aurelius blog, november 2012.
rodriguez, m.a., larocque, d., “
deploying the aurelius graph cluster
,” aurelius blog, october 2012.
ho, r., “
graph processing in map reduce
,” pragmatic programming techniques blog, july 2010.
Comments