AtomicReference: A (Sometimes Easier) Alternative to Synchronized Blocks
Let's break out a few tools Java 8 brought and see where AtomicReferences shine over synchronized blocks: concurrent state machines and updating values.
Join the DZone community and get the full member experience.
Join For FreeBrian Goetz lists AtomicReference in his the book Java Concurrency in Practice in the in the section on advanced topics. Yet we will see that AtomicReferences are, for specific use cases, easier to use than synchronized blocks. And the relatively new JDK 8 getAndUpdate and updateAndGet methods make AtomicReference even easier to use.
Let us start with the first topic, a use case that can be more easily implemented by an AtomicReference than by a synchronized block: a concurrent state machine.
Use compareAndSet: A Concurrent State Machine
The class CommandReader from the Maven surefire plugin uses the compareAndSet method to implement a concurrent state machine:
public final class CommandReader {
private static final CommandReader READER = new CommandReader();
private final Thread commandThread =
newDaemonThread(new CommandRunnable(), "surefire-forkedjvm-command-thread");
private final AtomicReference <Thread.State> state =
new AtomicReference <Thread.State> (NEW);
public static CommandReader getReader() {
final CommandReader reader = READER;
if (reader.state.compareAndSet(NEW, RUNNABLE)) {
reader.commandThread.start();
}
return reader;
}
}
The class AtomicReference wraps another class to enrich a variable with atomic update functionality. In line 5, the AtomicReference represents an atomic variable of the Enum type Thread.State. The AtomicReference gets initialized in line 6 to the value NEW.
The method getReader must start the commandThread when the current state is NEW and update its value to RUNNABLE. Since the method can be called by multiple threads in parallel, setting and checking must be done atomically. This is done by the method compareAndSet, line 9. The compareAndSet method only updates its value to the new value when the current value is the same as the expected. In the example, it only updates the variable to RUNNING when the current value is NEW. If the update succeeds, the method returns true and the thread gets started. Otherwise, it returns false and nothing happens. The check and update are done atomically.
Here is, as a comparison — the same functionality implemented with a synchronized block.
public final class CommandReader {
private static final CommandReader READER = new CommandReader();
private final Thread commandThread =
newDaemonThread(new CommandRunnable(), "surefire-forkedjvm-command-thread");
private final Thread.State state = NEW;
private final Object LOCK = new Object();
public static CommandReader getReader() {
final CommandReader reader = READER;
synchronized(reader.LOCK) {
if (reader.state == NEW) {
reader.commandThread.start();
reader.state = RUNNABLE;
}
}
return reader;
}
}
We use a synchronized block around the check and update of the variable state, line 10. This example shows that the check and update must be atomic. Without synchronization, two threads might read a state as NEW, calling the start method from the commandThread multiple times.
As we can see, we can replace the synchronized block, the if statement, and the write to the state by one method call to compareAndSet. In the next example, we see how to use the compareAndSet method to update values.
Updating Values: Retry Until Success
The idea behind using compareAndSet for updates is to retry until the update succeeds. The class AsyncProcessor from RxJava uses this technique to update an array of subscribers in the method add:
final AtomicReference <AsyncSubscription <T> [] > subscribers;
boolean add(AsyncSubscription < T > ps) {
for (;;) {
AsyncSubscription <T> [] a = subscribers.get();
if (a == TERMINATED) {
return false;
}
int n = a.length;
@SuppressWarnings("unchecked")
AsyncSubscription <T> [] b = new AsyncSubscription[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = ps;
if (subscribers.compareAndSet(a, b)) {
return true;
}
}
}
The update is retried using a for loop, line 3. The loop is only terminated either if the subscriber array is in the state terminated, line 6, or the compareAndSet operation succeeds, line 14. In all other cases, the update is repeated on a copy of the array.
Starting with JDK 8, the class AtomicReference provides this functionality in the two utility methods getAndUpdate and updateAndGet. The following shows the implementation of the getAndUpdate method in JDK 8:
public final V getAndUpdate(UnaryOperator <V> updateFunction) {
V prev, next;
do {
prev = get();
next = updateFunction.apply(prev);
} while (!compareAndSet(prev, next));
return prev;
}
The method uses the same technique as the add method from the class AsyncProcessor. It retries the compareAndSet method in a loop, line 6. The updateFunction will be called multiple times when the update fails. So this function must be side effect free.
And here is the add method from above implemented with the new updateAndGet method:
boolean add(AsyncSubscription < T > ps) {
AsyncSubscription <T> [] result = subscribers.updateAndGet((a) -> {
if (a != TERMINATED) {
int n = a.length;
@SuppressWarnings("unchecked")
AsyncSubscription <T> [] b = new AsyncSubscription[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = ps;
return b;
} else {
return a;
}
});
return result != TERMINATED;
}
As we can see, the while loop is hidden in the updateAndGet method. We only need to implement a function calculating a new value from an old one.
Conclusion and Next Steps
We have seen two examples of compareAndSet. If you are interested in more examples, take a look at the book The Art of Multiprocessor Programming. It shows you how to implement typical concurrent data structures using compareAndSet. And this article shows how to test atomic updates.
I would be glad to hear about how you use AtomicReference in your application.
Published at DZone with permission of Thomas Krieger, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments