Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Finding the Impact of a Tweet using Spark GraphX

DZone's Guide to

Finding the Impact of a Tweet using Spark GraphX

Looking for a great way to do social network analysis? Take a look at this annotated Scala example of using Spark with GraphX to process Twitter data.

· Big Data Zone
Free Resource

See how the beta release of Kubernetes on DC/OS 1.10 delivers the most robust platform for building & operating data-intensive, containerized apps. Register now for tech preview.

Social Network Analysis (SNA), a process of investigating social structures using Networks and Graphs, has become a very hot topic nowadays. Using it, we can answer many questions like:

  • How many connections an individual have?
  • What is the ability of an individual to influence a network?
  • and so on…

These and others can be used for conducting marketing research studies, running ad campaigns, and finding out latest trends. So, it becomes very crucial to identify the impact of an individual or individuals in a social network, so that we can identify key individuals, or Alpha Users (term used in SNA), in a social network.

In this post, we are going to see how to find the impact of an individual in a Social Network like Twitter, i.e., How many Twitter users an individaul can influence via his/her Tweet up to N number of level, i.e., Followers of followers of followers… and so on. For, this post we have downloaded data from Stanford University’s SNAP portal. Also, for anonymity, we have assigned random names to the users in the Twitter data.

Now, let's get on to the task of finding Alpha User. For that first we have to extract data from the text file like this:

val data = sparkContext.textFile("twitter-graph-data.txt")
val followees: RDD[(VertexId, String)] = data.map(_.split(",")).map { arr =>
  val user = arr(0).replace("((", "")
  val id = arr(1).replace(")", "")
  (id.toLong, user)
} 

val followers: RDD[(VertexId, String)] = data.map(_.split(",")).map { arr =>
  val user = arr(2).replace("(", "")
  val id = arr(3).replace("))", "")
  (id.toLong, user)
}

Then we have to create the graph from data extracted above using Spark GraphX API.

val vertices = followees.union(followers)
val edges = data.map(_.split(",")).map { arr =>
  val followeeId = arr(1).replace(")", "").toLong
  val followerId = arr(3).replace("))", "").toLong
  Edge(followeeId, followerId, "follow")
} 

val defaultUser = ("")
val graph = Graph(vertices, edges, defaultUser)

In above code we have created vertices and edges using the Twitter followers & followees RDD. Also, we have to provide a default entry like defaultUser.

Now, comes the trivial task of finding the most influential/alpha user which has maximum influence up to 2 levels, i.e., the one who have maximum followers of followers in total. For this, we are going to use Pregel API of Spark GraphX and Breadth First Traversal algorithm.

val subGraph = graph.pregel("", 2, EdgeDirection.In)((id, attr, msg) =>
attr + "," + msg,
triplet => Iterator((triplet.srcId, triplet.dstAttr)),
(a, b) => (a + "," + b))

The above code is hard to digest at first. So, let's try to understand it.

  • The first argument in pregel is the initial message that is sent to all vertices in the graph which are connected.
  • The second argument is the number of iterations which tells to pregel API as to how many times the message has to be sent among connected vertices.
  • The third argument is the direction of edge in which message has to be sent. We selected inward direction because we have to traverse to root node from leaf nodes. So, that we can get all followers of followers for all users in the graph.
  • Then comes the vprog argument which asks for the function/expression to get vertex argument. For that, we have combined the attribute and message of the previous vertex as to accumulate all users following an individual user on Twitter.
  • After that comes the sendMsg argument which requires an expression for calculating the message to be sent in next iteration. Here we are sending followers name so that we can accumulate all users following an individual user on Twitter.
  • Lastly comes the mergeMsg argument, which reduces/merges the messages sent to a vertex during an iteration of Pregel API.

Now, we just have to find the user with maximum Followers of Followers. For that we can use following code:

val alphaUser = subGraph.vertices.map(vertex => (vertex._1, vertex._2.split(",").distinct.length - 2)).max()(new Ordering[Tuple2[VertexId, Int]]() {
  override def compare(x: (VertexId, Int), y: (VertexId, Int)): Int =
    Ordering[Int].compare(x._2, y._2) 
})

val alphaUserId = graph.vertices.filter(_._1 == alphaUser._1).map(_._2).collect().head

The variable alphaUserId will provide us the Id of Alpha User (as the name suggests). In the same way, if we wanted to find the most influential user up to N level then just increase the number of iterations to N instead of 2 in Pregel API. It is as easy as it gets ;)

I hope you liked this post and want to have a hands-on experience for this example. So, just download or clone the code from here: https://github.com/knoldus/spark-graphx-twitter and run it.

New Mesosphere DC/OS 1.10: Production-proven reliability, security & scalability for fast-data, modern apps. Register now for a live demo.

Topics:
spark ,scala ,graphx ,big data

Published at DZone with permission of Himanshu Gupta, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}