{{announcement.body}}
{{announcement.title}}

Spark Tutorial: Validating Data in a Spark DataFrame Part Two

DZone 's Guide to

Spark Tutorial: Validating Data in a Spark DataFrame Part Two

Validate data with Spark's DataFrame in a few, simple steps.

· Big Data Zone ·
Free Resource

sparks-coming-off-metal-work


In the previous article (mentioned in the link below), I covered a few techniques that can be used for validating data in a Spark DataFrame. In this article, I will cover a few more techniques, but this time focus on the conversion of a User Defined Function (UDF).

Method 1: Simple UDF

In this technique, we first define a helper function that will allow us to perform the validation operation. In this case, we are checking if the column value is null. So, the function is as below:

def isNullFunction(value: String): Boolean = {
    if ( value == null ) {
        return true
    }
    return false
}


We then use this function in a UDF (User Defined Function), as below

val isNullUDF = udf(isNullFunction _)


Then, we use the UDF to check the value, as below

df.withColumn("IsNullUDF", isNullUDF(col("name")))


By using a UDF, we can include a little more complex validation logic that would have been difficult to incorporate in the 'withColumn' syntax shown in part 1.

Method 2: Alternate UDF

In this technique, the function to check null remains the same, but the syntax of the UDF is different, as below

val isNullUDF = udf[Boolean, String](isNullFunction)


Even though the syntax of the UDF is different, the way it is invoked remains the same. The advantage of this variant of the UDF is that the return value and the data type of the column is clearly indicated — in this case, the return value is a Boolean (as we wish to store a 'true' or a 'false' value in the new column), while the data type of the column in String.

Method 3: Complicate Matters

Let us now complicate matters a little bit. Suppose, instead of a simple null check, we want to check if the value in a column lies within a range. How can we implement this functionality using a UDF? The task is quite straight forward. We need to define a UDF that accepts three values — say integers, as an example — where two values represent the limits and the third value represents the actual data. We can define the UDF as below

def isInRangeUDF(ll: Int, ul: Int) = udf[Boolean, Int]((data: Int) => {
    if ( data > ll && data < ul ) {
        true
    } else {
        false
    }
  }
)


Then, we invoke the UDF as below: 

df.withColumn("rangeCheck", isInRangeUDF(lowerLimit, upperLimit)(df("marks")))


Method 4: The Next Logical Step

The usage of UDFs makes the task of data validation quite simple, but they need to use with care. Spark treats UDFs as black boxes and thus does not perform any optimization on the code. Let's go to the next logical step.

To avoid peppering our code base with too many function definitions, we can always encapsulate the definition of a UDF inside a class and then use the class. Defining a class allows us to hide the complexity of the UDF inside a simple interface. Let us suppose that we convert the range UDF into a class that helps us perform the range check validation. The UDF, when converted to a class is as below:

class RangeCheck(val colName: String, val lLimit: Int, val uLimit: Int)
{
    val columnName = colName
    val lowerLimit = lLimit
    val upperLimit = uLimit

    def isInRangeUDF = udf[Boolean, Int]((data: Int) => {
            if ( data > lowerLimit && data < upperLimit ) {
                true
            } else {
                false
            }
        }
    )

    def execute(df: DataFrame): DataFrame = {
        df.withColumn("range", isInRangeUDF(df(columnName)))
    }
}


The class can then be invoked as below

var df = . . .
val r1 = new RangeCheck("marks", 0, 100)
df = r1.execute(df)


Method 5: Complex Logic Behind a Simple Interface

If we wish to do so, we can increase the complexity of the UDF, but still, hide it behind a simple interface. For example, the class below allows the user to use boolean flags to decide if the limits are also inclusive or exclusive in the check.

class RangeCheck(val colName: String, val ll: Int, val ul: Int, val lInc: Boolean = false, val uInc: Boolean = false)
{
    val columnName = colName
    var lLimit: Int = ll
    var uLimit: Int = ul
    var llInc: Boolean = lInc
    var ulInc: Boolean = uInc

    def isInRangeUDF = udf[Boolean, Int]((data: Int) => {
            var uf = false
            var lf = false

            if ( llInc ) {
                lf = data >= lLimit
            } else {
                lf = data > lLimit
            }

            if ( ulInc ) {
                uf = data <= uLimit
            } else {
                uf = data < uLimit
            }

            if ( llInc && ulInc ) {
                true
            } else {
                false
            }
        }
    )

    override def execute(df: DataFrame): DataFrame = {
        df.withColumn("range", isInRangeUDF(df(columnName)))
    }
}


The class can then be invoked as below

var df = . . .
val r1 = new RangeCheck("marks", 0, 100, true) // 0 is included in the range check
df = r1.execute(df)


OR

val r1 = new RangeCheck("marks", 0, 100, ulInc=true) // 100 is included in the range check
df = r1.execute(df)


OR

val r1 = new RangeCheck("marks", 0, 100, true, true) // both 0 and 100 are included in the range check
df = r1.execute(df)


Conclusion

In this article, we have covered a few techniques that can be used to implement data validation on Spark DataFrames. By encapsulating the validation in a helper class, we can achieve code that is easier to understand, maintain and extend. Additionally, if designed properly, we can create the validations based on metadata and apply them one after the other on a DataFrame.


Related Articles

Topics:
spark ,spark 2.x

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}