Coordinating Threads
Join the DZone community and get the full member experience.
Join For FreeJava 5 introduced many new concurrency primitives and collections, and this post is going to look at two classes that can be used to coordinate threads: CountDownLatch and CyclicBarrier.
A CountDownLatch is initialized with a counter. Threads can then either count down on the latch or wait for it to reach 0. When the latch reaches 0, all waiting threads are released.
A common idiom is to use a latch to trigger a coordinated start or end between threads:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownDemo {
public static void main(String[] args) throws Exception {
int threads = 3;
final CountDownLatch startLatch = new CountDownLatch(threads);
final CountDownLatch endLatch = new CountDownLatch(threads);
ExecutorService svc = Executors.newFixedThreadPool(threads);
for (int i = 0; i < threads; i++) {
svc.execute(new Runnable() {
public void run() {
try {
log("At run()");
startLatch.countDown();
startLatch.await();
log("Do work");
Thread.sleep((int) (Math.random() * 1000));
log("Wait for end");
endLatch.countDown();
endLatch.await();
log("Done");
} catch (Exception e) {
e.printStackTrace();
}
}
});
Thread.sleep(100);
}
}
private static void log(String msg) {
System.out.println(System.currentTimeMillis() + ": "
+ Thread.currentThread().getId() + " " + msg);
}
}
In this code, you'll see two latches get initialized. Each thread that starts up counts down on the latch and awaits the latch counting down to 0 (when all threads have been initialized). Similarly, each thread waits for all threads to complete at the same time.
Running this program yields:
1194812267416: 7 At run()
1194812267517: 8 At run()
1194812267618: 9 At run()
1194812267618: 9 Do work
1194812267618: 7 Do work
1194812267619: 8 Do work
1194812267673: 7 Wait for end
1194812267688: 8 Wait for end
1194812268023: 9 Wait for end
1194812268023: 9 Done
1194812268023: 7 Done
1194812268023: 8 Done
You can see that each thread hits run() at different times, but proceeds past the barrier at the same time. They each then do some random amount of work and wait for the latch, then proceed past it together.
In the example above, each thread waits forever for the latch to trigger. You can also choose to wait for a specified time period before giving up. And you can check the latch to see how many threads have arrived and are now waiting. Each CountDownLatch instance can only be used once and is then dead.
If you want a set of threads to repeatedly meet at a common point, you are better served by using a CyclicBarrier. A common use for this is in multi-threaded testing where it is typical to start a bunch of threads, meet, do some stuff, meet, validate some assertions, repeatedly.
The prior program can be simplified by replacing the two latches with a single barrier:
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierDemo {
public static void main(String[] args) throws Exception {
int threads = 3;
final CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService svc = Executors.newFixedThreadPool(threads);
for (int i = 0; i < threads; i++) {
svc.execute(new Runnable() {
public void run() {
try {
log("At run()");
barrier.await();
log("Do work");
Thread.sleep((int) (Math.random() * 1000));
log("Wait for end");
barrier.await();
log("Done");
} catch (Exception e) {
e.printStackTrace();
}
}
});
Thread.sleep(100);
}
}
private static void log(String msg) {
System.out.println(System.currentTimeMillis() + ": "
+ Thread.currentThread().getId() + " " + msg);
}
}
We can see here that the threads can repeatedly wait at the barrier, which implicitly counts down until all threads have arrived, then releases all threads.
Another nice trick with CyclicBarrier is that a Runnable action can be associated with the barrier to be run by the last thread reaching the barrier. You can very simply build a start/end timer for testing with this functionality:
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TimerBarrierDemo {
public static void main(String[] args) throws Exception {
int threads = 3;
final CyclicBarrier barrier = new CyclicBarrier(threads, new BarrierTimer());
ExecutorService svc = Executors.newFixedThreadPool(threads);
for (int i = 0; i < threads; i++) {
svc.execute(new Runnable() {
public void run() {
try {
barrier.await();
long sleepTime = (int) (Math.random() * 1000);
System.out.println(Thread.currentThread().getId() + " working for " + sleepTime);
Thread.sleep(sleepTime);
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
private static class BarrierTimer implements Runnable {
private long start;
public void run() {
if (start == 0) {
start = System.currentTimeMillis();
} else {
long end = System.currentTimeMillis();
long elapsed = (end - start);
System.out.println("Completed in " + elapsed + " ms");
}
}
}
}
Here we rely on knowing that the barrier will be reached exactly twice - once at start and once at end. The first time it's reached, we record a timestamp and the second time it's reached we print out the timing. When we construct our barrier, we give it an instance of this timer class. Each thread then waits to start on the barrier, works for a random amount of time, and waits for the end barrier.
A run looks like this:
9 working for 35
7 working for 341
8 working for 371
Completed in 372 ms
Generally, you should expect the recorded elapsed time to be the maximum of the working time of any of the threads.
CyclicBarrier has a few additional tricks as well - threads can wait for a time period instead of forever, check whether a barrier has been broken (by interruption or forcibly with a reset() method), and determine the number of parties and the number currently waiting.
Opinions expressed by DZone contributors are their own.
Comments