DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • An Overview of SQL Server Joins
  • How to Pivot and Join Time Series Data in Flux
  • Top 10 Web Developer Communities Developers Should Join in 2021
  • MongoDB to Couchbase: An Introduction to Developers and Experts

Trending

  • Memory-Optimized Tables: Implementation Strategies for SQL Server
  • Strategies for Securing E-Commerce Applications
  • Designing AI Multi-Agent Systems in Java
  • Web Crawling for RAG With Crawl4AI
  1. DZone
  2. Data Engineering
  3. Databases
  4. Joins with MapReduce

Joins with MapReduce

By 
Buddhika  Chamith user avatar
Buddhika Chamith
·
Mar. 12, 12 · Interview
Likes (0)
Comment
Save
Tweet
Share
30.7K Views

Join the DZone community and get the full member experience.

Join For Free

i have been reading up on join implementations available for hadoop for past few days. in this post i recap some techniques i learnt during the process. the joins can be done at both map side and join side according to the nature of data sets of to be joined.

reduce side join

let’s take the following tables containing employee and department data.

let’s see how join query below can be achieved using reduce side join.

select employees.name, employees.age, department.name  from employees inner join department on employees.dept_id=department.dept_id

map side is responsible for emitting the join predicate values along with the corresponding record from each table so that records having same department id in both tables will end up at on same reducer which would then do the joining of records having same department id. however it is also required to tag the each record to indicate from which table the record originated so that joining happens between records of two tables. following diagram illustrates the reduce side join process.

here is the pseudo code for map function for this scenario.

map (k table, v rec) {

   dept_id = rec.dept_id

   tagged_rec.tag = table

   tagged_rec.rec = rec

   emit(dept_id, tagged_rec)

}

at reduce side join happens within records having different tags.

reduce (k dept_id, list<tagged_rec> tagged_recs)  {

   for (tagged_rec : tagged_recs) {

      for (tagged_rec1 : taagged_recs) {

          if (tagged_rec.tag != tagged_rec1.tag) {

              joined_rec = join(tagged_rec, tagged_rec1)

          }
       emit (tagged_rec.rec.dept_id, joined_rec)

    }

}

map side join (replicated join)

using distributed cache on smaller table

for this implementation to work one relation has to fit in to memory. the smaller table is replicated to each node and loaded to the memory. the join happens at map side without reducer involvement which significantly speeds up the process since this avoids shuffling all data across the network even-though most of the records not matching are later dropped. smaller table can be populated to a hash-table so look-up by dept_id can be done. the pseudo code is outlined below.

map (k table, v rec) {

list recs = lookup(rec.dept_id) // get smaller table records having this dept_id

for (small_table_rec : recs) {

joined_rec = join (small_table_rec, rec)

}

emit (rec.dept_id, joined_rec)

}

using distributed cache on filtered table

if the smaller table doesn’t fit the memory it may be possible to prune the contents of it if  filtering expression has been specified in the query. consider following query.

select employees.name, employees.age, department.name  from employees inner join department on employees.dept_id=department.dept_id where department.name="eng"

here a smaller data set can be derived from department table by filtering out records having department names other than “eng”. now it may be possible to do replicated map side join with this smaller data set.

replicated semi-join

reduce side join with map side filtering

even of the filtered data of small table doesn’t fit in to the memory it may be possible to include just the dept_id s of filtered records in the replicated data set. then at map side this cache can be used to filter out records which would be sent over to reduce side thus reducing the amount of data moved between the mappers and reducers.

the map side logic would look as follows.

map (k table, v rec) {

   // check if this record needs to be sent to reducer
   boolean sendtoreducer = check_cache(rec.dept_id)
   if (sendtoreducer) {
      dept_id = rec.dept_id

      tagged_rec.tag = table

      tagged_rec.rec = rec

      emit(dept_id, tagged_rec)
   }
}

reducer side logic would be same as the reduce side join case.

using a bloom filter

a bloom filter is a construct which can be used to test the containment of a given element in a set. a smaller representation of filtered dept_ids can be derived if dept_id values can be augmented in to a bloom filter. then this bloom filter can be replicated to each node. at the map side for each record fetched from the smaller table the bloom filter can be used to check whether the dept_id in the record is present in the bloom filter and only if so to emit that particular record to reduce side. since a bloom filter is guaranteed not to provide false negatives the result would be accurate.

references

[1] hadoop in action

[2] hadoop : the definitive guide


Database Joins (concurrency library) MapReduce

Published at DZone with permission of Buddhika Chamith, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • An Overview of SQL Server Joins
  • How to Pivot and Join Time Series Data in Flux
  • Top 10 Web Developer Communities Developers Should Join in 2021
  • MongoDB to Couchbase: An Introduction to Developers and Experts

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!