Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Java 8: Definitive Guide to CompletableFuture

DZone's Guide to

Java 8: Definitive Guide to CompletableFuture

While Java 7 and Java 6 were rather minor releases, version 8 will be a big step forward.

· Java Zone
Free Resource

Download Microservices for Java Developers: A hands-on introduction to frameworks and containers. Brought to you in partnership with Red Hat.

Java 8 is coming so it's time to study new features. While Java 7 and Java 6 were rather minor releases, version 8 will be a big step forward. Maybe even too big? Today I will give you a thorough explanation of new abstraction in JDK 8 - CompletableFuture<T>. As you all know Java 8 will hopefully be released in less than a year, therefore this article is based on JDK 8 build 88 with lambda supportCompletableFuture<T> extendsFuture<T> by providing functional, monadic (!) operations and promoting asynchronous, event-driven programming model, as opposed to blocking in older Java. If you openedJavaDoc of CompletableFuture<T> you are surely overwhelmed. About fifty methods(!), some of them being extremely cryptic and exotic, e.g.:


public <U,V> CompletableFuture<V> thenCombineAsync(
CompletableFuture<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn,
Executor executor)


Don't worry, but keep reading. 

CompletableFuture

 collects all the features ofListenableFuture in Guava with SettableFuture. Moreover built-in lambda support brings it closer to Scala/Akka futures. Sounds too good to be true, but keep reading.

CompletableFuture

 has two major areas superior to good ol' 

Future<T>

 - asynchronous callback/transformations support and the ability to set value of

CompletableFuture

 from any thread at any point in time. 


Extract/modify wrapped value

Typically futures represent piece of code running by other thread. But that's not always the case. Sometimes you want to create a 

Future

 representing some event that you know will occur, e.g. JMS message arrival. So you have 

Future<Message>

 but there is no asynchronous job underlying this future. You simply want to complete (resolve) that future when JMS message arrives, and this is driven by an event. In this case you can simply create 

CompletableFuture

, return it to your client and whenever you think your results are available, simply 

complete()

 the future and unlock all clients waiting on that future.


For starters you can simply create new 

CompletableFuture

 out of thin air and give it to your client:


public CompletableFuture<String> ask() {
final CompletableFuture<String> future = new CompletableFuture<>();
//...
return future;
}

Notice that this future is not associated wtih any 

Callable<String>

, no thread pool, no asynchronous job. If now the client code calls 

ask().get()

 it will block forever. If it registers some completion callbacks, they will never fire. So what's the point? Now you can say:



...and at this very moment all clients blocked on 

Future.get()

 will get the result string. Also completion callbacks will fire immediately. This comes quite handy when you want to represent a task in the future, but not necessarily computational task running on some thread of execution. 

CompletableFuture.complete()

 can only be called once, subsequent invocations are ignored. But there is a back-door called

CompletableFuture.obtrudeValue(...)

 which overrides previous value of the

Future

 with new one. Use with caution.


Sometimes you want to signal failure. As you know 

Future

 objects can handle either wrapped result or exception. If you want to pass some exception further, there is

CompletableFuture.completeExceptionally(ex)

 (and 

obtrudeException(ex)

evil brother that overrides the previous exception). 

completeExceptionally()

 also unlock all waiting clients, but this time throwing an exception from 

get()

. Speaking of

get()

, there is also 

CompletableFuture.join()

 method with some subtle changes in error handling. But in general they are the same. And finally there is also

CompletableFuture.getNow(valueIfAbsent)

 method that doesn't block but if the

Future

 is not completed yet, returns default value. Useful when building robust systems where we don't want to wait too much.


Last 

static

 utility method is 

completedFuture(value)

 that returns already completed

Future

 object. Might be useful for testing or when writing some adapter layer.


Creating and obtaining CompletableFuture

OK, so is creating 

CompletableFuture

 manually our only option? Not quite. Just as with normal 

Future

s we can wrap existing task with 

CompletableFuture

 using the following family of factory methods:


static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

Methods that do not take an 

Executor

 as an argument but end with 

...Async

 will useForkJoinPool.commonPool() (global, general purpose pool introduces in JDK 8). This applies to most methods in 

CompletableFuture

 class. 

runAsync()

 is simple to understand, notice that it takes 

Runnable

, therefore it returns

CompletableFuture<Void>

 as 

Runnable

 doesn't return anything. If you need to process something asynchronously and return result, use Supplier<U>:


final CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
//...long running...
return "42";
}
}, executor);

But hey, we have lambdas in Java 8!


final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
//...long running...
return "42";
}, executor);

or even:


final CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> longRunningTask(params), executor);

This article is not about project Lambda, but I will be using lambdas quite extensively.


Transforming and acting on one CompletableFuture(thenApply)

So I said that 

CompletableFuture

 is superior to 

Future

 but you haven't yet seen why? Simply put, it's because 

CompletableFuture

 is a monad and a functor. Not helping I guess? Both Scala and JavaScript allow registering asynchronous callbacks when future is completed. We don't have to wait and block until it's ready. We can simply say: run this function on a result, when it arrives. Moreover, we can stack such functions, combine multiple futures together, etc. For example if we have a function from 

String

 to 

Integer

we can turn 

CompletableFuture<String>

 to 

CompletableFuture<Integer

 without unwrapping it. This is achieved with 

thenApply()

 family of methods:


<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

As stated before 

...Async

 versions are provided for most operations on

CompletableFuture

 thus I will skip them in subsequent sections. Just remember that first method will apply function within the same thread in which the future completed while the remaining two will apply it asynchronously in different thread pool.


Let's see how 

thenApply()

 works:


CompletableFuture<String> f1 = //...
CompletableFuture<Integer> f2 = f1.thenApply(Integer::parseInt);
CompletableFuture<Double> f3 = f2.thenApply(r -> r * r * Math.PI);

Or in one statement:


CompletableFuture<Double> f3 =
f1.thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI);

You see a sequence of transformations here. From 

String

 to 

Integer

 and then to

Double

. But what's most important, these transformations are neither executed immediately nor blocking. They are simply remembered and when original 

f1

 completes they are executed for you. If some of the transformations are time-consuming, you can supply your own 

Executor

 to run them asynchronously. Notice that this operation is equivalent to monadic 

map

 in Scala.


Running code on completion (thenAccept/thenRun)

CompletableFuture<Void> thenAccept(Consumer<? super T> block);
CompletableFuture<Void> thenRun(Runnable action);

These two methods are typical "final" stages in future pipeline. They allow you to consume future value when it's ready. While 

thenAccept()

 provides the final value, 

thenRun

executes 

Runnable

 which doesn't even have access to computed value. Example:


future.thenAcceptAsync(dbl -> log.debug("Result: {}", dbl), executor);
log.debug("Continuing");
...Async

 variants are available as well for both methods, with implicit and explicit executor. I can't emphasize this enough: 

thenAccept()

/

thenRun()

 methods do not block (even without explicit 

executor

). Treat them like an event listener/handler that you attach to a future and that will execute some time in the future. 

"Continuing"

 message will appear immediately, even if 

future

 is not even close to completion.


Error handling of single CompletableFuture

So far we only talked about result of computation. But what about exceptions? Can we handle them asynchronously as well? Sure!


CompletableFuture<String> safe =
future.exceptionally(ex -> "We have a problem: " + ex.getMessage());
exceptionally()

 takes a function that will be invoked when original future throws an exception. We then have an opportunity to recover by transforming this exception into some value compatible with 

Future

's type. Further transformations of 

safe

 will no longer yield an exception but instead a 

String

 returned from supplied function.


A more flexible approach is 

handle()

 that takes a function receiving either correct result or exception:


CompletableFuture<Integer> safe = future.handle((ok, ex) -> {
if (ok != null) {
return Integer.parseInt(ok);
} else {
log.warn("Problem", ex);
return -1;
}
});
handle()

 is called always, with either result or exception argument being not-

null

. This is a one-stop catch-all strategy.


Combining two CompletableFuture together

Asynchronous processing of one 

CompletableFuture

 is nice but it really shows its power when multiple such futures are combined together in various ways.


Combining (chaining) two futures (thenCompose())

Sometimes you want to run some function on future's value (when it's ready). But this function returns future as well. 

CompletableFuture

 should be smart enough to understand that the result of our function should now be used as top-level future, as opposed to 

CompletableFuture<CompletableFuture<T>>

. Method 

thenCompose()

is thus equivalent to 

flatMap

 in Scala:


<U> CompletableFuture<U> thenCompose(Function<? super T,CompletableFuture<U>> fn);
...Async

 variations are available as well. Example below, look carefully at the types and the difference between 

thenApply()

 (

map

) and 

thenCompose()

 (

flatMap

) when applying a 

calculateRelevance()

 function returning 

CompletableFuture<Double>

:


CompletableFuture<Document> docFuture = //...
CompletableFuture<CompletableFuture<Double>> f =
docFuture.thenApply(this::calculateRelevance);
CompletableFuture<Double> relevanceFuture =
docFuture.thenCompose(this::calculateRelevance);
//...
private CompletableFuture<Double> calculateRelevance(Document doc)  //...
thenCompose()

 is an essential method that allows building robust, asynchronous pipelines, without blocking or waiting for intermediate steps.


Transforming values of two futures (thenCombine())

While 

thenCompose()

 is used to chain one future dependent on the other,

thenCombine

 combines two independent futures when they are both done:


?
1
<U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extendsU> other, BiFunction<? superT,? superU,? extendsV> fn)
...Async

 variations are available as well. Imagine you have two 

CompletableFuture

s, one that loads 

Customer

 and other that loads nearest 

Shop

. They are completely independent from each other, but when both of them are completed, you want to use their values to calculate 

Route

. Here is a stripped example:


