Four Ways to Filter a Large-Sized Spark Dataset Against a Data Collection
Filtering a Spark Dataset against a collection of data values is a commonly encountered use case for many data analytics scenarios. This article explains four different ways to achieve the same.
Join the DZone community and get the full member experience.
Join For FreeLet us assume there is a large-sized Dataset, ‘A’, having the following schema:
root:
| — empId: Integer
| — sal: Integer
| — name: String
| — address: String
| — dept: Integer
The Dataset ‘A’ needs to be filtered against a set of employee IDs (empIds), ‘B’ (can be broadcasted to executors), to get a filtered Dataset ‘A`’. The filter operation can be represented as:
xxxxxxxxxx
A` = A.filter(A.empId contains in 'B')
To achieve this most common filtering scenario, you can use four types of transformation in Spark, each one having its own pros and cons. Here is a description of the usage of all these four transformations to execute this particular filtering scenario along with detailed notes on the reliability and efficiency aspects of each of these.
Filter: Filter transformation (filtering Dataset records on a boolean condition expression or a boolean returning filter function), on a Dataset, can be used in the following ways:
xxxxxxxxxx
1. Dataset<T> A` = A.filter(Column condition)
2. Dataset<T> A` = A.filter(FilterFunction<T> func)
3. Dataset<T> A` = A.filter(String conditionExpr)
For the filtering scenario, as described earlier, one can use the ‘Filter’ transformation on ‘A’ that takes a ‘FilterFunction’ as an input. The ‘FilterFunction’ is invoked on each of the records contained in the partitions of the corresponding Dataset and returns either ‘true’ or ‘false’. In our filtering scenario, the FilterFunction would be invoked on each of the record of the Dataset ‘A’ and check if the ‘empId’ of the record exists in the broadcasted set of empIds, ‘B’ ( ‘B’ being backed by a corresponding HashTable).
The use of Filter transformation as described above is quite simple, robust, and efficient irrespective of the size of Dataset ‘A’. This is because, the transformation is invoked record by record. Further, since, broadcasted set of empIds is backed by hashtable on the executor, filtering lookup in the filter function for each of the record remain efficient.
Map: Map transformation (applies a function on each of the records of a Dataset to return either a null, same or different record type), on a Dataset, is used in the following way:
xxxxxxxxxx
Dataset<U> A` = A.map(MapFunction<T,U> func, Encoder<U> encoder)
For the filtering scenario, as described earlier, one can use the ‘Map’ transformation on ‘A’ that takes a ‘MapFunction’ as an input. In our filtering scenario, the ‘MapFunction’ would be invoked on each of the record of the Dataset ‘A’ and check if the ‘empId’ of the record exists in the broadcasted set of empIds, ‘B’ (backed by a corresponding HashTable). In case, the record exists, the same would be returned from the MapFunction. In case, the record does not exist, NULL would be returned. Also, the Encoder input for the MapFunction would be the same as for Dataset ‘A’.
Although, the semantics of the ‘MapFunction’ is similar to the ‘FilterFunction’, the use of ‘Map’ transformation as described above, for the filtering scenario, is not as simple and elegant as compared to direct ‘Filter’ transformation approach. One has to explicity provision additional Encoder input in the transformation. Also, after, ‘Map’ transformation is invoked, the output needs to be filtered for NULL values, therefore, making ‘Map’ approach to be less efficient than ‘Filter’ approach. However, the reliability of the approach is similar to the ‘Filter’ approach since it would run without problems irrespective of the size of ‘A’. This is because, the ‘Map’ transformation is also invoked record by record.
MapPartitions: Mappartitions transformation applies a function on each of the partition (Know more about Spark partitioning in my recently published book, “Guide to Spark Partitioning: Spark Partitioning Explained in Depth” ) of a Dataset returning either a null or an iterator to a new collection of same or different record type), on a Dataset, is used in the following way:
xxxxxxxxxx
Dataset<U> A` = A.map(MapPartitionsFunction<T,U> func, Encoder<U> encoder)
For the filtering scenario, as described earlier, one can also use the ‘MapPartitions’ transformation on ‘A’ that takes a ‘MapPartitionsFunction’ as an input. In our filtering scenario, the ‘MapPartitionsFunction’ would be invoked on each partition of the Dataset ‘A’, iterating on all the records of the partition, and checking for each of the records, if the ‘empId’ of the record exists in the broadcasted set of empIds, ‘B’ (backed by a corresponding HashTable). In case, the record exists, the same would be added to a returnable collection initialized in the ‘MapPartitionsFunction’. Finally, an iterator to the returnable collection is returned from the ‘MapPartitionsFunction’.
As compared to ‘Map’ and ‘Filter’ approach, ‘MapPartitions’ approach is generally more efficient because it is operating partition wise, and not the record wise. However, similar to ‘Map’, one has to explicity provision Encoder input in the transformation. Also, the ‘MapPartitions’ approach can become highly unreliable in case the size of certain partitions of Dataset ‘A’ exceeds the memory provisioned for executing each of partition computing task. This is because of the fact that larger partition can lead to a potential larger returnable collection leading to memory overruns.
Inner Join: Inner Join transformation applies to two input Datasets, A & B, in the following way:
xxxxxxxxxx
Dataset<Row> A` = A.join(Dataset<?> B, Column joinExprs)
For the filtering scenario, as described earlier, one can also use the ‘Inner Join’ transformation on ‘A’ that joins Dataset representation of ‘B’ on the join condition (A.empId equals B.empId) and selects only the fields of ‘A’ from each of the joined record.
‘Inner Join’ approach returns Dataset of generic ‘Row’ objects, hence one needs to use a Encoder to convert it back into Dataset of A’s record type to match the exact filter semantics. However, similar to ‘Filter’ apporach, the ‘Inner Join’ approach is efficient and reliable. Efficiency comes from the fact that since ‘B’ is broadcastable, the most efficient ‘Boradcast Hash Join’ approach would be chosen by the Spark to execute the Join. Also, the reliablility comes from the fact that ‘Inner Join’ approach would be applicable to large data sets of ‘A’ as was the case with ‘Filter’ approach.
Considering all the approaches, I would pick the ‘Filter’ approach as the safest bet from a reliability and efficiency perspective. Also, please note, the ‘Filter’ approach would also allow me to perform an anti search with similar efficiency and robustness which ‘Inner Join’ won’t allow.
Also, you could refer to my original story here.
Published at DZone with permission of Ajay Gupta. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments