Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

What Are Spark Checkpoints on Data Frames?

DZone's Guide to

What Are Spark Checkpoints on Data Frames?

Checkpoints freeze the content of your data frames before you do something else. They're essential to keeping track of your data frames.

Free Resource

Compliments of Zaloni: Download free eBook "Architecting Data Lakes" to learn the key to building and managing a big data lake, brought to you in partnership with Zaloni.

Let’s understand what can checkpoints do for your Spark data frames and go through a Java example on how we can use them.

Checkpoint on Data Frames

In v2.1.0, Apache Spark introduced checkpoints on data frames and datasets. I will continue to use the term "data frame" for a Dataset<Row>. The Javadoc describes it as:

Returns a checkpointed version of this dataset. Checkpointing can be used to truncate the logical plan of this dataset, which is especially useful in iterative algorithms where the plan may grow exponentially. It will be saved to files inside the checkpoint directory set with  SparkContext#setCheckpointDir.

However, I think it requires a little more explanation.

Why Would I Use a Checkpoint?

Basically, I use a checkpoint if I want to freeze the content of my data frame before I do something else. It can be in the scenario of iterative algorithms (as mentioned in the Javadoc) but also in recursive algorithms or simply branching out a data frame to run different kinds of analytics on both.

Spark has been offering checkpoints on streaming since earlier versions (at least v1.2.0), but checkpoints on data frames are a different beast.

Types of Checkpoints

You can create two kinds of checkpoints.

Eager Checkpoint

An eager checkpoint will cut the lineage from previous data frames and will allow you to start “fresh” from this point on. In clear, Spark will dump your data frame in a file specified by setCheckpointDir() and will start a fresh new data frame from it. You will also need to wait for completion of the operation.

Non-Eager Checkpoint

On the other hand, a non-eager checkpoint will keep the lineage from previous operations in the data frame.

Implementing the Code

Now that we understand what a checkpoint is and how it works, let’s see how we implement that in Java. The code is part of my Apache Spark Java Cookbook on GitHub.

public class DataframeCheckpoint {
    public static void main(String[] args) {
        DataframeCheckpoint app = new DataframeCheckpoint();
        app.start();
    }

    private void start() {
        SparkConf conf = new SparkConf().setAppName("Checkpoint").setMaster("local[*]");
        SparkContext sparkContext = new SparkContext(conf);
        // We need to specify where Spark will save the checkpoint file. It can be an HDFS location.
        sparkContext.setCheckpointDir("/tmp");
        SparkSession spark = SparkSession.builder().appName("Checkpoint").master("local[*]").getOrCreate();

        String filename = "data/tuple-data-file.csv";
        Dataset<Row> df1 = spark.read().format("csv").option("inferSchema", "true").option("header", "false")
                .load(filename);
        System.out.println("DF #1 - step #1: simple dump of the dataframe");
        df1.show();

        System.out.println("DF #2 - step #2: same as DF #1 - step #1");
        Dataset<Row> df2 = df1.checkpoint(false);
        df2.show();

        df1 = df1.withColumn("x", df1.col("_c0"));
        System.out.println("DF #1 - step #2: new column x, which is the same as _c0");
        df1.show();

        System.out.println("DF #2 - step #2: no operation was done on df2");
        df2.show();
    }
}

The execution will be, without much surprise:

DF #1 - step #1: simple dump of the dataframe
+---+---+
|_c0|_c1|
+---+---+
|  1|  5|
|  2| 13|
|  3| 27|
|  4| 39|
|  5| 41|
|  6| 55|
+---+---+

DF #2 - step #2: same as DF #1 - step #1
+---+---+
|_c0|_c1|
+---+---+
|  1|  5|
|  2| 13|
|  3| 27|
|  4| 39|
|  5| 41|
|  6| 55|
+---+---+

DF #1 - step #2: new column x, which is the same as _c0
+---+---+---+
|_c0|_c1|  x|
+---+---+---+
|  1|  5|  1|
|  2| 13|  2|
|  3| 27|  3|
|  4| 39|  4|
|  5| 41|  5|
|  6| 55|  6|
+---+---+---+

DF #2 - step #2: no operation was done on df2
+---+---+
|_c0|_c1|
+---+---+
|  1|  5|
|  2| 13|
|  3| 27|
|  4| 39|
|  5| 41|
|  6| 55|
+---+---+

Although this example is really basic, it explains how to use checkpoint on a data frame and see the evolution after the data frame. Hopefully, this will be useful to you, too.

A comment is always appreciated! By the way, thanks to Burak Yavuz at Databricks for his additional explanations.

Zaloni, the data lake company, provides data lake management and governance software and services. Learn more about Bedrock and Mica

Topics:
java ,big data ,dataframe ,spark ,spark checkpoints

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}