Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}
Refcard #272

Introduction to Hazelcast IMDG

Hazelcast IMDG is a clustered, in-memory data-grid that uses sharding for data distribution and supports monitoring. In this Refcard, learn about what an in-memory data grid can be used for, how to do simple query operations with Hazelcast IMDG, what sharding means with Hazelcast, and more.

1,304

Brought to you by

Hazelcast
Free .PDF for easy Reference

Written by

Tom OConnell SSA, Hazelcast
Refcard #272

Introduction to Hazelcast IMDG

Hazelcast IMDG is a clustered, in-memory data-grid that uses sharding for data distribution and supports monitoring. In this Refcard, learn about what an in-memory data grid can be used for, how to do simple query operations with Hazelcast IMDG, what sharding means with Hazelcast, and more.

1,304
Free .PDF for easy Reference

Written by

Tom OConnell SSA, Hazelcast

Brought to you by

Hazelcast
Table of Contents

What’s an IMDG For?

Before You Start

A Simple Spring Boot Cluster Member

Simple Query Operations

Conclusion

Section 1

What’s an IMDG For?

It’s for almost any programming task — broadly speaking, the three major areas are caching, distributed processing, and distributed messaging. The primary benefits to applications are big and fast data. Big data is good; big, fast data is awesome. Start small and grow enormously.

Section 2

Before You Start

There are only a few things you need to get going:

  • Java 8 is probably the most widely used JDK/JRE right now and is the preferred one to start with.
  • IDEs Eclipse, IntelliJ, and NetBeans all work well with Hazelcast/Maven. The sample code in the GitHub repo was tested in Eclipse, as well as from the command line, so it’s an easy import.
  • Maven is one of the popular build tools used with Hazelcast. There is also the option of using Gradle.

##Maven Dependencies All the dependencies for Hazelcast (any edition) are available on public Maven repositories, in addition to the Hazelcast download page.

  • Server: The server will be in one of two forms — hazelcast or hazelcast-all — which also includes client dependencies.
  • Client: The Hazelcast client is generally included from hazelcast-client, and that’s the only addition to your client app build.

Programming Models

Hazelcast is a toolkit. There are common patterns you can employ, but really, it’s just Java. You can design your own infrastructure to meet your needs in any way you see fit. Here are some common deployment models:

Embedded Member

Embedded members are really the easiest way to get started and for some things, they may be all that you need. An embedded client is a Java program where you create a HazelcastInstance. That will load the launch and the framework, and form a cluster with any compatibly configured members they find on the network (depending on your network and discovery configuration).

Dedicated Member

A dedicated member is a Hazelcast process dedicated to storage and a few other things. It won’t run your code, except for a few server-side-specific instances — entry processors, Executor Tasks (Callables and Runnables), event code (Listeners and Interceptors), and persistence code (MapLoader and MapStore).

The advantage of this approach over the embedded model is that scalability will always become more important than simplicity. With this, you can scale your storage fleet separately from your client fleet. If your storage demands soar but the processing doesn’t, you just scale these members. If you introduce new processing demands for the same, or similar, data loads, you just add clients.

Lite Member

Lite members are interesting — they join the cluster, unlike clients that just make a client-specific TCP connection. They do not, however, host data. They are for a small number of advanced things — they may be used for class-loading or they may be used as high-performance compute clients. You could, for example, direct runnable and callable tasks to Lite members if they require data that’s spread across the cluster for some kind of computation or processing.

Clients

Clients are Java programs that include the client (i.e. Hazelcast-client) JAR in their build, read, or create config that helps them find a cluster and perform the widest scope, typically, of client requests. These will be in your web-clients, your command-line tools, or anywhere you need to interface your systems with Hazelcast. Don’t think, though, that because they’re clients, you’re going to be doing all your processing there. Well written clients will use server-side constructs — particularly entry processors, aggregation, and executor tasks to delegate processing requests from single-threaded clients onto a massively scalable clustered storage and processing environment.

