Over a million developers have joined DZone.

Fast and Accurate Categorical Distribution Without Reshuffling in Apache Spark

Using Seahorse to provide users with accurate distributions for categorical data in Apache Spark.

· Big Data Zone

Learn how you can maximize big data in the cloud with Apache Hadoop. Download this eBook now. Brought to you in partnership with Hortonworks.

In Seahorse, we want to provide our users with accurate distributions for their categorical data. Categorical data can be thought of as possible results of an observation that can take one of K possible outcomes. Some examples: Nationality, Marital Status, Gender, Type of Education.

categorical

In the described scenario we don’t have prior knowledge whether a given feature is categorical or not. We would like to treat features as categorical as long as their value set is small enough. Moreover, if there are too many distinct values we are no longer interested in its discrete distribution.

Naive and easy approach to this problem would be to simply use reduceByKey or groupByKey. Unfortunately using those methods means poor performance due to Spark’s partition reshuffling.

// limit - max. number of distinct categories to treat a feature as categorical
if(rdd.distinct().count() < limit) {
 // causes partition reshuffling
 val discreteDistribution = rdd.map(v => v -> 1).reduceByKey(_ + _).collect()
 Some(discreteDistribution)
} else {
 None
}

Counting occurrences of specific values of a feature can be easily done without reshuffling. We might think of calculating category distribution as counting occurrences of each category.

Let the code speak for itself:

import scalaz._
import scalaz.Scalaz._
(...) // other imports

object Demo {

 // limit - represents the maximum number of categories.
 // if there are more distinct values than `limit`, no categorical distribution will be returned
 case class CategoryDistributionCalculator[T](limit: Int) {

 // `zero` - represents initial value.
 // `seq` - mergeValue is used to accumulate values across one partition
 // `comb` - mergeCombiners is used to accumulate results from mergeValue across multiple partitions
 def calculate(rdd: RDD[T]): Option[Map[T, Long]] = rdd.aggregate(zero)(mergeValue _, mergeCombiners _)

 private val zero = Option(mutable.Map.empty[T, Long])

 def mergeValue[T](acc: Option[Map[T, Long]], next: T): Option[Map[T, Long]] = {
 mergeCombiners(acc, Some(Map(next -> 1)))
 }

 def mergeCombiners[T](
 leftOpt: Option[Map[T, Long]],
 rightOpt: Option[Map[T, Long]]): Option[Map[T, Long]] = {
 for (left <- leftOpt; right <- rightOpt) yield {
 val sumMap = left |+| right
 (sumMap.keySet.size <= limit).option(sumMap)
 }
 }.flatten

 }

 def main(args: Array[String]): Unit = {
 (...)
 val valueSlow = rdd.map(v => v -> 1).reduceByKey(_ + _).collect()
 val valueFast = CategoryDistributionCalculator(30).calculate(rdd).get
 (...)
 }

}

Code above is clean, functional and takes advantage of Scalaz. Unfortunately, it allocates a new object per each record in the Data Set. With Apache Spark sometimes it’s better not to use immutable objects in favor of mutable ones for performance’s sake. Functions passed to aggregate can mutate their left argument and return it for better performance. In our next iteration we will take advantage of that. Code below is not as as pretty, but its performance is much better.

(...) // imports

object Demo {

 // limit - represents maximum size of categorical type
 // if there are more distinct values than `limit`, no categorical distribution will be returned
 case class CategoryDistributionCalculator[T](limit: Int) {

 // None is returned if there are too many distinct values (>limit) to calculate categorical distribution
 def calculate(rdd: RDD[T]): Option[Map[T, Long]] = {
 // `aggregate` calculates one result from full dataset
 // first, it calculates partial results for each partition with `seq` function
 // then, it combines those partial partition results together using `comb` function to get only one result

 // `zero` represents initial value
 // `seq` - mergeValue is used to accumulate values across one partition
 // `comb` - mergeCombiners is used to accumulate results from mergeValue across multiple partitions
 val mutableMapOpt = rdd.aggregate(zero)(mergeValue _, mergeCombiners _)
 val immutableMapOpt = mutableMapOpt.map(_.toMap)
 immutableMapOpt
 }

 // Both functions passed to aggregate allow us to mutate their left argument and return it.
 // This reduces object allocations. It's especially significant in `mergeValue` function
 // that is called for every record in the dataset.

 private val zero = Option(mutable.Map.empty[T, Long])

 private def mergeValue(accOpt: Option[mutable.Map[T, Long]],
 next: T): Option[mutable.Map[T, Long]] = {
 accOpt.foreach { acc =>
 addOccurrencesToMap(acc, next, 1)
 }
 replacedWithNoneIfLimitExceeded(accOpt)
 }

 private def mergeCombiners(leftOpt: Option[mutable.Map[T, Long]],
 rightOpt: Option[mutable.Map[T, Long]]): Option[mutable.Map[T, Long]] = {
 for (left <- leftOpt; rightMap <- rightOpt) {
 rightMap.foreach { case (element, count) =>
 addOccurrencesToMap(left, element, count)
 }
 }
 replacedWithNoneIfLimitExceeded(leftOpt)
 }

 private def addOccurrencesToMap(
 occurrences: mutable.Map[T, Long],
 element: T,
 count: Long): Unit = {
 occurrences(element) = occurrences.getOrElse(element, 0L) + count
 }

 private def replacedWithNoneIfLimitExceeded(
 mapOpt: Option[mutable.Map[T, Long]]): Option[mutable.Map[T, Long]] = {
 mapOpt.flatMap { map =>
 if (map.size <= limit) mapOpt else None
 }
 }
 }

 def main(args: Array[String]): Unit = {
 (...)
 val valueSlow = rdd.map(v => v -> 1).reduceByKey(_ + _).collect()
 val valueFast = CategoryDistributionCalculator(30).calculate(rdd).get
 (...)
 }

}

Measurements

Our procedure should be at least twice as fast. group by works in two stages with partition reshuffling between them. Our solution should run in one stage.

Simple experiment with Apache Spark seems to confirm that. For our experiment we used:

  • A local Spark cluster,
  • Dataset size of 10,000,000,
  • 33 labels in category, uniformly distributed.

Jobs 0 and 1 are parts of a simple, not optimised solution. Job 2 is our optimized procedure.

// job 0 - count call - check if distinct value set size is small enough to interpret data as categorical.
if(rdd.distinct().count() < limit) {
 // job 1 - collect call - calculates discrete distribution
 val discreteDistribution = rdd.map(v => v -> 1).reduceByKey(_ + _).collect()
 Some(discreteDistribution)
}

// job 2 - calculates discrete distribution and dynamically checks distinct value set size.
CategoryDistributionCalculator(limit).execute(rdd)

testjob 1

Job 1 (with reduceByKey) has 400 tasks. It uses memory to perform shuffling.

Optimised Job 2 has only 200 tasks, runs in one stage and doesn’t use additional memory to perform shuffling. Checking distinct value set size (from Job 0) is done here dynamically.

Summary

Categorical features are very common in Data Science. With our approach we are able to calculate accurate categorical distributions fast and without prior knowledge if a feature is categorical or not. It is guaranteed to execute in only one Apache Spark job and without any partition reshuffling.

The post Fast and accurate categorical distribution without reshuffling in Apache Spark appeared first on deepsense.io.

Hortonworks DataFlow is an integrated platform that makes data ingestion fast, easy, and secure. Download the white paper now.  Brought to you in partnership with Hortonworks

Topics:
categorization ,scala ,apache spark

Published at DZone with permission of deepsense.io Blog, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}