Cutting Big Data Costs: Effective Data Processing With Apache Spark
In this article, we will delve into strategies to ensure that your data pipeline is resource-efficient, cost-effective, and time-efficient.
Join the DZone community and get the full member experience.Join For Free
In today's data-driven world, efficient data processing plays a pivotal role in the success of any project. Apache Spark, a robust open-source data processing framework, has emerged as a game-changer in this domain. In this article, we will delve into strategies to ensure that your data pipeline is resource-efficient, cost-effective, and time-efficient. This guide is valuable for both data beginners and professionals, serving as a checklist for optimization opportunities. While most of the advice here applies universally to Spark usage, we'll point out specific differences that may affect your application's behavior when using Scala or Python API or writing Spark SQL.
Optimizing Data Input
Make Use of Data Format
In most cases, the data being processed is stored in a columnar format. While this format may not be ideal when you only need to retrieve a few rows from a large partition, it truly excels in analytical use cases. This is particularly beneficial when you have to examine the majority of input rows but are solely interested in a specific subset of columns, which can often number in the hundreds or even thousands within a table. Spark takes full advantage of this storage property by exclusively reading the columns that are involved in subsequent computations.
To ensure efficiency, it's crucial to review your execution plan and verify that the earliest project statements are selecting only the columns you will require moving forward. This practice avoids the unnecessary burden of Spark reading hundreds of columns only to discard them later in the process.
Don’t Forget About Partitions
Now that we've optimized our column selection, let's delve into the efficient retrieval of the necessary rows. As previously mentioned, columnar storage isn't the most efficient choice when you only need to access a few rows out of a massive dataset, and this remains true. However, there are strategies to enhance this situation by cleverly organizing your data.
Imagine you're working with sales data collected in hourly intervals from multiple city supermarkets. In your data pipeline, you avoid mixing data from different shops and hours, which is a good practice. Early in the pipeline, you likely perform filtering, but you can take this a step further to prevent Spark from reading superfluous data entirely. By partitioning your data based on both the
supermarket_id, Spark can skip irrelevant data with a simple check against the Metastore.
However, exercise caution regarding the cardinality of your partitions. If possible, keep the cardinality of your partitioning column in the single or double digits; this makes it an excellent candidate for partitioning. However, if it could potentially reach tens of thousands, consider devising effective aggregation strategies. Returning to the earlier example, envision a supermarket chain like Walmart, where using
supermarket_id for partitioning might result in over 10,000 partitions. In such cases, opting for a different partitioning approach, such as using the state, may be more efficient. For instance, in the United States, you can reasonably assume that the cardinality won't exceed 50.
To ensure that filter pushdown operates as expected, review your execution plan. In the initial lines where the dataset is loaded, you'll find information like
PushedFilters: [*EqualTo(state, CA)]. This confirms that your filtering optimizations are being implemented as intended.
Efficient Data Processing
Shuffle operations are expensive, so aim to minimize their impact. Reduce the data size before shuffling, apply filtering operations, and consider using "
byKey" methods like
aggregateByKey to avoid shuffles whenever possible.
You probably won’t be able to get rid of all the shuffles altogether, so consider smart partitioning — it can save resources in future operations.
Continuing our discussion on shuffling, it's essential to consider the option of broadcasting one of your datasets if it's sufficiently small. Broadcasting it to all workers can eliminate the need for a costly shuffle when joining it with your larger "big data" portion. However, it's crucial to navigate the nuances between Spark SQL and the Spark Dataset API in this context.
In the Dataset API, using the broadcast method will attempt to broadcast your variable, but it will throw an exception if it doesn't fit in memory. On the other hand, in Spark SQL, the
/* BROADCAST(dataset) */ notation functions as a hint. If broadcasting fails to fit in memory, Spark will revert to a regular join. In situations where your supposedly small broadcast dataset unexpectedly grows in size, the outcomes differ between the two approaches.
With the Dataset API, you'll notice job failures in Spark, which, while disruptive, clearly highlight the issue. In contrast, in Spark SQL, jobs may become slower, offering greater stability but potentially obscuring the problem for some time. Therefore, understanding these distinctions is crucial when deciding on the broadcasting strategy for your specific use case.
Now, let's delve into the critical aspect of dataset sizes and address the issue of data skews. The most obvious way to identify them is by looking at job processing stats. If you see huge differences between median and max time or input size, or out of 1000 jobs 998 finish in minutes, but 2 leftovers are taking more than an hour — that’s a good indicator of a skew.
You can see a skew example below: The longest-running task takes a staggering 7 seconds and reads a massive 116 MB of inputs, whereas the median task processes in just 50 milliseconds and handles a mere 4 KB of data.
And here’s an example of a “fixed” pipeline - now the max is just 2 seconds and 10 Mb — still much bigger than a median, but nowhere near the scale it previously was on:
Data skews occur when one group being shuffled is disproportionately larger than the others. This can lead to issues such as Out-of-Memory (OOM) errors on a worker node if it cannot accommodate the entire chunk in memory. More commonly, it results in data spillover to disk, causing extended serialization times and excessive time spent in garbage collection (GC).
To address data skews, consider techniques like key salting. While we won't delve into the details here, key salting involves introducing a random component to problematic keys and redistributing the data based on the modified keys. This can effectively mitigate skew-related performance bottlenecks.
Note: Spark 3.2.0 has Adaptive Query Execution (AQE) enabled by default which should help deal with skewness, but if you see weird task stats - try going with manual salting.
It's important to understand that when you interact with the Spark API, creating new datasets and saving them to variables doesn't actually save any state; instead, you're storing a Directed Acyclic Graph (DAG) of computations required to obtain specific results. Consequently, when you reuse the same variable, Spark will redundantly recompute the same data multiple times.
To address this redundancy, it's advisable to cache your dataset when you observe this recomputation pattern. Spark provides various caching options, allowing you to store the dataset either in memory or serialize it to disk. While in-memory caching offers the fastest access, keep in mind that Spark monitors memory consumption and may evict datasets from memory if necessary, as they can be recomputed when needed later.
Therefore, it's important to strike a balance and avoid attempting to cache everything all the time, as doing so may not yield the expected performance improvements. Instead, judiciously apply caching to specific datasets that are reused frequently to optimize your Spark application's efficiency.
Data Schema and Processing
Once More About Formats
While it might initially seem enticing to adopt a "flexible" schema, such as storing complex objects as JSON strings, it's essential to consider the long-term implications, especially as your data matures and stabilizes. At this point, it becomes advisable to select the most suitable data format.
Opting for native arrays or maps consistently delivers the best performance. This choice offers several advantages, including substantial resource savings by eliminating the need for repetitive deserialization. Additionally, it avoids the overhead of instantiating costly Serialization/Deserialization (SerDe) libraries within User-Defined Functions (UDFs).
Furthermore, embracing native arrays or maps can provide added benefits in the future. As Spark continues to evolve and integrate with diverse data formats, you may unlock further optimizations. For instance, this could involve features like the pushdown of reading specific keys within a map, enhancing your application's efficiency and reducing processing overhead. By selecting the optimal data format, you not only improve your current performance but also position yourself for potential performance enhancements in the evolving Spark ecosystem.
I strongly recommend minimizing the use of UDFs whenever possible, particularly if the same functionality can be achieved using native Spark SQL functions. It's crucial to understand that native Spark functions, while somewhat limited in their capabilities, should be your primary choice, and the utilization of UDFs should be a last resort.
There are two key reasons for this recommendation: representation and potential optimizations. Spark manages in-memory data using its proprietary native format. When you call a UDF, each row of data must be transferred to the Java Virtual Machine (JVM) for execution of the function, and afterward, the result is serialized back into Spark's native format. As you can imagine, this process incurs significant computational costs and resource overhead.
Moreover, UDFs present a challenge for Spark's optimizers. These optimizers lack visibility into the inner workings of UDFs, rendering them opaque. Consequently, Spark's built-in optimization techniques cannot be applied to UDFs, limiting the potential for performance improvements. By prioritizing native Spark SQL functions and minimizing the reliance on UDFs, you not only reduce computational expenses but also leverage Spark's optimization capabilities to enhance the efficiency of your data processing workflows.
Choosing Execution Mode
Batch or Streaming?
As a final point, I'd like to recommend considering the most suitable execution mode for your needs: batch or streaming. This might initially seem like a significant shift, and indeed, it can be, but the Spark API is designed to abstract away many of the complexities associated with this choice. While Spark is inherently well-tuned for batch datasets (to the extent that even streaming is essentially micro-batching), it's typically the default choice.
However, it's worth exploring streaming if your data processing primarily involves small aggregations, where most of the processing can be accommodated within the memory space of just one row. Additionally, if the "side" datasets you need to join with are relatively small, streaming could be a viable option. The decision between batch and streaming should take into account user expectations regarding data output and the significance of data arrival time as a metric.
In essence, this choice revolves around aligning your data processing approach with the specific demands and preferences of your users and the importance of real-time data arrival in your application.
In this article, I’ve tried briefly summarizing different ways to optimize your Spark application, reducing costs and processing times. While this isn’t an exhaustive list of all possible issues and mitigations, it gives you a good starting point for where to look. The last word of advice before you start your optimization journey would be to not rely on gut feeling, but benchmark your application and look into stats and metrics to know where the actual issues lie.
Opinions expressed by DZone contributors are their own.