DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Building a High-Throughput Distributed Sequence Generator Using the Hi-Lo Algorithm
  • When Snowflake Lies to You: Understanding False Failures in dbt Pipelines
  • Master-Class: Understanding Database Replication (Single, Multi, and Leaderless)
  • Liquibase: Database Change Management and Automated Deployments

Trending

  • 5 AI Security Incidents That Broke Things in Production (and What They Have in Common)
  • Why Stable RAG Answers Can Still Hide Unstable Evidence
  • Alternative Structured Concurrency
  • Agentic AI Has an Observability Blind Spot Nobody Is Talking About
  1. DZone
  2. Data Engineering
  3. Databases
  4. Reading Spark Query Plans

Reading Spark Query Plans

This article is for the Spark programmer who has at least some fundamentals, but has not dived into how Spark works yet.

By 
Daniel Ciocirlan user avatar
Daniel Ciocirlan
·
Updated May. 04, 20 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
25.1K Views

Join the DZone community and get the full member experience.

Join For Free

This article is for the Spark programmer who has at least some fundamentals, e.g. how to create a DataFrame and how to do basic operations like selects and joins, but has not dived into how Spark works yet. Perhaps you're interested in boosting the performance out of your Spark jobs.

You can also find this article in video form on YouTube!

Prerequisites

The code I'll be writing is inside a Spark shell with version 3.0.0, which you can find here for download. The default choices in the dropdowns will give you a pre-compiled Spark distribution. Just download, unzip, navigate to the bin folder, then run the spark-shell executable.

That is, if you've never installed Spark before.

How Spark Doesn't Run

You're probably aware that Spark has this lazy execution model, i.e. that you write transformations but they're not actually run until you call an action, like a show, or collect, or take, etc.

When you write transformations, Spark will automatically build up a dependency tree of your DataFrames, which will actually end up executing when you call an action. Let me give an example:

Scala
 




x


 
1
 val simpleNumbers = spark.range(1, 1000000) 
2
 val times5 = simpleNumbers.selectExpr("id * 5 as id")  


That times5 DataFrame will not actually get evaluated. If you hit those two lines in the Spark shell, you'll notice that they return instantly. However, when you do

Scala
 




xxxxxxxxxx
1


 
1
times5.show()


Then this will take a little more than an instant — that's because Spark is only now performing the actual computations. If you want to see what operations Spark did, you can try explaining the DF:

Scala
 




x




1
times5.explain()
2

          


This will give you the prettiest thing:

Plain Text
 




xxxxxxxxxx
1


 
1
 == Physical Plan == 
