DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
The Latest "Software Integration: The Intersection of APIs, Microservices, and Cloud-Based Systems" Trend Report
Get the report
  1. DZone
  2. Data Engineering
  3. Data
  4. PySpark Data Pipeline To Cleanse, Transform, Partition, and Load Data Into Redshift Database Table

PySpark Data Pipeline To Cleanse, Transform, Partition, and Load Data Into Redshift Database Table

In this article, we will discuss how to create an optimized data pipeline using PySpark and load the data into a Redshift database table.

Amlan Patnaik user avatar by
Amlan Patnaik
·
Mar. 16, 23 · Tutorial
Like (1)
Save
Tweet
Share
1.67K Views

Join the DZone community and get the full member experience.

Join For Free

Data is the driving force behind many of today's businesses. With the ever-growing amounts of data available, businesses need to create optimized data pipelines that can handle large volumes of data in a reliable and efficient manner. In this article, we will discuss how to create an optimized data pipeline using PySpark and load the data into a Redshift database table. We will also cover data cleansing, transformation, partitioning, and data quality validation.

Before diving into the code, let's take a quick look at the tools we will be using:

  • PySpark: PySpark is a Python API for Apache Spark, an open-source distributed computing system. PySpark provides an interface for programming Spark with Python.
  • Redshift: Amazon Redshift is a fast, fully-managed, petabyte-scale data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing Business Intelligence (BI) tools.

With these tools in mind, let's start by defining the problem we want to solve.

Problem Definition

Suppose we have a large dataset containing information about customers and their orders. We want to load this dataset into a Redshift table and perform the following tasks:

  1. Data Cleansing: Remove any records with missing or invalid data.
  2. Data Transformation: Transform the data into a format suitable for Redshift.
  3. Partitioning: Partition the data into smaller subsets to improve query performance.
  4. Data Quality Validation: Validate the data quality of the dataset before loading it into Redshift.
  5. Loading Data into Redshift: Load the cleaned and transformed dataset into a Redshift table.

Now that we have defined the problem let's start building our optimized data pipeline. Here is the complete code to create an optimized data pipeline with data cleansing, transformation, partitioning, and data quality validation using PySpark and loading into the Redshift database table: 

Python
 
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when 

# create a spark session
spark = SparkSession.builder.appName("DataPipeline").getOrCreate() 

# load the data
data = spark.read.csv("customer_orders.csv", header=True, inferSchema=True) 

# drop records with missing or invalid data
data = data.dropna() 

# rename columns
data = data.withColumnRenamed("Customer ID", "customer_id") \
    .withColumnRenamed("Order ID", "order_id") \
    .withColumnRenamed("Order Date", "order_date") \
    .withColumnRenamed("Order Amount", "order_amount") 

# convert data types
data = data.withColumn("customer_id", col("customer_id").cast("int")) \
    .withColumn("order_id", col("order_id").cast("int")) \
    .withColumn("order_amount", col("order_amount").cast("double")) 

# partition data
data = data.repartition("customer_id") 

# data quality validation
if data.count() == 0:    
    print("Error: No data to process.")    
    exit() 

# load data into Redshift
data.write \
    .format("jdbc") \
    .option("url", "jdbc:redshift://redshift-cluster-1.cvboublcdews.us-west-2.redshift.amazonaws.com:5439/dev") \
    .option("dbtable", "customer_orders") \
    .option("user", "username") \
    .option("password", "password") \
    .option("aws_iam_role", "arn:aws:iam::0123456789012:role/myRedshiftRole") \
    .option("tempdir", "s3a://my-s3-bucket/temp/") \
    .mode("overwrite") \
    .save() 

# stop the spark session
spark.stop()


Now, let's explain each step of the code.

Step 1: Set Up PySpark and Redshift

We start by importing the necessary libraries and setting up PySpark.

We also import the col and when functions from pyspark.sql.functions library. These functions will be used later in the data transformation step.

Step 2: Load the Data

The next step is to load the data into PySpark.

We load the data from a CSV file using the read.csv() method. We also specify that the file has a header row and infer the schema from the data.

Step 3: Data Cleansing

The next step is to cleanse the data by removing any records with missing or invalid data.

We use the dropna() method to drop any records with missing or invalid data.

Step 4: Data Transformation and Data Quality Validation

The next step is to transform the data into a format suitable for Redshift. We will be renaming the columns to conform to Redshift naming conventions and converting the data types to match the Redshift table schema. Data is validated using the <dataframe>.count() to ensure that the dataframe is not empty before initiating the write to the Redshift database.

Step 5: Write to Redshift Database

Finally, we use data.write to write the data from the PySpark DataFrame to Redshift. We specify the Redshift connection properties such as the URL, user, password, IAM role, and temporary S3 directory where the data is staged before being loaded into Redshift. We also specify the table name and set the options to truncate the table before writing the data (i.e., delete all existing data) and overwrite mode (i.e., replace the table if it already exists). Lastly, we use spark.stop() to stop the SparkSession.

Data (computing) pyspark Redshift (theory)

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • The Path From APIs to Containers
  • Spring Boot, Quarkus, or Micronaut?
  • REST vs. Messaging for Microservices
  • HTTP vs Messaging for Microservices Communications

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: