DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Building an AI/ML Data Lake With Apache Iceberg
  • AWS S3 Strategies for Scalable and Secure Data Lake Storage
  • Handling Dynamic Data Using Schema Evolution in Delta
  • The Future of Data Lakehouses: Apache Iceberg Explained

Trending

  • Operational Principles, Architecture, Benefits, and Limitations of Artificial Intelligence Large Language Models
  • Agile’s Quarter-Century Crisis
  • Building Reliable LLM-Powered Microservices With Kubernetes on AWS
  • The Full-Stack Developer's Blind Spot: Why Data Cleansing Shouldn't Be an Afterthought
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Managing Schema Validation in a Data Lake Using Data Version Control

Managing Schema Validation in a Data Lake Using Data Version Control

Open-source data version control tools can help you manage schema evolution, data transformations, and compatibility checks across multiple formats.

By 
Iddo Avneri user avatar
Iddo Avneri
·
Jun. 27, 23 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
4.1K Views

Join the DZone community and get the full member experience.

Join For Free

It’s not uncommon for a data team to be dependent on many other “third parties” that send in the data. They often change the schema of the data without communicating anything or letting the data team know too late.

Whenever that happens, data pipelines break, and the data team needs to fix the data lake. This is a manual process filled with heavy-lifting tasks. Typically, teams engage in a blame game, trying to prove that the schema has changed.

But as they progressed, teams realized that it was smarter to simply prevent schemas from changing together in an automatic CI/CD way. 

Schema changes and schema validation bring a lot of pain to teams, but there are a few solutions on the market that help with that — and, luckily, some are open-source. 

This is a step-by-step tutorial on approaching the schema validation problem with the open-source data version control tool lakeFS.

What Is Schema Validation?

Schema validation allows you to create validation rules for your data lake, such as allowed data types and value ranges. It guarantees that the data saved in the lake adheres to an established schema, which describes the data's structure, format, and limitations.

Since your data lake can be filled with data coming from various sources with different schema definitions, enforcing a uniform schema across all the data in the lake is a challenge.

And one that definitely needs solving — if you don’t act fast, you’re going to see inconsistencies and mistakes all over your data processing.

Why Do We Need To Deal With Schema Validation?

Taking your time to manage schemas properly is worth it for these four reasons:

  • Consistency — Data lakes often contain massive amounts of data from several sources. Without schema validation, you might end up with data in inconsistent or incorrect forms stored in the lake, resulting in issues during processing.
  • Quality — Schema validation contributes to the good quality of the data kept in the lake by imposing data restrictions and standards. It helps you identify and flag data quality concerns, such as missing or inaccurate information, before they cause problems downstream.
  • Efficiency — Schema validation expedites data processing and analysis by ensuring a uniform schema across all data in the lake. This, in turn, reduces the time and effort required to clean, convert, and analyze the data — and increases the overall efficiency of your data pipeline.
  • Compliance — Many companies have to meet strict regulatory and compliance requirements. Schema validation helps to make sure that the data stored in the lake matches these standards, providing a clear audit trail of data lineage and quality.

Dealing With Schemas in Data Lakes Is Not All Roses

In a data warehouse, you’re dealing with a rigid data model and a rigid schema. Data lakes are the opposite of that. Most of the time, they end up containing a wide range of data sources. 

Why does it matter? Because in data lakes, schema definitions can change between data sources, and your schema may evolve over time when new data is added. This makes enforcing a uniform schema across all data in the lake a massive challenge. If you fail to solve this issue, you’ll be dealing with data processing issues down the line.

But that’s not everything.

You can’t have one consistent schema due to the ever-increasing complexity of data pipelines built on top of data lakes. Data pipelines can include multiple processes and transformations, each requiring a unique schema definition. 

The schema may vary as data is processed and modified, making it hard to ensure schema validation across the entire pipeline.

This is where a version control system can come in handy, right?

Implementing Data Version Control for Schema Validation in a Data Lake

lakeFS is an open-source tool that transforms your data lake into a Git-like repository, letting you manage it just like software engineers manage their code. This is what data version control is all about.

Like other source control systems, lakeFS has a feature called hooks, which are bespoke scripts or programs that the lakeFS platform can run in response to specified events or actions. 

These events can include committing changes, merging branches, creating new branches, adding or removing tags, and so on. For example, when a merge happens, a pre-merge hook runs on the source branch before the merge is finished.

