Over a million developers have joined DZone.

JDK concurrent package

DZone 's Guide to

JDK concurrent package

· Performance Zone ·
Free Resource

JDK concurrent package

The currently Java Memory Model guarantees the expected execution order of the multi-threaded code, in case there is no races in this code. In order to protect your code from racing, come up with different ways to synchronize and exchange data between them. Package java.util.concurrent, part of the HotSpot JDK, provides the following tools for writing multi-threaded code:

  • Atomic
  • Locks
  • Collections
  • Synchronization points
  • Executors
  • Accumulators jdk 1.8


In the child package java.util.concurrent.atomic is a set of classes for working with primitive atomic types. Contract of those classes ensures the operation compare-and-set runs for 'one unit of CPU time". When your set new value of the variable you also transfer its old value (the approach of optimistic locking). If after calling the variable value differs from the expected parameter -compare-and-set will return false result.

For example, take two arrays of long variables: [1,2,3,4,5] and [-1,-2,-3,-4,-5]. Few threads will iterated over the array, and summarize the elements into a single variable. Code (groovy) with pessimistic locking is looks like:

class Sum {
    static monitor = new Object()
    static volatile long sum = 0
class Summer implements Callable {
    long[] data
    Object call() throws Exception {
        data.each {
            synchronized (Sum.monitor) {
                println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
                Sum.sum += it

        new Summer(data: [1,2,3,4,5]),
        new Summer(data: [-1,-2,-3,-4,-5])

print("Sum: ${Sum.sum}")
Expected result:
pool-1-thread-1: add 1 to 0
pool-1-thread-2: add -1 to 1
pool-1-thread-1: add 2 to 0
pool-1-thread-2: add -2 to 2
pool-1-thread-1: add 3 to 0
pool-1-thread-2: add -3 to 3
pool-1-thread-1: add 4 to 0
pool-1-thread-1: add 5 to 4
pool-1-thread-2: add -4 to 9
pool-1-thread-2: add -5 to 5
Sum: 0

However, this approach has significant disadvantages in performance. In this case, we've waste much more time staying in lock state than executing real code:

  • attempt to block the monitor
  • blocking thread
  • unlock monitor
  • unlock thread

Consider the use of AtomicLong to implement optimistic locking in the calculation of the same amount:

class Sum {
    static volatile AtomicLong sum = new AtomicLong(0)
class Summer implements Callable {
    long[] data
    Object call() throws Exception {
        data.each {
                while(true) {
                    long localSum = Sum.sum.get()
                    if (Sum.sum.compareAndSet(localSum, localSum + it)) {
                        println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
                    } else {
                        println("[MISS!] ${Thread.currentThread().name}: add ${it} to ${Sum.sum}")

        new Summer(data: [1,2,3,4,5]),
        new Summer(data: [-1,-2,-3,-4,-5])

print("Sum: ${Sum.sum}")
As we can see from the results the errors attempts have been not so much:
[MISS!] pool-1-thread-1: add 1 to -1
pool-1-thread-2: add -1 to -1
pool-1-thread-2: add -2 to -3
[MISS!] pool-1-thread-1: add 1 to -3
pool-1-thread-2: add -3 to -6
pool-1-thread-1: add 1 to -5
[MISS!] pool-1-thread-2: add -4 to -5
pool-1-thread-1: add 2 to -7
pool-1-thread-2: add -4 to -7
pool-1-thread-1: add 3 to -9
pool-1-thread-2: add -5 to -9
pool-1-thread-1: add 4 to -5
pool-1-thread-1: add 5 to 0
Sum: 0

When deciding to use optimistic locking is important to act with modifiable variable does not take long time. The longer this action - the more often will be a case of mistaken compare-and-set, and more often have to perform this action again.

Using compare-and-set method we can also implement nonblocking read lock. In this case, we store current version of object into atomic variable. After all processing over object has done - compare current version of object with saved one. If they are not equal - it's to read object properties again using usual read-write blocking.

class Transaction {
    long debit

class Account {
    AtomicLong version = new AtomicLong()
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock()
    List<Transaction> transactions = new ArrayList<Transaction>()

long  balance(Account account) {
    ReentrantReadWriteLock.ReadLock locked
    while(true) {
        long balance = 0
        long version = account.version.get()
        account.transactions.each {balance += it.debit}
        //volatile write for JMM
        if (account.version.compareAndSet(version, version)) {
            if (locked) {locked.unlock()}
            return balance
        } else {
            locked = account.readWriteLock.readLock()

void modifyTransaction(Account account, int position, long newDebit) {
    def writeLock = account.readWriteLock.writeLock()
    account.transactions[position].debit = newDebit



Unlike syncronized block, ReentrantLock allows more flexibility to choose the time of lock and release because it uses ordinary Java calls.

ReentrantLock also provides information on the current state of lock and allows "expect" lock within a certain time. Maintains proper lock and release of recursive locks for a single thread. If you need a fair lock (complying with the order of the capture monitor) -ReentrantLock is also provide this mechanism.

Although that the syncronized and ReentrantLock lock is very similar on interface level the implementation of them is quite different. Without going into the details of JMM: use ReentrantLock instead provided JVM syncronized block is only if you have battle for the monitor very often. In the case where only one thread (usually) try to get into syncronized method - performance with simple syncronized better than ReentrantLock .


ReentrantReadWriteLock Complementing ReentrantLock with ability to get a lot of locks read locks and single write lock. Lock on entry can be "lowered" to the read block, if necessary.

StampedLock jdk 1.8

Implements the optimistic and pessimistic locking read-write with the possibility of further increase or decrease locking state. Optimistic locking is implemented through the current lock "stamp" (javadoc):

double distanceFromOriginV1() { // A read-only method
 long stamp;
 if ((stamp = sl.tryOptimisticRead()) != 0L) { // optimistic
   double currentX = x;
   double currentY = y;
   if (sl.validate(stamp))
     return Math.sqrt(currentX * currentX + currentY * currentY);
 stamp = sl.readLock(); // fall back to read lock
 try {
   double currentX = x;
   double currentY = y;
     return Math.sqrt(currentX * currentX + currentY * currentY);
 } finally {


Class Description


Fair queue to transmit messages from one thread to another. Support block (put() take()), and non-blocking (offer() pool()) methods. Prohibits null values​​. Capacity of queue must be present in construction time.


Key-value structure, based on the hash function. There are no locks on the reading. Write blocks only part of the map (segment). Number of segments is limited to the nearest concurrencyLevelpower of 2.


Balanced multi-threaded key-value structure (O(log n)). The search is based on skip-list. The map should be able to compare keys.


ConcurrentSkipListMap without values.


Blocking for the record, no blocking on reading list. Any modification creates a new instance of the array in memory.


CopyOnWriteArrayList without values.


PriorityBlockingQueue allows to get item only after a certain delay (the delay should be set through Delayed interface). DelayQueue may be of use for scheduler implementation. Capacity's not fixed.


Bidirectional BlockingQueue, based on connectivity (cache-miss & cache coherence overhead). Capacity's not fixed.


Onedirectional BlockingQueue, based on connectivity (cache-miss & cache coherence overhead). Capacity's not fixed.


Onedirectional BlockingQueue, based on connectivity (cache-miss & cache coherence overhead). Capacity's not fixed. This queue has methods to wait consumer in producer thread.


Onedirectional BlockingQueue, with message prioritization (through a comparison of elements).


Onedirectional BlockingQueue, implements transfer() logic (from transfer queue) for put()methods.

Synchronization points

Class Description


Barrier (await()), waiting the specific (or more) of number of calls countDown(). The state of the barrier can not be reset.


Barrier (await()), waiting a specific count of calls await() other threads. When the number of threads reaches the specified one - an optional callback will be called and lock comes off. Barrier reset its initial condition when number of threads reache specidied number and may be reused.


Barrier (exchange()) for the synchronization two threads. At the time synchronization is possible to volatile transfer objects between threads.


Expanding CyclicBarrier, which allows to register and remove members at each cycle of the barrier.


Barrier, allowing only a specified number of threads capture the monitor. In fact extends the functionality ofLock with opportunity to be multiple threads in sync block.


ExecutorService replaces the new Thread(runnable) to simplify work with threads. ExecutorService helps to re-use free threads, organize the queue of tasks to the thread pool, allow subscription (or await) tasks result. Instead of interface Runnablepool uses an interface Callable (allows return the result and throwing errors).

ExecutorService pool = Executors.newFixedThreadPool(4)
Future future = pool.submit(new Callable() {
    Object call() throws Exception {
        println("In thread")
        return "From thread"
println("From main")

try {
    pool.submit(new Callable() {
        Object call() throws Exception {
            throw new IllegalStateException()
} catch (ExecutionException e) {println("Got it: ${e.cause}")}


InvokeAll method returns control to the calling thread only after completion of all tasks. InvokeAny method returns the result of the first successfully completed tasks, canceling all further.

Class Description


Thread pool with the ability to specify working and maximum number of threads in the pool, queue for tasks.


Extends functionality of ThreadPoolExecutor with ability to perform tasks on a regular basis or deferred.


Lightweight thread pool for a "self-replicating" tasks. Poole expects call fork() andjoin() methods with child tasks in a parent one.

class LNode {
    List<LNode> childs = []
    def object

class Finder extends RecursiveTask<LNode> {
    LNode  node
    Object expect

    protected LNode compute() {
        if (node?.object?.equals(expect)) {
            return node
        node?.childs?.collect {
            new Finder(node: it, expect: expect).fork()
        }?.collect {
        }?.find {
            it != null

ForkJoinPool es = new ForkJoinPool()
def invoke = es.invoke(new Finder(
        node: new LNode(
                childs: [
                        new LNode(object: "ivalid"),
                        new LNode(
                                object: "ivalid",
                                childs: [new LNode(object: "test")]
        expect: "test"


Accumulators jdk 1.8

Accumulators allow for primitive operations (like sum or finding the maximum value) with the numerical elements in a multithreaded environment without CAS using.


Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}