DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Python Async/Sync: Understanding and Solving Blocking (Part 1)
  • Mastering Concurrency: An In-Depth Guide to Java's ExecutorService
  • The Challenges and Pitfalls of Using Executors in Java
  • How To Create Asynchronous and Retryable Methods With Failover Support

Trending

  • The Invisible OOMKill: Why Your Java Pod Keeps Restarting in Kubernetes
  • Build Self-Managing Data Pipelines With an LLM Agent
  • LLM Integration in Enterprise Applications: A Practical Guide
  • Lambda-Driven API Design: Building Composable Node.js Endpoints With Functional Primitives

Which Thread Executes CompletableFuture's Tasks and Callbacks?

If not told which thread to use, CompletableFuture works hard to find one, including potentially the caller's thread. This can ruin the whole idea of scheduling a task asynchronously. Read on to understand how to know which thread your background task will use.

By 
Tomasz Nurkiewicz user avatar
Tomasz Nurkiewicz
·
Dec. 07, 15 · Analysis
Likes (4)
Comment
Save
Tweet
Share
21.9K Views

Join the DZone community and get the full member experience.

Join For Free

CompletableFuture is still a relatively fresh concept, despite being introduced almost two years ago (!) in March 2014 with Java 8. But maybe it's good that this class is not so well known since it can be easily abused, especially with regards to threads and thread pools that are involved along the way. This article aims to describe how threads are used with CompletableFuture.

Running Tasks

This is the fundamental part of the API. There is a convenient supplyAsync() method that is similar to ExecutorService.submit(), but returning CompletableFuture:

CompletableFuture<String> future =
        CompletableFuture.supplyAsync(() -> {
            try (InputStream is = new URL("http://www.nurkiewicz.com").openStream()) {
                log.info("Downloading");
                return IOUtils.toString(is, StandardCharsets.UTF_8);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });

The problem is, supplyAsync() by default uses ForkJoinPool.commonPool(), thread pool shared between all CompletableFutures, all parallel streams and all applications deployed on the same JVM (if you are unfortunate to still use application server with many deployed artifacts). This hard-coded, unconfigurable thread pool is completely outside of our control, hard to monitor and scale. Therefore you should always specify your own Executor, like here (and have a look at my few tips how to create one):

ExecutorService pool = Executors.newFixedThreadPool(10);

final CompletableFuture<String> future =
        CompletableFuture.supplyAsync(() -> {
            //...
        }, pool);

But that is just the beginning...

Callbacks and Transformations

Suppose you want to transform given CompletableFuture, e.g. extract the length of the String:

CompletableFuture<Integer> intFuture =
    future.thenApply(s -> s.length());

Who, exactly, invokes the s.length() code? Frankly, my dear developers, we don't give a damn [1]. As long as the lambda expression inside all of the operators like thenApply is cheap, we don't really care who calls it. But what if this expression takes a little bit of CPU time to complete or makes a blocking network call?

First of all what happens by default? Think about it: we have a background task of type String and we want to apply some specific transformation asynchronously when that value completes. The easiest way to implement that is by wrapping the original task (returning String) and intercepting it when it completes. When the inner task finishes, our callback kicks in, applies the transformation and returns modified value. It's like an aspect that sits between our code and original computation result. That being said it should be fairly obvious that s.length() transformation will be executed in the same thread as the original task, huh? Not quite!

CompletableFuture<String> future =
        CompletableFuture.supplyAsync(() -> {
            sleepSeconds(2);
            return "ABC";
        }, pool);

future.thenApply(s -> {
    log.info("First transformation");
    return s.length();
});

future.get();
pool.shutdownNow();
pool.awaitTermination(1, TimeUnit.MINUTES);

future.thenApply(s -> {
    log.info("Second transformation");
    return s.length();
});

The first transformation in thenApply() is registered while the task is still running. Thus it will be executed immediately after task completion in the same thread as the task. However before registering second transformation we wait until the task actually completes. Even worse, we shutdown the thread pool entirely, to make sure no other code can ever be executed there. So which thread will run second transformation? We know it must happen immediately since the future we register callback on already completed. It turns out that by default client thread (!) is used! The output is as follows:

pool-1-thread-1 | First transformation main | Second transformation

Second transformation, when registered, realizes that the CompletableFuture already finished, so it executes the transformation immediately. There is no other thread around so thenApply() is invoked in the context of current main thread. The biggest reason why this behavior is error prone shows up when the actual transformation is costly. Imagine lambda expression inside thenApply() doing some heavy computation or blocking network call. Suddenly our asynchronous CompletableFuture blocks calling thread!

Controlling Callback's Thread Pool

There are two techniques to control which thread executes our callbacks and transformations. Notice that these solutions are only needed if your transformations are costly. Otherwise the difference is negligible. So first of all we can choose the *Async versions of operators, e.g.:

future.thenApplyAsync(s -> {
    log.info("Second transformation");
    return s.length();
});

This time the second transformation was automatically off-loaded to our friend, ForkJoinPool.commonPool():

pool-1-thread-1                  | First transformation
ForkJoinPool.commonPool-worker-1 | Second transformation

But we don't like commonPool so we supply our own:

future.thenApplyAsync(s -> {
    log.info("Second transformation");
    return s.length();
}, pool2);

Notice that different thread pool was used (pool-1 vs. pool-2):

pool-1-thread-1 | First transformation
pool-2-thread-1 | Second transformation

Treating Callback Like Another Computation Step

But I believe that if you are having troubles with long-running callbacks and transformations (remember that this article applies to almost all other methods on CompletableFuture), you should simply use another explicit CompletableFuture, like here:

//Imagine this is slow and costly
CompletableFuture<Integer> strLen(String s) {
    return CompletableFuture.supplyAsync(
            () -> s.length(),
            pool2);
}

//...

CompletableFuture<Integer> intFuture = 
        future.thenCompose(s -> strLen(s));

This approach is more explicit. Knowing that our transformation has significant cost we don't risk running it on some arbitrary or uncontrolled thread. Instead we explicitly model it as asynchronous operation from String to CompletableFuture<Integer>. However we must replace thenApply() with thenCompose(), otherwise we'll end up with CompletableFuture<CompletableFuture<Integer>>.

But what if our transformation does not have a version that plays well with nested CompletableFuture, e.g. applyToEither() that waits for the first Future to complete and applies a transformation?

CompletableFuture<CompletableFuture<Integer>> poor = 
        future1.applyToEither(future2, s -> strLen(s));

There is a handy trick for "unwrapping" such obscure data structure called flatten, easily implemented using flatMap(identity) (or flatMap(x -> x)). In our case flatMap() is called thenCompose (duh!):

CompletableFuture<Integer> good = 
        poor.thenCompose(x -> x);

I leave it up to you how and why it works. I hope this article made it more clear how threads are involved in CompletableFuture.

Task (computing) Thread pool

Published at DZone with permission of Tomasz Nurkiewicz. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Python Async/Sync: Understanding and Solving Blocking (Part 1)
  • Mastering Concurrency: An In-Depth Guide to Java's ExecutorService
  • The Challenges and Pitfalls of Using Executors in Java
  • How To Create Asynchronous and Retryable Methods With Failover Support

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook