In Seahorse, we want to provide our users with accurate distributions for their categorical data. Categorical data can be thought of as possible results of an observation that can take one of *K* possible outcomes. Some examples: Nationality, Marital Status, Gender, Type of Education.

In the described scenario we don’t have prior knowledge whether a given feature is categorical or not. We would like to treat features as categorical as long as their value set is small enough. Moreover, if there are too many distinct values we are no longer interested in its discrete distribution.

Naive and easy approach to this problem would be to simply use **reduceByKey** or **groupByKey**. Unfortunately using those methods means poor performance due to Spark’s partition reshuffling.

```
// limit - max. number of distinct categories to treat a feature as categorical
if(rdd.distinct().count() < limit) {
// causes partition reshuffling
val discreteDistribution = rdd.map(v => v -> 1).reduceByKey(_ + _).collect()
Some(discreteDistribution)
} else {
None
}
```

Counting occurrences of specific values of a feature can be easily done without reshuffling. We might think of calculating category distribution as counting occurrences of each category.

Let the code speak for itself:

```
import scalaz._
import scalaz.Scalaz._
(...) // other imports
object Demo {
// limit - represents the maximum number of categories.
// if there are more distinct values than `limit`, no categorical distribution will be returned
case class CategoryDistributionCalculator[T](limit: Int) {
// `zero` - represents initial value.
// `seq` - mergeValue is used to accumulate values across one partition
// `comb` - mergeCombiners is used to accumulate results from mergeValue across multiple partitions
def calculate(rdd: RDD[T]): Option[Map[T, Long]] = rdd.aggregate(zero)(mergeValue _, mergeCombiners _)
private val zero = Option(mutable.Map.empty[T, Long])
def mergeValue[T](acc: Option[Map[T, Long]], next: T): Option[Map[T, Long]] = {
mergeCombiners(acc, Some(Map(next -> 1)))
}
def mergeCombiners[T](
leftOpt: Option[Map[T, Long]],
rightOpt: Option[Map[T, Long]]): Option[Map[T, Long]] = {
for (left <- leftOpt; right <- rightOpt) yield {
val sumMap = left |+| right
(sumMap.keySet.size <= limit).option(sumMap)
}
}.flatten
}
def main(args: Array[String]): Unit = {
(...)
val valueSlow = rdd.map(v => v -> 1).reduceByKey(_ + _).collect()
val valueFast = CategoryDistributionCalculator(30).calculate(rdd).get
(...)
}
}
```

Code above is clean, functional and takes advantage of Scalaz. Unfortunately, it allocates a new object per each record in the Data Set. With Apache Spark sometimes it’s better not to use immutable objects in favor of mutable ones for performance’s sake. Functions passed to `aggregate`

can mutate their left argument and return it for better performance. In our next iteration we will take advantage of that. Code below is not as as pretty, but its performance is much better.

```
(...) // imports
object Demo {
// limit - represents maximum size of categorical type
// if there are more distinct values than `limit`, no categorical distribution will be returned
case class CategoryDistributionCalculator[T](limit: Int) {
// None is returned if there are too many distinct values (>limit) to calculate categorical distribution
def calculate(rdd: RDD[T]): Option[Map[T, Long]] = {
// `aggregate` calculates one result from full dataset
// first, it calculates partial results for each partition with `seq` function
// then, it combines those partial partition results together using `comb` function to get only one result
// `zero` represents initial value
// `seq` - mergeValue is used to accumulate values across one partition
// `comb` - mergeCombiners is used to accumulate results from mergeValue across multiple partitions
val mutableMapOpt = rdd.aggregate(zero)(mergeValue _, mergeCombiners _)
val immutableMapOpt = mutableMapOpt.map(_.toMap)
immutableMapOpt
}
// Both functions passed to aggregate allow us to mutate their left argument and return it.
// This reduces object allocations. It's especially significant in `mergeValue` function
// that is called for every record in the dataset.
private val zero = Option(mutable.Map.empty[T, Long])
private def mergeValue(accOpt: Option[mutable.Map[T, Long]],
next: T): Option[mutable.Map[T, Long]] = {
accOpt.foreach { acc =>
addOccurrencesToMap(acc, next, 1)
}
replacedWithNoneIfLimitExceeded(accOpt)
}
private def mergeCombiners(leftOpt: Option[mutable.Map[T, Long]],
rightOpt: Option[mutable.Map[T, Long]]): Option[mutable.Map[T, Long]] = {
for (left <- leftOpt; rightMap <- rightOpt) {
rightMap.foreach { case (element, count) =>
addOccurrencesToMap(left, element, count)
}
}
replacedWithNoneIfLimitExceeded(leftOpt)
}
private def addOccurrencesToMap(
occurrences: mutable.Map[T, Long],
element: T,
count: Long): Unit = {
occurrences(element) = occurrences.getOrElse(element, 0L) + count
}
private def replacedWithNoneIfLimitExceeded(
mapOpt: Option[mutable.Map[T, Long]]): Option[mutable.Map[T, Long]] = {
mapOpt.flatMap { map =>
if (map.size <= limit) mapOpt else None
}
}
}
def main(args: Array[String]): Unit = {
(...)
val valueSlow = rdd.map(v => v -> 1).reduceByKey(_ + _).collect()
val valueFast = CategoryDistributionCalculator(30).calculate(rdd).get
(...)
}
}
```

## Measurements

Our procedure should be at least twice as fast. `group by`

works in two stages with partition reshuffling between them. Our solution should run in one stage.

Simple experiment with Apache Spark seems to confirm that. For our experiment we used:

- A local Spark cluster,
- Dataset size of 10,000,000,
- 33 labels in category, uniformly distributed.

Jobs 0 and 1 are parts of a simple, not optimised solution. Job 2 is our optimized procedure.

```
// job 0 - count call - check if distinct value set size is small enough to interpret data as categorical.
if(rdd.distinct().count() < limit) {
// job 1 - collect call - calculates discrete distribution
val discreteDistribution = rdd.map(v => v -> 1).reduceByKey(_ + _).collect()
Some(discreteDistribution)
}
// job 2 - calculates discrete distribution and dynamically checks distinct value set size.
CategoryDistributionCalculator(limit).execute(rdd)
```

Job 1 (with reduceByKey) has 400 tasks. It uses memory to perform shuffling.

Optimised Job 2 has only 200 tasks, runs in one stage and doesn’t use additional memory to perform shuffling. Checking distinct value set size (from Job 0) is done here dynamically.

### Summary

Categorical features are very common in Data Science. With our approach we are able to calculate accurate categorical distributions fast and without prior knowledge if a feature is categorical or not. It is guaranteed to execute in only one Apache Spark job and without any partition reshuffling.

The post Fast and accurate categorical distribution without reshuffling in Apache Spark appeared first on deepsense.io.

## {{ parent.title || parent.header.title}}

{{ parent.tldr }}

## {{ parent.linkDescription }}

{{ parent.urlSource.name }}