DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
The Latest "Software Integration: The Intersection of APIs, Microservices, and Cloud-Based Systems" Trend Report
Get the report
  1. DZone
  2. Data Engineering
  3. Data
  4. Processing Hierarchical Data Using Spark GraphX Pregel API

Processing Hierarchical Data Using Spark GraphX Pregel API

Learn about using the GraphX Pregel API, a very powerful tool that can be used to solve iterative problems and pretty much any graph computation.

Suraj Bang user avatar by
Suraj Bang
·
Sep. 30, 17 · Tutorial
Like (2)
Save
Tweet
Share
15.14K Views

Join the DZone community and get the full member experience.

Join For Free

today, distributed compute engines are the backbone of many analytic, batch, and streaming applications. spark provides many advanced features (pivot, analytic window functions, etc.) out of the box to transform data. sometimes, there is a need to process hierarchical data or perform hierarchical calculations. many database vendors provide features like recursive ctes (common table expressions) or connect_by sql clauses to query and transform hierarchical data. ctes are also known as recursive queries or parent-child queries.

in this post, we will look at how we can address this with spark.

hierarchical data overview

hierarchical relationships exist where one item of data is the parent of another item. hierarchical data can be represented using a graph property object model where every row is a vertex (node) and a connection is the edge (relationship) that connects the vertices and columns are the properties of the vertex.

some use cases

  • financial calculations : subaccounts rolling up into parent accounts all the way to the top-most account.
  • creating organizational hierarchy : manage employee relationship with the path.
  • generating a graph of links between web pages with the path.
  • any kind of iterative computations involving linked data.

challenges

querying hierarchical data in a distributed system has some challenges

  • the data is connected but it is distributed across partitions and nodes . the implementation to solve this should be optimized for performing iterations and moving the data (shuffle) as needed.
  • the depth of a graph can vary over time . the solution should take care of varying depth and should not enforce users to define it before processing.

solution

one of the ways of implementing ctes in spark is using the graphx pregel api.

what is graphx pregel api?

graphx is a spark api for graph and graph-parallel computation. graph algorithms are iterative in nature and properties of vertices depend upon the properties of its directly or indirectly (connected via other vertices) connected vertices. pregel is a vertex-centric graph processing model developed by google and spark graphx provides an optimized variant of the pregel api.

how does pregel api work?

pregel api processing consists of executing super steps:

superstep 0 :

  1. pass initial message to all the vertices
  2. send the value as message to its directly connected vertices

superstep 1 :

  1. receive messages from the previous steps
  2. mutate the value
  3. send the value as message to its directly connected vertices

repeat superstep 1 until there are messages to pass and stop when there are no more messages to be passed.

hierarchical data for use case

the table below shows sample employee data that we will be using for generating top-down hierarchies. here, the manager for the employee is represented by the mgr_id field, which has emp_id values.

we will add the following columns as part of processing

level (depth) the level at which the vertex stands in the hierarchy
path the path from the top-most vertex to current vertex in the hierarchy
root the top-most vertex in the hierarchy, useful when multiple hierarchies exist in the dataset
iscyclic if there is bad data, a cyclic relationship exists; flag it
isleaf if the vertex has no parent, flag it

code :

// the code below demonstrates use of graphx pregel api - scala 2.11+ 

// functions to build the top down hierarchy

//setup & call the pregel api
def calctoplevelhierarcy(vertexdf: dataframe, edgedf: dataframe): rdd[(any,(int,any,string,int,int))] = {

// create the vertex rdd
// primary key, root, path
val verticesrdd = vertexdf
  .rdd
  .map{x=> (x.get(0),x.get(1) , x.get(2))}
  .map{ x => (murmurhash3.stringhash(x._1.tostring).tolong, ( x._1.asinstanceof[any], x._2.asinstanceof[any] , x._3.asinstanceof[string]) ) }

// create the edge rdd
// top down relationship
val edgesrdd = edgedf.rdd.map{x=> (x.get(0),x.get(1))}
  .map{ x => edge(murmurhash3.stringhash(x._1.tostring).tolong,murmurhash3.stringhash(x._2.tostring).tolong,"topdown" )}

// create graph
val graph = graph(verticesrdd, edgesrdd).cache()

val pathseperator = """/"""

// initialize id,level,root,path,iscyclic, isleaf
val initialmsg = (0l,0,0.asinstanceof[any],list("dummy"),0,1)

// add more dummy attributes to the vertices - id, level, root, path, iscyclic, existing value of current vertex to build path, isleaf, pk
val initialgraph = graph.mapvertices((id, v) => (id,0,v._2,list(v._3),0,v._3,1,v._1) )

val hrchyrdd = initialgraph.pregel(initialmsg,
  int.maxvalue,
  edgedirection.out)(
  setmsg,
  sendmsg,
  mergemsg)


// build the path from the list
val hrchyoutrdd = hrchyrdd.vertices.map{case(id,v) => (v._8,(v._2,v._3,pathseperator + v._4.reverse.mkstring(pathseperator),v._5, v._7 )) }

  hrchyoutrdd

}




//mutate the value of the vertices
def setmsg(vertexid: vertexid, value: (long,int,any,list[string], int,string,int,any), message: (long,int, any,list[string],int,int)): (long,int, any,list[string],int,string,int,any) = {
  if (message._2 < 1) { //superstep 0 - initialize
    (value._1,value._2+1,value._3,value._4,value._5,value._6,value._7,value._8)
  } else if ( message._5 == 1) { // set iscyclic   
    (value._1, value._2, value._3, value._4, message._5, value._6, value._7,value._8)
  } else if ( message._6 == 0 ) { // set isleaf
    (value._1, value._2, value._3, value._4, value._5, value._6, message._6,value._8)
  } else { // set new values
    ( message._1,value._2+1, message._3, value._6 :: message._4 , value._5,value._6,value._7,value._8)
  }
}



// send the value to vertices
def sendmsg(triplet: edgetriplet[(long,int,any,list[string],int,string,int,any), _]): iterator[(vertexid, (long,int,any,list[string],int,int))] = {
  val sourcevertex = triplet.srcattr
  val destinationvertex = triplet.dstattr
// check for icyclic
 if (sourcevertex._1 == triplet.dstid || sourcevertex._1 == destinationvertex._1)
    if (destinationvertex._5==0) { //set iscyclic
      iterator((triplet.dstid, (sourcevertex._1, sourcevertex._2, sourcevertex._3,sourcevertex._4, 1,sourcevertex._7)))
    } else {
     iterator.empty
    }
  else {
    if (sourcevertex._7==1) //is not leaf
    {
      iterator((triplet.srcid, (sourcevertex._1,sourcevertex._2,sourcevertex._3, sourcevertex._4 ,0, 0 )))
    }
    else { // set new values
      iterator((triplet.dstid, (sourcevertex._1, sourcevertex._2, sourcevertex._3, sourcevertex._4, 0, 1)))
    }
  }
}



// receive the values from all connected vertices
def mergemsg(msg1: (long,int,any,list[string],int,int), msg2: (long,int, any,list[string],int,int)): (long,int,any,list[string],int,int) = {
// dummy logic not applicable to the data in this usecase
msg2
}


// test with some sample data 


val empdata = array(
  ("emp001", "bob", "baker", "ceo", null.asinstanceof[string])
  , ("emp002", "jim", "lake", "cio", "emp001")
  , ("emp003", "tim", "gorab", "mgr", "emp002")
  , ("emp004", "rick", "summer", "mgr", "emp002")
  , ("emp005", "sam", "cap", "lead", "emp004")
  , ("emp006", "ron", "hubb", "sr.dev", "emp005")
  , ("emp007", "cathy", "watson", "dev", "emp006")
  , ("emp008", "samantha", "lion", "dev", "emp007")
  , ("emp009", "jimmy", "copper", "dev", "emp007")
  , ("emp010", "shon", "taylor", "intern", "emp009")
)

// create dataframe with some partitions
val empdf = sc.parallelize(empdata, 3).todf("emp_id","first_name","last_name","title","mgr_id").cache()

// primary key , root, path - dataframe to graphx for vertices
val empvertexdf = empdf.selectexpr("emp_id","concat(first_name,' ',last_name)","concat(last_name,' ',first_name)")

// parent to child - dataframe to graphx for edges
val empedgedf = empdf.selectexpr("mgr_id","emp_id").filter("mgr_id is not null")

// call the function
val emphirearchyextdf = calctoplevelhierarcy(empvertexdf,empedgedf)
  .map{ case(pk,(level,root,path,iscyclic,isleaf)) => (pk.asinstanceof[string],level,root.asinstanceof[string],path,iscyclic,isleaf)}
  .todf("emp_id_pk","level","root","path","iscyclic","isleaf").cache()



// extend original table with new columns
val emphirearchydf = emphirearchyextdf.join(empdf , empdf.col("emp_id") === emphirearchyextdf.col("emp_id_pk")).selectexpr("emp_id","first_name","last_name","title","mgr_id","level","root","path","iscyclic","isleaf")

// print
emphirearchydf.show()

output :

under the hood

a spark job breaks up into job(s), stage(s), and task(s). pregel api internally generates multiple jobs due to its iterative nature. a job is generated every time messages are passed to the vertices. each job may end up with multiple shuffles, as data can be on different nodes.

things to watch out for are long rdd lineages that are created when working with huge datasets.

summary

graphx pregel api is very powerful and can be used in solving iterative problems or in any graph computation.

references

  • experience and lessons learned for large-scale graph analysis using graphx

  • how to get started using apache spark graphx with scala

Data (computing) API Processing Database Apache Spark Graph (Unix)

Published at DZone with permission of Suraj Bang, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Introduction to Spring Cloud Kubernetes
  • Chaos Engineering Tutorial: Comprehensive Guide With Best Practices
  • How To Choose the Right Streaming Database
  • Solving the Kubernetes Security Puzzle

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: