SQL Queries
Hazelcast is not an SQL database or a SQL query tool, but it provides a workable, robust subset of SQL query functionality. It’s accessible for developers. If you have an SQL background, this is nothing; if you don’t, it’s still pretty intuitive. The SqlPrecicate
encapsulates the where
clause of a query. Since you’re dealing with purely in-memory data, this is going to be very fast.
public void sqlQueryDemo()
{
IMap<Integer, Employee> employees =
hazelcastInstance.getMap("employees");
Employee emp = new Employee();
emp.setId(Integer.valueOf(1));
emp.setFirstName("John");
emp.setLastName("Doe");
emp.setAge(new Random(System.currentTimeMillis()).nextInt(99));
emp.setDeptId(Integer.valueOf(13));
// put the dummy employee in the map using employee-id as the map key
employees.set(emp.getId(), emp);
Predicate sqlPredicate =
new SqlPredicate(String.format("lastName = '%s'",
emp.getLastName()));
Collection<Employee> matching = employees.values(sqlPredicate);
// wildcards are supported, too - look for last names starting
// with 'D' using the same 'values' call.
sqlPredicate = new SqlPredicate("lastName like 'D%'");
matching = employees.values(sqlPredicate);
// compound predicates work, as well
sqlPredicate =
new SqlPredicate("lastName like 'D%' and "
+ "age between 21 and 30");
matching = employees.values(sqlPredicate);
// this could go on, but it's a pretty robust subset of sql
// functionality.
sqlPredicate = new SqlPredicate("deptId not in (13, 23, 33)");
matching = employees.values(sqlPredicate);
}
This shows how easy it is to bridge an SQL background with the IMDG SQL-like query. The caveat here is that out-of-the-box, IMDG is not a good tool for joins because of the nature of the data. We split it up because it’s big data, and because it’s big data, joining it back together is more complex. I mentioned Jet earlier; this would a useful tool for that.
Predicate Queries: Criteria API
For Java developers who really never liked SQL, there’s also a pure Java approach to querying the IMDG: the Criteria API.
public void criteriaAPIDemo()
{
IMap<Integer, Employee> employees =
hazelcastInstance.getMap("employees");
<snip> same employee as before
// create a couples predicate and then, the and of them
Predicate<Integer, Employee> lastName = equal("lastName", "Doe");
Predicate<Integer, Employee> age = greaterThan("age",
Integer.valueOf(99));
Predicate<Integer, Employee> dept = equal("deptId",
Integer.valueOf(13));
// and is a variadic method, so you can just keep
// adding predicates and get the logical 'and' of all of them
Predicate<Integer, Employee> ageEtAl = and(age,
lastName, dept);
Collection<Employee> matching = employees.value(ageEtAl);
}
One thing that, looking at this, really needs to be mentioned, if only briefly, is memory in the query client. Hazelcast query operations can quickly retrieve very large volumes of data. If you had, say, 20 storage members and queried the cluster bringing back 1GB or so of data from each, you’d be looking at 20GB of data from one query. You may or may not have that much memory available in your client. The fix for that is paging predicates. These predicates wrap (logically subsume the functions of) other predicates so that you get the same logical comparisons — the same filtering — wrapped in a container that lets you bring results back in batch sizes that you specify. It’s as though you’re reading a printed book and seeing one page at a time.
Predicate<Integer, Employee> lastName = equal("lastName", "Doe");
Predicate<Integer, Employee> age = greaterThan("age", Interger.valueOf(99));
Predicate<Integer, Employee> dept = equal("deptId", Integer.valueOf(13));
Predicate<Integer, Employee> ageEtAl = and(age, lastName, dept);
// construct a paging predicate to get max of 10 objects per
// page, based on
// the previously constructed predicate.
PagingPredicate<Integer, Employee> pagedResults =
new PagingPredicate<>(ageEtAl, 10);
while (true)
{
Collection<Employee> oldPeople =
employees.values(pagedResults);
}
This was only taking the previously used predicate and returning the results in paged sets, with the page size set to 10 in this example.
Aggregations
Aggregations — or, as they’re now called, “fast aggregations” — allow data query and transformation to be dispatched to the cluster. It can be extremely effective. Keeping with the Employee class and the “employees” Map from the other examples, let’s do a quick and dirty aggregation. You could just do the aggregation across the entire entry set of the Map, but using a predicate to filter, or map, the objects before the aggregator does the reduction on them will prove more effective. Department 13 is used to represent group-W. You know those people; they’re everywhere.
This is going to create an anonymous class, which is quick and easy and highly effective for this.
public void simpleAverageAgeAggregation(IMap<Integer, Employee> employees)
{
Predicate<Integer, Employee> deptPredicate =
equal("deptId", Integer.valueOf(13));
Aggregator<Map.Entry<Integer, Employee>, Double> ageAggregator
= new Aggregator<Map.Entry<Integer, Employee>, Double>()
{
<snip> - serialVersionUID: don't forget this is
going over the wire in serialized
format.
protected long sum = 01;
protected long count = 01;
@Override
public void accumulate(Map.Entry<Integer,
Employee> entry)
{
count++;
sum += entry.getValue()
.getAge();
}
@Override
public void combine(Aggregator aggregator)
{
this.sum += this.getClass()
.cast(aggregator).sum;
this.count += this.getClass()
.cast(aggregator).count;
}
@Override
public Double aggregate()
{
if (count == 0)
{
return null;
}
double dsum = (double) sum;
return Double.valueOf(dsum / count);
}
};
// find the average age of employees in department 13
Double avgAge =
employees.aggregate(ageAggregator, deptPredicate);
L.info("average age: {}", avgAge);
}
}
The code is pretty self-explanatory. The predicate will ensure that only matching elements from the distributed map are included in the calculation. The aggregator will “accumulate” data — examining the matching subset and adding the age into the sum — but where does that happen? The accumulate call is called on each storage member (i.e. not the clients and not Lite members); it’s passed by each filtered (by deptPredicate
) matching entry and it accumulates the raw values. Note that these run in parallel on each member involved. Because the data is partitioned across members and only a filtered subset is processed, it’s going to be very fast. In the second phase, each of these aggregator instances are returned to the caller for processing — the instance of the anonymous aggregator class examines each returned aggregator (instances of the same anonymous class) and combines all the raw results. In that part of the code, because this wasn’t a concrete class, it’s necessary to call the class.cast()
method, to allow access to the count and the sum. The final step, also in the calling member, the aggregate
method is invoked and performs the simple calculation of average age. This is really not much code for that kind of power.
There is also a very complete set of built-in aggregators for things like min, max, count, and avg of different numeric types, distinct for any comparable type. They can be used with very little setup, like this:
// get a list of distinct last name
Set<String> lastNames = employees
.aggregate(Aggregators.<Map.Entry<Integer, Employee>, String>)
Implicit there was a static import of distinct
.
Entry Processors
Entry processors are pretty cool. You can do highly efficient in-place processing with a minimum of locking. Consider what people often end up doing to work with remote objects: lock a key, fetch the value, mutate the value, put it back (in a finally block), and unlock the key. That’s four network calls to start with — three if you’re only looking at the data and not updating the central source. Your objects may be large and incur significant cost in terms of CPU and network for serialization and transport.
Entry processors allow you to dispatch a key-based “task” object across the LAN — directly to the member owning a key, where it is executed in a lock-free, thread-safe fashion. Hazelcast has an interesting threading model that allows this to happen.
Here’s a brain-dead simple entry processor example — but it’s still a really useful approach:
@Component
@Profile("client")
public class EntryProcessorRunner implements CommandLineRunner, ApplicationCon
{
private ApplicationContext applicationContext;
@Override
public void run(String... args) throws Exception
{
HazelcastInstance instance = (HazelcastInstance) applicationContext.ge
IMap<String, String> demo = instance.getMap("demo");
String key = "someKey";
demo.set(key, "Just a String value...");
demo.executeOnKey(key, new DemoEntryProcessor());
EntryProcessor<String, String> asyncProcessor = new DemoOffloadableEnt
demo.submitToKey(key, asyncProcessor);
ExecutionCallback<String> callback = new AsynchCallbackDemo();
demo.submitToKey(key, asyncProcessor, callback);
}
<snip> }
Here’s the entry processor that is called in this:
public class DemoEntryProcessor extends AbstractEntryProcessor<String, String>
{
<snip>
@Override
public Object process(Entry<String, String> entry)
{
String key = entry.getKey();
String value = entry.getValue();
l.info("in-place processing called for {}::{}", key, value);
SimpleDateFormat sdf = new SimpleDateFormat(dtFormat);
entry.setValue(String.format("This value was modified at %s -- %s", sd
return null;
} <snip>
}
Here, we describe the simplest case, but as simple as it is, this is a very powerful tool to have. From your client, you can dispatch computation to your servers — no locking, no contention, efficient use of the LAN. But you’re waiting for the result to be returned, so it may not seem like a really big deal. Hazelcast also has a number of options to run synchronously, asynchronously, and on multiple entries.
executeOnKey(Object, EntryProcessor); //synch
submitToKey(Object, EntryProcessor); //asynch
submitToKey(Object, EntryProcessor, ExecutionCallback); //asynch
executeOnEntries(EntryProcessor); //synch
executeOnEntries(EntryProcessor, Predicate); //synch
executeOnKeys(Set, EntryProcessor); //synch
This is a really useful set of calls — the first one, executeOnKey
, does exactly that; it makes one direct call to the key owner and executes synchronously on that entry. The next two execute asynchronously — your client code doesn’t need to wait for long running operations. A word of warning, though — long-running entry processors can be truly evil. Use off-loadable in your code, by annotation, to tell Hazelcast to move the operation off of the default threading structure.
Here’s an async call, similar in function to the first.
public class DemoOffloadableEntryProcessor extends AbstractEntryProcessor<Stri
{
<snip>
public final static String OFFLOADABLE_EXECUTOR = "default";
@Override
public Object process(Entry<String, String> entry)
{
String key = entry.getKey();
String value = entry.getValue();
l.info("in-place processing called for {}::{}", key, value);
SimpleDateFormat sdf = new SimpleDateFormat(dtFormat);
String newValue = String.format("This value was modified at %s -- %s",
entry.setValue(newValue);
return newValue;
}
@Override
public String getExecutorName()
{
return OFFLOADABLE_EXECUTOR;
}
We recommend submitting the processing, tagging it with a callback, and going on your way. This could be used if you have a stream of data requests that need to be initiated without the caller needing to see a result. If you’re going to do this, be sure to read ahead to the section on back pressure.
The execution callback executes in the caller’s process space, so it’s your notification that the invocation is complete. This one just logs, like this:
package com.hazelcast.tao.gettingstarted.executors;
<snip> - imports
public class SimpleExecutionCallback<V> implements ExecutionCallback<V>
{
@Override
public void onFailure(Throwable throwable)
{
L.error("execution failed - {}"),
throwable.getMessage());
}
@Override
public void onResponse(V value)
{
L.info("processing complete - {}", value)
}
}
Because nothing is free, there’s a bit of server-side configuration to go with this. Back pressure, both in the client and in server members, is important when using asynchrony. Configuring it can be done with system properties like this:
# the default is false, you should enable it for async stuff
hazelcast.backpressure.enabled=true
# that's the default, look at your hardware resources before increasing it.
hazelcast.backpressure.max.concurrent.invocations.per.partition=100
# this one really just relates to making async backups, not async data operations,
# but I thought I'd put it here for completeness. If the system cannot keep up with
# async backup requests, they will periodically turn into sync backups based on
# this window. (millis, of course)
hazelcast.backpressure.syncwindow=1000
# configure how long hz will wait for invocation resources
hazelcast.backpressure.backoff.timeout.millis=60000
Back pressure is a topic worth some consideration. Hazelcast is using its threading model to execute these, so there’s a limit to how many invocations can be in flight at any one point in time. The absolute number doesn’t matter, as that would depend upon the size of your cluster and the number of CPUs/cores/physical threads. What’s likely to be interesting is how many can be queued up for one partition — by default, mutating entry processors operate on a partition thread. In configuring your cluster, you know how many physical threads you have, so you can configure the partition thread count to be a somewhat sensible number. Too few and you’ll have contention; too many (one-to-one sounds ideal, though it rarely is) and you won’t get good resource utilization.
Runnable Tasks
These are simply Java runnable objects that are dispatched to one or more members. Keep in mind that the salient part of the signature is public void run()
— i.e. nothing is returned. The way that they’re dispatched to the members is very flexible; it can be one member, all members, a member that owns a key, or members selected by an attribute you have set on them.
Here’s an example of running something on the member that owns a key:
@Component("runnableDemo")
@Profile("client")
public class RunnableDemoCaller implements CommandLineRunner, ApplicationConte
{
private ApplicationContext applicationContext;
@Override
public void run(String... args) throws Exception
{
HazelcastInstance client = (HazelcastInstance) applicationContext.getB
IExecutorService executorService = client.getExecutorService("default"
executorService.executeOnAllMembers(new LoggingRunnable());
} <snip>
}
The Runnable object is pretty ordinary, with the caveat that it needs to be serializable. This runnable is also HazelcastInstanceAware
so that when it’s set up on the target node, the Hazelcast framework will inject the correct instance, so that it can communicate with the cluster.
@Component("loggingRunnable")
public class LoggingRunnable implements Runnable, HazelcastInstanceAware
{
<snip>
private HazelcastInstance hazelcastInstance;
@Override
public void run()
{
l.info("into run, cluster size: {}", getHazelcastInstance().getCluster
}
<snip>
}
This code wasn’t particularly profound, but there’s one cool aspect to it — you can direct processing to a member that owns a key (or other members) and process that key and or other keys in multiple maps. So, complex manipulation may be performed outside your client, eliminating multiple network round-trips. They data need not come all from one member, either — there are no restrictions on that. It can be a significant performance boost to design your data so that related items are all within one node — then, this kind of task will tend not to make network calls but will not be restricted from doing so.
Callable Tasks
As with runnable tasks, callable tasks are dispatched to one or more members, but offer more options for things like bringing back data. Here’s a really simple callable that will be dispatched to a member, log some noise to show it ran, and return the partition count. There are better ways to monitor or manage partitions, but this should just show how you get a value — easily — from a member.
package com.hazelcast.tao.gettingstarted.port;
<snip> - imports. They should be just what you'd expect
public class PartitionReporter implements Callable<Integer>, HazelcastInstanceAware
{
<snip> - local data, ctor and such
@Override
public Integer call()
{
Member member = hazelcastInstance.getCluster()
.getLocalMember();
String fmt = "member listening on %s:%d";
String whoAmI = String.format(fmt,
member.getSocketAddress()
.getHostName(),
member.getSocketAddress()
.getPort());
PartitionService service =
hazelcastInstance.getPartitionService();
boolean memberIsSafe = service.isLocalMemberSafe();
String memberSafety = memberIsSafe? "": "not ";
boolean clusterIsSafe = service.isClusterSafe();
String clusterSafety = clusterIsSafe? "": "not ";
int partitionCount = service.getPartitions()
.size();
fmt = "executing in member @ {} - cluster is {}safe, " +
"member is {}safe, hosting {} partitions";
L.debug(fmt, whoAmI, clusterSafety, memberSafety,
partitionCount);
return Integer.valueOf(partitionCount);
}
<snip> - getters, setters
}
That’s just looking at the member, which you might want to do. Importantly, if you wanted to get/set/remove data, run a query, or perform any other Hazelcast operation, you can do that from that code. The call is easy. As with the collections, it draws heavily from the Java Executors API:
public void callableTaskDemoMembers(Set<Member> members)
throws InterruptedException, ExecutionException
{
Callable<Integer> partitionReporter = new PartitionReporter();
IExecutorService executorService =
hazelcastInstance.getExecutorService("default");
Map<Member, Future<Integer>> futures =
executorService.submitToMembers(partitionReporter,
members);
// process these
}
Events
There are lots of events, but for just right now, let’s stick to the data events: listeners and interceptors. This is still a fairly big topic, so let’s talk about a workable subset of it. Within data-data events, there are what are called map events and entry events. Map events are called for map-level changes, specifically clear()
or evictAll()
. Entry events are called after changes to map-entries and there’s an interesting set of those changes — there are events for entries being added, removed, updated, and evicted. This isn’t a listener for get, however. The entry events are only for changes. In that context, it makes sense that there’s an interceptor for “get”. Events may be added in a number of ways — they can be added in configuration or programmatically. In addition to that, they can be done from a member or a client. Within each member, you get local events — events triggered by (after) data-mutating events within that JVM. Here’s a simple example of listening for entry:
public static class My EntryAddedListener<K, V> implements EntryAddedListener<K, V>
{
Override
public void entryAdded(EntryEvent<K, V> event)
{
String whoami = event.getMember()
.getSocketAddress()
.getHostName() + ":"
+ event.getMember()
.getSocketAddress()
.getPort();
L.trace("member: {} - added - key: {}, value: {}", whoami,
event.getKey(),event.getValue());
}
}
Adding the entry-added listener could be done in config, but here’s how to use the Java API to do it:
public void addEntryAddedListener(IMap<String, String> myMap)
{
myMap.addEntryListener(new MyEntryAddedListener<>(), true);
}
This code will add the entry listener — listening only for entries being added. The boolean parameter tells Hazelcast that the value should be available (i.e. getValue()
) in the entry event that’s going to be delivered to the listener.
Clients may add these listeners, also — in addition to client lifecycle events, cluster membership events and distributed object creation/deletion. So, they may be notified of their own client lifecycle: starting, started, shutting down, and shutdown; they may be notified of membership changes or storage members joining and leaving, and they may be notified of distributed object creation or destruction — Maps, caches, queues, and all. A word of caution, though: High-volume activity will create high volumes of events. Look carefully at your resources, like client CPU/RAM and especially network. Think in a distributed perspective and put the listener where it needs to be, not simply where it seems convenient.
{{ parent.title || parent.header.title}}
{{ parent.tldr }}
{{ parent.linkDescription }}
{{ parent.urlSource.name }}