How does it all apply to schema validation? 

You can create a pre-merge hook that verifies that the schema for Parquet files is not different from the current schema. 

What You’ll Need To Have in Place

  • A lakeFS server (you can install it or spin one up in the cloud for free).
  • Optionally: You can use this sample-repo to launch a notebook that can be configured to connect to the lakeFS Server. 

In this scenario, we'll create a delta table in an ingest branch and merge it into production. Next, we'll change the table's schema and try to merge it again, simulating the process of promoting data to production.

 lakeFS Server

1. Setup

To begin, we'll set a number of global variables and install packages that will be used in this example, running in a Python notebook.  

After setting up the lakeFS credentials, we can start creating some global variables containing the repository and branch names:

Python
 
repo = "schema-validation-example-repo"

mainBranch = "main"

ingestionBranch = "ingestion_branch"


Each lakeFS repository needs to have its own storage namespace, so we need to create one too:

Python
 
storageNamespace = 's3://' # e.g. "s3://username-lakefs-cloud/"


In this example, we’re using AWS S3 storage. For everything to work out, your storage needs to be configured to operate with lakeFS, which works with AWS, Azure, Google Cloud, or on-premise object storage like MinIO. 

If you’re running lakeFS in the cloud, you can link it to your storage by copying the storage namespace of the sample repository and attaching a string to it. So, if lakeFS Cloud provided this sample-repo for you:

sample repo

You can configure it in the following way: 

Python
 
storageNamespace = 's3://lakefs-sample-us-east-1-production/AROA5OU4KHZHHFCX4PTOM:2ae87b7718e5bb16573c021e542dd0ec429b7ccc1a4f9d0e3f17d6ee99253655/my_random_string'


In our notebook, we will use Python code, so we must import the lakeFS Python client packages too:

Python
 
import lakefs_client

from lakefs_client import models

from lakefs_client.client import LakeFSClient



import os

from pyspark.sql.types import ByteType, IntegerType, LongType, StringType, StructType, StructField


Next, we configure our client:

Python
 
%xmode Minimal

if not 'client' in locals():

    # lakeFS credentials and endpoint

    configuration = lakefs_client.Configuration()

    configuration.username = lakefsAccessKey

    configuration.password = lakefsSecretKey

    configuration.host = lakefsEndPoint



    client = LakeFSClient(configuration)

    print("Created lakeFS client.")


We will create delta tables in this example, so we need to include the following packages:

Python
 
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.12:2.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" pyspark-shell'


LakeFS exposes an S3 Gateway, which allows applications to interface with lakeFS in the same way that they would communicate with S3. To configure the gateway, follow these steps:

Python
 
from pyspark.context import SparkContext

from pyspark.sql.session import SparkSession

sc = SparkContext.getOrCreate()

spark = SparkSession(sc)



sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", lakefsAccessKey)

sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", lakefsSecretKey)

sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", lakefsEndPoint)

sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")


We’re ready to start using lakeFS version control at scale in our notebook.

2. Creating the Repository and Hooks

We will create our repository using the Python client:

Python
 
client.repositories.create_repository(

    repository_creation=models.RepositoryCreation(

        name=repo,

        storage_namespace=storageNamespace,

        default_branch=mainBranch))


In this case, we'll use a pre-merge hook to ensure that the schema hasn't changed. Action files should be submitted to the lakeFS repository with the prefix _lakefs_actions/. Failure to parse an action file will result in a failed Run.

We will submit the following hook configuration action file, pre-merge-schema-validation.yaml:

Python
 
#Parquet schema Validator

#Args:

# - locations (list of strings): locations to look for parquet files under

# - sample (boolean): whether reading one new/changed file per directory is enough, or go through all of them

#Example hook declaration: (_lakefs_actions/pre-merge-schema-validation.yaml):

name: pre merge checks on main branch

on:

  pre-merge:

    branches:

      - main

hooks:

  - id: check_schema_changes

    type: lua

    properties:

      script_path: scripts/parquet_schema_change.lua # location of this script in the repository

      args:

        sample: false

        locations:

          - tables/customers/


This file (pre-merge-schema-validation.yaml) is stored in the subfolder LuaHooks in our example repo. We must submit the file to the lakeFS repository under the folder _lakefs_actions:

Python
 
hooks_config_yaml = "pre-merge-schema-validation.yaml"

