Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Analyzing Twitter Data With Spark and Algebird

DZone's Guide to

Analyzing Twitter Data With Spark and Algebird

Learn about an open-source tool that can be used with Apache Spark to analyze Twitter data as well as simplify building aggregation systems.

· Big Data Zone
Free Resource

Learn best practices according to DataOps. Download the free O'Reilly eBook on building a modern Big Data platform.

Algebird is an open-source library that provides abstract algebra for Scala and implements some probabilistic data structures. Algebird was designed on Twitter with a target to simplify building aggregation systems like Apache Spark, Scalding, Apache Storm, etc.

Probabilistic data structures are one of the most impressive tools for making effective data analysis with a small memory usage. In this article, we will consider how to unleash the power of probabilistic data structures (from Algebird) to increase the effectivity of streaming data processing with Spark. The most prominent and probabilistic data structures are:

  • Bloom filters test whether an element is a member of a set of false positive matches. Apache Cassandra, HBase, and Google BigTable use bloom filters to reduce IO operations. Medium uses it to recommend articles. Google Chrome uses it to check URLs in a database of dangerous sites.

  • A skip list is extension of a sorted linked list with additional links added in random paths with a geometric/negative binomial distribution.  ConcurrentSkipListMap in Java is the thread-safe lock-free implementation of skip lists.

  • MinHash/LHS estimates how similar two sets are with the specified probability.

  • HyperLogLog approximates the number of distinct elements in a set with the specified probability; for example, a number of unique properties of a user in a stream of data.

  • Count-min sketches build a frequency table of events in a stream of data with a certain probability. Google uses it in their Map-Reduce parallel computation framework. It's also widely used for tasks with traffic analysis with limitations on memory.

Many algorithms have a probabilistic nature. Even TCP and UDP packets have a checksum that helps find errors with only a (rather large) probability. From the same series, you can look at distributed algorithms (i.e. Gossip, Raft, and so on).

Twitter is a perfect source of data if you study new technologies and don’t know what data source to choose. Let’s analyze a tweet using Algebird. First, we need to create a developer account on Twitter. This just involves fill forms, and after successful registration, you can obtain the necessary info for authentication — consumerKey, consumerSecret, accessToken, accessTokenSecret  — and put them in a twitter.conf file (instead of asterisks in my mock).

The initialization of Twitter streaming looks like this:

  private def setupLogging(): Unit = {
    import org.apache.log4j.{Level, Logger}
    val rootLogger = Logger.getRootLogger
    rootLogger.setLevel(Level.ERROR)
  }

  private def setupTwitter(): Unit = {
    import scala.io.Source

    for (line <- Source.fromFile("twitter.conf").getLines) {
      val fields = line.split(" ")
      if (fields.length == 2) {
        System.setProperty("twitter4j.oauth." + fields(0), fields(1))
      }
    }
  }

  /**
    * Path to checkpoint directory
    */
  private val CHECKPOINT_PATH = "checkpoint"

  /**
    * Launch app and set up checkpoint
    * @param ssc StreamingContext
    */
  def startApp(ssc: StreamingContext): Unit = {
    ssc.checkpoint(CHECKPOINT_PATH)
    ssc.start()
    ssc.awaitTermination()
  }

  /**
    * Get StreamingContext and ReceiverInputDStream for streaming tweets
    * @param appName application name
    * @return the tuple of StreamingContext ReceiverInputDStream
    */
  def initStreamContext(appName: String, seconds:Int): (StreamingContext, ReceiverInputDStream[Status]) = {
    setupTwitter()
    val ssc = new StreamingContext("local[*]", appName, Seconds(seconds))
    setupLogging()
    val tweets = TwitterUtils.createStream(ssc, None)
    (ssc, tweets)
  }

Let’s make sure that Twitter streaming works properly. Simply print the most popular hashtags in a five-minute window.

private def popularTweets(tweets: ReceiverInputDStream[Status]) = {
    val statuses = tweets.map(status => status.getText)
    val tweetWords = statuses.flatMap(tweetText => tweetText.split(" "))
    val hashtags = tweetWords.filter(word => word.startsWith("#"))

    val hashtagKeyValues = hashtags.map(hashtag => (hashtag, 1))

    //5 minute window sliding every one second
    val hashtagCounts = hashtagKeyValues.reduceByKeyAndWindow(_ + _, _ - _, Seconds(300), Seconds(1))

    val sortedResults = hashtagCounts.transform(rdd => rdd.sortBy({ case (_, count) => count }, ascending = false))
    sortedResults.print
}

In the output, you can see the top ten most popular hashtags in a five-minute window every second!

Let’s count information about the most valuable users in a single batch and overall with two different methods — via theHyperLogLog algorithm and the usual method with the counter for each unique word:

 /**
    * Precision for HLL algoritm, computed as 1.04/sqrt(2^{bits})
    */
  private val HLL_PRECISION = 18

  /**
    * Global HLL instance
    */
  private val HYPER_LOG_LOG_MONOID = new HyperLogLogMonoid(HLL_PRECISION)

  /**
    * Get information about distinct users in single batch and overall and via
    * leveraging HLL algorithm from Algebird and compare it to usual method wia Map-Reduce
    * @param tweets Stream of tweets
    */
  def distinctUsers(tweets: ReceiverInputDStream[Status]): Unit = {
    val users = tweets.map(status => status.getUser.getId)

    val approxUsers: DStream[HLL] = users.mapPartitions(ids => {
      val hll = new HyperLogLogMonoid(HLL_PRECISION)
      ids.map(id => hll(id))
    }).reduce(_ + _)

    val exactUsers: DStream[Set[Long]] = users.map(id => Set(id)).reduce(_ ++ _)

    approximateUserCountHLL(approxUsers)
    exactUserCount(exactUsers)
  }

  private def exactUserCount(exactUsers: DStream[Set[Long]]) = {
    var userSet: Set[Long] = Set()
    exactUsers.foreachRDD(rdd => {
      if (rdd.count() != 0) {
        val partial = rdd.first()
        userSet ++= partial
        println("Exact --> Amount of users this batch: %d".format(partial.size))
        println("Exact --> Amount of users overall: %d%n".format(userSet.size))
      }
    })
  }

  private def approximateUserCountHLL(approxUsers: DStream[HLL]) = {
    var h = HYPER_LOG_LOG_MONOID.zero
    approxUsers.foreachRDD(rdd => {
      if (rdd.count() != 0) {
        val partial = rdd.first()
        h += partial
        println("HLL --> Amount of users in this batch: %d".format(partial.estimatedSize.toInt))
        println("HLL --> Amount of users overall: %d".format(HYPER_LOG_LOG_MONOID.estimateSize(h).toInt))
      }
    })
  }

Accuracy is specified via the HLL_PRECISION constant, meaning that precision is 1.04/sqrt(2¹⁸) accordingly to the algorithm. Algebird provides an HLL instance in Monoid style. For counting users with HLL in-batch and overall, you must specify the reduce function and apply it to DStream (it’s variable approxUsers; Line 24 on the code snapshot above). Next, do simple logic with aggregation in the method approximateUserCountHLL.

In the output, you can see the difference between working of HLL and the usual algorithm of “a word counter.” HLL is pretty accurate.

Let’s consider the next algorithm from Algebird’s collection: CountMinSketch. We can use this method for counting all users in-batch and overall. Like in the previous example, I made the comparison between algorithm from Algebird and usual:

   /**
    * CMS instance initializer
    */
  object CountMinSketchMonoidInitializer{
    private val DELTA = 1E-10
    private val EPS = 0.01
    private val SEED = 1
    private val PERC = 0.001

    def initCMS = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC)
  }

  private val COUNT_MIN_SKETCH_MONOID = CountMinSketchMonoidInitializer.initCMS
  private var globalCMS = COUNT_MIN_SKETCH_MONOID.zero

  /**
    * Get information about amount of users in single batch and overall and via
    * leveraging Count–min sketch algorithm from Algebird and compare it to the usual method
    * @param tweets Stream of tweets
    */
  def userCounter(tweets: ReceiverInputDStream[Status]): Unit = {
    val USERS_COUNT = 5
    val users: DStream[Long] = tweets.map(status => status.getUser.getId)

    val approxTopUsers: DStream[CMS] = users.mapPartitions(ids => {
      ids.map(id => COUNT_MIN_SKETCH_MONOID.create(id))
    }).reduce(_ ++ _)

    val exactTopUsers: DStream[(Long, Int)] = users.map(id => (id, 1))
      .reduceByKey(_ + _)

    topTwitterUsersWithCountMinSketch(approxTopUsers, USERS_COUNT)
    exactTopUserInTwitter(exactTopUsers, USERS_COUNT)
  }

  private def topTwitterUsersWithCountMinSketch(approxTopUsers: DStream[CMS], USERS_COUNT: Int) = {
    approxTopUsers.foreachRDD(rdd => {
      if (rdd.count() != 0) {
        val partial = rdd.first()
        val partialTopK = partial.heavyHitters.map(id =>
          (id, partial.frequency(id).estimate)).toSeq.sortBy { case (_, amount) => amount }.reverse.slice(0, USERS_COUNT)
        globalCMS ++= partial
        val globalTopK = globalCMS.heavyHitters.map(id =>
          (id, globalCMS.frequency(id).estimate)).toSeq.sortBy { case (_, amount) => amount }.reverse.slice(0, USERS_COUNT)
        println("CMS --> Top most valuable users this batch: %s".format(partialTopK.mkString("[", ",", "]")))
        println("CMS --> Top most valuable users overall: %s".format(globalTopK.mkString("[", ",", "]")))
      }
    })
  }

  private def exactTopUserInTwitter(exactTopUsers: DStream[(Long, Int)], amountToDisplay:Int) = {
    var globalExact = Map[Long, Int]()
    val mm = new MapMonoid[Long, Int]()
    exactTopUsers.foreachRDD(rdd => {
      if (rdd.count() != 0) {
        val partialMap = rdd.collect().toMap
        val partialTopK = rdd.sortByKey(ascending = false).take(amountToDisplay)
        globalExact = mm.plus(globalExact, partialMap)
        val globalTopK = globalExact.toSeq.sortBy { case (_, amount) => amount }.reverse.slice(0, amountToDisplay)
        println("Exact --> Top most valuable users in this batch: %s".format(partialTopK.mkString("[", ",", "]")))
        println("Exact --> Top most valuable users overall: %s".format(globalTopK.mkString("[", ",", "]")))
      }
    })
  }

Code with count-min sketch is very similar to code with HyperLogLog exсept for the configuration of the Monoid instance — CountMinSketchMonoid has more parameters and aggregations than  HyperLogLogMonoid — for count-min sketch, it’s a map instead of a simple counter.

You can find a full listing of the source code on my GitHub repository.

Find the perfect platform for a scalable self-service model to manage Big Data workloads in the Cloud. Download the free O'Reilly eBook to learn more.

Topics:
big data ,data analytics ,twitter ,apache spark ,tutorial ,data aggregation

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}