DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
Building Scalable Real-Time Apps with AstraDB and Vaadin
Register Now

Trending

  • Seven Steps To Deploy Kedro Pipelines on Amazon EMR
  • Alpha Testing Tutorial: A Comprehensive Guide With Best Practices
  • Cypress Tutorial: A Comprehensive Guide With Examples and Best Practices
  • Design Patterns for Microservices: Ambassador, Anti-Corruption Layer, and Backends for Frontends

Trending

  • Seven Steps To Deploy Kedro Pipelines on Amazon EMR
  • Alpha Testing Tutorial: A Comprehensive Guide With Best Practices
  • Cypress Tutorial: A Comprehensive Guide With Examples and Best Practices
  • Design Patterns for Microservices: Ambassador, Anti-Corruption Layer, and Backends for Frontends
  1. DZone
  2. Data Engineering
  3. AI/ML
  4. An Algorithm for a Concurrent Queue

An Algorithm for a Concurrent Queue

Check out the source code, reasoning, and steps utilized to create an algorithm designed to handle concurrent queues.

Thomas Krieger user avatar by
Thomas Krieger
·
Nov. 06, 17 · Tutorial
Like (20)
Save
Tweet
Share
14.11K Views

Join the DZone community and get the full member experience.

Join For Free

In the following post, I want to show you an algorithm for a concurrent queue that supports one reading and multiple writing threads. The writing threads only need a read from a thread local field and a write to a volatile field to publish an event to the queue. Writing does not need to "compare and swap" operations like the standard JDK concurrent queues, leading to an easier and potentially faster algorithm. A usage example is a background thread writing log events asynchronously to a file.

Writing

The main idea is to use not one single queue, but many. We use one queue per writing thread stored in a thread local field. Then the queue is a simple linked list using a volatile field for the next element and final for the value:

public class LinkedList<T> implements Consumer<T>  {
    volatile ListElementPointer<T> lastRead;
    private LinkedListElement<T> lastWritten;
    // Constructor omitted 
    @Override
    public void accept(T event) {   
    // Queue stopped logic omitted
        LinkedListElement<T> linkedListElement= new LinkedListElement<T>(event);
        if( lastWritten == null )
        {
            lastWritten = linkedListElement;
            lastRead= new ListElementPointer<T>(lastWritten);
        }
        else
        {
            lastWritten.next = linkedListElement;
            lastWritten = lastWritten.next;
        }
    }
}
class LinkedListElement<T> {
    volatile LinkedListElement<T> next;
    final T event;  
    // Constructor omitted 
}


Writing an element to the queue is implemented in the accept method, line 6. When it is the first element written, lastWritten and lastRead will be set to the new LinkedListElement, lines 10-13. Otherwise, the list is extended by the new LinkedListElement, and lastWritten is moved to the end of the list, lines 16 and 17.

And here is the class storing each queue, called Consumer, in a thread local field:

public class ThreadLocalConsumer<T> implements Consumer<T> {
    private final EventBus<T> theBus;
    private ThreadLocal<Consumer<T>> threadLocal = new  ThreadLocal<Consumer<T>>();
    // Constructor omitted 
    public void accept(T event) {   
        Consumer<T>  consumer = threadLocal.get();
        if( consumer == null )
        {
            consumer = theBus.newConsumerForThreadLocalStorage(Thread.currentThread());
            threadLocal.set( consumer );
        }
        consumer.accept(event);
    }
}


Reading

When reading the elements, we must remember the last elements we read. We do this in the field lastRead in the LinkedList. The following shows the method used for reading elements from one queue:

public void prozessWithoutReadCount(EventSink<T> eventSink) {
        if(list.lastRead  != null  )
        {
            // First element read           
            if(  ! list.lastRead.isRead )
            {
                eventSink.execute( list.lastRead.element.event  );
                list.lastRead.isRead = true;
            }
            LinkedListElement<T> current = list.lastRead.element;
            while(  current.next != null )
            {
                eventSink.execute( current.event  );
                current = current.next;
            }
            list.lastRead.element = current;    
        }   
    }


And here is the class ListElementPointer used to store the last read element:

public class ListElementPointer<T> {
    LinkedListElement<T> element;
    boolean isRead;
    // Constructor omitted 
}


The field lastRead is initialized by the writing thread and, afterward, only modified by the reading thread.

I've skipped the logic for creating new LinkedLists and reading multiple LinkedLists in the reading thread. You can see the complete source code here.

Usage

The queue is open source and the source code is available on GitHub here. We use this queue in VMLens, a tool that detects race conditions and deadlocks in Java applications. VMLens traces the application and writes the trace events asynchronously using this queue to a file for later analysis.

Algorithm

Published at DZone with permission of Thomas Krieger, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Trending

  • Seven Steps To Deploy Kedro Pipelines on Amazon EMR
  • Alpha Testing Tutorial: A Comprehensive Guide With Best Practices
  • Cypress Tutorial: A Comprehensive Guide With Examples and Best Practices
  • Design Patterns for Microservices: Ambassador, Anti-Corruption Layer, and Backends for Frontends

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com

Let's be friends: