Parallel and Asynchronous Programming in Java 8
Parallel and Asynchronous Programming in Java 8
Join the DZone community and get the full member experience.Join For Free
Java-based (JDBC) data connectivity to SaaS, NoSQL, and Big Data. Download Now.
Parallel code, which is code that runs on more than one thread, was once the nightmare of many an experienced developer, but Java 8 brought a lot of changes that should make this performance-boosting trick a lot more manageable.
Before Java 8 there was a big difference between parallel (or concurrent) code and sequential code. It was also very hard to debug non-sequential code. Simply setting a breakpoint and going through the flow like you would normally do, would remove the parallel aspect, which is a problem if that is what is causing the bug.
Luckily, Java 8 gave us streams, the greatest thing for Java developers since the bean. If you don't know what they are, the Stream API makes it possible to handle sequences of elements in a functional matter. (Check our comparison between streams and .NET's LINQ here.) One of the advantages of streams is that the structure of the code stays the same: whether it's sequential or concurrent, it stays just as readable.
To make your code run parallel, you simply use
.parallelStream() instead of
.stream() , (or
stream.parallel() , if you are not the creator of the stream).
But just because it’s easy, doesn't mean that parallel code is always the best choice. You should always consider whether it makes any sense to use concurrency for your piece of code. The most important factor in that decision will be the speed: only use concurrency if it makes your code faster than its sequential counterpart.
The Speed Question
Parallel code gets its speed benefit from using multiple threads instead of the single one that sequential code uses. Deciding how many threads to create can be a tricky question because more threads don't always result in faster code: if you use too many threads the performance of your code might actually go down.
There are a couple of rules that will tell you what number of threads to choose. This depends mostly on the kind of operation that you want to perform and the number of available cores.
Computation intensive operations should use a number of threads lower than or equal to the number of cores, while IO intensive operations like copying files have no use for the CPU and can therefore use a higher number of threads. The code doesn’t know which case is applicable unless you tell it what to do. Otherwise, it will default to a number of threads equal to the number of cores.
There are two main cases when it can be useful to run your code parallel instead of sequential: time-consuming tasks and tasks run on big collections. Java 8 brought a new way of handling those big collections, namely with streams. Streams have built-in efficiency by laziness: they use lazy evaluation which saves resources by not doing more than necessary. This is not the same as parallelism, which doesn’t care about the resources as long as it goes faster. So for big collections, you probably don’t need classic parallelism.
func .then(f1) .catch(e1) .then(f2) .catch(e2);
So when the original function has a successful result, f1 is called, but if an error was thrown e1 will be called. This might bring it back to the successful track (f2), or result in another error (e2). You can go from data track to error track and back.
CompletableFuture implements both the
Future and the
Future already existed pre-Java8, but it wasn’t very developer-friendly by itself. You could only get the result of the asynchronous computation by using the
.get() method, which blocked the rest (making the async part pretty pointless most of the time) and you needed to implement each possible scenario manually. Adding the
CompletionStage interface was the breakthrough that made asynchronous programming in Java workable.
CompletionStage is a promise, namely the promise that the computation will eventually be done. It contains a bunch of methods that let you attach callbacks that will be executed on that completion. Now we can handle the result without blocking.
There are two main methods that let you start the asynchronous part of your code:
supplyAsync if you want to do something with the result of the method, and
runAsync if you don’t.
CompletableFuture.runAsync(() → System.out.println("Run async in completable future " + Thread.currentThread())); CompletableFuture.supplyAsync(() → 5);
Now you can add those callbacks to handle the result of your
CompletableFuture.supplyAsync(() → 5) .thenApply(i → i * 3) .thenAccept(i → System.out.println(“The result is “ + i) .thenRun(() → System.out.println("Finished."));
.thenApply is similar to the
.map function for streams: it performs a transformation. In the example above it takes the result (5) and multiplies it by 3. It will then pass that result (15) further down the pipe.
.thenAccept performs a method on the result without transforming it. It will also not return a result. Here it will print “The result is 15” to the console. It can be compared to the
.foreach method for streams.
.thenRun doesn’t use the result of the async operation and also doesn’t return anything, it just waits to call its
Runnable until the previous step is completed.
Asyncing Your Async
All of the above callback methods also come in an async version:
thenApplyAsync, etc. These versions can run on their own thread and they give you extra control because you can tell it which
ForkJoinPool to use.
If you don’t use the async version, then the callbacks will all be executed on the same thread.
When Things Go Wrong
When something goes wrong, the
exceptionally method is used to handle the exception. You can give it a method that returns a value to get back on the data track, or throw a (new) exception.
… .exceptionally(ex → new Foo()) .thenAccept(this::bar);
Combine and Compose
You can chain multiple
CompletableFutures by using the
thenCompose method. Without it, the result would be nested
CompletableFutures. This makes
map for streams.
CompletableFuture.supplyAsync(() -> "Hello") .thenCompose(s -> CompletableFuture .supplyAsync(() -> s + "World"));
If you want to combine the result of two
CompletableFutures, you will need a method conveniently called
future.thenCombine(future2, Integer::sum) .thenAccept(value → System.out.println(value));
As you can see in the example above, the result of the callback in
thenCombine can be handled like a normal
CompletableFuture with all your favourite
Parallel programming no longer needs to be an insurmountable obstacle in the hunt for faster code. Java 8 makes the process as straightforward as can be, so that any piece of code that could possibly benefit from it, can be pulled, kicking and screaming on all threads, into the multi-core future that is, in fact, just the present day. By which I mean: it’s easy to do, so give it a try and see its advantages for yourself.
Further Reading and Sources
Published at DZone with permission of Lisa Steendam . See the original article here.
Opinions expressed by DZone contributors are their own.