hooks_prefix = "_lakefs_actions"

with open(f'./LuaHooks/{hooks_config_yaml}', 'rb') as f:

    client.objects.upload_object(repository=repo,

                                branch=mainBranch,

                                path=f'{hooks_prefix}/{hooks_config_yaml}',

                                content=f

                                )


We just set up an action script to run scripts/parquet_schema_change.lua before merging into the main. 

The script itself (parquet_schema_change.lua) will then be created and uploaded to the scripts directory. As you can see, we’re employing an embedded Lua VM to run hooks without relying on other components. 

This file is also located in the LuaHooks subfolder in the sample-repo:

Python
 
--[[

Parquet schema validator



Args:

- locations (list of strings): locations to look for parquet files under

- sample (boolean): whether reading one new/changed file per directory is enough, or go through all of them

]]





lakefs = require("lakefs")

strings = require("strings")

parquet = require("encoding/parquet")

regexp = require("regexp")

path = require("path")





visited_directories = {}



for _, location in ipairs(args.locations) do

    after = ""

    has_more = true

    need_more = true

    print("checking location: " .. location)

    while has_more do

        print("running diff, location = " .. location .. " after = " .. after)

        local code, resp = lakefs.diff_refs(action.repository_id, action.branch_id, action.source_ref, after, location)

        if code ~= 200 then

            error("could not diff: " .. resp.message)

        end



        for _, result in pairs(resp.results) do

            p = path.parse(result.path)

            print("checking: '" .. result.path .. "'")

            if not args.sample or (p.parent and not visited_directories[p.parent]) then

                if result.path_type == "object" and result.type ~= "removed" then

                    if strings.has_suffix(p.base_name, ".parquet") then

                        -- check it!

                        code, content = lakefs.get_object(action.repository_id, action.source_ref, result.path)

                        if code ~= 200 then

                            error("could not fetch data file: HTTP " .. tostring(code) .. "body:\n" .. content)

                        end

                        schema = parquet.get_schema(content)

                        for _, column in ipairs(schema) do

                            for _, pattern in ipairs(args.column_block_list) do

                                if regexp.match(pattern, column.name) then

                                    error("Column is not allowed: '" .. column.name .. "': type: " .. column.type .. " in path: " .. result.path)

                                end

                            end

                        end

                        print("\t all columns are valid")

                        visited_directories[p.parent] = true

                    end

                end

            else

                print("\t skipping path, directory already sampled")

            end

        end



        -- pagination

        has_more = resp.pagination.has_more

        after = resp.pagination.next_offset

    end

end


We will upload the file (this time parquet_schema_change.lua) from the LuaHooks directory to our lakeFS repository at the location specified in the action configuration file (i.e., inside the scripts folder):

Python
 
hooks_config_yaml = "pre-merge-schema-validation.yaml"

hooks_prefix = "_lakefs_actions"



with open(f'./LuaHooks/{hooks_config_yaml}', 'rb') as f:

    client.objects.upload_object(repository=repo,

                                branch=mainBranch,

                                path=f'{hooks_prefix}/{hooks_config_yaml}',

                                content=f

                                )


We must commit the changes after submitting the action file for them to take effect:

Python
 
client.commits.commit(

    repository=repo,

    branch=mainBranch,

    commit_creation=models.CommitCreation(

        message='Added hook config file and schema validation scripts'))


If we switch to the lakeFS UI, we should see the following directory structure and files beneath the main directory:

The lakeFS UI’s directory structure

The lakeFS UI’s directory structure

Pre-merge schema validation as shown in the lakeFS UI

Pre-merge schema validation as shown in the lakeFS UI

Schema validation scripts as seen in the lakeFS UI

Schema validation scripts as seen in the lakeFS UI

3. Running the First ETL Using the Original Schema

Ingestion and transformation can be performed on a distinct branch from the production (main) branch in lakeFS. 

We will establish an ingestion branch:

Python
 
client.branches.create_branch(

    repository=repo,

    branch_creation=models.BranchCreation(

        name=ingestionBranch, source=mainBranch))


Following that, we will use the Kaggle dataset Orion Star – Sports and outdoors RDBMS dataset. Let’s use Customer.csv, which we can upload to our sample repository from data/samples/OrionStar/. 

First, the table schema needs to be defined:

Python
 
