Efficient Data Reports in Apache Spark With Aggregator Batching

DZone 's Guide to

Efficient Data Reports in Apache Spark With Aggregator Batching

When building statistical reports such as histograms on large streaming datasets, a batching approach can make the necessary calculations much more efficient.

· Big Data Zone ·
Free Resource

Seahorse provides users with reports on their data at every step in the workflow. A user can view reports after each operation to review the intermediate results. In our reports we provide users with distributions for columns in the form of a histogram for continuous data, and a pie chart for categorical data.


Categorical distribution

Image title

DataFrame report. Users are able to view distributions of their data after clicking on the chart icon.

Data Reports in Seahorse

Reports are generated for each DataFrame after every Operation. This means that performance is crucial here.

In our first approach, we used Apache Spark’s built-in histogram on RDD[Double] feature. We would initially map our RDD[Row] to RDD[Double] for each Double column and call histogram for each one.

Unfortunately this approach is very time-intensive. Calculating histograms for each column independently meant that we were performing expensive, full-data passes per each column.

Another alternative would be to introduce a form of special multi-histogram operation which would operate on multiple columns. Apache Spark already does that for column statistics – there is a Multicolumn Statistics method that calculates column statistics for each column in only one data pass (MultivariateStatisticalSummary).

This approach was not good enough for us. We would like to have a generic solution to combine arbitrary operations together instead of writing custom multi-operations. 

Abstracting Over Aggregator

An operation producing one value out of a whole dataset is called aggregation. In order to perform an aggregation, the user must provide three components:

  • mergeValue function – aggregates results from a single partition
  • mergeCombiners function – merges aggregated results from the partitions
  • initialElement – the initial aggregator value.

With these three pieces of data, Apache Spark is able to aggregate data across each partition and then combine these partial results together to produce final value.

We introduced an abstract Aggregator encapsulating the mergeValue, mergeCombiners and, initialElement properties. Thanks to this abstraction we can create Wrappers for our aggregators with additional behaviors.

MultiAggregator is one such wrapper. It wraps a sequence of aggregators and executes them in a batch – in a single data-pass. Its implementation involves:

  • mergeValue – aggregates results from a single partition for each aggregator
  • mergeCombiners – merges aggregated results from the partitions for each aggregator
  • initialElement – initial aggregator values for each aggregator

This special aggregator wraps a sequence of aggregators and then uses their mergeValue, mergeCombiners and initialElement functions to perform aggregation in one batch.

Reading Results in a Type-safe Manner

Each aggregator can return an arbitrary value type. In order to maintain type safety and genericity we added a special class encapsulating results for all input aggregators. In order to get specific aggregator result, we pass initial aggregator object to batched result. That way result type is correctly inferred from the aggregator’s result type.


Let’s measure time for calculating histograms for data rows made up of 5 numeric columns. 

In this experiment, we first execute an empty foreach statement to force Spark to cache the data in the clusters (job 0). Otherwise, the first job would have additional overhead for reading the data.

// job 0 - forces Spark to cache the data in cluster. It will take away data-loading overhead
// from future jobs and make future measurements reliable.
rows.foreach((_) => ())
private def sequentialProcessing(rows: RDD[Row], buckets: Array[Double]): Seq[Array[Long]] =
 for (i <- 0 until 5) yield {
   val column = rows.map(extractDoubleColumn(i))
   column.histogram(buckets, true)

And use our batched aggregator (job 6).

private def batchedProcessing(rows: RDD[Row], buckets: Array[Double]): Seq[Array[Long]] = {
 val aggregators = for (i <- 0 until 5) yield
   HistogramAggregator(buckets, true).mapInput(extractDoubleColumn(i))
 val batchedResult = AggregatorBatch.executeInBatch(rows, aggregators)
 for (aggregator <- aggregators) yield batchedResult.forAggregator(aggregator)

The results:


We improved from a linear number of histogram calls against column number to a single  batched aggregate call. This method is about 5 times faster for this specific experiment.


Using only Spark’s built-in histogram method we would be stuck with calling it for each numeric column. Using our approach we can achieve the same results in a single method call. Our method is roughly N times faster (where N stands for the number of columns).

We are also able to batch other arbitrary aggregators along with it. In our case they would be:

  • Calculating distributions of categorical columns
  • Counting missing values
  • Calculating min, max and mean values for numerical columns

In the future we could add more operations as long as they comply with the aggregator interface.

Here is our aggregator batching code along with the above experiment: GitHub repository

aggregate function, apache spark, batch processing, spark

Published at DZone with permission of deepsense.io Blog , DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}