Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

What Are Spark Checkpoints on Data Frames?

DZone's Guide to

What Are Spark Checkpoints on Data Frames?

Checkpoints freeze the content of your data frames before you do something else. They're essential to keeping track of your data frames.

Free Resource

Need to build an application around your data? Learn more about dataflow programming for rapid development and greater creativity. 

Let’s understand what can checkpoints do for your Spark data frames and go through a Java example on how we can use them.

Checkpoint on Data Frames

In v2.1.0, Apache Spark introduced checkpoints on data frames and datasets. I will continue to use the term "data frame" for a Dataset<Row>. The Javadoc describes it as:

Returns a checkpointed version of this dataset. Checkpointing can be used to truncate the logical plan of this dataset, which is especially useful in iterative algorithms where the plan may grow exponentially. It will be saved to files inside the checkpoint directory set with  SparkContext#setCheckpointDir.

However, I think it requires a little more explanation.

Why Would I Use a Checkpoint?

Basically, I use a checkpoint if I want to freeze the content of my data frame before I do something else. It can be in the scenario of iterative algorithms (as mentioned in the Javadoc) but also in recursive algorithms or simply branching out a data frame to run different kinds of analytics on both.

Spark has been offering checkpoints on streaming since earlier versions (at least v1.2.0), but checkpoints on data frames are a different beast.

Types of Checkpoints

You can create two kinds of checkpoints.

Eager Checkpoint

An eager checkpoint will cut the lineage from previous data frames and will allow you to start “fresh” from this point on. In clear, Spark will dump your data frame in a file specified by setCheckpointDir() and will start a fresh new data frame from it. You will also need to wait for completion of the operation.

Non-Eager Checkpoint

On the other hand, a non-eager checkpoint will keep the lineage from previous operations in the data frame.

Implementing the Code

Now that we understand what a checkpoint is and how it works, let’s see how we implement that in Java. The code is part of my Apache Spark Java Cookbook on GitHub.

public class DataframeCheckpoint {
    public static void main(String[] args) {
        DataframeCheckpoint app = new DataframeCheckpoint();
        app.start();
    }

    private void start() {
        SparkConf conf = new SparkConf().setAppName("Checkpoint").setMaster("local[*]");
        SparkContext sparkContext = new SparkContext(conf);
        // We need to specify where Spark will save the checkpoint file. It can be an HDFS location.
        sparkContext.setCheckpointDir("/tmp");
        SparkSession spark = SparkSession.builder().appName("Checkpoint").master("local[*]").getOrCreate();

        String filename = "data/tuple-data-file.csv";
        Dataset<Row> df1 = spark.read().format("csv").option("inferSchema", "true").option("header", "false")
                .load(filename);
        System.out.println("DF #1 - step #1: simple dump of the dataframe");
        df1.show();

        System.out.println("DF #2 - step #2: same as DF #1 - step #1");
        Dataset<Row> df2 = df1.checkpoint(false);
        df2.show();

        df1 = df1.withColumn("x", df1.col("_c0"));
        System.out.println("DF #1 - step #2: new column x, which is the same as _c0");
        df1.show();

        System.out.println("DF #2 - step #2: no operation was done on df2");
        df2.show();
    }
}

The execution will be, without much surprise:

DF #1 - step #1: simple dump of the dataframe
+---+---+
|_c0|_c1|
+---+---+
|  1|  5|
|  2| 13|
|  3| 27|
|  4| 39|
|  5| 41|
|  6| 55|
+---+---+

DF #2 - step #2: same as DF #1 - step #1
+---+---+
|_c0|_c1|
+---+---+
|  1|  5|
|  2| 13|
|  3| 27|
|  4| 39|
|  5| 41|
|  6| 55|
+---+---+

DF #1 - step #2: new column x, which is the same as _c0
+---+---+---+
|_c0|_c1|  x|
+---+---+---+
|  1|  5|  1|
|  2| 13|  2|
|  3| 27|  3|
|  4| 39|  4|
|  5| 41|  5|
|  6| 55|  6|
+---+---+---+

DF #2 - step #2: no operation was done on df2
+---+---+
|_c0|_c1|
+---+---+
|  1|  5|
|  2| 13|
|  3| 27|
|  4| 39|
|  5| 41|
|  6| 55|
+---+---+

Although this example is really basic, it explains how to use checkpoint on a data frame and see the evolution after the data frame. Hopefully, this will be useful to you, too.

A comment is always appreciated! By the way, thanks to Burak Yavuz at Databricks for his additional explanations.

Check out the Exaptive data application Studio. Technology agnostic. No glue code. Use what you know and rely on the community for what you don't. Try the community version.

Topics:
java ,big data ,dataframe ,spark ,spark checkpoints

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}