Tweet Analysis Using Lambda Architecture
Lambda architecture can help you process massive quantities of data — even on websites with tons of it, such as Twitter.
Join the DZone community and get the full member experience.
Join For FreeIn this blog, I will explain how to analyze tweets with Lambda architecture. First, we need to understand what Lambda architecture is about — more specifically, its component and usage.
According to Wikipedia:
Lambda architecture is a data processing architecture designed to handle massive quantities of data by taking advantage of both batch and stream processing methods.
Now, let's see Lambda architecture components in detail.
This architecture is classified into three layers:
The batch layer precomputes the master dataset (the core components of Lambda architecture that contains all data) into batch views so that queries can be resolved with low latency.
In speed layer, we basically do two things: store the real-time views and process the incoming data stream so as to update those views. It fills the gap that is left by the batch layer. That means that combining speed layer view and batch view gives us the capability to fire any ad hoc query over all data — that is, query=function(overall data).
The serving layer provides low-latency access to the results of calculations performed on the master dataset. It combines batch view and real-time view to give results in real-time for any ad hoc query over all data.
So, in short, we can say that Lambda architecture is query=function(over all data).
Now, I am going to describe Twitter’s tweet analysis with the help of Lambda architecture. This project uses the Twitter4J streaming API and Apache Kafka to get and store Twitter’s real-time data. I have used Apache Cassandra for storing master datasets, batch view, and realtime view.
Batch Layer
To process data in batch, we use the Apache Spark (a fast and general engine for large-scale data processing) engine. To store batch view, we use Cassandra. To do this, we have created BatchProcessingUnit
to create all batch views on a master dataset.
class BatchProcessingUnit {
val sparkConf = new SparkConf()
.setAppName("Lambda_Batch_Processor").setMaster("local[2]")
.set("spark.cassandra.connection.host", "127.0.0.1")
.set("spark.cassandra.auth.username", "cassandra")
val sc = new SparkContext(sparkConf)
def start: Unit ={
val rdd = sc.cassandraTable("master_dataset", "tweets")
val result = rdd.select("userid","createdat","friendscount").where("friendsCount > ?", 500)
result.saveToCassandra("batch_view","friendcountview",SomeColumns("userid","createdat","friendscount"))
result.foreach(println)
}
}
We have used the Akka scheduler to schedule batch process in specified intervals.
Speed Layer
In the speed layer, we have used Spark Streaming to process real-time Tweets and store them in Cassandra.
To do this, we have created SparkStreamingKafkaConsumer
, which reads data from the Kafka queue, tweets a topic, and sends it to the view handler of the speed layer to generate all views.
object SparkStreamingKafkaConsumer extends App {
val brokers = "localhost:9092"
val sparkConf = new SparkConf().setAppName("KafkaDirectStreaming").setMaster("local[2]")
.set("spark.cassandra.connection.host", "127.0.0.1")
.set("spark.cassandra.auth.username", "cassandra")
val ssc = new StreamingContext(sparkConf, Seconds(10))
ssc.checkpoint("checkpointDir")
val topicsSet = Set("tweets")
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "group.id" -> "spark_streaming")
val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
val tweets: DStream[String] = messages.map { case (key, message) => message }
ViewHandler.createAllView(ssc.sparkContext, tweets)
ssc.start()
ssc.awaitTermination()
}
Serving Layer
In the serving layer, we have combined batch view data and real-time view data to satisfy the ad hoc query requirement. Here is an example in which have to try to analyze all Twitter users who match the specify hashtag and have follower counts greater than 500.
def findTwitterUsers(minute: Long, second: Long, tableName: String = "tweets"): Response = {
val batchInterval = System.currentTimeMillis() - minute * 60 * 1000
val realTimeInterval = System.currentTimeMillis() - second * 1000
val batchViewResult = cassandraConn.execute(s"select * from batch_view.friendcountview where createdat >= ${batchInterval} allow filtering;").all().toList
val realTimeViewResult = cassandraConn.execute(s"select * from realtime_view.friendcountview where createdat >= ${realTimeInterval} allow filtering;").all().toList
val twitterUsers: ListBuffer[TwitterUser] = ListBuffer()
batchViewResult.map { row =>
twitterUsers += TwitterUser(row.getLong("userid"), new Date(row.getLong("createdat")), row.getLong("friendscount"))
}
realTimeViewResult.map { row =>
twitterUsers += TwitterUser(row.getLong("userid"), new Date(row.getLong("createdat")), row.getLong("friendscount"))
}
Response(twitterUsers.length, twitterUsers.toList)
}
Finally, this project uses Akka HTTP for building REST API to fire ad hoc queries.
I hope this will be helpful for you in building Big Data applications using Lambda architecture.
You can get source code here.
References
Published at DZone with permission of Narayan Kumar. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments