Over a million developers have joined DZone.

JDK concurrent package

· Performance Zone

Evolve your approach to Application Performance Monitoring by adopting five best practices that are outlined and explored in this e-book, brought to you in partnership with BMC.

JDK concurrent package

The currently Java Memory Model guarantees the expected execution order of the multi-threaded code, in case there is no races in this code. In order to protect your code from racing, come up with different ways to synchronize and exchange data between them. Package java.util.concurrent, part of the HotSpot JDK, provides the following tools for writing multi-threaded code:

  • Atomic
  • Locks
  • Collections
  • Synchronization points
  • Executors
  • Accumulators jdk 1.8


Atomic

In the child package java.util.concurrent.atomic is a set of classes for working with primitive atomic types. Contract of those classes ensures the operation compare-and-set runs for 'one unit of CPU time". When your set new value of the variable you also transfer its old value (the approach of optimistic locking). If after calling the variable value differs from the expected parameter -compare-and-set will return false result.

For example, take two arrays of long variables: [1,2,3,4,5] and [-1,-2,-3,-4,-5]. Few threads will iterated over the array, and summarize the elements into a single variable. Code (groovy) with pessimistic locking is looks like:

class Sum {
    static monitor = new Object()
    static volatile long sum = 0
}
class Summer implements Callable {
    long[] data
    Object call() throws Exception {
        data.each {
            synchronized (Sum.monitor) {
                println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
                Sum.sum += it
            }
        }
    }
}

Executors.newFixedThreadPool(2).invokeAll([
        new Summer(data: [1,2,3,4,5]),
        new Summer(data: [-1,-2,-3,-4,-5])
])

print("Sum: ${Sum.sum}")
Expected result:
pool-1-thread-1: add 1 to 0
pool-1-thread-2: add -1 to 1
pool-1-thread-1: add 2 to 0
pool-1-thread-2: add -2 to 2
pool-1-thread-1: add 3 to 0
pool-1-thread-2: add -3 to 3
pool-1-thread-1: add 4 to 0
pool-1-thread-1: add 5 to 4
pool-1-thread-2: add -4 to 9
pool-1-thread-2: add -5 to 5
Sum: 0

However, this approach has significant disadvantages in performance. In this case, we've waste much more time staying in lock state than executing real code:

  • attempt to block the monitor
  • blocking thread
  • unlock monitor
  • unlock thread

Consider the use of AtomicLong to implement optimistic locking in the calculation of the same amount:

class Sum {
    static volatile AtomicLong sum = new AtomicLong(0)
}
class Summer implements Callable {
    long[] data
    Object call() throws Exception {
        data.each {
                while(true) {
                    long localSum = Sum.sum.get()
                    if (Sum.sum.compareAndSet(localSum, localSum + it)) {
                        println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
                        break;
                    } else {
                        println("[MISS!] ${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
                    }
                }
        }
    }
}

Executors.newFixedThreadPool(2).invokeAll([
        new Summer(data: [1,2,3,4,5]),
        new Summer(data: [-1,-2,-3,-4,-5])
])

print("Sum: ${Sum.sum}")
As we can see from the results the errors attempts have been not so much:
[MISS!] pool-1-thread-1: add 1 to -1
pool-1-thread-2: add -1 to -1
pool-1-thread-2: add -2 to -3
[MISS!] pool-1-thread-1: add 1 to -3
pool-1-thread-2: add -3 to -6
pool-1-thread-1: add 1 to -5
[MISS!] pool-1-thread-2: add -4 to -5
pool-1-thread-1: add 2 to -7
pool-1-thread-2: add -4 to -7
pool-1-thread-1: add 3 to -9
pool-1-thread-2: add -5 to -9
pool-1-thread-1: add 4 to -5
pool-1-thread-1: add 5 to 0
Sum: 0

When deciding to use optimistic locking is important to act with modifiable variable does not take long time. The longer this action - the more often will be a case of mistaken compare-and-set, and more often have to perform this action again.

Using compare-and-set method we can also implement nonblocking read lock. In this case, we store current version of object into atomic variable. After all processing over object has done - compare current version of object with saved one. If they are not equal - it's to read object properties again using usual read-write blocking.

class Transaction {
    long debit
}

class Account {
    AtomicLong version = new AtomicLong()
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock()
    List<Transaction> transactions = new ArrayList<Transaction>()
}

long  balance(Account account) {
    ReentrantReadWriteLock.ReadLock locked
    while(true) {
        long balance = 0
        long version = account.version.get()
        account.transactions.each {balance += it.debit}
        //volatile write for JMM
        if (account.version.compareAndSet(version, version)) {
            if (locked) {locked.unlock()}
            return balance
        } else {
            locked = account.readWriteLock.readLock()
        }
    }
}

void modifyTransaction(Account account, int position, long newDebit) {
    def writeLock = account.readWriteLock.writeLock()
    account.version.incrementAndGet()
    account.transactions[position].debit = newDebit
    writeLock.unlock()
}


Locks

ReentrantLock

Unlike syncronized block, ReentrantLock allows more flexibility to choose the time of lock and release because it uses ordinary Java calls.

ReentrantLock also provides information on the current state of lock and allows "expect" lock within a certain time. Maintains proper lock and release of recursive locks for a single thread. If you need a fair lock (complying with the order of the capture monitor) -ReentrantLock is also provide this mechanism.

Although that the syncronized and ReentrantLock lock is very similar on interface level the implementation of them is quite different. Without going into the details of JMM: use ReentrantLock instead provided JVM syncronized block is only if you have battle for the monitor very often. In the case where only one thread (usually) try to get into syncronized method - performance with simple syncronized better than ReentrantLock .

ReentrantReadWriteLock

ReentrantReadWriteLock Complementing ReentrantLock with ability to get a lot of locks read locks and single write lock. Lock on entry can be "lowered" to the read block, if necessary.

StampedLock jdk 1.8

Implements the optimistic and pessimistic locking read-write with the possibility of further increase or decrease locking state. Optimistic locking is implemented through the current lock "stamp" (javadoc):

double distanceFromOriginV1() { // A read-only method
 long stamp;
 if ((stamp = sl.tryOptimisticRead()) != 0L) { // optimistic
   double currentX = x;
   double currentY = y;
   if (sl.validate(stamp))
     return Math.sqrt(currentX * currentX + currentY * currentY);
 }
 stamp = sl.readLock(); // fall back to read lock
 try {
   double currentX = x;
   double currentY = y;
     return Math.sqrt(currentX * currentX + currentY * currentY);
 } finally {
   sl.unlockRead(stamp);
 }
}


Collections

Class Description

ArrayBlockingQueue

Fair queue to transmit messages from one thread to another. Support block (put() take()), and non-blocking (offer() pool()) methods. Prohibits null values​​. Capacity of queue must be present in construction time.

ConcurrentHashMap

Key-value structure, based on the hash function. There are no locks on the reading. Write blocks only part of the map (segment). Number of segments is limited to the nearest concurrencyLevelpower of 2.

ConcurrentSkipListMap

Balanced multi-threaded key-value structure (O(log n)). The search is based on skip-list. The map should be able to compare keys.

ConcurrentSkipListSet

ConcurrentSkipListMap without values.

CopyOnWriteArrayList

Blocking for the record, no blocking on reading list. Any modification creates a new instance of the array in memory.

CopyOnWriteArraySet

CopyOnWriteArrayList without values.

DelayQueue

PriorityBlockingQueue allows to get item only after a certain delay (the delay should be set through Delayed interface). DelayQueue may be of use for scheduler implementation. Capacity's not fixed.

LinkedBlockingDeque

Bidirectional BlockingQueue, based on connectivity (cache-miss & cache coherence overhead). Capacity's not fixed.

LinkedBlockingQueue

Onedirectional BlockingQueue, based on connectivity (cache-miss & cache coherence overhead). Capacity's not fixed.

LinkedTransferQueue

Onedirectional BlockingQueue, based on connectivity (cache-miss & cache coherence overhead). Capacity's not fixed. This queue has methods to wait consumer in producer thread.

PriorityBlockingQueue

Onedirectional BlockingQueue, with message prioritization (through a comparison of elements).

SynchronousQueue

Onedirectional BlockingQueue, implements transfer() logic (from transfer queue) for put()methods.


Synchronization points

Class Description

CountDownLatch

Barrier (await()), waiting the specific (or more) of number of calls countDown(). The state of the barrier can not be reset.

CyclicBarrier

Barrier (await()), waiting a specific count of calls await() other threads. When the number of threads reaches the specified one - an optional callback will be called and lock comes off. Barrier reset its initial condition when number of threads reache specidied number and may be reused.

Exchanger

Barrier (exchange()) for the synchronization two threads. At the time synchronization is possible to volatile transfer objects between threads.

Phaser

Expanding CyclicBarrier, which allows to register and remove members at each cycle of the barrier.

Semaphore

Barrier, allowing only a specified number of threads capture the monitor. In fact extends the functionality ofLock with opportunity to be multiple threads in sync block.


Executors

ExecutorService replaces the new Thread(runnable) to simplify work with threads. ExecutorService helps to re-use free threads, organize the queue of tasks to the thread pool, allow subscription (or await) tasks result. Instead of interface Runnablepool uses an interface Callable (allows return the result and throwing errors).

ExecutorService pool = Executors.newFixedThreadPool(4)
Future future = pool.submit(new Callable() {
    Object call() throws Exception {
        println("In thread")
        return "From thread"
    }
})
println("From main")
println(future.get())

try {
    pool.submit(new Callable() {
        Object call() throws Exception {
            throw new IllegalStateException()
        }
    }).get()
} catch (ExecutionException e) {println("Got it: ${e.cause}")}

pool.shutdown()

InvokeAll method returns control to the calling thread only after completion of all tasks. InvokeAny method returns the result of the first successfully completed tasks, canceling all further.

Class Description

ThreadPoolExecutor

Thread pool with the ability to specify working and maximum number of threads in the pool, queue for tasks.

ScheduledThreadPoolExecutor

Extends functionality of ThreadPoolExecutor with ability to perform tasks on a regular basis or deferred.

ForkJoinPool

Lightweight thread pool for a "self-replicating" tasks. Poole expects call fork() andjoin() methods with child tasks in a parent one.

class LNode {
    List<LNode> childs = []
    def object
}

class Finder extends RecursiveTask<LNode> {
    LNode  node
    Object expect

    protected LNode compute() {
        if (node?.object?.equals(expect)) {
            return node
        }
        node?.childs?.collect {
            new Finder(node: it, expect: expect).fork()
        }?.collect {
            it.join()
        }?.find {
            it != null
        }
    }
}

ForkJoinPool es = new ForkJoinPool()
def invoke = es.invoke(new Finder(
        node: new LNode(
                childs: [
                        new LNode(object: "ivalid"),
                        new LNode(
                                object: "ivalid",
                                childs: [new LNode(object: "test")]
                        )
                ]
        ),
        expect: "test"
))

print("${invoke?.object}")

Accumulators jdk 1.8

Accumulators allow for primitive operations (like sum or finding the maximum value) with the numerical elements in a multithreaded environment without CAS using.

Learn tips and best practices for optimizing your capacity management strategy with the Market Guide for Capacity Management, brought to you in partnership with BMC.

Topics:

Published at DZone with permission of Alexey Kutuzov. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}