An Algorithm for a Concurrent Queue
Check out the source code, reasoning, and steps utilized to create an algorithm designed to handle concurrent queues.
Join the DZone community and get the full member experience.
Join For FreeIn the following post, I want to show you an algorithm for a concurrent queue that supports one reading and multiple writing threads. The writing threads only need a read from a thread local field and a write to a volatile field to publish an event to the queue. Writing does not need to "compare and swap" operations like the standard JDK concurrent queues, leading to an easier and potentially faster algorithm. A usage example is a background thread writing log events asynchronously to a file.
Writing
The main idea is to use not one single queue, but many. We use one queue per writing thread stored in a thread local field. Then the queue is a simple linked list using a volatile field for the next element and final for the value:
public class LinkedList<T> implements Consumer<T> {
volatile ListElementPointer<T> lastRead;
private LinkedListElement<T> lastWritten;
// Constructor omitted
@Override
public void accept(T event) {
// Queue stopped logic omitted
LinkedListElement<T> linkedListElement= new LinkedListElement<T>(event);
if( lastWritten == null )
{
lastWritten = linkedListElement;
lastRead= new ListElementPointer<T>(lastWritten);
}
else
{
lastWritten.next = linkedListElement;
lastWritten = lastWritten.next;
}
}
}
class LinkedListElement<T> {
volatile LinkedListElement<T> next;
final T event;
// Constructor omitted
}
Writing an element to the queue is implemented in the accept method, line 6. When it is the first element written, lastWritten and lastRead will be set to the new LinkedListElement, lines 10-13. Otherwise, the list is extended by the new LinkedListElement, and lastWritten is moved to the end of the list, lines 16 and 17.
And here is the class storing each queue, called Consumer, in a thread local field:
public class ThreadLocalConsumer<T> implements Consumer<T> {
private final EventBus<T> theBus;
private ThreadLocal<Consumer<T>> threadLocal = new ThreadLocal<Consumer<T>>();
// Constructor omitted
public void accept(T event) {
Consumer<T> consumer = threadLocal.get();
if( consumer == null )
{
consumer = theBus.newConsumerForThreadLocalStorage(Thread.currentThread());
threadLocal.set( consumer );
}
consumer.accept(event);
}
}
Reading
When reading the elements, we must remember the last elements we read. We do this in the field lastRead in the LinkedList. The following shows the method used for reading elements from one queue:
public void prozessWithoutReadCount(EventSink<T> eventSink) {
if(list.lastRead != null )
{
// First element read
if( ! list.lastRead.isRead )
{
eventSink.execute( list.lastRead.element.event );
list.lastRead.isRead = true;
}
LinkedListElement<T> current = list.lastRead.element;
while( current.next != null )
{
eventSink.execute( current.event );
current = current.next;
}
list.lastRead.element = current;
}
}
And here is the class ListElementPointer used to store the last read element:
public class ListElementPointer<T> {
LinkedListElement<T> element;
boolean isRead;
// Constructor omitted
}
The field lastRead is initialized by the writing thread and, afterward, only modified by the reading thread.
I've skipped the logic for creating new LinkedLists and reading multiple LinkedLists in the reading thread. You can see the complete source code here.
Usage
The queue is open source and the source code is available on GitHub here. We use this queue in VMLens, a tool that detects race conditions and deadlocks in Java applications. VMLens traces the application and writes the trace events asynchronously using this queue to a file for later analysis.
Published at DZone with permission of Thomas Krieger, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Trending
-
Seven Steps To Deploy Kedro Pipelines on Amazon EMR
-
Alpha Testing Tutorial: A Comprehensive Guide With Best Practices
-
Cypress Tutorial: A Comprehensive Guide With Examples and Best Practices
-
Design Patterns for Microservices: Ambassador, Anti-Corruption Layer, and Backends for Frontends
Comments