Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

A Sorted, Concurrent, Bounded Buffer implementation in Java

DZone's Guide to

A Sorted, Concurrent, Bounded Buffer implementation in Java

This type of a data structure could be very handy in a distributed query aggregator implementation.

· Java Zone ·
Free Resource

Get the Edge with a Professional Java IDE. 30-day free trial.

Java has TreeSet to use as a sorted buffer, and a ConcurrentSkipListSet to use as a concurrent version of the same. Let's consider a scenario where we are collecting a 'top-K' list of items. Say we are adding numbers to a collection, and after all numbers are added, we will take the top 5 numbers.

This can be realized using a NavigableSet like this:

NavigableSet<Integer> aSet = new TreeSet<>();
    aSet.add(5);
    //..add more elements
    aSet.pollFirst();//'0'th
    aSet.pollFirst();//'1'th

    aSet = aSet.descendingSet();
    aSet.pollFirst();//'n'th
    aSet.pollFirst();//'n-1'th


However, a set is unbounded. So it will retain all elements. This may not be necessary for our use case, since we are interested in a subset of the items. And it will be deemed extremely unecessary when dealing with, say, huge result sets from the database.

Wouldn't it would be better if we were able to keep the data structure bounded (to the size of 'K') and the elements therein sorted as before. So, it would have to be some sort of a BoundedNavigableSet. And it would need to be concurrently accessible, since multiple producers should be able to add elements to it. 

I found this type of a data structure to be very handy in a distributed query aggregator implementation. Especially for a distributed key-value database like Cassandra, where queries can be made to run on a partition basis. Such a structure will be useful to collate and merge results.

I am not sure if this type of structure is already available in some library, so I made one for my own use. Internally, it uses a lock-free algorithm using an AtomicReferenceArray.

The addition of a new element will be in the form

/*
Scan the array from given offset to 'insert' the item
at a proper sort level
*/
boolean addItem(int fromOffset, T item)
{
    for (int i = fromOffset; i < size(); i++)
    {
      // if there is no element at 'i'th position
      // set item at 'i'
      if (!buffer.compareAndSet(i, null, item))
      {
        // compare and swap using Comparator provided
        // or, if element implements Comparable
        T swapped = compareAndSwap(i, item);
        if (swapped != null)
        {

          // the item has been placed. so break. but then
          // the element currently at 'i' has been swapped. so find its new
          // position, if present
          // we could have scanned from the 'i+1'th position, but to be safe
          // just in case some other element higher up was removed and this
          // needs to go up in that case
          if (i + 1 < size()) {
            addItem(0, swapped);
          }

          return true;
        }
      }
      else {
        return true;
      }
    }

    return false;

  }

Compare and swap would be an atomic operation

private T compareAndSwap(int i, T item)
  {
    boolean set = false, greater = false;
    T t = null;
    while (!set) {
      t = buffer.get(i);

      // either i-th element was replaced with this item
      // or by some other element
  // compare using Comparator/Comparable
      greater = compare(item, t) > 0;
      set = buffer.compareAndSet(i, t, greater ? item : t);

    }
    return greater ? t : null;

  }

The removal operation, if present, should be atomic as well

public boolean remove(Object o) {

    T b;

    if(o == null)
      return false;

    for (int i = 0; i < size(); i++) {

      b = buffer.get(i);

      if (o.equals(b))
      {

        if(buffer.compareAndSet(i, b, null))
        {
          //shift left elements
          for (int j = i + i; j < size(); j++) {
            //check position form start. it can be possible that another higher item has been removed in the meantime
            addItem(0, buffer.get(j));

          }
          return true;

        }

      }

    }

    return false;

  }

Full code can be found in github.

Get the Java IDE that understands code & makes developing enjoyable. Level up your code with IntelliJ IDEA. Download the free trial.

Topics:
java ,concurrent ,sorting array

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}