DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Transforming Data Into JSON Structure With Spark: API-Based and SQL-Based Approaches
  • The Complete Tutorial on the Top 5 Ways to Query Your Relational Database in JavaScript - Part 2
  • Apache Spark for the Impatient
  • The Full-Stack Developer's Blind Spot: Why Data Cleansing Shouldn't Be an Afterthought

Trending

  • AI-Driven Root Cause Analysis in SRE: Enhancing Incident Resolution
  • How to Build Real-Time BI Systems: Architecture, Code, and Best Practices
  • Customer 360: Fraud Detection in Fintech With PySpark and ML
  • Mastering Advanced Aggregations in Spark SQL
  1. DZone
  2. Data Engineering
  3. Data
  4. cleanframes: A Data Cleansing Library for Apache Spark!

cleanframes: A Data Cleansing Library for Apache Spark!

A developer discusses how to use an open source, Scala-based library that can help take some of the boiler plate code out of performing data cleansing.

By 
Dawid Rutowicz user avatar
Dawid Rutowicz
·
Updated Jul. 02, 19 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
13.0K Views

Join the DZone community and get the full member experience.

Join For Free

In the last decade, computer engineering has exploded with solutions based on cloud computing, distributed systems, big data, streaming or artificial intelligence. Today's world systems are data-intensive so that volumes become higher and cheaper. Shortly speaking, data is at the centre of our interests. One of issues in these area is a data quality — why is it so important?

Nowadays, data is everywhere and drives companies and their operations. If we do not make decisions based on reliable or complete information, it might have a huge impact on these and end up with tragic consequences. The data's correctness and prominence reserves a special discipline, known as a data cleansing, which is focused on removing or correcting coarse records.

Introduction

cleanframes is a small library for Apache Spark to make data cleansing automated and enjoyable.

Let's walk through a problem, step by step, to examine what it takes to make data quality as awesome and reliable as possible! 

Let's introduce a sample data and, for a simplicity-sake, make it a small set:

col1,col2,col3
1,true,1.0
lmfao,true,2.0
3,false,3.0
4,true,yolo data
5,true,5.0

And a domain model that, for its representation, is defined as:

case class Example(col1: Option[Int], col2: Option[Boolean], col3: Option[Float])

If you are wondering why there are options across the case class the answer is simple: Scala primitives are values and cannot be nulls. Options liberate us by setting the dataframe schema columns' nullability to true.

Let's load the data and check what Spark will do about it:

val frame = spark
             .read
             .option("header", "true")
             .schema(Encoders.product[Example].schema)
             .csv("""some/path/to/source.csv""")

val collect = frame
              .as[Example]
              .collect

The  collect variable contains the following result:

Example(Some(1),  Some(true),   Some(1.0f)),
Example(None,     None,         None),
Example(Some(3),  Some(false),  Some(3.0f)),
Example(None,     None,         None),
Example(Some(5),  Some(true),   Some(5.0f))

As you can see, due to malformed values, the second and fourth lines don't retain the correct data and reject entire rows. We would be much better off if it would be possible to ignore only invalid cells and save the correct ones.

Our goal is to represent input data as follows:

Example(Some(1),  Some(true),   Some(1.0f)),
Example(None,     Some(true),   Some(2.0f)),
Example(Some(3),  Some(false),  Some(3.0f)),
Example(Some(4),  Some(true),   None),
Example(Some(5),  Some(true),   Some(5.0f))

How can we achieve this?

Pure Spark SQL API

Apache Spark comes with set of APIs to work with dataframes that could be used for graceful parsing:

val result: DataFrame = frame
  .withColumn("col1",
    when(not(frame.col("col1").isNaN),
    frame.col("col1")
  ) cast IntegerType
 ).withColumn("col2",
   when(trim(lower(frame.col("col2"))) === "true",
   lit(true) cast BooleanType
  ).otherwise(false)
 ).withColumn("col3",
   when(not(frame.col("col3").isNaN),
   frame.col("col3")
 ) cast FloatType)

These lines do just what we need, however, this is so much code to do a simple thing (there are only three columns to be taken care of!). Typically, there are a lot more columns to deal with, which only means the boilerplate code grows linearly.

Can We Do Better?

Well, of course we can! You could always write helper methods and re-use them in the project just like this:

val result: DataFrame = cleanIntegerCol(
    cleanBooleanCol(
      cleanFloatCol(frame, "col3"),
      "col2"),
    "col1")

  def cleanIntegerCol(frame: DataFrame, colName: String): DataFrame =
    numericHelper(frame, colName, IntegerType)

  def cleanFloatCol(frame: DataFrame, colName: String): DataFrame =
    numericHelper(frame, colName, FloatType)

  def numericHelper(frame: DataFrame, colName: String, dataType: NumericType): DataFrame = {
    frame.withColumn(colName,
      when(not(frame.col(colName).isNaN),
        frame.col(colName)
      ) cast dataType
    )
  }

  def cleanBooleanCol(frame: DataFrame, colName: String): DataFrame = {
    frame.withColumn(colName,
      when(
        trim(lower(frame.col(colName))) === "true",
        lit(true) cast BooleanType
      ).otherwise(false)
    )
  }

The question is, is this good enough? We have achieved some re-usability but still, did we give it our best shot? Frankly speaking, the chain of these calls are awful and error-prone. They could be hidden with some implicit case class to imitate some DSL:

frame
    .cleanInteger("col1")
    .cleanBoolean("col2")
    .cleanFloat("col3")

There is some progress noticed, nonetheless, it's actually more of a visual progress. We need to do it manually and repetitively throughout our code base. There is some room left for improvement.

cleanframes is a library that aims to automate data cleansing in Spark SQL with help of generic programming. Just add two imports and call the clean method:

import cleanframes.syntax._
import cleanframes.instances.all._

frame.clean[Example]

This code is equivalent to the code listed in the "Pure Spark SQL API" section. Due to this, there is no performance difference between the two.

What the Heck Just Happened?

The clean method expands code through implicit resolutions based on a case class's elements. The Scala compiler applies a specific method to a corresponding element's type. cleanframes comes with predefined implementations that are available via a simple library import (see, line 2). I recommend you to check their source code on GitHub.

It's worth mentioning that these transformations are used from the Spark SQL functions package so they are catalyst-friendly.

The full solution code is as follows:

val frame = spark
      .read
      .option("header", "true")
      .csv("""some/path/to/source.csv""")

import cleanframes.syntax._
import cleanframes.instances.all._

val cleaned = frame.clean[Example]

And we're ready to go with transformations! 

The project source code is here: https://github.com/funkyminds/cleanframes

Please refer to link above to find Maven or sbt coordinates.

What Are the Pros and Cons?

It's time to examine advantages and disadvantages of the library.

Pros:

  • boilerplate is done for you by the compiler.
  • common project transformations can be defined in one place and reused.
  • no additional overhead compared to same manual code.
  • changes in case classes are automatically reflected and applied in transformations during compilation.

Cons:

  • implicits

Scala-based libraries have the advantage over Java counterparts that they are not based on reflection APIs but a type classes approach. It brings code safety during compilation time and reduces computation overhead.

Not everyone likes implicits, but, this is a straightforward case where all you do is parse simple types! There is no endo-mumbo-jumbo-functors in this scope.

Don't freak out, be reasonable!

Summary

This article gave only an introduction and a simple demo of cleanframes. The library is capable of doing much more.

In following articles, I'll explain features such as:

  • overriding a custom transformation.
  • adding support for new types.
  • defining transformation using Spark UDFs.
  • transforming nested case classes.
  • handling case classes with the same types.

"cleanframes - data cleansing library for Apache Spark! (part 2)" presents all of them one by one with code examples and explanations.

I hope that cleanframes can help you with tedious but necessary work and cut out some boilerplate code. The project has been recently released on the Apache license and I would appreciate contributions of all kind: feature proposals, issues, pull requests, you name it!

And of course: give it a try!

Project source code:

https://github.com/funkyminds/cleanframes

Data (computing) Apache Spark Data cleansing Library sql

Published at DZone with permission of Dawid Rutowicz. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Transforming Data Into JSON Structure With Spark: API-Based and SQL-Based Approaches
  • The Complete Tutorial on the Top 5 Ways to Query Your Relational Database in JavaScript - Part 2
  • Apache Spark for the Impatient
  • The Full-Stack Developer's Blind Spot: Why Data Cleansing Shouldn't Be an Afterthought

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!