Setting Up DBT and Snowpark for Machine Learning Pipelines
This guide describes how to install and set up DBT and Snowpark for machine-learning model pipelines to streamline data processing and feature engineering tasks.
Join the DZone community and get the full member experience.
Join For FreeAI/ML workflows excel on structured, reliable data pipelines. To streamline these processes, DBT and Snowpark offer complementary capabilities: DBT is for modular SQL transformations, and Snowpark is for programmatic Python-driven feature engineering.
Here are some key benefits of using DBT, Snowpark, and Snowflake together:
- Simplifies SQL-based ETL with DBT’s modularity and tests.
- Handles complex computations with Snowpark’s Python UDFs.
- Leverages Snowflake’s high-performance engine for large-scale data processing.
Here’s a step-by-step guide to installing, configuring, and integrating DBT and Snowpark into your workflows.
Step 1: Install DBT
In Shell, you can use Python’s pip command for installing packages. Assuming Python is already installed and added to your PATH, follow these steps:
# Set up a Python virtual environment (recommended):
python3 -m venv dbt_env
source dbt_env/bin/activate
# Install DBT and the Snowflake adapter:
pip install dbt-snowflake
# Verify DBT installation
dbt --version
Step 2: Install Snowpark
# Install Snowpark for Python
pip install snowflake-snowpark-python
# Install additional libraries for data manipulation
pip install pandas numpy
# Verify Snowpark installation
python -c "from snowflake.snowpark import Session; print('successful Snowpark installation')"
Step 3: Configuring DBT for Snowflake
DBT requires a profiles.yml
file to define connection settings for Snowflake.
Locate the DBT Profiles Directory
By default, DBT expects the profiles.yml
file in the ~/.dbt/
directory. Create the directory if it doesn’t exist:
mkdir -p ~/.dbt
Create the profiles.yml File
Define your Snowflake credentials in the following format:
my_project:
outputs:
dev:
type: snowflake
account: your_account_identifier
user: your_username
password: your_password
role: your_role
database: your_database
warehouse: your_warehouse
schema: your_schema
target: dev
Replace placeholders like your_account_identifier
with your Snowflake account details.
Test the Connection
Run the following command to validate your configuration:
dbt debug
If the setup is correct, you’ll see a success message confirming the connection.
Step 4: Setting Up Snowpark
Ensure Snowflake Permissions
Before using Snowpark, ensure your Snowflake user has the following permissions:
- Access to the warehouse and schema.
- Ability to create and register UDFs (User-Defined Functions).
Create a Snowpark Session
Set up a Snowpark session using the same credentials from profiles.yml
:
from snowflake.snowpark import Session
def create_session():
connection_params = {
"account": "your_account_identifier",
"user": "your_username",
"password": "your_password",
"role": "your_role",
"database": "your_database",
"warehouse": "your_warehouse",
"schema": "your_schema",
}
return Session.builder.configs(connection_params).create()
session = create_session()
print("Snowpark session created successfully")
Register a Sample UDF
Here’s an example of registering a simple Snowpark UDF for text processing:
def clean_text(input_text):
return input_text.strip().lower()
session.udf.register(
func=clean_text,
name="clean_text_udf",
input_types=["string"],
return_type="string",
is_permanent=True
)
print("UDF registered successfully")
Step 5: Integrating DBT With Snowpark
You have a DBT model named raw_table
that contains raw data.
raw_table DBT Model Definition
-- models/raw_table.sql
SELECT *
FROM my_database.my_schema.source_table
Use Snowpark UDFs in DBT Models
Once you’ve registered a UDF in Snowflake using Snowpark, you can call it directly from your DBT models.
-- models/processed_data.sql
WITH raw_data AS (
SELECT id, text_column
FROM {{ ref('raw_table') }}
),
cleaned_data AS (
SELECT
id,
clean_text_udf(text_column) AS cleaned_text
FROM raw_data
)
SELECT * FROM cleaned_data;
Run DBT Models
Execute your DBT models to apply the transformation:
dbt run --select processed_data
Step 6: Advanced AI/ML Use Case
For AI/ML workflows, Snowpark can handle tasks like feature engineering directly in Snowflake. Here’s an example of calculating text embeddings:
Create an Embedding UDF
Using Python and a pre-trained model, you can generate text embeddings:
from transformers import pipeline
def generate_embeddings(text):
model = pipeline("feature-extraction", model="bert-base-uncased")
return model(text)[0]
session.udf.register(
func=generate_embeddings,
name="generate_embeddings_udf",
input_types=["string"],
return_type="array",
is_permanent=True
)
Integrate UDF in DBT
Call the embedding UDF in a DBT model to create features for ML:
-- models/embedding_data.sql
WITH raw_text AS (
SELECT id, text_column
FROM {{ ref('raw_table') }}
),
embedded_text AS (
SELECT
id,
generate_embeddings_udf(text_column) AS embeddings
FROM raw_text
)
SELECT * FROM embedded_text;
Best Practices
- Use DBT for reusable transformations: Break down complex SQL logic into reusable models.
- Optimize Snowpark UDFs: Write lightweight, efficient UDFs to minimize resource usage.
- Test Your Data: Leverage DBT’s testing framework for data quality.
- Version Control Everything: Track changes in DBT models and Snowpark scripts for traceability.
Conclusion
By combining DBT’s SQL-based data transformations with Snowpark’s advanced programming capabilities, you can build AI/ML pipelines that are both scalable and efficient. This setup allows teams to collaborate effectively while leveraging Snowflake’s computational power to process large datasets.
Whether you’re cleaning data, engineering features, or preparing datasets for ML models, the DBT-Snowpark integration provides a seamless workflow to unlock your data’s full potential.
Opinions expressed by DZone contributors are their own.
Comments