Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Apache Spark: Setting Default Values

DZone's Guide to

Apache Spark: Setting Default Values

Quick tip: Prefer using DataFrameNaFunctions to prevent code duplication when the same default values are set in several queries that use the same DataFrame.

· Big Data Zone
Free Resource

Need to build an application around your data? Learn more about dataflow programming for rapid development and greater creativity. 

Using a default value instead of 'null' is a common practice, and as a Spark's struct field can be nullable, it applies to DataFrames too. Let's say that we have a DataFrame of music tracks playing with the following schema:

root
 |-- id: string (nullable = false)
 |-- trackId: string (nullable = false)
 |-- trackLength: long (nullable = true)
 |-- albumId: string (nullable = true)
 |-- completionPercentage: double (nullable = true)
 |-- totalPlayedMillis: long (nullable = false)


And we want to find:

  • Plays that are less than 30 seconds

  • Plays that are more than 1 minute with a completion percentage greater or equal to 80%

For both of the queries, we need to set default values:

  • "no album" if  'album' is 'null'

  • 0 if  'completionPercentage' is 'null'.

These queries could be written as: 

Dataset<Row> dataFrame = session.createDataFrame(source, structType);
Dataset<Row> playsLessThan30seconds = getPlaysLessThan30seconds(dataFrame);
Dataset<Row> playGte1minuteWithCompletionGte80 = getPlayGte1minuteWithCompletionPercentageGte80(dataFrame);
...

private static Dataset<Row> getPlaysLessThan30seconds(Dataset<Row> dataFrame) {
  return dataFrame
      .select(
          col("id"),
          col("trackId"),
          coalesce(col("albumId"), lit("no album"))
              .as("albumId"),
          col("trackLength"),
          coalesce(col("completionPercentage"), lit(0.0D))
              .as("completionPercentage")
      )
      .where(col("totalPlayedMillis").lt(30000L));
}

private static Dataset<Row> getPlayGte1minuteWithCompletionPercentageGte80(Dataset<Row> dataFrame) {
  return dataFrame
      .select(
          col("id"),
          col("trackId"),
          coalesce(col("albumId"), lit("no album"))
              .as("albumId"),
          col("trackLength"),
          coalesce(col("completionPercentage"), lit(0.0D))
              .as("completionPercentage")
      )
      .where(col("totalPlayedMillis").gt(60000L)
             .and(col("completionPercentage").geq(0.8D)));
}


The result for a test dataset is:

+---------------------------------------------------------------------------------------------+
|                                      Plays less than 30 seconds                             |             |
+----------+----------------+--------------------------------+-----------+--------------------+
|id        |trackId         |albumId                         |trackLength|completionPercentage|
+----------+----------------+--------------------------------+-----------+--------------------+
|oZzaToTxqy|I Keep Coming   |0                               |360000     |0.06388888888888888 |
|bejFaTssDw|Una palabra     |no album                        |180000     |0.10555555555555556 |
|jtydOYuSkJ|A 1000 Times    |I Had A Dream That You Were Mine|240000     |0.0                 |
+----------+----------------+--------------------------------+-----------+--------------------+

+--------------------------------------------------------------------------+  
|          Plays gte 1 minute with completion percentage gte 80%           |       
+----------+---------------------+--------+-----------+--------------------+
|id        |trackId              |albumId |trackLength|completionPercentage|
+----------+---------------------+--------+-----------+--------------------+
|nJXKxjobwo|Wrench and Numbers   |Fargo   |300000     |0.8                 |
|svyULOUcKC|Dutch National Anthem|no album|300000     |0.8666666666666667  |
+----------+---------------------+--------+-----------+--------------------+ 


Unfortunately, C&P comes in to play, therefore, if at some point in time a default value for 'trackLength' is also required, you may end up changing both of these methods. Another disadvantage is that if another similar method, which requires the same default values, is added, code duplication is unavoidable.

A possible solution, which helps to reduce boilerplate, is DataFrameNaFunctions, which is intended to be used for handling missing data: replacing specific values, dropping 'null' and 'NaN', and setting default values:

Dataset<Row> dataFrame = session.createDataFrame(source, structType)
    .na()
    .fill(ImmutableMap.of(
        "albumId", "no album",
        "completionPercentage", 0.0D
    ));
Dataset<Row> playsLessThan30seconds = getPlaysLessThan30seconds(dataFrame);
Dataset<Row> playGte1minuteWithCompletionGte80 = getPlayGte1minuteWithCompletionPercentageGte80(dataFrame);
....

private Dataset<Row> getPlaysLessThan30seconds(Dataset<Row> dataFrame) {
  return dataFrame
    .select("id", "trackId", "albumId", "trackLength", "completionPercentage")
    .where(col("totalPlayedMillis").lt(30000L));
}

private static Dataset<Row> getPlayGte1minuteWithCompletionPercentageGte80(Dataset<Row> dataFrame) {
  return dataFrame
      .select("id","trackId", "albumId", "trackLength", "completionPercentage")
      .where(col("totalPlayedMillis").gt(60000L)
             .and(col("completionPercentage").geq(0.8D)));
}


The result is still the same:

+---------------------------------------------------------------------------------------------+
|                                      Plays less than 30 seconds                             |             |
+----------+----------------+--------------------------------+-----------+--------------------+
|id        |trackId         |albumId                         |trackLength|completionPercentage|
+----------+----------------+--------------------------------+-----------+--------------------+
|oZzaToTxqy|I Keep Coming   |0                               |360000     |0.06388888888888888 |
|bejFaTssDw|Una palabra     |no album                        |180000     |0.10555555555555556 |
|jtydOYuSkJ|A 1000 Times    |I Had A Dream That You Were Mine|240000     |0.0                 |
+----------+----------------+--------------------------------+-----------+--------------------+

+--------------------------------------------------------------------------+  
|          Plays gte 1 minute with completion percentage gte 80%           |       
+----------+---------------------+--------+-----------+--------------------+
|id        |trackId              |albumId |trackLength|completionPercentage|
+----------+---------------------+--------+-----------+--------------------+
|nJXKxjobwo|Wrench and Numbers   |Fargo   |300000     |0.8                 |
|svyULOUcKC|Dutch National Anthem|no album|300000     |0.8666666666666667  |
+----------+---------------------+--------+-----------+--------------------+  

Conclusion

Prefer using DataFrameNaFunctions in order to prevent code duplication when the same default values are set in several queries that use the same DataFrame.

Check out the Exaptive data application Studio. Technology agnostic. No glue code. Use what you know and rely on the community for what you don't. Try the community version.

Topics:
apache spark ,dataframes ,default parameters ,big data ,tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}