CompletableFuture<Customer> customerFuture = loadCustomerDetails(123);
CompletableFuture<Shop> shopFuture = closestShop();
CompletableFuture<Route> routeFuture =
customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop));
//...
private Route findRoute(Customer customer, Shop shop) //...

Notice that in Java 8 you can replace 

(cust, shop) -> findRoute(cust, shop)

with simple 

this::findRoute

 method reference:


customerFuture.thenCombine(shopFuture, this::findRoute);

So you get the idea. We have 

customerFuture

 and 

shopFuture

. Then 

routeFuture

wraps them and "waits" for both to complete. When both of them are ready, it runs our supplied function that combines results (

findRoute()

). Thus 

routeFuture

 will complete when two underlying futures are resolved and

findRoute()

 is done.


Waiting for both CompletableFutures to complete

If instead of producing new 

CompletableFuture

 combining both results we simply want to be notified when they finish, we can use 

thenAcceptBoth()

/

runAfterBoth()

 family of methods (

...Async

 variations are available as well). They work similarly to

thenAccept()

 and 

thenRun()

 but wait for two futures instead of one:


extends U> other, BiConsumer<? super T,? super U> block)
CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, Runnable action)

Imagine that in the example above, instead of producing new

CompletableFuture<Route>

 you simply want send some event or refresh GUI immediately. This can be easily achieved with 

thenAcceptBoth()

:


customerFuture.thenAcceptBoth(shopFuture, (cust, shop) -> {
final Route route = findRoute(cust, shop);
//refresh GUI with route
});

I hope I'm wrong but maybe some of you are asking themselves a question: why can't I simply block on these two futures? Like here:


Future<Customer> customerFuture = loadCustomerDetails(123);
Future<Shop> shopFuture = closestShop();
findRoute(customerFuture.get(), shopFuture.get());

Well, of course you can. But the whole point of 

CompletableFuture

 is to allow asynchronous, event driven programming model instead of blocking and eagerly waiting for result. So functionally two code snippets above are equivalent, but the latter unnecessarily occupies one thread of execution.


Waiting for first CompletableFuture to complete

Another interesting part of the 

CompletableFuture

 API is the ability to wait for first (as opposed to all) completed future. This can come handy when you have two tasks yielding result of the same type and you only care about response time, not which task resulted first. API methods (

...Async

 variations are available as well):


CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other, Consumer<? super T> block)
CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, Runnable action)

As an example say you have two systems you integrate with. One has smaller average response times but high standard deviation. Other one is slower in general, but more predictable. In order to take best of both worlds (performance and predictability) you call both systems at the same time and wait for the first one to complete. Normally it will be the first one, but in case it became slow, second one finishes in an acceptable time:


CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
fast.acceptEither(predictable, s -> {
System.out.println("Result: " + s);
});
s

 represents 

String

 reply either from 

fetchFast()

 or from 

fetchPredictably()

. We neither know nor care.


Transforming first completed

applyToEither()

 is an older brother of 

acceptEither()

. While the latter simply calls some piece of code when faster of two futures complete, 

applyToEither()

 will return a new future. This future will complete when first of the two underlying futures complete. API is a bit similar (

...Async

 variations are available as well):


<U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other, Function<? super T,U> fn)

The extra 

fn

 function is invoked on the result of first future that completed. I am not really sure what's the purpose of such a specialized method, after all one could simply use:

fast.applyToEither(predictable).thenApply(fn)

. Since we are stuck with this API but we don't really need extra function application, I will simply useFunction.identity() placeholder:


CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
CompletableFuture<String> firstDone =
fast.applyToEither(predictable, Function.<String>identity());
firstDone

 future can then be passed around. Notice that from the client perspective the fact that two futures are actually behind 

firstDone

 is hidden. Client simply waits for future to complete and 

applyToEither()

 takes care of notifying the client when any of the two finish first.


Combining multiple CompletableFuture together

So we now know how to wait for two futures to complete (using 

thenCombine()

) and for the first one to complete (

applyToEither()

). But can it scale to arbitrary number of futures? Sure, using 

static

 helper methods:


static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
allOf()

 takes an array of futures and returns a future that completes when all of the underlying futures are completed (barrier waiting for all). 

anyOf()

 on the other hand will wait only for the fastest of the underlying futures. Please look at the generic type of returned futures. Not quite what you would expect? We will take care of this issue in the next article.


Summary

We explored pretty much whole CompletableFuture API. I'm sure this was quite overwhelming so in the next article shortly we will develop yet another implementation of simple web crawling program, taking advantage of 

CompletableFuture

 functionalities and Java 8 lambdas. We will also look at disadvantages and shortcomings of

CompletableFuture

.


Download Building Reactive Microservices in Java: Asynchronous and Event-Based Application Design. Brought to you in partnership with Red Hat

Topics:
java ,java 8 ,completablefuture

Published at DZone with permission of Tomasz Nurkiewicz, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}