DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Stop Running Two Data Systems for One Agent Query
  • Setting Up a Data Catalog With Azure Purview and Collibra: What Three Attempts Taught Me
  • Beyond Partitioning and Z-Order: A Deep Dive into Liquid Clustering for Unity Catalog Managed Tables
  • Why Google Data Migration Gets Stuck at 99%: Causes and Proven Fixes

Trending

  • Mocking Kafka for Local Spring Development
  • Rethinking Java CRUDs With Event Sourcing and CQRS Patterns
  • From APIs to Actions: Rethinking Back-End Design for Agents
  • Exactly-Once Processing: Myth vs Reality
  1. DZone
  2. Data Engineering
  3. Data
  4. Spark Tutorial: Validating Data in a Spark DataFrame Part Two

Spark Tutorial: Validating Data in a Spark DataFrame Part Two

Validate data with Spark's DataFrame in a few, simple steps.

By 
Bipin Patwardhan user avatar
Bipin Patwardhan
·
Sep. 25, 19 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
17.3K Views

Join the DZone community and get the full member experience.

Join For Free

sparks-coming-off-metal-work


In the previous article (mentioned in the link below), I covered a few techniques that can be used for validating data in a Spark DataFrame. In this article, I will cover a few more techniques, but this time focus on the conversion of a User Defined Function (UDF).

Method 1: Simple UDF

In this technique, we first define a helper function that will allow us to perform the validation operation. In this case, we are checking if the column value is null. So, the function is as below:

def isNullFunction(value: String): Boolean = {
    if ( value == null ) {
        return true
    }
    return false
}


We then use this function in a UDF (User Defined Function), as below

val isNullUDF = udf(isNullFunction _)


Then, we use the UDF to check the value, as below

df.withColumn("IsNullUDF", isNullUDF(col("name")))


By using a UDF, we can include a little more complex validation logic that would have been difficult to incorporate in the 'withColumn' syntax shown in part 1.

Method 2: Alternate UDF

In this technique, the function to check null remains the same, but the syntax of the UDF is different, as below

val isNullUDF = udf[Boolean, String](isNullFunction)


Even though the syntax of the UDF is different, the way it is invoked remains the same. The advantage of this variant of the UDF is that the return value and the data type of the column is clearly indicated — in this case, the return value is a Boolean (as we wish to store a 'true' or a 'false' value in the new column), while the data type of the column in String.

Method 3: Complicate Matters

Let us now complicate matters a little bit. Suppose, instead of a simple null check, we want to check if the value in a column lies within a range. How can we implement this functionality using a UDF? The task is quite straight forward. We need to define a UDF that accepts three values — say integers, as an example — where two values represent the limits and the third value represents the actual data. We can define the UDF as below

def isInRangeUDF(ll: Int, ul: Int) = udf[Boolean, Int]((data: Int) => {
    if ( data > ll && data < ul ) {
        true
    } else {
        false
    }
  }
)


Then, we invoke the UDF as below: 

df.withColumn("rangeCheck", isInRangeUDF(lowerLimit, upperLimit)(df("marks")))


Method 4: The Next Logical Step

The usage of UDFs makes the task of data validation quite simple, but they need to use with care. Spark treats UDFs as black boxes and thus does not perform any optimization on the code. Let's go to the next logical step.

To avoid peppering our code base with too many function definitions, we can always encapsulate the definition of a UDF inside a class and then use the class. Defining a class allows us to hide the complexity of the UDF inside a simple interface. Let us suppose that we convert the range UDF into a class that helps us perform the range check validation. The UDF, when converted to a class is as below:

class RangeCheck(val colName: String, val lLimit: Int, val uLimit: Int)
{
    val columnName = colName
    val lowerLimit = lLimit
    val upperLimit = uLimit

    def isInRangeUDF = udf[Boolean, Int]((data: Int) => {
            if ( data > lowerLimit && data < upperLimit ) {
                true
            } else {
                false
            }
        }
    )

    def execute(df: DataFrame): DataFrame = {
        df.withColumn("range", isInRangeUDF(df(columnName)))
    }
}


The class can then be invoked as below

var df = . . .
val r1 = new RangeCheck("marks", 0, 100)
df = r1.execute(df)


Method 5: Complex Logic Behind a Simple Interface

If we wish to do so, we can increase the complexity of the UDF, but still, hide it behind a simple interface. For example, the class below allows the user to use boolean flags to decide if the limits are also inclusive or exclusive in the check.

class RangeCheck(val colName: String, val ll: Int, val ul: Int, val lInc: Boolean = false, val uInc: Boolean = false)
{
    val columnName = colName
    var lLimit: Int = ll
    var uLimit: Int = ul
    var llInc: Boolean = lInc
    var ulInc: Boolean = uInc

    def isInRangeUDF = udf[Boolean, Int]((data: Int) => {
            var uf = false
            var lf = false

            if ( llInc ) {
                lf = data >= lLimit
            } else {
                lf = data > lLimit
            }

            if ( ulInc ) {
                uf = data <= uLimit
            } else {
                uf = data < uLimit
            }

            if ( llInc && ulInc ) {
                true
            } else {
                false
            }
        }
    )

    override def execute(df: DataFrame): DataFrame = {
        df.withColumn("range", isInRangeUDF(df(columnName)))
    }
}


The class can then be invoked as below

var df = . . .
val r1 = new RangeCheck("marks", 0, 100, true) // 0 is included in the range check
df = r1.execute(df)


OR

val r1 = new RangeCheck("marks", 0, 100, ulInc=true) // 100 is included in the range check
df = r1.execute(df)


OR

val r1 = new RangeCheck("marks", 0, 100, true, true) // both 0 and 100 are included in the range check
df = r1.execute(df)


Conclusion

In this article, we have covered a few techniques that can be used to implement data validation on Spark DataFrames. By encapsulating the validation in a helper class, we can achieve code that is easier to understand, maintain and extend. Additionally, if designed properly, we can create the validations based on metadata and apply them one after the other on a DataFrame.


Related Articles

  • Apache Spark: An Engine for Large-Scale Data Processing.
Data (computing)

Opinions expressed by DZone contributors are their own.

Related

  • Stop Running Two Data Systems for One Agent Query
  • Setting Up a Data Catalog With Azure Purview and Collibra: What Three Attempts Taught Me
  • Beyond Partitioning and Z-Order: A Deep Dive into Liquid Clustering for Unity Catalog Managed Tables
  • Why Google Data Migration Gets Stuck at 99%: Causes and Proven Fixes

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook