Projections, Joins, and Partition Awareness
Projections, Joins, and Partition Awareness
Using PartitionAware on a key overrides Hazelcast's data placement rules. This can result in great performance boost or can go badly wrong.
Join the DZone community and get the full member experience.Join For Free
How to Simplify Apache Kafka. Get eBook.
This example shows some of the newer features of querying (a way that joins can be achieved) and shows the pros and cons of partition-aware routing.
Although mainly a Hazelcast IMDG example, Hazelcast Jet has a guest appearance to implement the join.
The most commonly used data structure in Hazelcast is
IMap can mostly be used interchangeably as an
java.util.Map as a
Behind the scenes, there are differences. The most important one for this example is that the
IMap is partitioned. What we mean by this is the
IMap is divided into sections called partitions, and these partitions are spread across the available server JVMs.
In this diagram, there is an
person for storing details about people, divided into 26 parts, and spread across four JVMs.
This partitioning gives the benefit of scaling.
No one JVM holds all the
IMap content, so more data can be injected into the
person map than any one JVM can cope with — a significant advantage over
Furthermore, if servers are added to the cluster, Hazelcast will automatically re-arrange the partitions across the existing and added servers to rebalance the load. So, capacity can be increased or reduced while the cluster runs.
Spreading the data in such a way isn't problematic for CRUD operations. Typically, the caller maintains a connection to every server, so if a data record is needed from the first server, it's a single hop across the network. And if the next data record is needed from the second server, then there's a connection open to that, too, and again, it's a single network hop to get it.
Spreading the data makes querying faster. If we need to search through the data records and there are five JVMs, the search can run in parallel across all five servers at once and complete in one-fifth of the time.
Often, processing needs access to several data records at once, and this is hampered if the data records are on different JVMs. Although the network transfer may be fast, it still contributes to run time. Using
PartitionAware can help.
The default algorithm for determining which partition stores a particular data record is based on the whole key of the data record. Simplistically, the entry with key 1 might go to partition 1, key 2 to partition 2, key 3 to partition 3, etc.
What this means in practice is that processing that runs on a server JVM and needs access to entries 1 and 3 will get inconsistent performance. If the partitions containing those entries are hosted by the server JVM on which processing runs, there will be no network transfer of data. If the partitions then get re-arranged to different server JVMs because more server JVMs are added, that same processing will have to retrieve the data across the network, which will give different performance.
The solution here is to use
com.hazelcast.core.PartitionAware to override the default algorithm for data placement.
A key that implements
PartitionAware has to provide a method
getPartitionKey() that provides the routing to Hazelcast. For this example, we might return odd for keys 1 and 3, and even for key 2.
This would ensure that keys 1 and 3 are placed in the same partition. No matter where that partition was hosted, any processing accessing keys 1 and 3 will always find them in the same JVM and soget consistent performance without networking interference.
This doesn't affect equality. Keys 1 and 3 are different, separate map entries. They just have the same basis for routing — odd instead of the whole key — so have a common destination.
Note also that you aren't specifying which partition they go in, nor where that partition is hosted. It's just an affinity mechanism.
The default algorithm for calculating the partition where an entry will reside is to take the hash of the key in binary form, and modulus this by the number of partitions selected.
If that sounds like too much hard work, just ask Hazelcast for the answer:
System.out.println(this.hazelcastInstance.getPartitionService().getPartition("1")); System.out.println(this.hazelcastInstance.getPartitionService().getPartition("2")); System.out.println(this.hazelcastInstance.getPartitionService().getPartition("3"));
Partition  Partition  Partition 
When using the default setup of 271 partitions.
The use of odd and even is a deliberately bad example to illustrate the potential pitfalls of overriding the partitioning routing with your own scheme.
If all keys result in odd or even, then only two partitions will ever contain any data. This results in a significant imbalance in the data.
Adding more servers to increase capacity will be ineffectual. Adding more partitions will be ineffectual.
Normal Hazelcast queries return whole objects: keys, values, or both (entries).
As an optimization, projections are available. These allow you to select the fields you want from the objects being queried for the result set.
If the result set is smaller because it doesn't contain unwanted fields, so it is quicker to transmit.
Querying on Keys
IMap is a key-value store. Queries are normally expressed as predicates to match against the value.
Alternatively, the predicate can be coded to match against the key, as well.
Equality checking would be coded as
__key = 1. This is pointless. If you have the full key, then
IMap.get(key) will always be faster, as it doesn't have to search.
Key search is useful in comparison searches (
__key > 1) and wildcard searches (
__key LIKE "John%").
Joins are more problematic.
Remembering that the main use case for
IMap is for scaling, where the data is too voluminous to fit in one JVM and is split into parts to spread the load of hosting.
It is almost axiomatic that if the data has been split up because it is big, it cannot be joined because it is big.
This is true in general, so joins are not supported by Hazelcast IMDG. But it is possible in certain specific cases, as we'll see in the example using Hazelcast Jet.
The example itself uses two main
IMaps and derives a third.
IMap is called
person and contains information about people. The key is a compound of first name and last name. The value is the person's date of birth.
IMap is called "
deaths and records the date of death for some of the people. The date of death is held in a separate record from the date of birth; partly to make the example more challenging and also because not everyone will have died. So, this is a de-normalized data model.
IMap is called
life. It holds the age of people at their death, and is derived from the previous two maps, much like a materialized view in a relational database.
Two parts of the example are worth a closer look.
PersonKey class defines the key for the
person map. It is compound of first name and last name.
This is defined as
PartitionAware and defines a routing function
getPartitionKey(), which returns the first letter of the last name.
So, for te two people John Doe and Jane Doe, the partition choice will be based on "D" and both will go to the same partition.
This is (deliberately) a bad algorithm to chose.
To start, there are only 26 letters in the western alphabet that will be used for last names. So with the default 271 partitions, 26 could be used and 245 will always be empty.
And furthermore, not all names occur with the same frequency. So, the partition holding the people whose last name begins with "M" will likely have a different count than the partition holding those whose last name begins with "P".
Would be a considerably improvement for balancing the data.
life map, the data entry is
LifeValue, which holds the date of birth and the date of death.
LifeAgeValueExtractor does is make a virtual third field (
age) at runtime by subtracting birth from death.
This new field
age is then usable in queries just like any other.
Running the Example
The example uses Spring Boot, so you need to run
mvn install to create an executable JAR file.
Then, run this command to start a Hazelcast instance:
java -jar target/project-key-0.1-SNAPSHOT.jar
Ideally, run this twice or more at once. The later steps of the example show data distribution across members, so one won't be enough.
Step 1: Load Test Data
From any of the running instances, enter
load as a command on the command line.
TestDataLoader.load() will load six records into the
person map and four into the
list command to display this data.
Step 2: Key Query #1
Run the command
howard to query for all people who have the last name "Howard".
The code is in
MyCommands.howard() and the important line is this:
new SqlPredicate("__key.lastName = 'Howard'")
This returns all the values in the
person map that match the above predicate.
As the values only contain the date of birth, this isn't wonderfully helpful. The output shows the date of birth but not the name.
Step 3: Key Query #2
Now, run the command
This is a refinement to the previous, adding this projection:
So, the query is looking for matches based on last name but returns the first name and the data of birth.
This is more efficient for network transfer than returning the full entry. Less data transfers across the network than returns the entry to the caller only to have some fields not displayed (e.g. last name is not printed).
Step 4: Data Location
location command. For each record in the
person map, the partition and hosting JVM is printed.
Assuming you are running two or more JVMs at this point, you should see some records are on different JVMs.
Because of the
PartitionAware coding, you should see all the records for last name "Howard" are in the same partition.
If you can, add or remove some server JVMs for the cluster and rerun this command after the automatic rebalance. Partitions may have moved, and if "Howard" records have moved, they are still all kept together.
Step 5: Join
join command runs a Jet pipeline to pull data from two
IMaps and output the result to a third
In Hazelcast IMDG, joins are not supported. Queries run on a single map at a time, and if data is so big it that needs to be distributed onto multiple nodes, then, in general, it is so big that combining two or more maps won't fit in memory.
This is where Jet lends a hand. An eight-step Jet pipeline is defined.
- Steps 1 and 2 read from the
personand reduce it down to just the fields needed,
- Steps 3 and 4 do the same for the
deaths, resulting in pairs of
- Step 5 has two input streams:
("firstName", "dateOfDeath), so the join is easy and produces
(("firstName", "dateOfBirth"), "dateOfDeath").
- Step 6 filters out results where the
dateOfDeathis null. Step 5 tries to enrich the Step 2 stream with the Step 4 stream but there may not always be a match.
- Step 7 and 8 convert the output from Step 6 into a map entry and store it in the
There are six rows of test data in the
person map and four in
So, this gives the potential for 24 combinations, which isn't going to overflow memory. Actually, there are only four matches, so it's even less.
However, the number of matches fits in memory by luck, not design. If more rows of test data are added, at some point, the number of results could be too great.
This join happens to work, but it is not guaranteed that it won't result in an
OutOfMemoryError in the future if more data is added.
Step 6: List
As the join is coded to send output to an
IMap, the result isn't immediately visible.
Run the command
list to output the content of the three maps. Now that the join has run, the
life map has been populated.
Step 7: Oldest
The last step is to find out which of the four entries in the
life map lived for the longest. Run the command
The code first does:
int max = lifeMap.aggregate(Aggregators.integerMax("age"));
To find the oldest age. Remember that the
age field is virtual; it's not actually a field in the
LifeValue object but is derived.
Set<String> keySet = lifeMap.keySet(new SqlPredicate("age = " + max));
Is run to find all the people that lived for that long.
__key prefix allows you to use a key or part of a key in search predicate.
PartitionAware on a key overrides Hazelcast's data placement rules. This can result in great performance boost by utilizing domain specific knowledge, or can go badly wrong.
Projections reduce the size of the result set to the parts you need.
Aggregations do server-side calculations, further reducing the result set that gets sent to the caller.
Joins are not supported, but can be built for specific situations using Hazelcast Jet.
Published at DZone with permission of Neil Stevenson , DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.