cleanframes: A Data Cleansing Library for Apache Spark!
A developer discusses how to use an open source, Scala-based library that can help take some of the boiler plate code out of performing data cleansing.
Join the DZone community and get the full member experience.
Join For FreeIn the last decade, computer engineering has exploded with solutions based on cloud computing, distributed systems, big data, streaming or artificial intelligence. Today's world systems are data-intensive so that volumes become higher and cheaper. Shortly speaking, data is at the centre of our interests. One of issues in these area is a data quality — why is it so important?
Nowadays, data is everywhere and drives companies and their operations. If we do not make decisions based on reliable or complete information, it might have a huge impact on these and end up with tragic consequences. The data's correctness and prominence reserves a special discipline, known as a data cleansing, which is focused on removing or correcting coarse records.
Introduction
cleanframes is a small library for Apache Spark to make data cleansing automated and enjoyable.
Let's walk through a problem, step by step, to examine what it takes to make data quality as awesome and reliable as possible!
Let's introduce a sample data and, for a simplicity-sake, make it a small set:
col1,col2,col3
1,true,1.0
lmfao,true,2.0
3,false,3.0
4,true,yolo data
5,true,5.0
And a domain model that, for its representation, is defined as:
case class Example(col1: Option[Int], col2: Option[Boolean], col3: Option[Float])
If you are wondering why there are options across the case
class the answer is simple: Scala primitives are values and cannot be nulls. Options liberate us by setting the dataframe schema columns' nullability to true.
Let's load the data and check what Spark will do about it:
val frame = spark
.read
.option("header", "true")
.schema(Encoders.product[Example].schema)
.csv("""some/path/to/source.csv""")
val collect = frame
.as[Example]
.collect
The collect
variable contains the following result:
Example(Some(1), Some(true), Some(1.0f)),
Example(None, None, None),
Example(Some(3), Some(false), Some(3.0f)),
Example(None, None, None),
Example(Some(5), Some(true), Some(5.0f))
As you can see, due to malformed values, the second and fourth lines don't retain the correct data and reject entire rows. We would be much better off if it would be possible to ignore only invalid cells and save the correct ones.
Our goal is to represent input data as follows:
Example(Some(1), Some(true), Some(1.0f)),
Example(None, Some(true), Some(2.0f)),
Example(Some(3), Some(false), Some(3.0f)),
Example(Some(4), Some(true), None),
Example(Some(5), Some(true), Some(5.0f))
How can we achieve this?
Pure Spark SQL API
Apache Spark comes with set of APIs to work with dataframes that could be used for graceful parsing:
val result: DataFrame = frame
.withColumn("col1",
when(not(frame.col("col1").isNaN),
frame.col("col1")
) cast IntegerType
).withColumn("col2",
when(trim(lower(frame.col("col2"))) === "true",
lit(true) cast BooleanType
).otherwise(false)
).withColumn("col3",
when(not(frame.col("col3").isNaN),
frame.col("col3")
) cast FloatType)
These lines do just what we need, however, this is so much code to do a simple thing (there are only three columns to be taken care of!). Typically, there are a lot more columns to deal with, which only means the boilerplate code grows linearly.
Can We Do Better?
Well, of course we can! You could always write helper methods and re-use them in the project just like this:
val result: DataFrame = cleanIntegerCol(
cleanBooleanCol(
cleanFloatCol(frame, "col3"),
"col2"),
"col1")
def cleanIntegerCol(frame: DataFrame, colName: String): DataFrame =
numericHelper(frame, colName, IntegerType)
def cleanFloatCol(frame: DataFrame, colName: String): DataFrame =
numericHelper(frame, colName, FloatType)
def numericHelper(frame: DataFrame, colName: String, dataType: NumericType): DataFrame = {
frame.withColumn(colName,
when(not(frame.col(colName).isNaN),
frame.col(colName)
) cast dataType
)
}
def cleanBooleanCol(frame: DataFrame, colName: String): DataFrame = {
frame.withColumn(colName,
when(
trim(lower(frame.col(colName))) === "true",
lit(true) cast BooleanType
).otherwise(false)
)
}
The question is, is this good enough? We have achieved some re-usability but still, did we give it our best shot? Frankly speaking, the chain of these calls are awful and error-prone. They could be hidden with some implicit case
class to imitate some DSL:
frame
.cleanInteger("col1")
.cleanBoolean("col2")
.cleanFloat("col3")
There is some progress noticed, nonetheless, it's actually more of a visual progress. We need to do it manually and repetitively throughout our code base. There is some room left for improvement.
cleanframes is a library that aims to automate data cleansing in Spark SQL with help of generic programming. Just add two imports and call the clean
method:
import cleanframes.syntax._
import cleanframes.instances.all._
frame.clean[Example]
This code is equivalent to the code listed in the "Pure Spark SQL API" section. Due to this, there is no performance difference between the two.
What the Heck Just Happened?
The clean
method expands code through implicit resolutions based on a case
class's elements. The Scala compiler applies a specific method to a corresponding element's type. cleanframes comes with predefined implementations that are available via a simple library import (see, line 2). I recommend you to check their source code on GitHub.
It's worth mentioning that these transformations are used from the Spark SQL functions package so they are catalyst-friendly.
The full solution code is as follows:
val frame = spark
.read
.option("header", "true")
.csv("""some/path/to/source.csv""")
import cleanframes.syntax._
import cleanframes.instances.all._
val cleaned = frame.clean[Example]
And we're ready to go with transformations!
The project source code is here: https://github.com/funkyminds/cleanframes
Please refer to link above to find Maven or sbt coordinates.
What Are the Pros and Cons?
It's time to examine advantages and disadvantages of the library.
Pros:
- boilerplate is done for you by the compiler.
- common project transformations can be defined in one place and reused.
- no additional overhead compared to same manual code.
- changes in
case
classes are automatically reflected and applied in transformations during compilation.
Cons:
- implicits
Scala-based libraries have the advantage over Java counterparts that they are not based on reflection APIs but a type classes approach. It brings code safety during compilation time and reduces computation overhead.
Not everyone likes implicits, but, this is a straightforward case where all you do is parse simple types! There is no endo-mumbo-jumbo-functors in this scope.
Don't freak out, be reasonable!
Summary
This article gave only an introduction and a simple demo of cleanframes. The library is capable of doing much more.
In following articles, I'll explain features such as:
- overriding a custom transformation.
- adding support for new types.
- defining transformation using Spark UDFs.
- transforming nested
case
classes. - handling
case
classes with the same types.
"cleanframes - data cleansing library for Apache Spark! (part 2)" presents all of them one by one with code examples and explanations.
I hope that cleanframes can help you with tedious but necessary work and cut out some boilerplate code. The project has been recently released on the Apache license and I would appreciate contributions of all kind: feature proposals, issues, pull requests, you name it!
And of course: give it a try!
Project source code:
Published at DZone with permission of Dawid Rutowicz. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments