Mastering Advanced Aggregations in Spark SQL
OLAP (Online Analytical Processing) aggregation techniques in Spark SQL are used for complex, multi-level data analysis over large datasets.
Join the DZone community and get the full member experience.
Join For FreeIn data analytics, efficiently aggregating large datasets is a basic need. For example, when working with retail inventory data, tracking products shipped to stores each month, the standard GROUP BY clause in SQL can handle basic aggregations.
However, it falls short when you need multiple levels of aggregation in a single query. This is where Spark SQL’s advanced GROUP BY extensions, GROUPING SETS, ROLLUP, and CUBE, come into the picture to compute multiple groupings efficiently.
In this article, we’ll see aggregation techniques using a retail inventory scenario. We’ll explore how to calculate shipments by store and product type, as well as totals across various dimensions, all in one query. With examples in both Scala and PySpark, complete with code snippets and outputs.
Let’s get going.
The Problem: Aggregating Retail Inventory Data
Suppose you’re a data engineer managing a retail chain’s inventory system. Every month, products like clothing, footwear, and electronics are shipped to various stores. Your dataset includes details such as store ID, product type, units shipped, and shipment month. You need to answer questions like:
- How many units of each product type were shipped to each store?
- What’s the total number of units shipped to each store, regardless of product type?
- What’s the grand total of all shipments across all stores and types?
Running separate GROUP BY queries for each question and combining the results with UNION would work, but it’s inefficient, especially with large datasets processed in Spark. Instead, GROUPING SETS, ROLLUP, and CUBE allow you to compute all these aggregations in a single task, optimizing performance and simplifying your code.
Input Data
We’ll use a sample dataset provided as a CSV string, representing shipments to two stores (101 and 102) over two months:
Here’s what the data looks like visually:
store_id | product_type | shipped_units | month |
---|---|---|---|
101 | Clothing | 2000 | 1 |
101 | Footwear | 1200 | 1 |
101 | Electronics | 2400 | 1 |
102 | Electronics | 3000 | 1 |
102 | Clothing | 1000 | 1 |
101 | Clothing | 1500 | 2 |
101 | Electronics | 800 | 2 |
102 | Clothing | 3000 | 2 |
Setting Up the Spark Environment
Before we start the aggregations, let’s set up our Spark environment in Scala and PySpark. We’ll load the CSV string into a DataFrame and create a temporary view for SQL queries.
package com.examples
import org.apache.spark.sql.SparkSession
object AdvancedAggregations {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Advanced Aggregations in Spark SQL")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val csvData = spark.sparkContext.parallelize(
"""
|store_id,product_type,shipped_units,month
|101,Clothing,2000,1
|101,Footwear,1200,1
|101,Electronics,2400,1
|102,Electronics,3000,1
|102,Clothing,1000,1
|101,Clothing,1500,2
|101,Electronics,800,2
|102,Clothing,3000,2
""".stripMargin.lines.toList).toDS()
val df = spark.read.option("header", true).option("inferSchema", true).csv(csvData)
df.show()
df.createOrReplaceTempView("shipped_products")
}
}
PySpark setup:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Advanced Aggregations in Spark SQL") \
.master("local[*]") \
.getOrCreate()
csv_data = """
store_id,product_type,shipped_units,month
101,Clothing,2000,1
101,Footwear,1200,1
101,Electronics,2400,1
102,Electronics,3000,1
102,Clothing,1000,1
101,Clothing,1500,2
101,Electronics,800,2
102,Clothing,3000,2
"""
lines = csv_data.strip().split("\n")
header = lines[0].split(",")
data = [dict(zip(header, line.split(","))) for line in lines[1:]]
df = spark.createDataFrame(data).selectExpr("cast(store_id as int)", "product_type", "cast(shipped_units as int)", "cast(month as int)")
df.show()
df.createOrReplaceTempView("shipped_products")
Output (from Scala and PySpark setup):
+--------+------------+-------------+-----+
|store_id|product_type|shipped_units|month|
+--------+------------+-------------+-----+
| 101| Clothing| 2000| 1|
| 101| Footwear| 1200| 1|
| 101| Electronics| 2400| 1|
| 102| Electronics| 3000| 1|
| 102| Clothing| 1000| 1|
| 101| Clothing| 1500| 2|
| 101| Electronics| 800| 2|
| 102| Clothing| 3000| 2|
+--------+------------+-------------+-----+
With our data loaded, let’s explore the aggregation techniques.
GROUPING SETS: Multi-Level Aggregations
GROUPING SETS is an extension of GROUP BY that lets you define multiple grouping combinations in a single query. Think of it as a way to perform several GROUP BY operations and union their results, all in one single go.
GROUP BY GROUPING SETS is equivalent to the UNION of two or more GROUP BY operations in the same result set:
- GROUP BY GROUPING SETS((a)) is equivalent to the single grouping set operation GROUP BY a.
- GROUP BY GROUPING SETS((a),(b)) is equivalent to GROUP BY a UNION ALL GROUP BY b.
Example: Shipments by Store and Product Type, Plus Store Totals
Let’s calculate the total units shipped to each store for each product type, plus the total units per store across all types.
Scala/PySpark code:
spark.sql("""
SELECT store_id, product_type, SUM(shipped_units) as total_count
FROM shipped_products
GROUP BY store_id, product_type
GROUPING SETS ((store_id, product_type), (store_id))
""").show()
Output:
+--------+------------+-----------+
|store_id|product_type|total_count|
+--------+------------+-----------+
| 102| null| 7000|
| 101| Electronics| 3200|
| 102| Electronics| 3000|
| 101| null| 7900|
| 101| Clothing| 3500|
| 102| Clothing| 4000|
| 101| Footwear| 1200|
+--------+------------+-----------+
Explanation:
(store_id, product_type)
gives totals for each store-product combination (e.g., 3200 Electronics for store 101).(store_id)
gives totals per store across all types (e.g., 7900 for store 101), with product_type as null.
Adding the Grand Total
To include the overall total of all shipments, add an empty set ():
spark.sql("""
SELECT store_id, product_type, SUM(shipped_units) as total_count
FROM shipped_products
GROUP BY store_id, product_type
GROUPING SETS ((store_id, product_type), (store_id), ())
""").show()
Output:
+--------+------------+-----------+
|store_id|product_type|total_count|
+--------+------------+-----------+
| null| null| 14900|
| 102| null| 7000|
| 101| Electronics| 3200|
| 102| Electronics| 3000|
| 101| null| 7900|
| 101| Clothing| 3500|
| 102| Clothing| 4000|
| 101| Footwear| 1200|
+--------+------------+-----------+
Explanation:
()
adds the grand total (14900), with both store_id and product_type as null.
Using GROUPING_ID for Clarity
The GROUPING_ID
function provides a bit vector to identify which columns are grouped, helping in explaining the result.
spark.sql("""
SELECT store_id, product_type, SUM(shipped_units) as total_count, GROUPING__ID
FROM shipped_products
GROUP BY store_id, product_type
GROUPING SETS ((store_id, product_type), (store_id), ())
""").show(
Output:
+--------+------------+-----------+------------+
|store_id|product_type|total_count|grouping__id|
+--------+------------+-----------+------------+
| null| null| 14900| 3|
| 102| null| 7000| 1|
| 101| Electronics| 3200| 0|
| 102| Electronics| 3000| 0|
| 101| null| 7900| 1|
| 101| Clothing| 3500| 0|
| 102| Clothing| 4000| 0|
| 101| Footwear| 1200| 0|
+--------+------------+-----------+------------+
Explanation:
- 0: Both columns grouped.
- 1: Only store_id grouped.
- 3: No columns grouped (grand total).
ROLLUP: Hierarchical Aggregations Made Simple
ROLLUP builds on GROUPING SETS to create hierarchical aggregations, ideal for subtotals along a dimension. For store_id
and product_type
, it generates:
- (store_id, product_type)
- (store_id)
- ()
Example: Rollup
spark.sql("""
SELECT store_id, product_type, SUM(shipped_units) as total_count
FROM shipped_products
GROUP BY store_id, product_type WITH ROLLUP
""").show()
Output: (The N elements of a ROLLUP specification correspond to N+1 GROUPING SETS.)
+--------+------------+-----------+
|store_id|product_type|total_count|
+--------+------------+-----------+
| null| null| 14900|
| 102| null| 7000|
| 101| Electronics| 3200|
| 102| Electronics| 3000|
| 101| null| 7900|
| 101| Clothing| 3500|
| 102| Clothing| 4000|
| 101| Footwear| 1200|
+--------+------------+-----------+
Explanation: This matches the GROUPING SETS ((store_id, product_type), (store_id), ())
output, providing a hierarchical view from specific combinations to the grand total.
CUBE: Multi-Dimensional Aggregations
CUBE takes aggregation to the next level by computing all possible combinations of the specified columns. For two columns, it produces 2² = 4 sets:
- (
store_id
,product_type
) - (
store_id
) - (
product_type
) - ()
Example: Cube
spark.sql("""
SELECT store_id, product_type, SUM(shipped_units) as total_count
FROM shipped_products
GROUP BY store_id, product_type WITH CUBE
""").show()
Output: (The N elements of a CUBE specification correspond to 2^N GROUPING SETS.)
+--------+------------+-----------+
|store_id|product_type|total_count|
+--------+------------+-----------+
| null| Electronics| 6200|
| null| null| 14900|
| null| Footwear| 1200|
| 102| null| 7000|
| 101| Electronics| 3200|
| null| Clothing| 7500|
| 102| Electronics| 3000|
| 101| null| 7900|
| 101| Clothing| 3500|
| 102| Clothing| 4000|
| 101| Footwear| 1200|
+--------+------------+-----------+
Explanation:
- (product_type) adds totals per product type across all stores (e.g., 7500 for Clothing).
- This is equivalent to
GROUPING SETS ((store_id, product_type), (store_id), (product_type), ())
.
Choosing the Right Tool
- GROUPING SETS: Perfect for custom, specific groupings when you don’t need a full hierarchy or all combinations.
- ROLLUP: Best for hierarchical aggregations, like subtotals by store, then grand totals.
- CUBE: Ideal for multi-dimensional analysis, giving you every possible aggregation.
Conclusion
In data analytics, especially within fields like retail and financial technology, swiftly consolidating vast datasets is essential for uncovering valuable insights.
Spark SQL’s powerful GROUP BY extensions, such as GROUPING SETS, ROLLUP, and CUBE, will simplify data aggregation by enabling complex, multi-tier summaries within a single query, surpassing the constraints of basic GROUP BY operations. Through a retail inventory case study, the article showcases how these extensions efficiently calculate metrics like store-product totals, store-wide summaries, and overall aggregates.
Opinions expressed by DZone contributors are their own.
Comments