Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Apache Ignite 1.7: Welcome Non-Collocated Distributed Joins!

DZone's Guide to

Apache Ignite 1.7: Welcome Non-Collocated Distributed Joins!

Check out a new tool in Ignite 1.7.0 — the ability to perform non-collocated distributed joins. At the cost of some performance, you are no longer tied to collocation.

· Database Zone
Free Resource

Read why times series is the fastest growing database category.

Apache Ignite 1.7.0 has been recently rolled out, and among the new changes, you can find a killer one that was awaited by many Apache Ignite users and customers for a long time — Non-Collocated Distributed Join support for SQL queries. So this post will be fully dedicated to this feature, and I'll try to shed some light on how the non-collocated distributed joins work and how they are different from the traditional (affinity collocation-based) joins available in Apache Ignite.

Affinity Collocation-Based Joins

Historically, Apache Ignite allowed executing SQL queries with joins across different tables, but it required collocating the data of the caches that are being joined in a query. In fact, in Ignite, collocation can be enabled easily by using the affinity key concept where the data of one business entity is stored on the same node where the other business entity resides.

For example, let's say you have two business entities — Organization and Person, and an Organization ID is used as an affinity key for Persons from that Organization. Then, Ignite will make sure to place all the Person data on the same node where their Organization data resides. This simple concept allows executing a whole range of imaginable SQL queries that are ANSI-99 compliant, including joins between multiple caches.

Basically, the execution flow of a SQL query that uses a join is absolutely the same as that of a query without the latter.

Let's have a look at the flow of one of the basic queries using Organizations and Persons business entities defined in the following way:

  • Organization (id, address) entity: where id is literally an Organization ID and its value will be used as a cache key at the time an Organization is put into the cache. The key that is used as a cache key is treated as a primary key at the Ignite SQL engine's layer. Keep this in mind until you get to the end of the blog post! 
  • Person(name, salary) entity: will be located in Persons cache, and as a cache key, we will use AffinityKey(id, orgId) where AffinityKey is a special kind of object in Ignite that allows us to define a Person's unique ID (the first parameter) as well as his affinity key (the second parameter). Here, Organization ID (orgId) has been chosen as Person's affinity key. This means that Persons will be located on the same node where their Organizations reside. 

After defining these business entities and preloading caches with data, we are free to execute an SQL query like the one below. Because the Persons are affinity collocated with their Organization, we're guaranteed to receive a complete result set.



SELECT * FROM Organization as org JOIN Person as p ON org.id = p.orgId


The execution flow of this query, depicted on Picture 1 below, will be the following:

  1. The query initiating node (mapper and reducer) sends the query to all the nodes where cached data resides (Phase Q).
  2. All the nodes that receive the query from the reducer will execute it locally, performing the join using local data only (Phase E(Q)).
  3. The nodes respond to the reducer with their portion of the result set (Phases R1, R2, and R3).
  4. The reducer will eventually reduce the result sets received from all the remote nodes and provide a final aggregated result to your code (Phase R)
Picture1. Collocated SQL QueryPicture 1. Collocated SQL Query


Non-collocated Distributed Joins

If the same query was executed on a non-affinity-collocated data, then you would get an incomplete and inconsistent result. The reason for that is that Apache Ignite’s versions earlier than 1.7.0 perform the query only on the local data (as described in step two of the flow above).

However, this is no longer true in Apache Ignite 1.7.0 and later versions that provide support for the non-collocated distributed joins. These joins no longer force you to collocate your data. 

Now we can use a Person's actual id as is, instead of the AffinityKey(id, orgId) as a cache key, and add orgId field to the content of the Person object itself in order to be able to perform joins between these two caches. Even after these modifications, we will still get a complete result regardless of the fact that Persons are no longer collocated with their Organization. This is because in the latest version of Ignite the execution flow of the same query (mentioned above) and now depicted on Picture 2 below will be the following : 

  1. The query initiating node (mapper & reducer) sends the query to all the nodes where cached data resides (Phase Q).
  2. All the nodes that receive the query from the reducer will execute it locally (Phase E(Q))  performing the join using both the local data and the potential data requested from the remote nodes (Phase D(Q)).
  3. The nodes respond to the reducer with their portion of the result set (Phases R1, R2 and R3).
  4. The reducer will eventually reduce the result sets received from all the remote nodes and provide a final aggregated result to your code (Phases R)
Picture 2. Non-collocated SQL QueryPicture 2. Non-collocated SQL Query


One important thing to note here is that due to the specificity of the query, a node will send broadcast requests into the cluster asking for the missing data in step two. However, even now, there is a way to help the optimizer and the SQL engine to switch from broadcast to unicast requests for certain join types, and for the exemplar query, the following modification may enable the unicast mode: ?


SELECT * FROM Organization as org JOIN Person as p ON org._key = p.orgId


With this query, if the SQL engine decides to execute the query against the Persons cache first joining with Organizations on the go, then the engine will send unicast requests to nodes that store Organizations with org._key(s), where _key is a special keyword that is used in Ignite SQL queries and it refers to object's cache key/primary key. Basically, it works since the engine can easily find out a node that stores an entry knowing its cache key/primary key. The same is true for affinity keys that are used to join some caches. 

Last word

Undoubtedly, Ignite's non-collocated distributed joins functionality makes it possible for applications to execute very complex analytics and operational queries in cases where it's not feasible to collocate all the data. However, I would advise not to overuse this approach in practice because the performance of these joins is worse than the performance of the affinity collocation-based joins due to the fact that there will be many more network round-trips and data movement between the nodes to fulfill a query. 

In reality, there is a very small chance that you will be able to collocate all your business entities in such a way that you can execute 100% of your SQL over the data cached locally. Usually, it’s possible to collocate data satisfying 95% of queries that will be executed in the fastest and most efficient manner, and use the non-collocated distributed joins for the residual 5% that may be not as efficient but this will eventually let you execute 100% of all your queries in Apache Ignite.

Learn how to get 20x more performance than Elastic by moving to a Time Series database.

Topics:
distributed caching ,sql join ,distributed computing ,sql ,distributed systems

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}