2
   *(1) Project [(id#0L * 5) AS id#2L] 
3
   +-  *(1) Range(1, 1000000, step=1, splits=6) 


This is a query plan. When you call the explain method, the final plan — the physical one that will actually be run on executors — will be shown here. You can inspect this plan for your massive computations before kicking off any job. This is particularly important because the ability to read and interpret query plans allows you to predict performance bottlenecks.

 Reading Plans

Let's go over some examples of query plans and how to read them. Let's go back to the one we've just shown:

Java
 




xxxxxxxxxx
1


 
1
 == Physical Plan == 
2
*(1) Project [(id#0L * 5) AS id#2L]
3
+- *(1) Range (1, 1000000, step=1, splits=6)


We read this plan backwards, bottom to top:

  1. First Spark builds a Range object from 1 to 1000000. The splits parameter defines the number of partitions.
  2. Then Spark does a "project", which is the mathematical term for a database/DF select. So we're selecting the column id with the identifier 0L (long type), and multiplying the value there with 5. The result will be a new column with the name id and the identifier 2.

Or we can read this plan top to bottom, like this:

  1. The end of the computation is a "project" (with the meaning above), which depends on
  2. A Range from 1 to 1 million in 6 partitions.

Not too hard. Let's do another:

Scala
 




xxxxxxxxxx
1


 
1
val moreNumbers = spark.range(1, 10000000, 2)
2
val split7 = moreNumbers.repartition(7) split7.explain()


 

Plain Text
 




xxxxxxxxxx
1


 
1
== Physical Plan == 
2
Exchange RoundRobinPartitioning(7) 
3
+- *(1) Range (1, 10000000, step=2, splits=6) 



Same operation first, but the next step is an Exchange, which is another name for a shuffle. You're probably aware — a shuffle is an operation in which data is exchanged (hence the name) between all the executors in the cluster. The more massive your data and your cluster is, the more expensive this shuffle will be, because sending data over takes time. For performance reasons, it's best to keep shuffles to a minimum.

So a performance tip: whenever you see Exchange in a query plan, that's a perf bottleneck.

Let's do one more, this time make it complex:

Scala
 




xxxxxxxxxx
1


 
1
val ds1 = spark.range(1, 10000000) 
2
val ds2 = spark.range(1, 10000000, 2) 
3
val ds3 = ds1.repartition(7) 
4
val ds4 = ds2.repartition(9) 
5
val ds5 = ds3.selectExpr("id * 5 as id") 
6
val joined = ds5.join(ds4, "id") 
7
val sum = joined.selectExpr("sum(id)") sum.explain



Plain Text
 




x
15


 
1
= Physical Plan ==
2
*(7) HashAggregate(keys=[], functions=[sum(id#36L)])
3
+- Exchange SinglePartition
4
   +- *(6) HashAggregate(keys=[], functions=[partial_sum(id#36L)])
5
      +- *(6) Project [id#36L]
6
         +- *(6) SortMergeJoin [id#36L], [id#32L], Inner
7
            :- *(3) Sort [id#36L ASC NULLS FIRST], false, 0
8
            :  +- Exchange hashpartitioning(id#36L, 200)
9
            :     +- *(2) Project [(id#30L * 5) AS id#36L]
10
            :        +- Exchange RoundRobinPartitioning(7)
11
            :           +- *(1) Range (1, 10000000, step=1, splits=6)
12
            +- *(5) Sort [id#32L ASC NULLS FIRST], false, 0
13
               +- Exchange hashpartitioning(id#32L, 200)
14
                  +- Exchange RoundRobinPartitioning(9)
15
                     +- *(4) Range (1, 10000000, step=2, splits=6)



Now that's a nasty one. Let's take a look.

Notice this plan is now branched. It has two sections:

Section 1, bottom:

  • A Range 1 to 1 million, in steps of 2, in 6 partitions,
  • Which is repartitioned (shuffled) into 9 partitions,
  • Then repartitioned again by a hash partitioner with the key being the column id (identifier 32) into 200 partitions,
  • Then sorted by the column id (identifier 32) ascending.

Section 2, above:

  • A range 1 to 1 million, in steps of 1, in 6 partitions,
  • Repartitioned (shuffled) to 7 partitioned,
  • Modified as id * 5, with the name id (identifier 36)
  • Then repartitioned again by a hash partitioner with the key being the column id (idendifier 36) into 200 partitions
  • Then sorted by that

Then those two sections are used for the SortMergeJoin. The join operation requires that the DFs in question be partitioned with the same partitioning scheme and sorted. This is why Spark does all these operations.

Then we have a project (select), in which we're selecting just one of the two columns, because otherwise we'd have duplicate columns.

Then we have an operation called HashAggregate, in which partial sums are being computed on every partition.

Then, all the partial sums are brought to a single partition by the last exchange.

Finally we have a single HashAggregate operation in which all the partial sums are now combined for a single result.

Just the Beginning

This seems tedious, but in practice, the skill of reading and interpreting query plans is invaluable for performance analysis. You might notice that in the last example, we're doing quite a few shuffles. With time, you will learn to quickly identify which transformations in your code are going to cause a lot of shuffling and thus performance issues.

I hope this was useful! You can find articles like these at the Rock the JVM blog or on our YouTube channel. In the meantime, follow us on Twitter and LinkedIn for frequent updates on upcoming material!

Database

Published at DZone with permission of Daniel Ciocirlan. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Building a High-Throughput Distributed Sequence Generator Using the Hi-Lo Algorithm
  • When Snowflake Lies to You: Understanding False Failures in dbt Pipelines
  • Master-Class: Understanding Database Replication (Single, Multi, and Leaderless)
  • Liquibase: Database Change Management and Automated Deployments

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook