Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

A Sorted, Concurrent, Bounded Buffer implementation in Java

DZone's Guide to

A Sorted, Concurrent, Bounded Buffer implementation in Java

This type of a data structure could be very handy in a distributed query aggregator implementation.

· Java Zone
Free Resource

Just released, a free O’Reilly book on Reactive Microsystems: The Evolution of Microservices at Scale. Brought to you in partnership with Lightbend.

Java has TreeSet to use as a sorted buffer, and a ConcurrentSkipListSet to use as a concurrent version of the same. Let's consider a scenario where we are collecting a 'top-K' list of items. Say we are adding numbers to a collection, and after all numbers are added, we will take the top 5 numbers.

This can be realized using a NavigableSet like this:

NavigableSet<Integer> aSet = new TreeSet<>();
    aSet.add(5);
    //..add more elements
    aSet.pollFirst();//'0'th
    aSet.pollFirst();//'1'th

    aSet = aSet.descendingSet();
    aSet.pollFirst();//'n'th
    aSet.pollFirst();//'n-1'th


However, a set is unbounded. So it will retain all elements. This may not be necessary for our use case, since we are interested in a subset of the items. And it will be deemed extremely unecessary when dealing with, say, huge result sets from the database.

Wouldn't it would be better if we were able to keep the data structure bounded (to the size of 'K') and the elements therein sorted as before. So, it would have to be some sort of a BoundedNavigableSet. And it would need to be concurrently accessible, since multiple producers should be able to add elements to it. 

I found this type of a data structure to be very handy in a distributed query aggregator implementation. Especially for a distributed key-value database like Cassandra, where queries can be made to run on a partition basis. Such a structure will be useful to collate and merge results.

I am not sure if this type of structure is already available in some library, so I made one for my own use. Internally, it uses a lock-free algorithm using an AtomicReferenceArray.

The addition of a new element will be in the form

/*
Scan the array from given offset to 'insert' the item
at a proper sort level
*/
boolean addItem(int fromOffset, T item)
{
    for (int i = fromOffset; i < size(); i++)
    {
      // if there is no element at 'i'th position
      // set item at 'i'
      if (!buffer.compareAndSet(i, null, item))
      {
        // compare and swap using Comparator provided
        // or, if element implements Comparable
        T swapped = compareAndSwap(i, item);
        if (swapped != null)
        {

          // the item has been placed. so break. but then
          // the element currently at 'i' has been swapped. so find its new
          // position, if present
          // we could have scanned from the 'i+1'th position, but to be safe
          // just in case some other element higher up was removed and this
          // needs to go up in that case
          if (i + 1 < size()) {
            addItem(0, swapped);
          }

          return true;
        }
      }
      else {
        return true;
      }
    }

    return false;

  }

Compare and swap would be an atomic operation

private T compareAndSwap(int i, T item)
  {
    boolean set = false, greater = false;
    T t = null;
    while (!set) {
      t = buffer.get(i);

      // either i-th element was replaced with this item
      // or by some other element
  // compare using Comparator/Comparable
      greater = compare(item, t) > 0;
      set = buffer.compareAndSet(i, t, greater ? item : t);

    }
    return greater ? t : null;

  }

The removal operation, if present, should be atomic as well

public boolean remove(Object o) {

    T b;

    if(o == null)
      return false;

    for (int i = 0; i < size(); i++) {

      b = buffer.get(i);

      if (o.equals(b))
      {

        if(buffer.compareAndSet(i, b, null))
        {
          //shift left elements
          for (int j = i + i; j < size(); j++) {
            //check position form start. it can be possible that another higher item has been removed in the meantime
            addItem(0, buffer.get(j));

          }
          return true;

        }

      }

    }

    return false;

  }

Full code can be found in github.

Strategies and techniques for building scalable and resilient microservices to refactor a monolithic application step-by-step, a free O'Reilly book. Brought to you in partnership with Lightbend.

Topics:
java ,concurrent ,sorting array

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}