DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • What Developers Need to Know About Table Geo-Partitioning
  • What Developers Need to Know About Table Partition Maintenance
  • How To Implement and Design Twitter Search Backend Systems using Java Microservices?
  • Why Round-Robin Won't Save You: Load Balancing Challenges in Data Streaming Services With Heterogeneous Traffic

Trending

  • Why Your DLP Policies Fall Short the Moment AI Agents Enter the Picture
  • A Scalable Framework for Enterprise Salesforce Optimization: Turning Outcomes Into an Operating System
  • Microservices: Externalized Configuration
  • Dear Micromanager: Your Distrust Has a Job; It’s Just Not the One You’re Doing
  1. DZone
  2. Data Engineering
  3. Databases
  4. Four Ways to Filter a Large-Sized Spark Dataset Against a Data Collection

Four Ways to Filter a Large-Sized Spark Dataset Against a Data Collection

Filtering a Spark Dataset against a collection of data values is a commonly encountered use case for many data analytics scenarios. This article explains four different ways to achieve the same.

By 
Ajay Gupta user avatar
Ajay Gupta
·
Nov. 06, 20 · Opinion
Likes (2)
Comment
Save
Tweet
Share
9.3K Views

Join the DZone community and get the full member experience.

Join For Free

Let us assume there is a large-sized Dataset, ‘A’, having the following schema:

Java
 




x


 
1
root:
2
| — empId: Integer
3
| — sal: Integer
4
| — name: String
5
| — address: String
6
| — dept: Integer



The Dataset ‘A’ needs to be filtered against a set of employee IDs (empIds), ‘B’ (can be broadcasted to executors), to get a filtered Dataset ‘A`’. The filter operation can be represented as:

Java
 




xxxxxxxxxx
1


 
1
A` = A.filter(A.empId contains in 'B')



To achieve this most common filtering scenario, you can use four types of transformation in Spark, each one having its own pros and cons. Here is a description of the usage of all these four transformations to execute this particular filtering scenario along with detailed notes on the reliability and efficiency aspects of each of these.

Filter: Filter transformation (filtering Dataset records on a boolean condition expression or a boolean returning filter function), on a Dataset, can be used in the following ways:

Java
 




xxxxxxxxxx
1


 
1
1. Dataset<T> A` = A.filter(Column condition)
2
2. Dataset<T> A` = A.filter(FilterFunction<T> func)
3
3. Dataset<T> A` = A.filter(String conditionExpr)



For the filtering scenario, as described earlier, one can use the ‘Filter’ transformation on ‘A’ that takes a ‘FilterFunction’ as an input. The ‘FilterFunction’ is invoked on each of the records contained in the partitions of the corresponding Dataset and returns either ‘true’ or ‘false’. In our filtering scenario, the FilterFunction would be invoked on each of the record of the Dataset ‘A’ and check if the ‘empId’ of the record exists in the broadcasted set of empIds, ‘B’ ( ‘B’ being backed by a corresponding HashTable).

The use of Filter transformation as described above is quite simple, robust, and efficient irrespective of the size of Dataset ‘A’. This is because, the transformation is invoked record by record. Further, since, broadcasted set of empIds is backed by hashtable on the executor, filtering lookup in the filter function for each of the record remain efficient.

Map: Map transformation (applies a function on each of the records of a Dataset to return either a null, same or different record type), on a Dataset, is used in the following way:

Java
 




xxxxxxxxxx
1


 
1
Dataset<U> A` = A.map(MapFunction<T,U> func, Encoder<U> encoder)



For the filtering scenario, as described earlier, one can use the ‘Map’ transformation on ‘A’ that takes a ‘MapFunction’ as an input. In our filtering scenario, the ‘MapFunction’ would be invoked on each of the record of the Dataset ‘A’ and check if the ‘empId’ of the record exists in the broadcasted set of empIds, ‘B’ (backed by a corresponding HashTable). In case, the record exists, the same would be returned from the MapFunction. In case, the record does not exist, NULL would be returned. Also, the Encoder input for the MapFunction would be the same as for Dataset ‘A’.

Although, the semantics of the ‘MapFunction’ is similar to the ‘FilterFunction’, the use of ‘Map’ transformation as described above, for the filtering scenario, is not as simple and elegant as compared to direct ‘Filter’ transformation approach. One has to explicity provision additional Encoder input in the transformation. Also, after, ‘Map’ transformation is invoked, the output needs to be filtered for NULL values, therefore, making ‘Map’ approach to be less efficient than ‘Filter’ approach. However, the reliability of the approach is similar to the ‘Filter’ approach since it would run without problems irrespective of the size of ‘A’. This is because, the ‘Map’ transformation is also invoked record by record.

MapPartitions: Mappartitions transformation applies a function on each of the partition  (Know more about Spark partitioning in my recently published book, “Guide to Spark Partitioning: Spark Partitioning Explained in Depth” ) of a Dataset returning either a null or an iterator to a new collection of same or different record type), on a Dataset, is used in the following way:

Java
 




xxxxxxxxxx
1


 
1
Dataset<U> A` = A.map(MapPartitionsFunction<T,U> func, Encoder<U> encoder)



For the filtering scenario, as described earlier, one can also use the ‘MapPartitions’ transformation on ‘A’ that takes a ‘MapPartitionsFunction’ as an input. In our filtering scenario, the ‘MapPartitionsFunction’ would be invoked on each partition of the Dataset ‘A’, iterating on all the records of the partition, and checking for each of the records, if the ‘empId’ of the record exists in the broadcasted set of empIds, ‘B’ (backed by a corresponding HashTable). In case, the record exists, the same would be added to a returnable collection initialized in the ‘MapPartitionsFunction’. Finally, an iterator to the returnable collection is returned from the ‘MapPartitionsFunction’.

As compared to ‘Map’ and ‘Filter’ approach, ‘MapPartitions’ approach is generally more efficient because it is operating partition wise, and not the record wise. However, similar to ‘Map’, one has to explicity provision Encoder input in the transformation. Also, the ‘MapPartitions’ approach can become highly unreliable in case the size of certain partitions of Dataset ‘A’ exceeds the memory provisioned for executing each of partition computing task. This is because of the fact that larger partition can lead to a potential larger returnable collection leading to memory overruns.

Inner Join: Inner Join transformation applies to two input Datasets, A & B, in the following way:

Java
 




xxxxxxxxxx
1


 
1
Dataset<Row> A` = A.join(Dataset<?> B, Column joinExprs)



For the filtering scenario, as described earlier, one can also use the ‘Inner Join’ transformation on ‘A’ that joins Dataset representation of ‘B’ on the join condition (A.empId equals B.empId) and selects only the fields of ‘A’ from each of the joined record.

‘Inner Join’ approach returns Dataset of generic ‘Row’ objects, hence one needs to use a Encoder to convert it back into Dataset of A’s record type to match the exact filter semantics. However, similar to ‘Filter’ apporach, the ‘Inner Join’ approach is efficient and reliable. Efficiency comes from the fact that since ‘B’ is broadcastable, the most efficient ‘Boradcast Hash Join’ approach would be chosen by the Spark to execute the Join. Also, the reliablility comes from the fact that ‘Inner Join’ approach would be applicable to large data sets of ‘A’ as was the case with ‘Filter’ approach.

Considering all the approaches, I would pick the ‘Filter’ approach as the safest bet from a reliability and efficiency perspective. Also, please note, the ‘Filter’ approach would also allow me to perform an anti search with similar efficiency and robustness which ‘Inner Join’ won’t allow.

Also, you could refer to my original story here.

Database Filter (software) Data collection Data (computing) Partition (database)

Published at DZone with permission of Ajay Gupta. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • What Developers Need to Know About Table Geo-Partitioning
  • What Developers Need to Know About Table Partition Maintenance
  • How To Implement and Design Twitter Search Backend Systems using Java Microservices?
  • Why Round-Robin Won't Save You: Load Balancing Challenges in Data Streaming Services With Heterogeneous Traffic

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook