DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Stop Loading Everything into Redshift: A Spectrum + Iceberg Pattern for Hybrid Analytics
  • Architecting Scalable JSON Pipelines: The Power of a Single PySpark Schema
  • Data Modeling and ETL Design Using AWS Services
  • Developing Metadata-Driven Data Engineering Pipelines Using Apache Spark and Python Dictionary

Trending

  • Managing, Updating, and Organizing Agent Skills
  • Spring AI Advisors: Chat Memory, Token Tracking, and Message Logging
  • A Deep Dive into Tracing Agentic Workflows (Part 2)
  • Engineering Closed-Loop Graph-RAG Systems, Part 3: Closing the Loop in Graph-RAG Systems
  1. DZone
  2. Data Engineering
  3. Data
  4. PySpark Data Pipeline To Cleanse, Transform, Partition, and Load Data Into Redshift Database Table

PySpark Data Pipeline To Cleanse, Transform, Partition, and Load Data Into Redshift Database Table

In this article, we will discuss how to create an optimized data pipeline using PySpark and load the data into a Redshift database table.

By 
Amlan Patnaik user avatar
Amlan Patnaik
·
Mar. 16, 23 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
7.6K Views

Join the DZone community and get the full member experience.

Join For Free

Data is the driving force behind many of today's businesses. With the ever-growing amounts of data available, businesses need to create optimized data pipelines that can handle large volumes of data in a reliable and efficient manner. In this article, we will discuss how to create an optimized data pipeline using PySpark and load the data into a Redshift database table. We will also cover data cleansing, transformation, partitioning, and data quality validation.

Before diving into the code, let's take a quick look at the tools we will be using:

  • PySpark: PySpark is a Python API for Apache Spark, an open-source distributed computing system. PySpark provides an interface for programming Spark with Python.
  • Redshift: Amazon Redshift is a fast, fully-managed, petabyte-scale data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing Business Intelligence (BI) tools.

With these tools in mind, let's start by defining the problem we want to solve.

Problem Definition

Suppose we have a large dataset containing information about customers and their orders. We want to load this dataset into a Redshift table and perform the following tasks:

  1. Data Cleansing: Remove any records with missing or invalid data.
  2. Data Transformation: Transform the data into a format suitable for Redshift.
  3. Partitioning: Partition the data into smaller subsets to improve query performance.
  4. Data Quality Validation: Validate the data quality of the dataset before loading it into Redshift.
  5. Loading Data into Redshift: Load the cleaned and transformed dataset into a Redshift table.

Now that we have defined the problem let's start building our optimized data pipeline. Here is the complete code to create an optimized data pipeline with data cleansing, transformation, partitioning, and data quality validation using PySpark and loading into the Redshift database table: 

Python
 
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when 

# create a spark session
spark = SparkSession.builder.appName("DataPipeline").getOrCreate() 

# load the data
data = spark.read.csv("customer_orders.csv", header=True, inferSchema=True) 

# drop records with missing or invalid data
data = data.dropna() 

# rename columns
data = data.withColumnRenamed("Customer ID", "customer_id") \
    .withColumnRenamed("Order ID", "order_id") \
    .withColumnRenamed("Order Date", "order_date") \
    .withColumnRenamed("Order Amount", "order_amount") 

# convert data types
data = data.withColumn("customer_id", col("customer_id").cast("int")) \
    .withColumn("order_id", col("order_id").cast("int")) \
    .withColumn("order_amount", col("order_amount").cast("double")) 

# partition data
data = data.repartition("customer_id") 

# data quality validation
if data.count() == 0:    
    print("Error: No data to process.")    
    exit() 

# load data into Redshift
data.write \
    .format("jdbc") \
    .option("url", "jdbc:redshift://redshift-cluster-1.cvboublcdews.us-west-2.redshift.amazonaws.com:5439/dev") \
    .option("dbtable", "customer_orders") \
    .option("user", "username") \
    .option("password", "password") \
    .option("aws_iam_role", "arn:aws:iam::0123456789012:role/myRedshiftRole") \
    .option("tempdir", "s3a://my-s3-bucket/temp/") \
    .mode("overwrite") \
    .save() 

# stop the spark session
spark.stop()


Now, let's explain each step of the code.

Step 1: Set Up PySpark and Redshift

We start by importing the necessary libraries and setting up PySpark.

We also import the col and when functions from pyspark.sql.functions library. These functions will be used later in the data transformation step.

Step 2: Load the Data

The next step is to load the data into PySpark.

We load the data from a CSV file using the read.csv() method. We also specify that the file has a header row and infer the schema from the data.

Step 3: Data Cleansing

The next step is to cleanse the data by removing any records with missing or invalid data.

We use the dropna() method to drop any records with missing or invalid data.

Step 4: Data Transformation and Data Quality Validation

The next step is to transform the data into a format suitable for Redshift. We will be renaming the columns to conform to Redshift naming conventions and converting the data types to match the Redshift table schema. Data is validated using the <dataframe>.count() to ensure that the dataframe is not empty before initiating the write to the Redshift database.

Step 5: Write to Redshift Database

Finally, we use data.write to write the data from the PySpark DataFrame to Redshift. We specify the Redshift connection properties such as the URL, user, password, IAM role, and temporary S3 directory where the data is staged before being loaded into Redshift. We also specify the table name and set the options to truncate the table before writing the data (i.e., delete all existing data) and overwrite mode (i.e., replace the table if it already exists). Lastly, we use spark.stop() to stop the SparkSession.

Data (computing) pyspark Redshift (theory)

Opinions expressed by DZone contributors are their own.

Related

  • Stop Loading Everything into Redshift: A Spectrum + Iceberg Pattern for Hybrid Analytics
  • Architecting Scalable JSON Pipelines: The Power of a Single PySpark Schema
  • Data Modeling and ETL Design Using AWS Services
  • Developing Metadata-Driven Data Engineering Pipelines Using Apache Spark and Python Dictionary

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook