Google Pregel Graph Processing
Join the DZone community and get the full member experience.
Join For Free- Capture (e.g. When John is connected to Peter in a social network, a link is created between two Person nodes)
- Query (e.g. Find out all of John's friends of friends whose age is less than 30 and is married)
- Mining (e.g. Find out the most influential person in Silicon Valley)
Distributed and Parallel Graph Processing
Although
using a Graph to represent a relationship network is not new, the size
of network has been dramatically increase in the past decade such that
storing the whole graph in one place is impossible. Therefore, the
graph need to be broken down into multiple partitions and stored in
different places. Traditional graph algorithm that assume the whole
graph can be resided in memory becomes invalid. We need to redesign the
algorithm such that it can work in a distributed environment. On the
other hand, by breaking the graph into different partitions, we can
manipulate the graph in parallel to speed up the processing.
Property Graph Model
The paper “Constructions from Dots and Lines” by Marko A. Rodriguez and Peter Neubauer illustrate the idea very well. Basically, a graph contains nodes and arcs.
A node has a "type" which defines a set of properties (name/value pairs) that the node can be associated with.
An
arc defines a directed relationship between nodes, and hence contains
the fromNode, toNode as well as a set of properties defined by the
"type" of the arc.
General Parallel Graph Processing
Most of the graph processing algorithm can be expressed in terms of a combination of "traversal" and "transformation".
Parallel Graph Traversal
In
the case of "traversal", it can be expressed as a path which contains a
sequence of segments. Each segment contains a traversal from a node to
an arc, followed by a traversal from an arc to a node. In Marko and Peter's model,
a Node (Vertex) contains a collection of "inE" and another collection
of "outE". On the other hand, an Arc (Edge) contains one "inV", one
"outV". So to expressed a "Friend-of-a-friend" relationship over a
social network, we can use the following
./outE[@type='friend']/inV/outE[@type='friend']/inV
Loops can also be expressed in the path, to expressed all persons that is reachable from this person, we can use the following
.(/outE[@type='friend']/inV)*[@cycle='infinite']
On the implementation side, a traversal can be processed in the following way
- Start with a set of "context nodes", which can be defined by a list of node ids, or a search criteria (in this case, the search result determines the starting context nodes)
- Repeat until all segments in the path are exhausted. Perform a walk from all context nodes in parallel. Evaluate all outward arcs (ie: outE) with conditions (ie: @type='friend'). The nodes that this arc points to (ie: inV) will become the context node of next round
- Return the final context nodes
Such traversal path can also be used to expressed inference (or derived) relationships, which doesn't have a physical arc stored in the graph model.
Parallel Graph Transformation
The main goal of Graph transformation is to modify the graph. This include modifying the properties of existing nodes and arcs, creating new arcs / nodes and removing existing arcs / nodes. The modification logic is provided by a user-defined function, which will be applied to all active nodes.
The Graph transformation process can be implemented in the following steps
- Start with a set of "active nodes", which can be defined by a lost of node ids, or a search criteria (in this case, the search result determines the starting context nodes)
- Repeat until there is no more active nodes. Execute the user-defined transformation which modifies the properties of the context nodes and outward arcs. It can also remove outwards arcs or create new arcs that point to existing or new nodes (in other words, the graph connectivity can be modified). It can also send message to other nodes (the message will be picked up in the next round) as well as receive message sent from other nodes in the previous round.
- Return the transformed graph, or a traversal can be performed to return a subset of the transformed graph.
Google's Pregel
Pregel
can be thought as a generalized parallel graph transformation
framework. In this model, the most basic (atomic) unit is a "node" that
contains its properties, outward arcs (and its properties) as well as
the node id (just the id) that the outward arc points to. The node also
has a logical inbox to receive all messages sent to it.
The
whole graph is broken down into multiple "partitions", each contains a
large number of nodes. Partition is a unit of execution and typically
has an execution thread associated with it. A "worker" machine can host
multiple "partitions".
The execution model is based on BSP (Bulk Synchronous Processing) model.
In this model, there are multiple processing units proceeding in
parallel in a sequence of "supersteps". Within each "superstep", each
processing units first receive all messages delivered to them from the
preceding "superstep", and then manipulate their local data and may
queue up the message that it intends to send to other processing units.
This happens asynchronously and simultaneously among all processing
units. The queued up message will be delivered to the destined
processing units but won't be seen until the next "superstep". When all
the processing unit finishes the message delivery (hence the
synchronization point), the next superstep can be started, and the cycle
repeats until the termination condition has been reached.
Notice
that depends on the graph algorithms, the assignment of nodes to a
partition may have an overall performance impact. Pregel provides a
default assignment where partition = nodeId % N but user can overwrite
this assignment algorithm if they want. In general, it is a good idea
to put close-neighbor nodes into the same partition so that message
between these nodes doesn't need to flow into the network and hence
reduce communication overhead. Of course, this also means traversing
the neighboring nodes all happen within the same machine and hinder
parallelism. This usually is not a problem when the context nodes are
very diverse. In my experience of parallel graph processing,
coarse-grain parallelism is preferred over fine-grain parallelism as it
reduces communication overhead.
The complete picture of execution can be implemented as follows:
The basic processing unit is a "thread" associated with each partition,
running inside a worker. Each worker receive messages from previous
"superstep" from its "inQ" and dispatch the message to the corresponding
partition that the destination node is residing. After that, a user
defined "compute()" function is invoked on each node of the partition.
Notice that there is a single thread per partition so nodes within a
partition are executed sequentially and the order of execution is
undeterministic.
The "master" is playing a central role to
coordinate the execute of supersteps in sequence. It signals the
beginning of a new superstep to all workers after knowing all of them
has completed the previous one. It also pings each worker to know their
processing status and periodically issue "checkpoint" command to all
workers who will then save its partition to a persistent graph store.
Pregel doesn't define or mandate the graph storage model so any
persistent mechanism should work well. There is a "load" phase at the
beginning where each partition starts empty and read a slice of the
graph storage. For each node read from the storage, a "partition()"
function will be invoked and load the node in the current partition if
the function returns the same node, otherwise the node is queue to
another partition who the node is assigned to.
Fault resilience
is achieved by having the checkpoint mechanism where each worker is
instructed to save its in-memory graph partition to the graph storage
periodically (at the beginning of a superstep). If the worker is
detected to be dead (not responding to the "ping" message from the
master), the master will instruct the surviving workers to take up the
partitions of the failed worker. The whole processing will be reverted
back to the previous checkpoint and proceed again from there (even the
healthy worker need to redo the previous processing). The Pregel paper
mention a potential optimization to just re-execute the processing of
the failed partitions from the previous checkpoint by replaying the
previous received message, of course this requires keeping a log of all
received messages between nodes at every super steps since previous
checkpoint. This optimization, however, rely on the algorithm to be
deterministic (in other words, same input execute at a later time will
achieve the same output).
Further optimization is available in
Pregel to reduce the network bandwidth usage. Messages destined to the
same node can be combined using a user-defined "combine()" function,
which is required to be associative and commutative. This is similar to
the same combine() method in Google Map/Reduce model.
In
addition, each node can also emit an "aggregate value" at the end of
"compute()". Worker will invoke an user-defined "aggregate()" function
that aggregate all node's aggregate value into a partition level
aggregate value and all the way to the master. The final aggregated
value will be made available to all nodes in the next superstep. Just
aggregate value can be used to calculate summary statistic of each node
as well as coordinating the progress of each processing units.
I
think the Pregel model is general enough for a large portion of
classical graph algorithm. I'll cover how we map these traditional
algorithms in Pregel in subsequent postings.
Reference
http://www.slideshare.net/slidarko/graph-windycitydb2010
Published at DZone with permission of Ricky Ho, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Trending
-
Clear Details on Java Collection ‘Clear()’ API
-
Guide To Selecting the Right GitOps Tool - Argo CD or Flux CD
-
Integration Testing Tutorial: A Comprehensive Guide With Examples And Best Practices
-
Comparing Cloud Hosting vs. Self Hosting
Comments