DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Modernizing Cloud Data Automation for Faster Insights
  • From ETL to ELT to Real-Time: Modern Data Engineering with Databricks Lakehouse
  • Optimizing Your Data Pipeline: Choosing the Right Approach for Efficient Data Handling and Transformation Through ETL and ELT
  • Are Your ELT Tools Ready for Medallion Data Architecture?

Trending

  • RAG Done Right: When to Use SQL, Search, and Vector Retrieval and How To Combine Them
  • Why SAP S/4HANA Landscape Design Impacts Cloud TCO More Than Compute Costs
  • Bridging Gaps in SOC Maturity Using Detection Engineering and Automation
  • AWS Managed Database Observability: Monitoring DynamoDB, ElastiCache, and Redshift Beyond CloudWatch
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Automating Data Pipelines With Snowflake: Leveraging DBT and Airflow Orchestration Frameworks for ETL/ELT Processes

Automating Data Pipelines With Snowflake: Leveraging DBT and Airflow Orchestration Frameworks for ETL/ELT Processes

This article explores the automation of data pipelines using Snowflake, dbt, and Airflow, detailing best practices for efficient data processing and orchestration.

By 
Harshavardhan Yedla user avatar
Harshavardhan Yedla
·
Sep. 25, 24 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
5.4K Views

Join the DZone community and get the full member experience.

Join For Free

In the era of digitization and data landscape, automating data pipelines is crucial for enhanced efficiency, consistency, and scalability of the lake house. Snowflake is a leading cloud data platform that integrates seamlessly with various tools to facilitate the automation of ETL (Extract, Transform, Load) and ELT (Extract, Load, Transform) processes. 

This article delves into automating data pipelines with Snowflake by leveraging dbt (data build tool) and orchestration frameworks and the best practices for streamlining data workflows to ensure reliable data processing.

What Is dbt?

The tool, dbt stands for "data build tool." It's a command-line tool used in data engineering to build and manage data transformation workflows. 

It can turn raw data into a more structured, organized form of data by defining, running, and documenting SQL-based transformations.

dbt facilitates in:

  1. Write SQL: Create models (SQL files) that define how raw data should be transformed.
  2. Test: Implement tests to ensure data quality and integrity.
  3. Document: Document your data models and transformations, which makes it easier for teams to collaborate easily.
  4. Schedule: Automate and schedule data transformations using schedulers like Airflow.

It's often used with data warehouses like Snowflake, Big Query, or Redshift. You can create a more organized and maintainable data pipeline using dbt.

What Is Airflow?

Apache Airflow is an open-source platform used to programmatically author, schedule, and monitor workflows. It is designed to manage complex data pipelines and workflows, which makes it easier to orchestrate and automate tasks.

Key Components of Airflow

  1. Directed Acyclic Graphs (DAGs): Workflows in Airflow are defined as Directed Acyclic Graphs (DAGs), which are a series of tasks with dependencies. Each task represents a unit of work, and the DAG defines the sequence in which tasks should be executed.
  2. Task Scheduling: Airflow allows you to schedule tasks to run at specific time intervals, enabling you to automate repetitive processes.
  3. Task Monitoring: It provides a web-based interface where you can monitor the progress of your workflows, check logs, and view the status of each task.
  4. Extensibility: Airflow supports custom operators and hooks, allowing you to integrate various systems and services. It has a rich ecosystem of plugins and extensions to delve into different use cases.
  5. Scalability: It’s designed to scale with your needs. You can run Airflow on a single machine or deploy it across a cluster to handle larger workloads.
  6. Dynamic Pipeline Generation: Pipelines in Airflow can be generated dynamically, which is useful for creating complex workflows that can change based on input parameters or conditions.

Airflow is used in data engineering to manage ETL (Extract, Transform, Load) processes and its flexibility allows it to handle a wide range of workflow automation tasks beyond data processing.

Flow Diagram:

Data pipeline on Snowflake DB with dbt and Airflow Orchestrator

    Figure-1: Data pipeline on Snowflake DB with dbt and Airflow Orchestrator 

Data Ingestion

dbt can’t handle extraction activities and should be used with other tools to extract data from sources. 

Below is a Python wrapper that extracts data from an S3 bucket and integrates with Snowflake.

Python Script for Data Extraction:

Python
 
import pandas as pd
from snowflake.connector import connect
# Extract data from an external source (e.g., a CSV file on S3)
def extract_data():
    import boto3
    s3 = boto3.client('s3')
    bucket_name = 'your_bucket'
    file_key = 'data/transactions.csv'
    response = s3.get_object(Bucket=bucket_name, Key=file_key)
    df = pd.read_csv(response['Body'])
    return df

# Load data into Snowflake
def load_data_into_snowflake(df):
    conn = connect(
        user='your_username',
        password='your_password',
        account='your_snowflake_account',
        warehouse='your_warehouse',
        database='your_database',
        schema='your_schema'
    )
    cursor = conn.cursor()

    # Create or replace a stage to load the data
    cursor.execute("CREATE OR REPLACE STAGE my_stage")
    
    # Write DataFrame to a temporary CSV file
    df.to_csv('/tmp/transactions.csv', index=False)

    # Upload the file to Snowflake stage
    cursor.execute(f"PUT file:///tmp/transactions.csv @my_stage")
    
    # Copy data into a Snowflake table
    cursor.execute("""
    COPY INTO my_table
    FROM @my_stage/transactions.csv
    FILE_FORMAT = (TYPE = 'CSV', FIELD_OPTIONALLY_ENCLOSED_BY = '"')
    """)
    conn.close()

df = extract_data()
load_data_into_snowflake(df)


Transform With dbt

Once the data is loaded into Snowflake, the dbt engine can be triggered to perform transformations. 

Aggregating Sales Data

Create a dbt model to aggregate sales data by product category.

File: models/aggregate_sales.sql

SQL
 
WITH sales_data AS (
    SELECT
        product_category,
        SUM(sales_amount) AS total_sales,
        COUNT(*) AS total_orders
    FROM {{ ref('raw_sales') }}
    GROUP BY product_category
)

SELECT
    product_category,
    total_sales,
    total_orders,
    CASE
        WHEN total_sales > 100000 THEN 'High'
        WHEN total_sales BETWEEN 50000 AND 100000 THEN 'Medium'
        ELSE 'Low'
    END AS sales_category
FROM sales_data


Data Quality Testing

dbt has a framework to test the quality of the dataset that is processing and also ensures the data’s freshness.

File: tests/test_null_values.sql

SQL
 
SELECT *
FROM {{ ref('raw_sales') }}
WHERE sales_amount IS NULL


Calculating Metrics

You could model your dataset and calculate your monthly revenue trends using dbt.

File: models/monthly_revenue.sql

SQL
 
WITH revenue_data AS (
    SELECT
        EXTRACT(YEAR FROM order_date) AS year,
        EXTRACT(MONTH FROM order_date) AS month,
        SUM(sales_amount) AS total_revenue
    FROM {{ ref('raw_sales') }}
    GROUP BY year, month
)

SELECT
    year,
    month,
    total_revenue,
    LAG(total_revenue) OVER (PARTITION BY year ORDER BY month) AS previous_month_revenue
FROM revenue_data


Load Data to Snowflake

dbt does not handle the actual loading of raw data into Snowflake and can handle transforming and modeling the data once it is loaded into the warehouse. For a typical load activity, we have to use an orchestrator to load and integrate with your data model.

Loading Transformed Data

File: models/production_load.sql

SQL
 
-- Load transformed data into a production table

CREATE OR REPLACE TABLE production_sales_data AS
SELECT *
FROM {{ ref('monthly_revenue') }}


Orchestrate Pipeline Using Airflow

With Airflow to orchestrate your ETL pipeline, you can define a DAG to execute dbt models as part of the transformation:


Python
 
from airflow import DAG
from airflow.operators.docker_operator import DockerOperator
from airflow.operators.python_operator import PythonOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
}

dag = DAG(
    'example_etl_with_dbt',
    default_args=default_args,
    description='ETL pipeline with dbt transformations',
    schedule_interval='@daily',
)

def extract_data():
    import boto3
    import pandas as pd

    s3 = boto3.client('s3')
    bucket_name = 'your_bucket'
    file_key = 'data/transactions.csv'
    response = s3.get_object(Bucket=bucket_name, Key=file_key)
    df = pd.read_csv(response['Body'])
    df.to_csv('/tmp/transactions.csv', index=False)

def load_data_into_snowflake():
    from snowflake.connector import connect

    conn = connect(
        user='your_username',
        password='your_password',
        account='your_snowflake_account',
        warehouse='your_warehouse',
        database='your_database',
        schema='your_schema'
    )
    cursor = conn.cursor()
    cursor.execute("CREATE OR REPLACE STAGE my_stage")
    cursor.execute("PUT file:///tmp/transactions.csv @my_stage")
    cursor.execute("""
    COPY INTO raw_sales
    FROM @my_stage/transactions.csv
    FILE_FORMAT = (TYPE = 'CSV', FIELD_OPTIONALLY_ENCLOSED_BY = '"')
    """)
    conn.close()

def run_dbt_models():
    import subprocess
    subprocess.run(["dbt", "run"], check=True)

extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract_data,
    dag=dag,
)

load_task = PythonOperator(
    task_id='load',
    python_callable=load_data_into_snowflake,
    dag=dag,
)

transform_task = PythonOperator(
    task_id='transform',
    python_callable=run_dbt_models,
    dag=dag,
)

extract_task >> load_task >> transform_task


Deploy and Test the Data Pipeline

Deploying and testing the data pipelines with Snowflake (DWH), dbt (data transformer), and Airflow (Orchestrator) require the following steps:

Set Up Your Environment

  • Snowflake Account: Create the account, databases, schemas, stages, tables, objects, etc. required for the transformation journey with the required permissions. 
  • dbt Installed: You should have dbt configured to connect to Snowflake.
  • Orchestration Tool: Apache Airflow tool should be installed and configured.

Deploy the Data Pipeline

Step 1: Prepare Your dbt Project

Initialize dbt Project.

Bash:

 
dbt init my_project
cd my_project


Update the profiles.yaml file with Snowflake connection details. 

YAML
 
my_project:
  target: dev
  outputs:
    dev:
      type: snowflake
      account: your_snowflake_account
      user: your_username
      password: your_password
      role: your_role
      warehouse: your_warehouse
      database: your_database
      schema: your_schema
      threads: 4


Define your models in the model's directory,

 e.g., models/aggregate_sales.sql , models/monthly_revenue.sql , etc.

Then, run dbt models locally to ensure it is working as expected before deploying:

Bash:

 
dbt run
dbt test


Step 2: Configure Apache Airflow

Install Apache Airflow and initialize the database:

Bash:

 
pip install apache-airflow
airflow db init


Define an Airflow DAG to orchestrate the ETL pipeline. Deploy the earlier code dags/Snowflake_Transformation_etl_dag.py:

Step 3. Start Airflow Services

Start the Airflow web server and scheduler:

Bash:

 
airflow webserver --port 8080
airflow scheduler


Test the Pipeline End to End

Testing is key to ensure your pipeline works correctly from extraction to loading.

Unit Tests: Test your extraction scripts independently. Verify that data is correctly extracted from the source and loaded into Snowflake.

Python
 
# Test extraction function
def test_extract_data():
    df = extract_data()
    assert df is not None
    assert len(df) > 0


Integration Tests: Run the complete pipeline from extraction through to the loading phase in a test environment to validate the entire workflow.

Testing Transformation

dbt Tests: Use dbt’s built-in testing features to ensure data quality and consistency.

dbt test


Validate Models: Query the resulting tables in Snowflake to ensure transformations are applied correctly.

SQL
 
SELECT * FROM production_sales_data;


Testing Loading

  1. Verify Data Load: Check the target tables in Snowflake to ensure data is loaded as expected.
  2. Data Quality Checks: Perform checks on the loaded data to validate that it matches the expected results.
SQL
 
SELECT COUNT(*) FROM raw_sales;


Conclusion

By combining dbt’s powerful transformation capabilities with Snowflake’s scalable data platform and orchestration tools like Airflow, organizations can build robust and automated data pipelines. 

Apache Airflow Extract, load, transform Extract, transform, load Data (computing) data pipeline

Opinions expressed by DZone contributors are their own.

Related

  • Modernizing Cloud Data Automation for Faster Insights
  • From ETL to ELT to Real-Time: Modern Data Engineering with Databricks Lakehouse
  • Optimizing Your Data Pipeline: Choosing the Right Approach for Efficient Data Handling and Transformation Through ETL and ELT
  • Are Your ELT Tools Ready for Medallion Data Architecture?

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook