{{announcement.body}}
{{announcement.title}}

Guide to Partitions Calculation for Processing Data Files in Apache Spark

DZone 's Guide to

Guide to Partitions Calculation for Processing Data Files in Apache Spark

Spark chooses the number of partitions implicitly while reading a set of data files into an RDD or a Dataset. This implicit process of selecting the number of portions is described comprehensively in this story.

· Big Data Zone ·
Free Resource

The majority of Spark applications source input data for their execution pipeline from a set of data files (in various formats). To facilitate the reading of data from files, Spark has provided dedicated APIs in the context of both, raw RDDs and Datasets. These APIs abstract the reading process from data files to an input RDD or a Dataset with a definite number of partitions. Users can then perform various transformations/actions on these inputs RDDs/Datasets.

Each of the partitions in an input raw RDD or Dataset is mapped to one or more data files, the mapping is done either on a part of a file or the entire file. During the execution of a Spark Job with an input RDD/Dataset in its pipeline, each of the partition of the input RDD/Dataset is computed by reading the data as per the mapping of partition to the data file(s) The computed partition data is then fed to dependent RDDs/Dataset further into the execution pipeline.

The number of partitions in an input RDD/Dataset (mapped to the data file(s)) is decided based on multiple parameters to achieve optimum parallelism. These parameters carry a default value and can also be tweaked by the user. The number of partitions decided in the input RDD/Dataset could affect the efficiency of the entire execution pipeline of the Job. Therefore, it is important to know, how the number of partitions is decided based on certain parameters in case of an input RDD or a Dataset.

Number of partitions when Dataset APIs used for reading data files: Multiple APIs are provided for reading data files into a Dataset, each of these APIs is called on an instance of a SparkSession which forms a uniform entry point of a Spark application since version 2.0. Some of these APIs are shown below:

Java
 




xxxxxxxxxx
1
11


 
1
*File Format specific APIs*
2
Dataset<Row> = SparkSession.read.csv(String path or List of paths)
3
Dataset<Row> = SparkSession.read.json(String path or List of paths)
4
Dataset<Row> = SparkSession.read.text(String path or List of paths)
5
Dataset<Row> = SparkSession.read.parquet(String path or List of paths)
6
Dataset<Row> = SparkSession.read.orc(String path or List of paths)
7
 
          
8
*Generic API*
9
Dataset<Row> = SparkSession.read.format(String fileformat).load(String path or List of paths)
10
 
          
11
'path' in above APIs is either actual file path or directory path. Also, it could contain wildcard, such as '*'.



There are more variants of these APIs which includes the facility of specifying various options related to a specific file reading. The full list can be referred to here.

After looking at the APIs for reading data files, here are the config parameters list which affects the number of partitions in the Dataset representing the data in the data files:

Java
 




xxxxxxxxxx
1


 
1
(a)spark.default.parallelism (default: Total No. of CPU cores)
2
(b)spark.sql.files.maxPartitionBytes (default: 128 MB)
3
(c)spark.sql.files.openCostInBytes (default: 4 MB)



Using these config parameters values, a maximum split guideline called as maxSplitBytes is calculated as follows:

Java
 




xxxxxxxxxx
1


 
1
maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore)



where bytesPerCore is calculated as:

Java
 




xxxxxxxxxx
1


 
1
bytesPerCore = 
2
(Sum of sizes of all data files + No. of files * openCostInBytes) / default.parallelism



Now using ‘maxSplitBytes’, each of the data files (to be read) is split if the same is splittable. Therefore, if a file is splittable, with a size more than ‘maxSplitBytes’, then the file is split in multiple chunks of ‘maxSplitBytes’, the last chunk being less than or equal to ‘maxSplitBytes’. If the file is not splittable or the size is less than ‘maxSplitBytes’, there is only one file chunk of size equal file size.

After file chunks are calculated for all the data files, one or more file chunks are packed in a partition. The packing process starts with initializing an empty partition followed by iteration over file chunks, for each iterated file chunk:

  • If there is no current partition being packed, initialize a new partition to be packed and assign the iterated file chunk to it. The partition size becomes the sum of chunk size and the additional overhead of ‘openCostInBytes’.
  • If the addition of chunk size does not exceed the size of current partition (being packed) by more than ‘maxSplitBytes’, then the file chunk becomes the part of the current partition. The partition size is incremented by the sum of the chunk size and the additional overhead of ‘openCostInBytes’.
  • If the addition of chunk size exceeds the size of current partition being packed by more than ‘maxSplitBytes’, then the current partition is declared as complete and a new partition is initiated. The iterated file chunk becomes the part of the newer partition being initiated, and the newer partition size becomes the sum of chunk size and the additional overhead of ‘openCostInBytes’.

After the packing process is over, the number of partitions of the Dataset, for reading the corresponding data files, is obtained.

 

Illustration of the process of deriving the partitions for a set of data files, first the data files are split based on the computed value of maxSplitBytes, then the splits are packed into one or more partitions based on maxSplitBytes and opencostInBytes.


Although the process of arriving at the number of partitions seems to bit complicated, the basic idea is to first split the individual files at the boundary of maxSplitBytes if the file is splittable. After this, the split chunks of files or unsplittable files are packed into a partition such that during packing of chunks into a partition if the partition size exceeds maxSplitBytes, the partition is considered complete for packing and then a new partition is taken for packing. Thus, a certain number of partitions are finally derived out of the packing process.

For illustration, here are some examples of arriving at the number of partitions in the case of Dataset APIs:

(a) 54 parquet files, 65 MB each, all 3 config parameters at default, No. of core equal to 10: The number of partitions for this comes out to be 54. Each file has only one chunk here. It is obvious here that two files cannot be packed in one partition (as the size would exceed ‘maxSplitBytes’, 128 MB after adding the second file) in this example.


(b) 54 parquet files, 63 MB each, all 3 config parameters at default, No. of core equal to 10: The number of partitions comes out to be again 54. It seems that two files can be packed here, but since, there is an overhead of ‘openCostInBytes’ (4 MB) after packing the first file, therefore, after adding the second file, the limit of 128 MB gets crossed, hence, two files cannot be packed in one partition in this example.  

(c) 54 parquet files, 40 MB each, all 3 config parameters at default, No. of core equal to 10: The number of partitions comes out to be 18 this time. According to the packing process explained above, even after adding two files of 40 MB and overhead of 4 MB each, the total size comes out to be 88 MB, therefore the third file of 40 MB can also be packed since the size come out to be just 128 MB. Hence, the number of partitions comes out to be 18.


It should be noted that while evaluating the packing eligibility for the file chunk, overhead of openCost is not considered, overhead is considered only while incrementing the partition size after the file chunk is considered for packing in the partition.

(d) 54 parquet files, 40 MB each, maxPartitionBytes set to 88 MB, other two configs at default values., No. of core equal to 10: The number of partitions comes out to be 27 for this case instead of 18 as in (c). This is due to the change in the value of ‘maxPartitionBytes’. The 54 partitions can be easily reasoned based on file split and packing process as explained above.

(e) 54 parquet files, 40 MB each, spark.default.parallelism set to 400, the other two configs at default values, No. of core equal to 10: The number of partitions comes out to be 378 for this case. Again 378 partitions can be easily reasoned based on file split and packing process as explained above.



Number of partitions when RDD APIs used for reading data files:

Following APIs are provided for reading data files into an RDD, each of these APIs is called on the SparkContex of a SparkSession instance:

Java
 




xxxxxxxxxx
1


 
1
*SparkContext.newAPIHadoopFile(String path, Class<F> fClass, Class<K> kClass, Class<V> vClass, org.apache.hadoop.conf.Configuration conf)
2
*SparkContext.textFile(String path, int minPartitions)
3
*SparkContext.sequenceFile(String path, Class<K> keyClass, Class<V> valueClass)
4
*SparkContext.sequenceFile(String path, Class<K> keyClass, Class<V> valueClass, int minPartitions)
5
*SparkContext.objectFile(String path, int minPartitions, scala.reflect.ClassTag<T> evidence$4)



In some of these API, a parameter ‘minPartitions’ is asked while in others it is not. If it is not asked, the default value is taken as 2 or 1, 1 in the case when default.parallelism is 1. This ‘minPartitions’ is one of the factors in deciding the number of partitions in the RDD returned by these APIs. Other factors are the value of the following Hadoop config parameters:

Java
 




xxxxxxxxxx
1


 
1
minSize (mapred.min.split.size - default value 1 MB)
2
blockSize (dfs.blocksize - default 128 MB)



Based on the values of the three parameters, a split guideline, called split size, is calculated as:

Java
 




xxxxxxxxxx
1


 
1
splitSize = Math.max(minSize, Math.min(goalSize, blockSize));
2
where: 
3
goalSize = Sum of all files lengths to be read / minPartitions



Now using ‘splitSize’, each of the data files (to be read) is split if the same is splittable. Therefore, if a file is splittable with a size more than ‘splitSize’ then the file is split in multiple chunks of ‘splitSize’, the last chunk being less than or equal to ‘splitSize’. If the file is not splittable or having size less then ‘splitSize’, then there is only one file chunk of size equal to file length.

Each of the file chunks (having size greater than zero) is mapped to a single partition. Therefore, the number of partitions in the RDD, returned by RDD APIs on data files, is equal to the number of non-zero file chunks derived from slicing the data files using ‘splitSize’.

Illustration of the process of deriving the partitions for a set of data files, first the data files are split based on the computed value of splitSize, then each of the non zero splits is assigned to a single Partition.

For illustration, here are some examples of arriving at the number of partitions in the case of Dataset APIs:

(a) 31 parquet files, 330 MB each, blocksize at default 128 MB, minPartitions not specified, ‘mapred.min.split.size’ at default, No. of core equal to 10: The number of partitions for this comes out to be 93. The splitSize comes out of 128 MB only, so basically the number of partitions becomes equal to the number of blocks occupied by 31 files. Each file occupies 3 blocks, so total blocks and total partitions come out to be 93.


(b) 54 parquet files, 40 MB each, blocksize at default 128 MB, minPartitions not specified, ‘mapred.min.split.size’ at default, No. of core equal to 10: The number of partitions for this comes out to be 54. The splitSize comes out of 128 MB only, so basically the number of partitions becomes equal to the number of blocks occupies by 54 files. Each file occupies 1blocks, so total blocks and total partitions come out to be 54.


(c) 31 parquet files, 330 MB each, blocksize at default 128 MB, minPartitions specified as 1000, ‘mapred.min.split.size’ at default, No. of core equal to 10: The number of partitions for this comes out to be 1023. The splitSize comes out of 10.23 MB only, so the number of File splits per file is equal to 33, total file splits are 1023 and therefore the total number of partitions is also 1023.


(d) 31 parquet files, 330 MB each, blocksize at default 128 MB, minPartitions not specified, ‘mapred.min.split.size’ set at 256 MB, No. of core equal to 10: The number of partitions for this comes out to be 62. The splitSize comes out of 256 MB only, so the number of File splits per file is equal to 2, total file splits are 62, and therefore the total number of partitions is also 62.


As evident from the ‘splitSize’ calculation, if there is a desire to have Paritions sizes greater than blocksize, then ‘mapred.min.split.size’ needs to be set to a higher number greater than the blocksize. Also, if the desire is to have Partitions sizes less than blocksize, then ‘minPartitions’ should be set at a relatively higher value such that the goalsize (Sum of Files sizes/’minParitions’) computation comes to be lesser than the blocksize.

Summary

Until recently, the process of picking up a certain number of partitions against a set of data files, always looked mysterious to me. However, recently, during an optimization routine, I wanted to change the default number of partitions picked by Spark for processing a set of data files, and that is when I started to decode this process comprehensively along with proofs. Hopefully, the description of this decoded process would also help the readers to understand Spark a bit deeper and would enable them to design an efficient and optimized Spark routine.

Please remember, the optimum number of partitions is the key to an efficient and reliable Spark application. In case of feedback or queries on this post, do write in the comments section. I hope you find it useful. 

Topics:
apache spark, artificial intelligence, big data, data analysis, etl, machine learning

Published at DZone with permission of Ajay Gupta . See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}