Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

cleanframes: A Data Cleansing Library for Apache Spark!

DZone 's Guide to

cleanframes: A Data Cleansing Library for Apache Spark!

A developer discusses how to use an open source, Scala-based library that can help take some of the boiler plate code out of performing data cleansing.

· Big Data Zone ·
Free Resource

In the last decade, computer engineering has exploded with solutions based on cloud computing, distributed systems, big data, streaming or artificial intelligence. Today's world systems are data-intensive so that volumes become higher and cheaper. Shortly speaking, data is at the centre of our interests. One of issues in these area is a data quality — why is it so important?

Nowadays, data is everywhere and drives companies and their operations. If we do not make decisions based on reliable or complete information, it might have a huge impact on these and end up with tragic consequences. The data's correctness and prominence reserves a special discipline, known as a data cleansing, which is focused on removing or correcting coarse records.

Introduction

cleanframes is a small library for Apache Spark to make data cleansing automated and enjoyable.

Let's walk through a problem, step by step, to examine what it takes to make data quality as awesome and reliable as possible! 

Let's introduce a sample data and, for a simplicity-sake, make it a small set:

col1,col2,col3
1,true,1.0
lmfao,true,2.0
3,false,3.0
4,true,yolo data
5,true,5.0

And a domain model that, for its representation, is defined as:

case class Example(col1: Option[Int], col2: Option[Boolean], col3: Option[Float])

If you are wondering why there are options across the case class the answer is simple: Scala primitives are values and cannot be nulls. Options liberate us by setting the dataframe schema columns' nullability to true.

Let's load the data and check what Spark will do about it:

val frame = spark
             .read
             .option("header", "true")
             .schema(Encoders.product[Example].schema)
             .csv("""some/path/to/source.csv""")

val collect = frame
              .as[Example]
              .collect

The  collect variable contains the following result:

Example(Some(1),  Some(true),   Some(1.0f)),
Example(None,     None,         None),
Example(Some(3),  Some(false),  Some(3.0f)),
Example(None,     None,         None),
Example(Some(5),  Some(true),   Some(5.0f))

As you can see, due to malformed values, the second and fourth lines don't retain the correct data and reject entire rows. We would be much better off if it would be possible to ignore only invalid cells and save the correct ones.

Our goal is to represent input data as follows:

Example(Some(1),  Some(true),   Some(1.0f)),
Example(None,     Some(true),   Some(2.0f)),
Example(Some(3),  Some(false),  Some(3.0f)),
Example(Some(4),  Some(true),   None),
Example(Some(5),  Some(true),   Some(5.0f))

How can we achieve this?

Pure Spark SQL API

Apache Spark comes with set of APIs to work with dataframes that could be used for graceful parsing:

val result: DataFrame = frame
  .withColumn("col1",
    when(not(frame.col("col1").isNaN),
    frame.col("col1")
  ) cast IntegerType
 ).withColumn("col2",
   when(trim(lower(frame.col("col2"))) === "true",
   lit(true) cast BooleanType
  ).otherwise(false)
 ).withColumn("col3",
   when(not(frame.col("col3").isNaN),
   frame.col("col3")
 ) cast FloatType)

These lines do just what we need, however, this is so much code to do a simple thing (there are only three columns to be taken care of!). Typically, there are a lot more columns to deal with, which only means the boilerplate code grows linearly.

Can We Do Better?

Well, of course we can! You could always write helper methods and re-use them in the project just like this:

val result: DataFrame = cleanIntegerCol(
    cleanBooleanCol(
      cleanFloatCol(frame, "col3"),
      "col2"),
    "col1")

  def cleanIntegerCol(frame: DataFrame, colName: String): DataFrame =
    numericHelper(frame, colName, IntegerType)

  def cleanFloatCol(frame: DataFrame, colName: String): DataFrame =
    numericHelper(frame, colName, FloatType)

  def numericHelper(frame: DataFrame, colName: String, dataType: NumericType): DataFrame = {
    frame.withColumn(colName,
      when(not(frame.col(colName).isNaN),
        frame.col(colName)
      ) cast dataType
    )
  }

  def cleanBooleanCol(frame: DataFrame, colName: String): DataFrame = {
    frame.withColumn(colName,
      when(
        trim(lower(frame.col(colName))) === "true",
        lit(true) cast BooleanType
      ).otherwise(false)
    )
  }

The question is, is this good enough? We have achieved some re-usability but still, did we give it our best shot? Frankly speaking, the chain of these calls are awful and error-prone. They could be hidden with some implicit case class to imitate some DSL:

frame
    .cleanInteger("col1")
    .cleanBoolean("col2")
    .cleanFloat("col3")

There is some progress noticed, nonetheless, it's actually more of a visual progress. We need to do it manually and repetitively throughout our code base. There is some room left for improvement.

cleanframes is a library that aims to automate data cleansing in Spark SQL with help of generic programming. Just add two imports and call the clean method:

import cleanframes.syntax._
import cleanframes.instances.all._

frame.clean[Example]

This code is equivalent to the code listed in the "Pure Spark SQL API" section. Due to this, there is no performance difference between the two.

What the Heck Just Happened?

The clean method expands code through implicit resolutions based on a case class's elements. The Scala compiler applies a specific method to a corresponding element's type. cleanframes comes with predefined implementations that are available via a simple library import (see, line 2). I recommend you to check their source code on GitHub.

It's worth mentioning that these transformations are used from the Spark SQL functions package so they are catalyst-friendly.

The full solution code is as follows:

val frame = spark
      .read
      .option("header", "true")
      .csv("""some/path/to/source.csv""")

import cleanframes.syntax._
import cleanframes.instances.all._

val cleaned = frame.clean[Example]

And we're ready to go with transformations! 

The project source code is here: https://github.com/funkyminds/cleanframes

Please refer to link above to find Maven or sbt coordinates.

What Are the Pros and Cons?

It's time to examine advantages and disadvantages of the library.

Pros:

  • boilerplate is done for you by the compiler.
  • common project transformations can be defined in one place and reused.
  • no additional overhead compared to same manual code.
  • changes in case classes are automatically reflected and applied in transformations during compilation.

Cons:

  • implicits

Scala-based libraries have the advantage over Java counterparts that they are not based on reflection APIs but a type classes approach. It brings code safety during compilation time and reduces computation overhead.

Not everyone likes implicits, but, this is a straightforward case where all you do is parse simple types! There is no endo-mumbo-jumbo-functors in this scope.

Don't freak out, be reasonable!

Summary

This article gave only an introduction and a simple demo of cleanframes. The library is capable of doing much more.

In following articles, I'll explain features such as:

  • overriding a custom transformation.
  • adding support for new types.
  • defining transformation using Spark UDFs.
  • transforming nested case classes.
  • handling case classes with the same types.

"cleanframes - data cleansing library for Apache Spark! (part 2)" presents all of them one by one with code examples and explanations.

I hope that cleanframes can help you with tedious but necessary work and cut out some boilerplate code. The project has been recently released on the Apache license and I would appreciate contributions of all kind: feature proposals, issues, pull requests, you name it!

And of course: give it a try!

Project source code:

https://github.com/funkyminds/cleanframes

Topics:
big data ,scala ,java ,hadoop ,scala tutorial

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}