Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Ringbuffer: A New Data-Structure in Hazelcast

DZone's Guide to

Ringbuffer: A New Data-Structure in Hazelcast

Ringbuffer is a new data-structure in Hazelcast 3.5. See how it compares to queues and how it can be used to parallelize workloads.

· Java Zone
Free Resource

Learn what it’s like to use Hazelcast as the backbone of your Microservices architecture, brought to you in partnership with Hazelcast.

Hazelcast Ringbuffer is a new data-structure added to Hazelcast 3.5 that in some cases can be a more practical alternative to queues. Think of Ringbuffer as a circular array with fixed capacity. Just as with an array, each item in a Ringbuffer is uniquely identified with a sequence id (a long).

Ringbuffer is an append only data-structure; so it’s not possible to remove an item. The tail is where items get appended and the head is where the oldest items in the Ringbuffer are found. Creating a Ringbuffer and adding items is very simple:

Ringbuffer<String>rb = hazelcastInstance.getRingbuffer();
long sequence = rb.add("someitem");

The cool thing is that the sequence being returned can be used to read out the item as well:

String item = rb.readOne(sequence);

Since each item is uniquely identified by its sequence-id, the returned sequence-id is one of a kind and can be used as a cheap id generator if you’re using a Ringbuffer.

Ringbuffer Compared to Queue

The nice thing about a Ringbuffer, compared to a queue, is that with a queue, a take is a destructive operation; so only one thread is able to take a particular item from a queue. Once it’s taken, it’s gone. This can be problematic for two reasons:

  1. What happens when the system crashes after the item has been taken, but before it has been fully processed?
  2. What happens if you want multiple readers to read the same item? One approach is to create a queue per reader and do a put on each queue. The problem is that it makes puts very expensive because, with N readers you need to do N puts.

Because a read on a Ringbuffer is not a destructive operation, and the reader control which items it wants to read, it’s easy for the reader to realize delivery guarantees by storing the sequence-id.

  • At Least Once: store the sequence-id after the item has been fully processed. If the system crashes before the item has been fully processed, the same item will be read again since stored sequence-id still contains the old value.
  • At Most Once: store the sequence-id before the item starts to be processed. If the system crashes before the item has been fully processed, the sequence-id of the item we potentially failed to process is loaded and the system can continue from the next item.

Another big advantage of the read operation not being a destructive operation, is that it’s very fast since it doesn’t need to be replicated — unlike a queue.

Capacity

Each Ringbuffer is created with a certain capacity — by default 10k items. A Ringbuffer can’t grow beyond this capacity, therefore, the oldest items get overwritten eventually (more about that below). The Ringbuffer can be configured using XML or using our programmatic API. If we want to set the capacity:

RingbufferConfig rbConfig = new RingbufferConfig("rb")
    .setCapacity(50 * 1000);
Config config = new Config();
config.addRingbufferConfig(rbConfig);
HazelcastInstance hz = Hazelcast.newHazelcastInstance(config);
Ringbuffer<String&gr; rb = hz.getRingbuffer("rb");

Time to Live

By default, the items in the Ringbuffer stay in the Ringbuffer until they get overwritten. Note that they will never expire. This is exactly the same behavior as if you were using a regular array; once an item is written to an array it will never be automatically removed.

In practice you often want to control how long items remain available (e.g. 30 seconds). With the Ringbuffer this can be done by setting the time to live on the RingbufferConfig:

RingbufferConfig rbConfig = new RingbufferConfig("rb")
    .setCapacity(50 * 1000)
    .setTimeToLiveSeconds(30);

With a time to live of 30 seconds, a consumer has a 30 second time window to process the item. If an item is written and 31 seconds have passes, a read is complete and the item won’t be available anymore.

A time to live can help prevent excessive memory usage and can prevent stale data; but its real value is when it’s combined with the OverflowPolicy. The OverflowPolicy determines what to do when a Ringbuffer is full and there are no items to expire. Currently there are two options:

  • OVERWRITE: The oldest item in the Ringbuffer is overwritten, even if it isn’t old enough to expire. In this case, you’ll be favoring the producer instead of the consumer, since the consumer can run into a StaleSequenceException if the data it wants to read doesn’t exist anymore.
  • FAIL: Nothing is overwritten and caller gets a signal that the write failed. It is then up to the caller to decide what to do.

The follow code shows how to set up an exponential backoff in combination with OverflowPolicy.FAIL:

long sleepMs = 100;
for (; ; ) {
    long result = ringbuffer.addAsync(item, OverflowPolicy.FAIL).get();
    if (result != -1) {
        break;
    }

    TimeUnit.MILLISECONDS.sleep(sleepMs);
    sleepMs = min(5000, sleepMs * 2);
}

Batching

The code examples shown so far inserted and read a single item at a time. The problem with this approach is that there is a huge amount of overhead due to operation scheduling, network communication, etc. It is much more efficient to batch reads and writes to amortize the overhead.

Adding a batch of items is very simple:

List<String> items = Arrays.asList("1","2","3");
ICompletableFuture<Long> f = rb.addAllAsync(items, OverflowPolicy.OVERWRITE);
f.get()

Apart from providing batch functionality, you can also decide if you want to make a sync call by calling get, or make it an async call by using the andThen method and providing a callback.

Reading a batch of items is a bit more complicated:

long sequence = rb.headSequence();
for(;;) {

    ICompletableFuture<ReadResultSet<String>> f = rb.readManyAsync(sequence, 1, 100, null);
    ReadResultSet<String> rs = f.get();
    for (String s : rs) {
        System.out.println(s);
    }
    sequence+=rs.readCount();
}

In this example, we want to read at least 1 item and at most 100 items. This can be very efficient if there are 1000 items available, as only 10 operations need to be executed.

You might be wandering about the null argument at the end. This is where a filter can be provided. Imagine there is a single Ringbuffer with employee-objects and you want to retrieve only the engineers; you can provide a filter that selects engineers.

public class EngineerFilter<Employee, Boolean> implements Filter {
    Boolean apply(Employee e){
    return e instanceof Engineer;
    }
}

The nice thing about a filter is that is is done at the source, therefore, items which are not relevant are not send to the caller.

One of the things that can be done with filters is parallellize workload (e.g. one readers deals with all engineers by using an engineer filter and one reader deals with all sales people with a sales filter).

Check out the Ringbuffer Documentation »
Ready to try it yourself? Download Hazelcast and get started today!

Learn what it’s like to use Hazelcast as the backbone of your Microservices architecture, brought to you in partnership with Hazelcast.

Topics:
hazelcast ,in-memory data grid

Published at DZone with permission of Peter Veentjer, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}