Not everything will be delegated to the back-end, of course. Many, many clients simply require extremely low latency access to fast, big data that isn’t changed too often and isn’t changed (ideally) by separate clients (i.e. sticky sessions are good). For these, near-caches are extremely effective. Each member can host (within its process space) potentially large subsets of data that are being actively managed by the cluster. We’re talking mostly about the open-source version here, but it’s worth noting that IMDG Enterprise HD will allow off-heap near-caches, giving you low latency access to potentially many gigabytes of near-cache data in each client. This has a broad range of applications across industries; real-time inventory for e-commerce and fraud detection for credit card processors are two. Note that in neither of these is the data static — that’s not a requirement. But the data is read much more often than it’s changed, making both of these ideal cases for near-caching.

Section 3

A Simple Spring Boot Cluster Member

So, finally a little more code. All of this is on GitHub, so you can look at the POM (pom.xml) file there. It’s basic — you just need the spring-boot parent entry and the Hazelcast dependency (from above). Make your main class a Spring Boot application with the @SpringBootApplication annotation. I’m adding the @Configuration annotation so I can have one class that serves up the beans and executes them.

Configuring Hazelcast

So, you have already run code — why talk about configuration now? Because the simple examples use all the defaults. While they’re interesting to run, you wouldn’t really go much past “hello world” with that.

The main things you’ll want to change are the network configuration — particularly the join configuration — and the definition of Maps and caches.

The join configuration dictates how members find each other. It starts with multicast, which probably won’t fly in most production environments, as multicast traffic is generally frowned upon by network administrators. TCP discovery is great, if you know the addresses and they don’t change. A data center with dedicated hardware works well. In the cloud, however, this falls apart — you don’t know the IPs up front and they’re pretty much guaranteed to change. There are cloud-based cluster discovery plugins that will make this work well and easily, but that’s too much for this intro.

Maps (IMAP) are the workhorse of distributed storage data-structures, and caches (ICache) are basically the same thing but for the JCache (JSR107) API. By default, you can create a Map using default configuration and you get a container with no limitations on it. The data is not bounded, it doesn’t expire, and it isn’t evicted — the problems with that should be obvious. The system will store and retain data until it runs out of memory and that’s ugly. For any IMDG, you want to be careful about managing memory, and Hazelcast has lots of options, e.g. using a number of objects in the cache, an absolute size of memory that can be used, or thresholds on the heap utilization that will trigger eviction. In addition to evicting on space, you can set an expiration interval on your data — you decide up front.

A Slightly More Robust Server

We can do better on the server code.

package com.hazelcast.tao.gettingstarted;

import org.slf4j.logger;
import org.slf4j.loggerFactory;
import org.springramework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ServerV1
        {

                private final static logger L = LoggerFactory.getLogger(ServerV1.class);

                public static void main(String[] args)
                        {
                                L.info("Calling the Spring Application 'run' method");
                                SpringApplication.run(ServerV1.class, args);
                        }
        }

Here’s a concise server. Everything that it’s going to do is going to be injected by Spring Boot. It relies on some configuration code:

import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;

@Configuration
public class ServerConfig
        {
                    @Bean
                    public Config config(int instanceId)
                            {
                                    Config config = new Config();

                                    config.getGroupConfig()
                                                    .setName("dev");
                                    config.getGroupConfig()
                                                    .setPassword("dev-pass");
                                    return config;
                            }

                    @Bean
                    public HazelcastInstance hazelcastInstance(Config config)
                            {
                                    HazelcastInstance instance =
                                            Hazelcast.newHazelcastInstance(config);

                                    return instance;

                            }

        }

Spring Boot will create a default instance of Hazelcast — which may not give you what you want. Having a bean for the config and one for an instance can be useful. Here’s the commandLineRunner that makes the Spring app work:

package com.hazelcast.tao.gettingstarted;

import java.util.Date;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;

@Component("commandLineRunner")
public class ServerRunner implements CommandLineRunner, ApplicationContextAware
            {

                    private final static Logger    |        = LoggerFactory.getLogger(ServerRunner.class);

                    private ApplicationContext    applicationContext;

                    @Override
                    public void run(String... args)
                            {

                                        Object bean = applicationContext.getBean("hazelcastInstance");
                                        HazelcastInstance member = (HazelcastInstance) bean;
                                        System.out.printIn("this was all that was needed to start a member");
                                        IMap<String, String> map = member.getMap("foo");
                                        for (int i = 0; i < 10; i++)
                                                    {
                                                                map.put("key:" + i, "value: " + i + ":: "
                                                                         + new Date().toString());
                                                    }

                                                    l.debug("at startup, map size: {}", map.size())

                            }

                    publicApplicationContext getApplicationContext()
                            {
                                        return applicationContext;
                            }

                    @Override
                    public void setApplicationContext(ApplicationContext applicationContext)
            }

Simple Map Access

This part is easy — a Hazelcast IMap is a java.util.map, so you can take existing code for the Java Collections API and just repurpose it. Here’s a little code showing how easy that can be:

package com.hazelcast.tao.gettingstarted.port;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;

public class MapClientDemo implements HazelcastInstanceAware
        {

            private HazelcastInstance hazelcastInstance;

            private final static Logger    L    =
                    LoggerFactory.getLogger(MapClientDemo.class);

            public void oldMethod()
                    {
                        Map<String, String> myMap = new ConcurrentHashMap<>();
                        String key = "SomeKey";
                        String value = "Just a random string";
                        myMap.put(key, value);

                        L.info("getting key {} yields {}", key, myMap.get(key))
                    }

            public void hzMethod()
                    {
                        //wait-shouldn't this be an 'IMap'?
                        Map<String, String> myMap = 
                                hazelcastInstance.getMap("myMap");
                        String key = "SomeKey";
                        String value = "Just a random string";
                        myMap.put(key, value);

                        L.info("getting key {} yields {}", key, myMap.get(key));
                    }

            // getters, setters, random noise follow.

        }

In that bit, there’s a method that creates a Map and uses it. In the second method (hzMethod), the only change was to use the injected Hazelcast instance (injected via annotation) to get a reference to a distributed map in the IMDG. There’s no magic — Hazelcast is designed so that you can swap it in that easily, using the familiar Collections API. But back to that comment for a second… shouldn’t the declaration have been IMap, not Map? It depends: It could be, but it doesn’t need to be. Hazelcast Maps implement the java.util implements, so that’s valid, but maybe not useful. In a minute, we’re going to use some Hazelcast-specific methods on the Map, and to make those visible, you want to change the declaration. If you’re just doing put, get, size, remove, and all of those, then no. One interesting note on that: It’s easy to forget that “put” returns the old mapping, as it inserts the new. Think about that in a network environment: When you do a “put”, Hazelcast (conforming to the contract) returns the old mapping over the network, incurring serialization for no reason because nobody ever looks at it. Hazelcast has added a “set” method that works like “put”, save that it doesn’t return the value. This may seem like small stuff, but think about a heavily utilized production environment getting a surge of requests; you’re busy and half of that flavor of network traffic is stuff you’re never going to look at. Change two letters in your code and the network traffic drops — possibly by lots.

Keep in mind, however, that there are differences. It’s a distributed Map — aside from security configuration, other clients/threads can use the same Map. If you test the size of a new in-process Map that you create in your thread, the size will be “0”. When you get a reference to a distributed collection from the IMDG, it will create it (if required), or return a reference to an existing collection if it’s already been created. This can be a very powerful feature — you can pre-populate a collection from a persistent store or any other data-source. Your client code will be smaller and simpler because you can make assumptions about it. If you’re using a Map for a scratchpad cache; however, keep in mind that you may want to create unique map instances or manage data so that your thread doesn’t collide with other clients.

Near Cache Access

Hazelcast supports second-level, or edge, caching in client processes and refers to it as near caching. Near caches are almost transparent to your code — there are things you need to be aware of, though. Each mapping in the near cache is fundamentally managed by the IMDG member that owns the “master” copy of the data. It may be cached on multiple clients in multiple apps, and each caching client app may define their own policy for managing updates. The data in your near cache may be stale — you probably shouldn’t cache things for overly long times (you set the expiry interval in config). You should be careful about using near caches for things that are updated frequently and very careful about using them for things that are updated from multiple points. For a web application with sticky sessions, you should be able to count on certain objects being in only one client process — that’s a good scenario.

Section 4

Simple Query Operations

SQL Queries

Hazelcast is not an SQL database or a SQL query tool, but it provides a workable, robust subset of SQL query functionality. It’s accessible for developers. If you have an SQL background, this is nothing; if you don’t, it’s still pretty intuitive. The SqlPrecicate encapsulates the where clause of a query. Since you’re dealing with purely in-memory data, this is going to be very fast.

public void sqlQueryDemo()
        {
            IMap<Integer, Employee> employees =
                hazelcastInstance.getMap("employees");
            Employee emp = new Employee();
            emp.setId(Integer.valueOf(1));
            emp.setFirstName("John");
            emp.setLastName("Doe");
            emp.setAge(new Random(System.currentTimeMillis()).nextInt(99));
            emp.setDeptId(Integer.valueOf(13));

            // put the dummy employee in the map using employee-id as the map key
            employees.set(emp.getId(), emp);

            Predicate sqlPredicate =
                new SqlPredicate(String.format("lastName = '%s'",
                        emp.getLastName()));
            Collection<Employee> matching = employees.values(sqlPredicate);

            // wildcards are supported, too - look for last names starting
            // with 'D' using the same 'values' call.
            sqlPredicate = new SqlPredicate("lastName like 'D%'");
            matching = employees.values(sqlPredicate);

            // compound predicates work, as well
            sqlPredicate =
                new SqlPredicate("lastName like 'D%' and "
                    + "age between 21 and 30");
            matching = employees.values(sqlPredicate);

            // this could go on, but it's a pretty robust subset of sql
            // functionality.
            sqlPredicate = new SqlPredicate("deptId not in (13, 23, 33)");
            matching = employees.values(sqlPredicate);

        }

This shows how easy it is to bridge an SQL background with the IMDG SQL-like query. The caveat here is that out-of-the-box, IMDG is not a good tool for joins because of the nature of the data. We split it up because it’s big data, and because it’s big data, joining it back together is more complex. I mentioned Jet earlier; this would a useful tool for that.

Predicate Queries: Criteria API

For Java developers who really never liked SQL, there’s also a pure Java approach to querying the IMDG: the Criteria API.

public void criteriaAPIDemo()
        {
            IMap<Integer, Employee> employees = 
                hazelcastInstance.getMap("employees");

            <snip> same employee as before

            // create a couples predicate and then, the and of them
            Predicate<Integer, Employee> lastName = equal("lastName", "Doe");
            Predicate<Integer, Employee> age = greaterThan("age",
                    Integer.valueOf(99));
            Predicate<Integer, Employee> dept = equal("deptId",
                    Integer.valueOf(13));
            // and is a variadic method, so you can just keep
            // adding predicates and get the logical 'and' of all of them
            Predicate<Integer, Employee> ageEtAl = and(age,
                lastName, dept);

            Collection<Employee> matching = employees.value(ageEtAl);
        }

One thing that, looking at this, really needs to be mentioned, if only briefly, is memory in the query client. Hazelcast query operations can quickly retrieve very large volumes of data. If you had, say, 20 storage members and queried the cluster bringing back 1GB or so of data from each, you’d be looking at 20GB of data from one query. You may or may not have that much memory available in your client. The fix for that is paging predicates. These predicates wrap (logically subsume the functions of) other predicates so that you get the same logical comparisons — the same filtering — wrapped in a container that lets you bring results back in batch sizes that you specify. It’s as though you’re reading a printed book and seeing one page at a time.

Predicate<Integer, Employee> lastName = equal("lastName", "Doe");
Predicate<Integer, Employee> age = greaterThan("age", Interger.valueOf(99));
Predicate<Integer, Employee> dept = equal("deptId", Integer.valueOf(13));

Predicate<Integer, Employee> ageEtAl = and(age, lastName, dept);

// construct a paging predicate to get max of 10 objects per
// page, based on
// the previously constructed predicate.
PagingPredicate<Integer, Employee> pagedResults =
      new PagingPredicate<>(ageEtAl, 10);
while (true)
       {
             Collection<Employee> oldPeople =
                    employees.values(pagedResults);
       }

This was only taking the previously used predicate and returning the results in paged sets, with the page size set to 10 in this example.

Aggregations

Aggregations — or, as they’re now called, “fast aggregations” — allow data query and transformation to be dispatched to the cluster. It can be extremely effective. Keeping with the Employee class and the “employees” Map from the other examples, let’s do a quick and dirty aggregation. You could just do the aggregation across the entire entry set of the Map, but using a predicate to filter, or map, the objects before the aggregator does the reduction on them will prove more effective. Department 13 is used to represent group-W. You know those people; they’re everywhere.

This is going to create an anonymous class, which is quick and easy and highly effective for this.

public void simpleAverageAgeAggregation(IMap<Integer, Employee> employees)
       {

            Predicate<Integer, Employee> deptPredicate =
                  equal("deptId", Integer.valueOf(13));
            Aggregator<Map.Entry<Integer, Employee>, Double> ageAggregator
                  = new Aggregator<Map.Entry<Integer, Employee>, Double>()

                  {
                        <snip> -     serialVersionUID: don't forget this is
                                     going over the wire in serialized 
                                     format.

                        protected long                       sum = 01;
                        protected long                       count = 01;

                        @Override
                        public void accumulate(Map.Entry<Integer,
                                                  Employee> entry)
                               {
                                     count++;
                                     sum += entry.getValue()
                                                  .getAge();
                               }

                        @Override
                        public void combine(Aggregator aggregator)
                               {
                                     this.sum += this.getClass()
                                                  .cast(aggregator).sum;
                                     this.count += this.getClass()
                                                  .cast(aggregator).count;
                               }

                        @Override
                        public Double aggregate()
                               {
                                       if (count == 0)
                                             {
                                                    return null;
                                             }
                                       double dsum = (double) sum;
                                       return Double.valueOf(dsum / count);
                               }
                        };
                  // find the average age of employees in department 13
                  Double avgAge =
                         employees.aggregate(ageAggregator, deptPredicate);

                  L.info("average age: {}", avgAge);
            }
       }

The code is pretty self-explanatory. The predicate will ensure that only matching elements from the distributed map are included in the calculation. The aggregator will “accumulate” data — examining the matching subset and adding the age into the sum — but where does that happen? The accumulate call is called on each storage member (i.e. not the clients and not Lite members); it’s passed by each filtered (by deptPredicate) matching entry and it accumulates the raw values. Note that these run in parallel on each member involved. Because the data is partitioned across members and only a filtered subset is processed, it’s going to be very fast. In the second phase, each of these aggregator instances are returned to the caller for processing — the instance of the anonymous aggregator class examines each returned aggregator (instances of the same anonymous class) and combines all the raw results. In that part of the code, because this wasn’t a concrete class, it’s necessary to call the class.cast() method, to allow access to the count and the sum. The final step, also in the calling member, the aggregate method is invoked and performs the simple calculation of average age. This is really not much code for that kind of power.

There is also a very complete set of built-in aggregators for things like min, max, count, and avg of different numeric types, distinct for any comparable type. They can be used with very little setup, like this:

// get a list of distinct last name
Set<String> lastNames = employees
      .aggregate(Aggregators.<Map.Entry<Integer, Employee>, String>)

Implicit there was a static import of distinct.

Entry Processors

Entry processors are pretty cool. You can do highly efficient in-place processing with a minimum of locking. Consider what people often end up doing to work with remote objects: lock a key, fetch the value, mutate the value, put it back (in a finally block), and unlock the key. That’s four network calls to start with — three if you’re only looking at the data and not updating the central source. Your objects may be large and incur significant cost in terms of CPU and network for serialization and transport.

Entry processors allow you to dispatch a key-based “task” object across the LAN — directly to the member owning a key, where it is executed in a lock-free, thread-safe fashion. Hazelcast has an interesting threading model that allows this to happen.

Here’s a brain-dead simple entry processor example — but it’s still a really useful approach:

@Component
@Profile("client")
public class EntryProcessorRunner implements CommandLineRunner, ApplicationCon
  {
    private ApplicationContext applicationContext;
    @Override
    public void run(String... args) throws Exception
      {
        HazelcastInstance instance = (HazelcastInstance) applicationContext.ge
        IMap<String, String> demo = instance.getMap("demo");
        String key = "someKey";
        demo.set(key, "Just a String value...");
        demo.executeOnKey(key, new DemoEntryProcessor());
        EntryProcessor<String, String> asyncProcessor = new DemoOffloadableEnt
        demo.submitToKey(key, asyncProcessor);
        ExecutionCallback<String> callback = new AsynchCallbackDemo();
        demo.submitToKey(key, asyncProcessor, callback);
      }
<snip> }

Here’s the entry processor that is called in this:

public class DemoEntryProcessor extends AbstractEntryProcessor<String, String>
  {
    <snip>
    @Override
    public Object process(Entry<String, String> entry)
      {
        String key = entry.getKey();
        String value = entry.getValue();
        l.info("in-place processing called for {}::{}", key, value);
        SimpleDateFormat sdf = new SimpleDateFormat(dtFormat);
        entry.setValue(String.format("This value was modified at %s -- %s", sd
        return null;
} <snip>
}

Here, we describe the simplest case, but as simple as it is, this is a very powerful tool to have. From your client, you can dispatch computation to your servers — no locking, no contention, efficient use of the LAN. But you’re waiting for the result to be returned, so it may not seem like a really big deal. Hazelcast also has a number of options to run synchronously, asynchronously, and on multiple entries.

executeOnKey(Object, EntryProcessor);                       //synch
submitToKey(Object, EntryProcessor);                        //asynch
submitToKey(Object, EntryProcessor, ExecutionCallback);     //asynch

executeOnEntries(EntryProcessor);                           //synch
executeOnEntries(EntryProcessor, Predicate);                //synch
executeOnKeys(Set, EntryProcessor);                         //synch

This is a really useful set of calls — the first one, executeOnKey, does exactly that; it makes one direct call to the key owner and executes synchronously on that entry. The next two execute asynchronously — your client code doesn’t need to wait for long running operations. A word of warning, though — long-running entry processors can be truly evil. Use off-loadable in your code, by annotation, to tell Hazelcast to move the operation off of the default threading structure.

Here’s an async call, similar in function to the first.

public class DemoOffloadableEntryProcessor extends AbstractEntryProcessor<Stri
  {
    <snip>
    public final static String  OFFLOADABLE_EXECUTOR  = "default";
    @Override
    public Object process(Entry<String, String> entry)
      {
        String key = entry.getKey();
        String value = entry.getValue();
        l.info("in-place processing called for {}::{}", key, value);
      SimpleDateFormat sdf = new SimpleDateFormat(dtFormat);
      String newValue = String.format("This value was modified at %s -- %s",
      entry.setValue(newValue);
      return newValue;
    }
  @Override
  public String getExecutorName()
    {
      return OFFLOADABLE_EXECUTOR;
} 

We recommend submitting the processing, tagging it with a callback, and going on your way. This could be used if you have a stream of data requests that need to be initiated without the caller needing to see a result. If you’re going to do this, be sure to read ahead to the section on back pressure.

The execution callback executes in the caller’s process space, so it’s your notification that the invocation is complete. This one just logs, like this:

package com.hazelcast.tao.gettingstarted.executors;

<snip> - imports

public class SimpleExecutionCallback<V> implements ExecutionCallback<V>
      {
             @Override
             public void onFailure(Throwable throwable)
                    {
                           L.error("execution failed - {}"),
                                  throwable.getMessage());
                    }

             @Override
             public void onResponse(V value)
                    {
                           L.info("processing complete - {}", value)
                    }
      }

Because nothing is free, there’s a bit of server-side configuration to go with this. Back pressure, both in the client and in server members, is important when using asynchrony. Configuring it can be done with system properties like this:

# the default is false, you should enable it for async stuff
hazelcast.backpressure.enabled=true

# that's the default, look at your hardware resources before increasing it.
hazelcast.backpressure.max.concurrent.invocations.per.partition=100

# this one really just relates to making async backups, not async data operations,
# but I thought I'd put it here for completeness. If the system cannot keep up with
# async backup requests, they will periodically turn into sync backups based on
# this window. (millis, of course)
hazelcast.backpressure.syncwindow=1000

# configure how long hz will wait for invocation resources
hazelcast.backpressure.backoff.timeout.millis=60000

Back pressure is a topic worth some consideration. Hazelcast is using its threading model to execute these, so there’s a limit to how many invocations can be in flight at any one point in time. The absolute number doesn’t matter, as that would depend upon the size of your cluster and the number of CPUs/cores/physical threads. What’s likely to be interesting is how many can be queued up for one partition — by default, mutating entry processors operate on a partition thread. In configuring your cluster, you know how many physical threads you have, so you can configure the partition thread count to be a somewhat sensible number. Too few and you’ll have contention; too many (one-to-one sounds ideal, though it rarely is) and you won’t get good resource utilization.

Runnable Tasks

These are simply Java runnable objects that are dispatched to one or more members. Keep in mind that the salient part of the signature is public void run() — i.e. nothing is returned. The way that they’re dispatched to the members is very flexible; it can be one member, all members, a member that owns a key, or members selected by an attribute you have set on them.

Here’s an example of running something on the member that owns a key:

@Component("runnableDemo")
@Profile("client")
public class RunnableDemoCaller implements CommandLineRunner, ApplicationConte
  {
    private ApplicationContext applicationContext;
    @Override
    public void run(String... args) throws Exception
      {
        HazelcastInstance client = (HazelcastInstance) applicationContext.getB
        IExecutorService executorService = client.getExecutorService("default"
        executorService.executeOnAllMembers(new LoggingRunnable());
} <snip>
}

The Runnable object is pretty ordinary, with the caveat that it needs to be serializable. This runnable is also HazelcastInstanceAware so that when it’s set up on the target node, the Hazelcast framework will inject the correct instance, so that it can communicate with the cluster.

@Component("loggingRunnable")
public class LoggingRunnable implements Runnable, HazelcastInstanceAware
  {
    <snip>
    private HazelcastInstance  hazelcastInstance;
    @Override
    public void run()
      {
        l.info("into run, cluster size: {}", getHazelcastInstance().getCluster
      }

    <snip>
  }

This code wasn’t particularly profound, but there’s one cool aspect to it — you can direct processing to a member that owns a key (or other members) and process that key and or other keys in multiple maps. So, complex manipulation may be performed outside your client, eliminating multiple network round-trips. They data need not come all from one member, either — there are no restrictions on that. It can be a significant performance boost to design your data so that related items are all within one node — then, this kind of task will tend not to make network calls but will not be restricted from doing so.

Callable Tasks

As with runnable tasks, callable tasks are dispatched to one or more members, but offer more options for things like bringing back data. Here’s a really simple callable that will be dispatched to a member, log some noise to show it ran, and return the partition count. There are better ways to monitor or manage partitions, but this should just show how you get a value — easily — from a member.

package com.hazelcast.tao.gettingstarted.port;

<snip> - imports. They should be just what you'd expect

public class PartitionReporter implements Callable<Integer>, HazelcastInstanceAware
       {
             <snip> - local data, ctor and such

             @Override
             public Integer call()
                    {
                           Member member = hazelcastInstance.getCluster()
                                        .getLocalMember();
                            String fmt = "member listening on %s:%d";
                            String whoAmI = String.format(fmt,
                                         member.getSocketAddress()
                                               .getHostName(),
                                         member.getSocketAddress()
                                                      .getPort());
                            PartitionService service = 
                                  hazelcastInstance.getPartitionService();
                            boolean memberIsSafe = service.isLocalMemberSafe();
                            String memberSafety = memberIsSafe? "": "not ";
                            boolean clusterIsSafe = service.isClusterSafe();
                            String clusterSafety = clusterIsSafe? "": "not ";

                            int partitionCount = service.getPartitions()
                                         .size();
                            fmt = "executing in member @ {} - cluster is {}safe, " +
                                  "member is {}safe, hosting {} partitions";
                            L.debug(fmt, whoAmI, clusterSafety, memberSafety,
                                   partitionCount);

                            return Integer.valueOf(partitionCount);          
                    }

                <snip> - getters, setters
       }

That’s just looking at the member, which you might want to do. Importantly, if you wanted to get/set/remove data, run a query, or perform any other Hazelcast operation, you can do that from that code. The call is easy. As with the collections, it draws heavily from the Java Executors API:

public void callableTaskDemoMembers(Set<Member> members)
       throws InterruptedException, ExecutionException
       {
              Callable<Integer> partitionReporter = new PartitionReporter();

              IExecutorService executorService =
                    hazelcastInstance.getExecutorService("default");

              Map<Member, Future<Integer>> futures =
                    executorService.submitToMembers(partitionReporter,
                          members);
              // process these
       }

Events

There are lots of events, but for just right now, let’s stick to the data events: listeners and interceptors. This is still a fairly big topic, so let’s talk about a workable subset of it. Within data-data events, there are what are called map events and entry events. Map events are called for map-level changes, specifically clear() or evictAll(). Entry events are called after changes to map-entries and there’s an interesting set of those changes — there are events for entries being added, removed, updated, and evicted. This isn’t a listener for get, however. The entry events are only for changes. In that context, it makes sense that there’s an interceptor for “get”. Events may be added in a number of ways — they can be added in configuration or programmatically. In addition to that, they can be done from a member or a client. Within each member, you get local events — events triggered by (after) data-mutating events within that JVM. Here’s a simple example of listening for entry:

public static class My EntryAddedListener<K, V> implements EntryAddedListener<K, V>
       {
              Override
              public void entryAdded(EntryEvent<K, V> event)
                     {
                           String whoami = event.getMember()
                                        .getSocketAddress()
                                        .getHostName() + ":"
                                        + event.getMember()
                                                     .getSocketAddress()
                                                     .getPort();
                            L.trace("member: {} - added - key: {}, value: {}", whoami,
                                  event.getKey(),event.getValue());
                     }
       }

Adding the entry-added listener could be done in config, but here’s how to use the Java API to do it:

public void addEntryAddedListener(IMap<String, String> myMap)
       {
             myMap.addEntryListener(new MyEntryAddedListener<>(), true);
       }

This code will add the entry listener — listening only for entries being added. The boolean parameter tells Hazelcast that the value should be available (i.e. getValue()) in the entry event that’s going to be delivered to the listener.

Clients may add these listeners, also — in addition to client lifecycle events, cluster membership events and distributed object creation/deletion. So, they may be notified of their own client lifecycle: starting, started, shutting down, and shutdown; they may be notified of membership changes or storage members joining and leaving, and they may be notified of distributed object creation or destruction — Maps, caches, queues, and all. A word of caution, though: High-volume activity will create high volumes of events. Look carefully at your resources, like client CPU/RAM and especially network. Think in a distributed perspective and put the listener where it needs to be, not simply where it seems convenient.

Section 5

Conclusion

This is just a little of what you can do with Hazelcast. Hazelcast has been doing distributed systems for some time now; it is deliberately designed to deliver performance and simplicity. You can be up-and-running in minutes and rolling out production-quality code that looks an awful lot like your Java collections code. It’s a fun environment for programmers. A little Java gets can be all you need on the server side, then you can cut loose with Java, .NET, C++, Node.js, Python, Go, or Scala — and that list is going to grow as new languages emerge.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}