customersSchema = StructType([

  StructField("User_ID", IntegerType(), False),

  StructField("Country", StringType(), False),

  StructField("Gender", StringType(), False),

  StructField("Personal_ID", IntegerType(), True),

  StructField("Customer_Name", StringType(), False),

  StructField("Customer_FirstName", StringType(), False),

  StructField("Customer_LastName", StringType(), False),

  StructField("Birth_Date", StringType(), False),

  StructField("Customer_Address", StringType(), False),

  StructField("Street_ID", LongType(), False),

  StructField("Street_Number", IntegerType(), False),

  StructField("Customer_Type_ID", IntegerType(), False)

])


Then, from the CSV file, we will create a delta table and submit it to our repository:

Python
 
customersTablePath = f"s3a://{repo}/{ingestionBranch}/tables/customers"

df = spark.read.csv('./data/samples/OrionStar/CUSTOMER.csv',header=True,schema=customersSchema)

df.write.format("delta").mode("overwrite").save(customersTablePath)


We need to commit changes:

Python
 
client.commits.commit(

    repository=repo,

    branch=ingestionBranch,

    commit_creation=models.CommitCreation(

        message='Added customers Delta table',

        metadata={'using': 'python_api'}))


And then, using a merge, send the data to production:

Python
 
client.refs.merge_into_branch(

    repository=repo,

    source_ref=ingestionBranch,

    destination_branch=mainBranch)


The sequence of schema validation that has been completed:

The sequence of schema validation that has been completed

4. Modify the Schema and Attempt To Move the Table to Production

To make things easier, we'll rename one of the columns. Let’s replace Country with Country_name:

Python
 
customersSchema = StructType([

  StructField("User_ID", IntegerType(), False),

  StructField("Country_Name", StringType(), False), # Column name changes from Country to Country_name

  StructField("Gender", StringType(), False),

  StructField("Personal_ID", IntegerType(), True),

  StructField("Customer_Name", StringType(), False),

  StructField("Customer_FirstName", StringType(), False),

  StructField("Customer_LastName", StringType(), False),

  StructField("Birth_Date", StringType(), False),

  StructField("Customer_Address", StringType(), False),

  StructField("Street_ID", LongType(), False),

  StructField("Street_Number", IntegerType(), False),

  StructField("Customer_Type_ID", IntegerType(), False)

])


In the ingest branch, let’s recreate the delta table:

Python
 
customersTablePath = f"s3a://{repo}/{ingestionBranch}/tables/customers"

df = spark.read.csv('./data/samples/OrionStar/CUSTOMER.csv',header=True,schema=customersSchema)

df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(customersTablePath)

])


Changes need to be committed:

Python
 
client.commits.commit(

    repository=repo,

    branch=ingestionBranch,

    commit_creation=models.CommitCreation(

        message='Added customers table with schema changes',

        metadata={'using': 'python_api'}))


And then, we can try to get the data into production:

Python
 
client.commits.commit(

    repository=repo,

    branch=ingestionBranch,

    commit_creation=models.CommitCreation(

        message='Added customer tables with schema changes!',

        metadata={'using': 'python_api'}))


We got a precondition Failed error as a result of schema modifications. The pre-merge hook thwarted the promotion. So, this data won’t be used in production:

API Exception

From the lakeFS UI, we can navigate to the repository and select the "Actions" option. Next, we click on the failed action's Run ID, pick "pre merge checks on main branch," expand check_schema_changes, and view the error message. 

Wrap Up

Due to the heterogeneous and raw nature of the stored data, schema validation on a data lake is critical but difficult. Managing schema evolution, data transformations, and compatibility checks across multiple formats means that every data practitioner needs some pretty powerful methodologies and tools.

The decentralized nature of data lakes, where numerous users and systems can edit data, complicates schema validation even further. Validation of schemas is critical for data governance, integration, and reliable analytics.

Solutions like the pre-merge hook I showed above help verify schema files before merging them into the production branch. It comes in handy for guaranteeing data integrity and preventing incompatible schema changes from being merged into the main branch. And it adds an additional layer of quality control, keeping data more consistent.

Data lake Version control Data (computing) Schema

Opinions expressed by DZone contributors are their own.

Related

  • Building an AI/ML Data Lake With Apache Iceberg
  • AWS S3 Strategies for Scalable and Secure Data Lake Storage
  • Handling Dynamic Data Using Schema Evolution in Delta
  • The Future of Data Lakehouses: Apache Iceberg Explained

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!