Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

An Algorithm for a Concurrent Queue

DZone's Guide to

An Algorithm for a Concurrent Queue

Check out the source code, reasoning, and steps utilized to create an algorithm designed to handle concurrent queues.

· Java Zone
Free Resource

Download Microservices for Java Developers: A hands-on introduction to frameworks and containers. Brought to you in partnership with Red Hat.

In the following post, I want to show you an algorithm for a concurrent queue that supports one reading and multiple writing threads. The writing threads only need a read from a thread local field and a write to a volatile field to publish an event to the queue. Writing does not need to "compare and swap" operations like the standard JDK concurrent queues, leading to an easier and potentially faster algorithm. A usage example is a background thread writing log events asynchronously to a file.

Writing

The main idea is to use not one single queue, but many. We use one queue per writing thread stored in a thread local field. Then the queue is a simple linked list using a volatile field for the next element and final for the value:

public class LinkedList<T> implements Consumer<T>  {
    volatile ListElementPointer<T> lastRead;
    private LinkedListElement<T> lastWritten;
    // Constructor omitted 
    @Override
    public void accept(T event) {   
    // Queue stopped logic omitted
        LinkedListElement<T> linkedListElement= new LinkedListElement<T>(event);
        if( lastWritten == null )
        {
            lastWritten = linkedListElement;
            lastRead= new ListElementPointer<T>(lastWritten);
        }
        else
        {
            lastWritten.next = linkedListElement;
            lastWritten = lastWritten.next;
        }
    }
}
class LinkedListElement<T> {
    volatile LinkedListElement<T> next;
    final T event;  
    // Constructor omitted 
}


Writing an element to the queue is implemented in the accept method, line 6. When it is the first element written, lastWritten and lastRead will be set to the new LinkedListElement, lines 10-13. Otherwise, the list is extended by the new LinkedListElement, and lastWritten is moved to the end of the list, lines 16 and 17.

And here is the class storing each queue, called Consumer, in a thread local field:

public class ThreadLocalConsumer<T> implements Consumer<T> {
    private final EventBus<T> theBus;
    private ThreadLocal<Consumer<T>> threadLocal = new  ThreadLocal<Consumer<T>>();
    // Constructor omitted 
    public void accept(T event) {   
        Consumer<T>  consumer = threadLocal.get();
        if( consumer == null )
        {
            consumer = theBus.newConsumerForThreadLocalStorage(Thread.currentThread());
            threadLocal.set( consumer );
        }
        consumer.accept(event);
    }
}


Reading

When reading the elements, we must remember the last elements we read. We do this in the field lastRead in the LinkedList. The following shows the method used for reading elements from one queue:

public void prozessWithoutReadCount(EventSink<T> eventSink) {
        if(list.lastRead  != null  )
        {
            // First element read           
            if(  ! list.lastRead.isRead )
            {
                eventSink.execute( list.lastRead.element.event  );
                list.lastRead.isRead = true;
            }
            LinkedListElement<T> current = list.lastRead.element;
            while(  current.next != null )
            {
                eventSink.execute( current.event  );
                current = current.next;
            }
            list.lastRead.element = current;    
        }   
    }


And here is the class ListElementPointer used to store the last read element:

public class ListElementPointer<T> {
    LinkedListElement<T> element;
    boolean isRead;
    // Constructor omitted 
}


The field lastRead is initialized by the writing thread and, afterward, only modified by the reading thread.

I've skipped the logic for creating new LinkedLists and reading multiple LinkedLists in the reading thread. You can see the complete source code here.

Usage

The queue is open source and the source code is available on GitHub here. We use this queue in VMLens, a tool that detects race conditions and deadlocks in Java applications. VMLens traces the application and writes the trace events asynchronously using this queue to a file for later analysis.

Download Building Reactive Microservices in Java: Asynchronous and Event-Based Application Design. Brought to you in partnership with Red Hat

Topics:
java ,concurrent ,algorithm ,concurrent programming ,tutorial

Published at DZone with permission of Thomas Krieger, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}