DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • DZone Community Awards 2022
  • Looking for the Best Java Data Computation Layer Tool
  • The Magic of Apache Spark in Java
  • Modern Cloud-Native Jakarta EE Frameworks: Tips, Challenges, and Trends.

Trending

  • Unmasking Entity-Based Data Masking: Best Practices 2025
  • AI-Based Threat Detection in Cloud Security
  • How to Practice TDD With Kotlin
  • Memory Leak Due to Time-Taking finalize() Method
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Databricks Delta Lake Using Java

Databricks Delta Lake Using Java

By 
Kiran Kumar user avatar
Kiran Kumar
·
May. 22, 20 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
21.0K Views

Join the DZone community and get the full member experience.

Join For Free

Delta Lake is an open source release by Databricks that provides a transactional storage layer on top of data lakes. In real-time systems, a data lake can be an Amazon S3, Azure Data Lake Store/Azure Blob storage, Google Cloud Storage, or Hadoop Distributed file system.

Delta Lake acts as a storage layer that sits on top of Data Lake. Delta Lake brings additional features where Data Lake cannot provide.

Key features of Delta Lake as follows

ACID transactions: In real-time data engineering applications, there are many concurrent pipelines for different business data domains that operate on data lake for concurrent operations that reads and updates the data. This will lead to an issue in the data integrity due to a lack of transaction features in Data Lake. Delta Lake brings transactional features that follow ACID properties which makes data consistent.

Data Versioning: Delta Lake maintains multiple versions of data for all the transactional operations which enables developers to work on the specific version whenever required.

Schema Enforcement: This feature is also called Schema Validation. Delta Lake validates the schema before updating the data. Ensures that data types are correct and required columns are present preventing invalid data and maintaining metadata quality checks

Schema Evolution: Schema evolution is a feature that allows users to easily change a table’s current schema to accommodate data that is changing over time. Most commonly, it’s used when performing an append or overwrite operation, to automatically adapt the schema to include one or more new columns. Data engineers and scientists can use this option to add new columns to their existing machine learning production tables without breaking existing models that rely on the old columns

Efficient data format: All the data in data lake is stored in Apache Parquet format enabling delta lake to leverage compressions and encoding of native parquet format

Scalable Metadata handling: In Big Data applications, the metadata also can be big data. Delta Lake treats metadata just like data, leveraging Spark’s distributed processing power to handle all its metadata

Unified layer for batch and stream processing: Delta Lake store can act like a source and sink for both batch processing and stream processing. This will be an improvement to existing Lambda architecture which has separate pipelines for batch and stream processing.

Fully compatible with Spark framework: Delta Lake is fully compatible and easily integrated with Spark that is highly distributed and scalable big data framework.

The logical view is represented as below

Application architecture


Generally we use Scala for developing Spark applications. As there are a vast number of developers in Java community and Java still ruling the enterprise world, we can develop Spark applications using Java also as Spark provides complete compatibility for Java programs and utilizes JVM environment.

We will be seeing how we can leverage Delta Lake features in Java using Spark.

Java
 




xxxxxxxxxx
1
33


 
1
SparkSession spark = SparkSession.builder().appName("Spark Excel file conversion")
2
                .config("spark.master", "local").config("spark.sql.warehouse.dir", "file://./sparkapp").getOrCreate();
3

          
4
        // create Employee POJO and add objects to the list
5
        List<Employee> empList = new ArrayList<Employee>();
6
        Employee emp = new Employee();
7
        emp.setEmpId("1234");
8
        emp.setEmpName("kiran");
9
        emp.setDeptName("Design dept");
10
        empList.add(emp);
11

          
12
        emp = new Employee();
13
        emp.setEmpId("3567");
14
        emp.setEmpName("raju");
15
        emp.setDeptName("IT");
16
        empList.add(emp);
17

          
18
        // Encoders convert JVM object of type T to and from the internal SQL
19
        // representation
20
        Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
21
        Dataset<Row> empDF = spark.createDataset(empList, employeeEncoder).toDF();
22

          
23
        // the format should be delta to leverage delta lake features while
24
        // saving the data to table
25
        empDF.write().format("delta").save("/tmp/delta-table20");
26

          
27
        // the format should be delta to leverage delta lake features while
28
        // reading the data from the table
29
        empDF = spark.read().format("delta").load("/tmp/delta-table20");
30

          
31
        // show the data from the dataframe
32
        empDF.show();
33
    



The above Java program uses the Spark framework that reads employee data and saves the data in Delta Lake.

