Spark Tutorial: Validating Data in a Spark DataFrame Part Two
Validate data with Spark's DataFrame in a few, simple steps.
Join the DZone community and get the full member experience.
Join For Free
In the previous article (mentioned in the link below), I covered a few techniques that can be used for validating data in a Spark DataFrame. In this article, I will cover a few more techniques, but this time focus on the conversion of a User Defined Function (UDF).
Method 1: Simple UDF
In this technique, we first define a helper function that will allow us to perform the validation operation. In this case, we are checking if the column value is null. So, the function is as below:
def isNullFunction(value: String): Boolean = {
if ( value == null ) {
return true
}
return false
}
We then use this function in a UDF (User Defined Function), as below
val isNullUDF = udf(isNullFunction _)
Then, we use the UDF to check the value, as below
df.withColumn("IsNullUDF", isNullUDF(col("name")))
By using a UDF, we can include a little more complex validation logic that would have been difficult to incorporate in the 'withColumn' syntax shown in part 1.
Method 2: Alternate UDF
In this technique, the function to check null remains the same, but the syntax of the UDF is different, as below
val isNullUDF = udf[Boolean, String](isNullFunction)
Even though the syntax of the UDF is different, the way it is invoked remains the same. The advantage of this variant of the UDF is that the return value and the data type of the column is clearly indicated — in this case, the return value is a Boolean (as we wish to store a 'true' or a 'false' value in the new column), while the data type of the column in String.
Method 3: Complicate Matters
Let us now complicate matters a little bit. Suppose, instead of a simple null check, we want to check if the value in a column lies within a range. How can we implement this functionality using a UDF? The task is quite straight forward. We need to define a UDF that accepts three values — say integers, as an example — where two values represent the limits and the third value represents the actual data. We can define the UDF as below
def isInRangeUDF(ll: Int, ul: Int) = udf[Boolean, Int]((data: Int) => {
if ( data > ll && data < ul ) {
true
} else {
false
}
}
)
Then, we invoke the UDF as below:
df.withColumn("rangeCheck", isInRangeUDF(lowerLimit, upperLimit)(df("marks")))
Method 4: The Next Logical Step
The usage of UDFs makes the task of data validation quite simple, but they need to use with care. Spark treats UDFs as black boxes and thus does not perform any optimization on the code. Let's go to the next logical step.
To avoid peppering our code base with too many function definitions, we can always encapsulate the definition of a UDF inside a class and then use the class. Defining a class allows us to hide the complexity of the UDF inside a simple interface. Let us suppose that we convert the range UDF into a class that helps us perform the range check validation. The UDF, when converted to a class is as below:
class RangeCheck(val colName: String, val lLimit: Int, val uLimit: Int)
{
val columnName = colName
val lowerLimit = lLimit
val upperLimit = uLimit
def isInRangeUDF = udf[Boolean, Int]((data: Int) => {
if ( data > lowerLimit && data < upperLimit ) {
true
} else {
false
}
}
)
def execute(df: DataFrame): DataFrame = {
df.withColumn("range", isInRangeUDF(df(columnName)))
}
}
The class can then be invoked as below
var df = . . .
val r1 = new RangeCheck("marks", 0, 100)
df = r1.execute(df)
Method 5: Complex Logic Behind a Simple Interface
If we wish to do so, we can increase the complexity of the UDF, but still, hide it behind a simple interface. For example, the class below allows the user to use boolean flags to decide if the limits are also inclusive or exclusive in the check.
class RangeCheck(val colName: String, val ll: Int, val ul: Int, val lInc: Boolean = false, val uInc: Boolean = false)
{
val columnName = colName
var lLimit: Int = ll
var uLimit: Int = ul
var llInc: Boolean = lInc
var ulInc: Boolean = uInc
def isInRangeUDF = udf[Boolean, Int]((data: Int) => {
var uf = false
var lf = false
if ( llInc ) {
lf = data >= lLimit
} else {
lf = data > lLimit
}
if ( ulInc ) {
uf = data <= uLimit
} else {
uf = data < uLimit
}
if ( llInc && ulInc ) {
true
} else {
false
}
}
)
override def execute(df: DataFrame): DataFrame = {
df.withColumn("range", isInRangeUDF(df(columnName)))
}
}
The class can then be invoked as below
var df = . . .
val r1 = new RangeCheck("marks", 0, 100, true) // 0 is included in the range check
df = r1.execute(df)
OR
val r1 = new RangeCheck("marks", 0, 100, ulInc=true) // 100 is included in the range check
df = r1.execute(df)
OR
val r1 = new RangeCheck("marks", 0, 100, true, true) // both 0 and 100 are included in the range check
df = r1.execute(df)
Conclusion
In this article, we have covered a few techniques that can be used to implement data validation on Spark DataFrames. By encapsulating the validation in a helper class, we can achieve code that is easier to understand, maintain and extend. Additionally, if designed properly, we can create the validations based on metadata and apply them one after the other on a DataFrame.
Related Articles
Opinions expressed by DZone contributors are their own.
Comments