DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Mastering Concurrency: An In-Depth Guide to Java's ExecutorService
  • The Challenges and Pitfalls of Using Executors in Java
  • Optimizing Java Applications: Parallel Processing and Result Aggregation Techniques
  • Efficient Task Management: Building a Java-Based Task Executor Service for Your Admin Panel

Trending

  • How To Build Resilient Microservices Using Circuit Breakers and Retries: A Developer’s Guide To Surviving
  • How to Use AWS Aurora Database for a Retail Point of Sale (POS) Transaction System
  • Introduction to Retrieval Augmented Generation (RAG)
  • Exploring Intercooler.js: Simplify AJAX With HTML Attributes
  1. DZone
  2. Coding
  3. Java
  4. Java 8 CompletableFutures Part I

Java 8 CompletableFutures Part I

Java 8's debut included a neat concurrency too, the CompletableFuture class. Here's an awesome look at CompletableFutures, with a glance at combining and composing tasks.

By 
Bill Bejeck user avatar
Bill Bejeck
·
Feb. 02, 16 · Tutorial
Likes (8)
Comment
Save
Tweet
Share
29.8K Views

Join the DZone community and get the full member experience.

Join For Free

When Java 8 was released a while ago, a great concurrency tool was added, the CompletableFuture class. The CompletableFuture is a Future that can have its value explicitly set and more interestingly can be chained together to support dependent actions triggered by the CompletableFutures completion. CompletableFutures are analogous to the ListenableFuture class found in Guava. While the two offer similar functionality, there won’t be any comparisons done in this post. I have previously covered ListenableFutures. While the coverage of ListenableFutures is a little dated, most of the information should still apply. The documentation for the CompletableFuture class is comprehensive but lacks concrete examples of how to use them. My goal is to show how to use CompletableFutures through a series of simple examples in unit tests. Originally I was going to cover the CompleteableFuture in one post, but there is so much information, it seems better to break up coverage into 3 parts –

  1. Creating/combining tasks and adding listeners for follow-on work.
  2. Handling errors and error recovery
  3. Canceling and forcing completion.

CompletableFuture Primer

Before we dig into using CompleteableFutures, some background information is needed. The CompleteableFuture implements the CompletionStage interface. The javadoc concisely explains what the CompletionStage is:

A stage of a possibly asynchronous computation, that performs an action or computes a value when another CompletionStage completes. A stage completes upon termination of its computation, but this may in turn trigger other dependent stages.

The full documentation for the CompletionStage is too long to include here, so we’ll briefly summarize the key points:

  1. Computations can be represented by a Future, Consumer, or a Runnable with the respective method names of applying, accept, or run
  2. Execution of computations can be one of the following
    1. Default execution (possibly the calling thread)
    2. Async execution using the default async execution provider of the CompletionStage. These methods are denoted by the form of someActionAsync
    3. Async execution by using a provided Executor. These methods also follow the form ofsomeActionAsync but take an Executor instance as an additional parameter.

For the rest of this post, I will be referring to CompletableFuture and CompletionStage interchangeably.

Creating a CompleteableFuture

Creating a CompletableFuture is simple, but not always clear. The simplest way is the CompleteableFuture.completedFuture method which returns an a new, finished CompleteableFuture:

@Test
public void test_completed_future() throws Exception {
  String expectedValue = "the expected value";
  CompletableFuture<String> alreadyCompleted = CompletableFuture.completedFuture(expectedValue);
  assertThat(alreadyCompleted.get(), is(expectedValue));
}

As unexciting as this may seem, the ability to create an already completed CompleteableFuture can come in handy as we’ll see a little later.

Now let’s take a look at how to create a CompletableFuture that represents an asynchronous task:

private static ExecutorService service = Executors.newCachedThreadPool();

    @Test
    public void test_run_async() throws Exception {
        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> System.out.println("running async task"), service);
        //utility testing method
        pauseSeconds(1);
        assertThat(runAsync.isDone(), is(true));
    }

    @Test
    public void test_supply_async() throws Exception {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(simulatedTask(1, "Final Result"), service);
        assertThat(completableFuture.get(), is("Final Result"));
    }

In the first code sample, we see an example of a runAsync task and the second sample is an example of supplyAsync. This may be stating the obvious, but the decision to use supplyAsync vs runAsync is determined by whether the task is expected to return a value or not. In both examples here we are supplying a custom Executor which is the asynchronous execution provider. When it comes to the supplyAsync method I personally think it would have been more natural to use a Callable and not a Supplier. While both are functional interfaces, the Callable is associated more with asynchronous tasks and can throw checked exceptions while the Supplier does not (although with a small amount of code we can have Suppliers that throw checked exceptions).

Adding Listeners

Now that we can create CompletableFuture objects to run asynchronous tasks, let’s learn how to ‘listen’ when a task completes to perform follow-up action(s). It’s important to mention here that when adding follow on CompletionStage objects, the previous task needs to complete successfully in order for the follow-on task/stage to run. There are methods to deal with failed tasks, but handling errors in the CompletableFuture chain are covered in a follow-up post.

@Test
    public void test_then_run_async() throws Exception {
        Map<String,String> cache = new HashMap<>();
        cache.put("key","value");
        CompletableFuture<String> taskUsingCache = CompletableFuture.supplyAsync(simulatedTask(1,cache.get("key")),service);
        CompletableFuture<Void> cleanUp = taskUsingCache.thenRunAsync(cache::clear,service);
        cleanUp.get();
        String theValue = taskUsingCache.get();
        assertThat(cache.isEmpty(),is(true));
        assertThat(theValue,is("value"));
    }

Here in this example, we are running a task that “cleans up” after the first CompletableFuture finishes successfully. While the previous example used a Runnable task to execute after the original task completed successfully, there really is no connection between the two. We can also specify a follow-on task that takes the result of the previous successful task directly:

@Test
public void test_accept_result() throws Exception {
        CompletableFuture<String> task = CompletableFuture.supplyAsync(simulatedTask(1, "add when done"), service);
        CompletableFuture<Void> acceptingTask = task.thenAccept(results::add);
        pauseSeconds(2);
        assertThat(acceptingTask.isDone(), is(true));
        assertThat(results.size(), is(1));
        assertThat(results.contains("add when done"), is(true));

    }

This is an example of Accept methods that take the result of the CompletableFuture and pass it to a Consumer object. In Java 8 Consumer instances have no return value and are expected to work by side-effects, in this case adding the result to a list.

Combining and Composing Tasks

In addition to adding listeners to run follow-up tasks or accept the results of a successful CompletableFuture, we can combine and/or compose tasks.

Composing Tasks

Composing means taking the results of one successful CompletableFuture as input to another CompletableFuture via a Function. Here’s an example of CompletableFuture.thenComposeAsync

@Test
public void test_then_compose() throws Exception {

 Function<Integer,Supplier<List<Integer>>> getFirstTenMultiples = num ->
                ()->Stream.iterate(num, i -> i + num).limit(10).collect(Collectors.toList());

 Supplier<List<Integer>> multiplesSupplier = getFirstTenMultiples.apply(13);

 //Original CompletionStage
 CompletableFuture<List<Integer>> getMultiples = CompletableFuture.supplyAsync(multiplesSupplier, service);

 //Function that takes input from orignal CompletionStage
 Function<List<Integer>, CompletableFuture<Integer>> sumNumbers = multiples ->
            CompletableFuture.supplyAsync(() -> multiples.stream().mapToInt(Integer::intValue).sum());

  //The final CompletableFuture composed of previous two.
  CompletableFuture<Integer> summedMultiples = getMultiples.thenComposeAsync(sumNumbers, service);

   assertThat(summedMultiples.get(), is(715));
  }

In this example, the first CompletionStage is providing a list of 10 multiples of a number, 13 in this case. The supplied Function takes those results and creates another CompletionStage which then sums the list of numbers.

Combining Tasks

Combining is accomplished by taking 2 successful CompletionStages and having the results from both used as parameters to a BiFunction to produce another result (Review a full list of Java functional interfaces). Here’s a very simple example to demonstrate taking results from combined CompletionStages.

@Test
public void test_then_combine_async() throws Exception {
 CompletableFuture<String> firstTask = CompletableFuture.supplyAsync(simulatedTask(3, "combine all"), service);

 CompletableFuture<String> secondTask = CompletableFuture.supplyAsync(simulatedTask(2, "task results"), service);

 CompletableFuture<String> combined = firstTask.thenCombineAsync(secondTask, (f, s) -> f + " " + s, service);

 assertThat(combined.get(), is("combine all task results"));
}

While the previous example showed combining two CompletionStages that could be asynchronous tasks, we could also combine an asynchronous task with an already completed CompletableFuture. It is good way to combine a known value with a value that needs to be computed:

@Test
public void test_then_combine_with_one_supplied_value() throws Exception {
 CompletableFuture<String> asyncComputedValue = CompletableFuture.supplyAsync(simulatedTask(2, "calculated value"), service);
 CompletableFuture<String> knowValueToCombine = CompletableFuture.completedFuture("known value");

 BinaryOperator<String> calcResults = (f, s) -> "taking a " + f + " then adding a " + s;
 CompletableFuture<String> combined = asyncComputedValue.thenCombine(knowValueToCombine, calcResults);

 assertThat(combined.get(), is("taking a calculated value then adding a known value"));
}

Finally here’s an example of using the CompletableFuture.runAfterbothAsync

@Test
public void test_run_after_both() throws Exception {

CompletableFuture<Void> run1 = CompletableFuture.runAsync(() -> {
        pauseSeconds(2);
        results.add("first task");
    }, service);

CompletableFuture<Void> run2 = CompletableFuture.runAsync(() -> {
        pauseSeconds(3);
        results.add("second task");
    }, service);

CompletableFuture<Void> finisher = run1.runAfterBothAsync(run2,() -> results. add(results.get(0)+ "&"+results.get(1)),service);
 pauseSeconds(4);
 assertThat(finisher.isDone(),is(true));
 assertThat(results.get(2),is("first task&second task"));
}

Listening For The First Finished Task

In all of the previous examples, final results required waiting for all CompletionStages to finish, but this doesn’t always need to be the case. We can get results from whichever task completes first. Here’s an example where the first completed result is accepted using a Consumer:

@Test
public void test_accept_either_async_nested_finishes_first() throws Exception {

 CompletableFuture<String> callingCompletable = CompletableFuture.supplyAsync(simulatedTask(2, "calling"), service);
 CompletableFuture<String> nestedCompletable = CompletableFuture.supplyAsync(simulatedTask(1, "nested"), service);

 CompletableFuture<Void> collector = callingCompletable.acceptEither(nestedCompletable, results::add);

 pauseSeconds(2);
 assertThat(collector.isDone(), is(true));
 assertThat(results.size(), is(1));
 assertThat(results.contains("nested"), is(true));
}

And the analogous CompletableFuture.runAfterEither

@Test
public void test_accept_either_async_nested_finishes_first() throws Exception {

 CompletableFuture<String> callingCompletable = CompletableFuture.supplyAsync(simulatedTask(2, "calling"), service);
 CompletableFuture<String> nestedCompletable = CompletableFuture.supplyAsync(simulatedTask(1, "nested"), service);

 CompletableFuture<Void> collector = callingCompletable.acceptEither(nestedCompletable, results::add);

 pauseSeconds(2);
 assertThat(collector.isDone(), is(true));
 assertThat(results.size(), is(1));
 assertThat(results.contains("nested"), is(true));
}

Multiple Combinations

Up to this point, all combining/composing examples have been two CompletableFuture objects only. This was done intentionally in an effort to make the examples clearer. But we can nest an arbitrary number of CompletionStages together. Please note that the following example is for illustration purposes only!

@Test
  public void test_run_after_either() throws Exception {

 CompletableFuture<Void> run1 = CompletableFuture.runAsync(() -> {
            pauseSeconds(2);
            results.add("should be first");
    }, service);

 CompletableFuture<Void> run2 = CompletableFuture.runAsync(() -> {
            pauseSeconds(3);
            results.add("should be second");
    }, service);

  CompletableFuture<Void> finisher = run1.runAfterEitherAsync(run2,() -> results.add(results.get(0).toUpperCase()),service);

 pauseSeconds(4);
 assertThat(finisher.isDone(),is(true));
 assertThat(results.get(1),is("SHOULD BE FIRST"));

It’s important to note that ordering is not guaranteed when combining CompletionStages. In these unit tests, times were provided to the simulated tasks to ensure completion order.

Conclusion

This wraps up the first part of using the CompletableFuture class. In upcoming posts, we’ll cover error handling/recovery and forcing completion/cancelation.

Resources

  • Source Code for this post.
Task (computing) Java (programming language)

Published at DZone with permission of Bill Bejeck, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Mastering Concurrency: An In-Depth Guide to Java's ExecutorService
  • The Challenges and Pitfalls of Using Executors in Java
  • Optimizing Java Applications: Parallel Processing and Result Aggregation Techniques
  • Efficient Task Management: Building a Java-Based Task Executor Service for Your Admin Panel

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!