To leverage delta lake features, the spark read format and write format has to be changed to "delta" from "parquet" as mentioned in the above program.

In this example, we will create Employee POJO and initialize the POJO with some test data. In real time, this will be populated with files from Data Lake or external data sources.

The output of the program is as below

Initial output

In order to run this as Maven project, add the below dependencies in the pom.xml

XML
 




xxxxxxxxxx
1
15


 
1
         <dependency>
2
            <groupId>io.delta</groupId>
3
            <artifactId>delta-core_2.11</artifactId>
4
            <version>0.4.0</version>
5
        </dependency>
6
        <dependency>
7
            <groupId>org.apache.spark</groupId>
8
            <artifactId>spark-core_2.11</artifactId>
9
            <version>2.4.3</version>
10
        </dependency>
11
        <dependency>
12
            <groupId>org.apache.spark</groupId>
13
            <artifactId>spark-sql_2.11</artifactId>
14
            <version>2.4.3</version>
15
        </dependency>



Append the new data to the existing delta table with "append" option.

Java
 




xxxxxxxxxx
1
33


 
1
SparkSession spark = SparkSession.builder().appName("Spark Excel file conversion")
2
                .config("spark.master", "local").config("spark.sql.warehouse.dir", "file://./sparkapp").getOrCreate();
3

          
4
        // create Employee POJO and add objects to the list
5
        List<Employee> empList = new ArrayList<Employee>();
6
        Employee emp = new Employee();
7
        emp.setEmpId("1234");
8
        emp.setEmpName("kiran");
9
        emp.setDeptName("IT");
10
        empList.add(emp);
11
        
12
        emp = new Employee();
13
        emp.setEmpId("4862");
14
        emp.setEmpName("david");
15
        emp.setDeptName("Engineering");
16
        empList.add(emp);
17

          
18
        // Encoders convert JVM object of type T to and from the internal SQL
19
        // representation
20
        Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
21
        Dataset<Row> empDF = spark.createDataset(empList, employeeEncoder).toDF();
22

          
23
        // the format should be delta to leverage delta lake features while
24
        // saving the data to table
25
        //append the data to the table
26
        empDF.write().format("delta").mode("append").save("/tmp/delta-table20");
27

          
28
        // the format should be delta to leverage delta lake features while
29
        // reading the data from the table
30
        empDF = spark.read().format("delta").load("/tmp/delta-table20");
31

          
32
        // show the data from the dataframe
33
        empDF.show();



The output of the above program is shown below:

The new data is appended to the table. Now there are 2 versions of the data, where the first version is the original data and data with appended is the new version of data

In order to get the data related to the first version, the below program illustrates the usage. The option "versionAsOf" will get the corresponding version of the data from Delta Lake table

Java
 




xxxxxxxxxx
1
11


 
1
SparkSession spark = SparkSession.builder().appName("Spark Excel file conversion")
2
                .config("spark.master", "local").config("spark.sql.warehouse.dir", "file://./sparkapp").getOrCreate();
3

          
4
        // the format should be delta to leverage delta lake features while
5
        // reading the data from the table.
6
        // The property "versionAsOf" loads the corresponding version of data.
7
        // Version 0 is the first version of data
8
        Dataset<Row> empDF = spark.read().format("delta").option("versionAsOf", 0).load("/tmp/delta-table20");
9

          
10
        // show the data from the dataframe
11
        empDF.show();



The output corresponds to the first version of data:

Initial program output

We can also overwrite with delta option using the mode "overwrite". This will overwrite the old data and create the new data in the table but still the versions are maintained and can fetch the old data with the version number

Java
 




xxxxxxxxxx
1


 
1
empDF.write().format("delta").mode("overwrite").save("/tmp/delta-table20");



The Employee POJO used in the above examples is as given below

Java
 




xxxxxxxxxx
1
30


 
1
public class Employee {
2
    private String empName;
3
    private String deptName;
4
    private String empId;
5

          
6
    public String getEmpName() {
7
        return empName;
8
    }
9

          
10
    public void setEmpName(String empName) {
11
        this.empName = empName;
12
    }
13

          
14
    public String getDeptName() {
15
        return deptName;
16
    }
17

          
18
    public void setDeptName(String deptName) {
19
        this.deptName = deptName;
20
    }
21

          
22
    public String getEmpId() {
23
        return empId;
24
    }
25

          
26
    public void setEmpId(String empId) {
27
        this.empId = empId;
28
    }
29

          
30
}



Now, we change one of the field name in Employee POJO, say "deptName" field is changed to "deptId" field

The new POJO is as follows

Java
 




xxxxxxxxxx
1
28


 
1
public class Employee {
2
    private String empName;
3
    private String deptId;
4
    private String empId;
5
    public String getDeptId() {
6
        return deptId;
7
    }
8

          
9
    public void setDeptId(String deptId) {
10
        this.deptId = deptId;
11
    }
12

          
13
    public String getEmpName() {
14
        return empName;
15
    }
16

          
17
    public void setEmpName(String empName) {
18
        this.empName = empName;
19
    }
20

          
21
    public String getEmpId() {
22
        return empId;
23
    }
24

          
25
    public void setEmpId(String empId) {
26
        this.empId = empId;
27
    }
28
}



Now, we try to append data to the existing delta table with the modified schema

Java
 




xxxxxxxxxx
1
27


 
1
SparkSession spark = SparkSession.builder().appName("Spark Excel file conversion")
2
                .config("spark.master", "local").config("spark.sql.warehouse.dir", "file://./sparkapp").getOrCreate();
3

          
4
        // create Employee POJO and add objects to the list
5
        List<Employee> empList = new ArrayList<Employee>();
6
        Employee emp = new Employee();
7
        emp.setEmpId("6798");
8
        emp.setEmpName("kumar");
9
        emp.setDeptId("IT");
10
        empList.add(emp);
11

          
12
        // Encoders convert JVM object of type T to and from the internal SQL
13
        // representation
14
        Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
15
        Dataset<Row> empDF = spark.createDataset(empList, employeeEncoder).toDF();
16

          
17
        // the format should be delta to leverage delta lake features while
18
        // saving the data to table
19
        //append the data to the table
20
        empDF.write().format("delta").mode("append").save("/tmp/delta-table20");
21

          
22
        // the format should be delta to leverage delta lake features while
23
        // reading the data from the table
24
        empDF = spark.read().format("delta").load("/tmp/delta-table20");
25

          
26
        // show the data from the dataframe
27
        empDF.show();



When we try to run the above program, the schema enforcement feature will prevent writing the data to the delta table. Delta Lake validates the schema before appending new data to the existing delta lake table. If there is any mismatch in the schema in the new data, it will throw the error about the schema mismatch

The below error occurs when we try to run the above program

Error output

Now, we will add a new field in the existing old schema of the Employee table, which is "sectionName" and try to append data to the existing Delta Lake table.

Delta Lake will utilize a schema evolution feature that accommodates new or modified schema of the data that is being saved dynamically without any explicit DDL operations. This is achieved by using "mergeSchema" option while saving the data to Delta table as shown in the below program

Java
 




x
28


 
1
SparkSession spark = SparkSession.builder().appName("Spark Excel file conversion")
2
                .config("spark.master", "local").config("spark.sql.warehouse.dir", "file://./sparkapp").getOrCreate();
3

          
4
        // create Employee POJO and add objects to the list
5
        List<Employee> empList = new ArrayList<Employee>();
6
        Employee emp = new Employee();
7
        emp.setEmpId("6798");
8
        emp.setEmpName("kumar");
9
        emp.setDeptName("IT");
10
        emp.setSectionName("Big Data");
11
        empList.add(emp);
12

          
13
        // Encoders convert JVM object of type T to and from the internal SQL
14
        // representation
15
        Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
16
        Dataset<Row> empDF = spark.createDataset(empList, employeeEncoder).toDF();
17

          
18
        // the format should be delta to leverage delta lake features while
19
        // saving the data to table
20
        // append the data to the table
21
        empDF.write().format("delta").mode("append").option("mergeSchema", "true").save("/tmp/delta-table20");
22

          
23
        // the format should be delta to leverage delta lake features while
24
        // reading the data from the table
25
        empDF = spark.read().format("delta").load("/tmp/delta-table20");
26

          
27
        // show the data from the dataframe
28
        empDF.show();



The output of the above program is shown, as below:

Program output

As per the above output, "sectionName" field is merged with the existing schema of Delta lake table, and the null value is being updated for already existing records in the table in this new column.

In this way, we can leverage Delta Lake features in real-time applications that brings transactional, historical data maintenance, schema management capabilities to the existing Data Lakes.

DELTA (taxonomy) Big data Database Java (programming language)

Opinions expressed by DZone contributors are their own.

Related

  • DZone Community Awards 2022
  • Looking for the Best Java Data Computation Layer Tool
  • The Magic of Apache Spark in Java
  • Modern Cloud-Native Jakarta EE Frameworks: Tips, Challenges, and Trends.

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!