Declarative Pipelines in Apache Spark 4.0
Apache Spark's declarative pipelines let you define your entire data pipeline's desired outcome, and Spark handles the execution details.
Join the DZone community and get the full member experience.
Join For FreeThe landscape of big data processing is constantly evolving, with data engineers and data scientists continually seeking more efficient and intuitive ways to manage complex data workflows. While Apache Spark has long been the cornerstone for large-scale data processing, the construction and maintenance of intricate data pipelines can still present significant operational overhead. Databricks, a key contributor to Apache Spark 4.0, recently addressed this challenge head-on by open-sourcing its core declarative ETL framework. This new framework extends the benefits of declarative programming from individual queries to entire data pipelines, offering a compelling approach for building robust and maintainable data solutions.
The Shift From Imperative to Declarative: A Paradigm for Simplification
For years, data professionals have leveraged Spark's powerful APIs (Scala, Python, SQL) to imperatively define data transformations. In an imperative model, you explicitly dictate how each step of your data processing should occur.
For example, a typical ETL (Extract, Transform, Load) pipeline might involve a series of detailed instructions:
# Imperative Spark Example (Conceptual)
df_sales = spark.read.format("csv").load("s3://raw-data/sales.csv")
df_products = spark.read.format("json").load("s3://raw-data/products.json")
# Join sales and products
df_joined = df_sales.join(df_products, "product_id")
# Aggregate sales by product category
df_aggregated = df_joined.groupBy("product_category").agg(sum("amount").alias("total_sales"))
# Write to a Delta table
df_aggregated.write.format("delta").mode("overwrite").save("s3://curated-data/product_sales_summary")
Declarative programming, conversely, focuses on what you want to achieve, allowing the underlying system to determine the most efficient how. SQL is a prime example of declarative programming for data. When you write SELECT customer_name, SUM(order_total) FROM orders GROUP BY customer_name;, you express your desired outcome without specifying the join algorithm, aggregation strategy, or execution plan.
Apache Spark Declarative Pipelines extends this powerful paradigm to the entire data pipeline. Instead of writing verbose imperative code for each step, users can define high-level specifications for their data flows. Spark then intelligently interprets these specifications, constructing an optimized execution graph and handling the intricate details of data movement, transformation, and persistence.
Core Concepts of Apache Spark Declarative Pipelines
Apache Spark Declarative Pipelines introduces several key abstractions that simplify pipeline development, primarily defined through SQL or Python APIs:
- Pipelines: A pipeline is the fundamental deployment and execution unit. It encapsulates a collection of tables (Streaming Tables, Materialized Views) and the transformations (Flows) that define how data moves and transforms from source to final target datasets. A pipeline defines the entire directed acyclic graph (DAG) of data dependencies.
- Flows (Underlying the Declarations): While not always explicitly declared as a separate "FLOW" keyword by the end-user, the concept of a flow represents the continuous or batch processing logic that populates a
STREAMING TABLEorMATERIALIZED VIEW. The system categorizes these as batch or streaming based on the upstream dependencies.- Batch Flows (Implicit in Materialized Views/Tables based on batch sources): These are inherently utilized when your target is a
MATERIALIZED VIEWor a standard table populated from batch data sources (e.g., historical files, static datasets). The pipeline orchestrator determines the most efficient way to refresh these tables, often by processing a complete snapshot or intelligently identifying changes in upstream batch dependencies. - Streaming Flows (Implicit in Streaming Tables or Materialized Views based on streaming sources): These are activated when you declare a streaming target that depends on a streaming source (like a message queue or a continuously arriving file stream). The pipeline orchestrator ensures continuous, incremental processing of new records as they arrive, maintaining low latency for real-time analytics.
- Batch Flows (Implicit in Materialized Views/Tables based on batch sources): These are inherently utilized when your target is a
- Streaming Tables: A Streaming Table is a Delta table specifically designed for continuous, incremental data processing. It's ideal for ingesting new, append-only data sources with low latency. When a streaming table is updated, it only processes the new data that has arrived since the last update, making it highly efficient for real-time ingestion and transformations.
- Materialized Views: Similar to traditional materialized views, these are precomputed tables that store the results of a query. However, in Declarative Pipelines, materialized views are designed to be incrementally updated whenever their underlying source data changes, significantly improving query performance for complex aggregations or joins without recomputing the entire dataset.
Example
In this section, we will create a very simple pipeline using Apache Spark's declarative framework. To begin, please download and install Apache Spark 4.0 .
In this section, we will use a newly added script "spark-pipelines" in $SPARK_HOME/bin directory.
Step-1 Create Sample Pipeline Project
Use spark-pipelines command to initialize a sample project with name "helloworld_app"
$spark-pipelines init --name helloworld_app
The console will display the following result:
Pipeline project 'helloworld_app' created successfully. To run your pipeline:
cd 'helloworld_app'
spark-pipelines run
Step- 2 Examine the Project Files
Here are the contents of the sample project generated in above step:
$ tree
.
├── pipeline.yml
└── transformations
├── example_python_materialized_view.py
└── example_sql_materialized_view.sql
Let's try to understand each file.
pipeline.yml: This is main project file which contains all the definitions of pipeline. In this project, we have pipeline artifacts defined in transformations directory in both sql and python files.
pipeline.yml file should look like the following:
$ cat pipeline.yml
definitions:
- glob:
include: transformations/**/*.py
- glob:
include: transformations/**/*.sql
In addition to definitions shown above, users can also add additional elements in their spec as listed here:
- catalog: The default catalog to use for the pipeline.
- database: The default database to use for the pipeline.
- configuration: A dictionary of Spark configuration properties to set for the pipeline.
- definitions: A list of glob patterns for finding pipeline definitions files.
example_python_materialized_view.py : In this python script, we will define a materialized view using pipelines package and a python decorator @sdp.materialized_view. The view definition is invoking spark's range function using spark.range(10), which will create a dataset with a single bigint column "id" having elements in a range from 0 to 10(exclusive) with a step value 1.
from pyspark import pipelines as sdp
from pyspark.sql import DataFrame, SparkSession
spark = SparkSession.active()
@sdp.materialized_view
def example_python_materialized_view() -> DataFrame:
return spark.range(10)
example_sql_materialized_view.py : In this part of pipeline, we are taking input from the view defined in python `example_python_materialized_view` as input and filter out all the odd numbers and save only even numbers in a new materialized view. Here is the definition of the view in SQL:
CREATE MATERIALIZED VIEW example_sql_materialized_view AS
SELECT id FROM example_python_materialized_view
WHERE id % 2 = 0
Step-3 Run the Pipeline
Run the pipeline using the command "spark-pipelines run". This command will locate the pipeline.yml file in your current working directory and process various pipeline constructs.
cd helloworld_app
#Please ensure that command spark-pipelines is in your PATH
$spark-pipelines run
Here is the output:
Loading pipeline spec from /Users/spark.user/apachespark4/spark/bin/helloworld_app/pipeline.yml...
Creating Spark session...
Creating dataflow graph...
Registering graph elements...
Loading definitions. Root directory: '/Users/spark.user/apachespark4/spark/bin/helloworld_app'.
Found 1 files matching glob 'transformations/**/*.py'
Importing /Users/spark.user/apachespark4/spark/bin/helloworld_app/transformations/example_python_materialized_view.py...
Found 1 files matching glob 'transformations/**/*.sql'
Registering SQL file /Users/spark.user/apachespark4/spark/bin/helloworld_app/transformations/example_sql_materialized_view.sql...
Starting run...
Flow spark_catalog.default.example_python_materialized_view is QUEUED.
Flow spark_catalog.default.example_sql_materialized_view is QUEUED.
Flow spark_catalog.default.example_python_materialized_view is PLANNING.
Flow spark_catalog.default.example_python_materialized_view is STARTING.
Flow spark_catalog.default.example_python_materialized_view is RUNNING.
Flow spark_catalog.default.example_python_materialized_view has COMPLETED.
Flow spark_catalog.default.example_sql_materialized_view is PLANNING.
Flow spark_catalog.default.example_sql_materialized_view is STARTING.
Flow spark_catalog.default.example_sql_materialized_view is RUNNING.
Flow spark_catalog.default.example_sql_materialized_view has COMPLETED.
Run is COMPLETED.
Step-4 Let's Examine the Output
As we earlier observed that our example pipeline is going to first create a materialized view example_python_materialized_view having a column "id" with values in range from 0-10(exclusive) and then create a materialized view example_sql_materialized_view by keeping only even values from previous view.
Here are the directories created to store the parquet files for both views after pipeline run is finished:
$ tree
.
├── example_python_materialized_view
│ ├── _SUCCESS
│ └── part-00000-78588af7-1dbd-4e4f-8e14-39fc6a464125-c000.snappy.parquet
└── example_sql_materialized_view
├── _SUCCESS
└── part-00000-1a53fa19-1809-4137-afe8-8f64441a72a8-c000.snappy.parquet
Here are the contents of view "example_python_materialized_view":
scala> spark.sql("""select * from parquet.`/Users/spark.user/apachespark4/spark/bin/helloworld_app/spark-warehouse/example_python_materialized_view/part-00000-78588af7-1dbd-4e4f-8e14-39fc6a464125-c000.snappy.parquet`""").show()
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+---+
Here are the contents of view "example_sql_materialized_view":
scala> spark.sql("""select * from parquet.`/Users/spark.user/apachespark4/spark/bin/helloworld_app/spark-warehouse/example_sql_materialized_view/part-00000-1a53fa19-1809-4137-afe8-8f64441a72a8-c000.snappy.parquet`""").show()
+---+
| id|
+---+
| 0|
| 2|
| 4|
| 6|
| 8|
+---+
Step-5 Final Observation
This framework lets you build pipelines by simply describing what you want your data to look like at each step, in whatever way makes sense to you. The best part? You don't have to write any code to actually run it! The framework smartly figures out all the connections and builds a super-efficient execution map for you. This means you get to spend your time defining your data and transformations, instead of wrestling with complex code to make sure your pipeline is fast, reliable, and solid.
What Declarative Pipelines Bring to the Table
This open-source initiative is more than just a new API; it represents a foundational shift in how data pipelines can be conceived and managed within the Spark ecosystem. Here are some of the key advantages it offers data engineers and data scientists:
- Simplified Pipeline Authoring: By expressing pipelines as a series of desired outcomes, the amount of boilerplate code can be significantly reduced. This allows teams to focus more on business logic and less on the mechanics of execution.
- Automated Optimization and Execution: One of the most compelling aspects is Spark's ability to optimize the entire pipeline end-to-end. Traditionally, Spark optimizes individual queries. With declarative pipelines, the framework gains a global view, enabling sophisticated optimizations like intelligent materialization, efficient dependency resolution, and dynamic resource allocation across the entire DAG.
- Improved Maintainability and Collaboration: Pipelines defined declaratively are often more readable and easier to understand, even for those new to the project. This fosters better collaboration across teams and reduces the cost of maintaining complex data infrastructure over time. Debugging also becomes more intuitive, as the intent of the pipeline is clearly articulated.
The source code is currently available within the Apache Spark repository at https://github.com/apache/spark/tree/master/sql/pipelines.
Conclusion: A Foundation for More Intelligent Data Engineering
Apache Spark Declarative Pipelines represents a notable advancement in simplifying the complexities of modern data engineering. By shifting the focus from imperative execution instructions to desired data outcomes through constructs like STREAMING TABLE and MATERIALIZED VIEW, it empowers data engineers and data scientists to build more robust, maintainable, and optimized data pipelines with greater ease. The implicit handling of underlying Batch Flows and Streaming Flows by the framework is key to its power. As this open-source initiative matures within the Apache Spark project, it is poised to redefine best practices for data processing, enabling organizations to unlock new levels of efficiency and derive greater value from their data assets.
Opinions expressed by DZone contributors are their own.
Comments