Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Tascalate Concurrent — Filling the Gaps in CompletableFuture API (Part 1)

DZone 's Guide to

Tascalate Concurrent — Filling the Gaps in CompletableFuture API (Part 1)

Learn how you can fill the gaps of the CompletableFuture API.

· Java Zone ·
Free Resource

The Tascalate Concurrent library provides an implementation of the CompletionStage interface and related classes. These are designed to support long-running blocking tasks (typically, I/O bound). This functionality augments the sole Java 8 built-in implementation, CompletableFuture, that primarily supports computational tasks. Also, the library helps with numerous asynchronous programming challenges like handling timeouts, retry/poll functionality, orchestrating results of multiple concurrent computations, and similar.

The library is shipped as a multi-release JAR and may be used both with Java 8 as a classpath library or with Java 9+ as a module.

Why a CompletableFuture Is Not Enough?

There are several shortcomings associated with the CompletableFuture implementation that complicate its usage for real-life asynchronous programming, especially when you have to work with I/O-bound interruptible tasks:

  1. CompletableFuture.cancel()method does not interrupt underlying thread; it merely puts future to an exceptionally completed state. So even if you use any blocking calls inside functions passed to thenApplyAsync/ thenAcceptAsync/ etc., these functions will run until the end and will never be interrupted. Please see CompletableFuture can't be interrupted by Tomasz Nurkiewicz.
  2. By default, all *Async composition methods of CompletableFutrure use ForkJoinPool.commonPool() (see here) unless the explicit Executor is specified. This thread pool is shared between all CompletableFuture-s and all parallel streams across all applications deployed on the same JVM. This hard-coded, unconfigurable thread pool is completely outside of application developers' control, hard to monitor and scale. Therefore, in robust real-life applications, you should always specify your own Executor. With API enhancements in Java 9+, you can fix this drawback, but it will require some custom coding.
  3. Additionally, built-in Java 8 concurrency classes provides pretty inconvenient API to combine several CompletionStage-s. CompletableFuture.allOf/ CompletableFuture.anyOf methods accept only CompletableFuture as arguments; you have no mechanism to combine arbitrary CompletionStage-s without converting them to CompletableFuture first. Also, the return type of the aforementioned CompletableFuture.allOf is declared as CompletableFuture<Void>. Hence, you are unable to extract conveniently individual results of each future supplied. CompletableFuture.anyOf is even worse in this regard; for more details, please read on here: CompletableFuture in Action (see Shortcomings) by Tomasz Nurkiewicz.
  4. Support for timeouts/delays was introduced to CompletableFuture only in Java 9, so widely-supported applications running on Java 8 are left out without this important functionality. Plus, some design decisions, like using delayed executors instead of 'delay' operator, are somewhat questionable. There are numerous, free, open-source libraries that address some of the aforementioned shortcomings. However, none of them provide the implementation of the interruptible CompletionStage, and no one solves all of the issues coherently.

How to Use

To use a library, you have to add a single Maven dependency, like so:

<dependency>
    <groupId>net.tascalate.concurrent</groupId>
    <artifactId>net.tascalate.concurrent.lib</artifactId>
    <version>0.7.1</version>
</dependency>


What Is Inside?

1. Promise Interface

This is the core interface of the Tascalate Concurrent library. It may be best described by the formula:

Promise == CompletionStage + Future


Basically, it combines both blocking Future’s API, including thecancel(boolean mayInterruptIfRunning)method AND composition capabilities of CompletionStage’s API. Importantly, all composition methods of the CompletionStage API (thenAccept, thenCombine, whenComplete, etc.) are re-declared to return Promise as well.

The decision to introduce an interface that merges CompletionStage and Future is aligned with the design of CompletableFuture API. In addition, several useful methods of CompletableFuture API are added as well:

T getNow(T valueIfAbsent) throws CancellationException, CompletionException;
T getNow(Supplier<? extends T> valueIfAbsent) throws CancellationException, CompletionException;
T join() throws CancellationException, CompletionException;


So, it should be pretty straightforward to use the Promise as a drop-in replacement for the CompletableFuture in many cases.

Besides this, there are numerous operators in the Promise API to work with timeouts and delays to override default asynchronous executor and similar. All of them will be discussed later.

When discussing the Promise interface, it's mandatory to mention the accompanying class Promises that provides several useful methods to adapt third-party CompletionStage (including the standard CompletableFuture) to the Promise API. First, there are two unit operations to create successfully/faulty settled Promise-es:

static <T> Promise<T> success(T value)
static <T> Promise<T> failure(Throwable exception)


Second, there is an adapter method from:

static <T> Promise<T> from(CompletionStage<T> stage)


It behaves like the following:

  1. If the supplied stage is already a Promise, then it is returned unchanged
  2. If stage is a CompletableFuture, then a specially-tailored wrapper is returned.
  3. If stage additionally implements Future, then the specialized wrapper is returned that delegates all the blocking methods defined in Future API
  4. Otherwise, the generic wrapper is created with the good-enough implementation of blocking Future API atop of asynchronous CompletionStage API. To summarize, the returned wrapper delegates as much as possible functionality to the supplied stage and never resorts to CompletionStage.toCompletableFuture because, in the Java 8 API, it's an optional method. From documentation: "A CompletionStage implementation that does not choose to interoperate with others may throw UnsupportedOperationException." (this text was dropped in Java 9+). In general, Tascalate Concurrent library does not depend on this method and should be interoperable with any minimal (but valid) CompletionStage implementation.

It's important to emphasize, that Promise-s returned from Promises.success, Promises.failure and Promises.frommethods are cancellable in the same way as CompletableFuture, but are not interruptible in general, while interruption depends on a concrete implementation. Next, we discuss the concrete implementation of an interruptible Promise provided by the Tascalate Concurrent library — the CompletableTask class.

2. CompletableTask

This is why this project was started. CompletableTask is the implementation of the Promise API for long-running blocking tasks. Typically, to create a CompletableTask, you should submit Supplier/ Runnable to the Executor right away, in a similar way as with CompletableFuture:

Promise<SomeValue> p1 = CompletableTask.supplyAsync(() -> {
  return blockingCalculationOfSomeValue();
}, myExecutor);

Promise<Void> p2 = CompletableTask.runAsync(this::someIoBoundMethod, myExecutor);


blockingCalculationOfSomeValue and someIoBoundMethod, in the example above, can have I/O code, work with blocking queues, do blocking get on regular Java-s Future-s, and alike. If at a later time you decide to cancel either of the returned promises, then the corresponding blockingCalculationOfSomeValue and someIoBoundMethod will be interrupted (if not completed yet).

In the realm of I/O-related functionality, failures like connection time-outs, missing, or locked files are pretty common, and the checked exceptions mechanism is used frequently to signal failures. Therefore, the library provides an entry point to the API that accepts Callable instead of Supplier:

// Notice the checked exception in the method signature
byte[] loadFile(File file) throws IOException {
    byte[] result = ... //load file content;
    return result;
}
...
ExecutorService executorService = Executors.newFixedThreadPool(6);
Promise<byte[]> contentPromise = CompletableTask.submit(
    () -> loadFile(new File("./myfile.dat")), 
    executorService
); 


Additionally, there are two unit operations to create a CompletableTask: a.CompletableTask.asyncOn(Executor executor)returns an already-completed, null-valued Promise that is "bound" to the specified executor. For example, any function passed to asynchronous composition methods of Promise (like thenApplyAsync/ thenAcceptAsync/ whenCompleteAsync, etc.) will be executed using this executor unless executor is overridden via explicit composition method parameter. Moreover, any nested composition calls will use the same executor if it’s not redefined via the explicit composition method parameter:

CompletableTask
  .asyncOn(myExecutor)
  .thenApplyAsync(myValueGenerator)
  .thenAcceptAsync(myConsumer)
  .thenRunAsync(myAction);


All of myValueGenerator, myConsumer, and myAction will be executed using myExecutor. b.CompletableTask.complete(T value, Executor executor) will be the same as above, but the starting point is a Promise completed with the specified value:

CompletableTask
   .complete("Hello!", myExecutor)
   .thenApplyAsync(myMapper)
   .thenApplyAsync(myTransformer)   
   .thenAcceptAsync(myConsumer)
   .thenRunAsync(myAction);


All of myMapper, myTransformer, myConsumer, and myAction will be executed using myExecutor.

Crucially, all composed promises support true cancellation (incl. interrupting thread) for the functions supplied as arguments:

Promise<?> p1 = CompletableTask.asyncOn(myExecutor)
Promise<?> p2 = p1.thenApplyAsync(myValueGenerator)
Promise<?> p3 = p2.thenRunAsync(myAction);

...
p2.cancel(true);


In the example above, myValueGenerator will be interrupted if already in progress. Both p2 and p3 will be settled with failure: p2 with a CancellationException and p3 with a CompletionException.

You may notice that above the term "asynchronous composition methods" *Async calls in examples like thenApplyAsyncand thenRunAsync. This is not accidental: non-asynchronous methods of theCompletionStage API are not interruptible. The grounding beneath the design decision is that invoking asynchronous methods involve the inevitable overhead of putting command to the queue of the executor, starting new threads implicitly, etc. And for simple, non-blocking methods, like small calculations, trivial transformations and alike this overhead might outweigh the method's execution time itself. So the guideline is: use asynchronous composition methods for heavy I/O-bound blocking tasks and use non-asynchronous composition methods for (typically lightweight) calculations.

It is worth mentioning that CompletableTask-s and Promise-s are composed out of it and may be ever interruptible only if the Executorused is interruptible by nature. For example, ThreadPoolExecutor supports interruptible tasks, but ForkJoinPool does not!

3. Overriding the Default Asynchronous Executor

One of the pitfalls of the CompletableFuture implementation is how it works with the default asynchronous executor. Consider the following example:

CompletionStage<String> p1 = CompletableFuture.supplyAsync(this::produceValue, executorInitial);
CompletionStage<String> p2 = p1.thenApplyAsync(this::transformValueA);
CompletionStage<String> p3 = p2.thenApplyAsync(this::transformValueB, executorNext);
CompletionStage<String> p4 = p3.thenApplyAsync(this::transformValueC);


The call to produceValue will be executed on the executorInitial — it is passed explicitly. However, the call to transformValueA will be excuted on ForkJoinPool.commonPool()! Hmmmm... this makes sense, but how to force using alternative executor by default? No way!

This is probably not possible with deeper calls. The answer is "NO," again! The invocation to transformValueB ran on an explicitly-supplied executorNext. But the next call, transformValueC, will be executed on... you guess it —ForkJoinPool.commonPool()!

So, once you use CompletableFuture with the JEE environment, you must pass the explicit instance of ManagedExecutorService to each and every method call. This is not very convenient! To be fair, with the Java 9+ API, you can redefine this behavior via sub-classing CompletableFuture and overriding two methods: defaultExecutor and newIncompleteFuture. Plus, you will have to define your own "entry points" instead of the standard CompletableFuture.runAsync and CompletableFuture.supplyAsync.

With CompletableTask, the situation is just the opposite. Let us rewrite the example above:

CompletionStage<String> p1 = CompletableTask.supplyAsync(this::produceValue, executorInitial);
CompletionStage<String> p2 = p1.thenApplyAsync(this::transformValueA);
CompletionStage<String> p3 = p2.thenApplyAsync(this::transformValueB, executorNext);
CompletionStage<String> p4 = p3.thenApplyAsync(this::transformValueC);


The call to produceValue will be executed on the executorInitial, obviously. But now, the call to transformValueA will be executed also on executorInitial! What's about deeper calls? The invocation to transformValueB ran on explicitly supplied executorNext. And next call, transformValueC will be executed on... check your intuition... executorNext. The logic behinds this is the following: the latest explicitly specified Executor is what will be used for all nested asynchronous composition methods without an explicit Executor parameter.

Obviously, it's rarely the case when one size fits all. Therefore, two additional options exist to specify default asynchronous executor:

A. CompletableTask has an overloaded method:

public static Promise<Void> asyncOn(Executor executor, boolean enforceDefaultAsync)


When enforceDefaultAsync is true, then all nested asynchronous composition methods without explicit Executorparameter will use the provided executor, even if previous composition methods use alternative Executor. This is somewhat similar to CompletableFuture but with the ability to explicitly set the default asynchronous executor initially.

B. Promiseinterface has the following operation:

Promise<T> defaultAsyncOn(Executor executor)


The returned decorator will use the specified executor for all nested asynchronous composition methods without explicit Executor parameter. So, at any point, you are able to switch to the desired default asynchronous executor and keep using it for all nested composition call.

To summarize, with Tascalate Concurrent, you have the following options to control what is the default asynchronous executor:

  1. The latest explicit Executor passed to the *Async method is used for derived Promise-s — the default option.
  2. Single default Executor passed to the root CompletableTask.asyncOn(Executor executor, true) call is propagated through the whole chain. This is the only variant supported with CompletableFuture in Java 9+, though, with custom coding.
  3. Redefine Executor with defaultAsyncOn(Executor executor) for all derived Promise-s. Having the best of three(!) worlds, the only responsibility of the library user is to use these options consistently!

The last thing that should be mentioned is a typical task when you would like to start a interruptible blocking method after completion of the standard CompletableFuture. The following utility method is defined in the CompletableTask:

public static <T> Promise<T> waitFor(CompletionStage<T> stage, Executor executor)


Roughly, this is a shortcut for the following:

CompletableTask.asyncOn(executor).thenCombine(stage, (u, v) -> v);


A typical usage of this method is:

TaskExecutorService executorService = TaskExecutors.newFixedThreadPool(3);
CompletableFuture<String> replyUrlPromise = sendRequestAsync();
Promise<byte[]> dataPromise = CompletableTask.waitFor(replyUrlPromise, executorService)
    .thenApplyAsync(url -> loadDataInterruptibly(url));


The dataPromise returned may be canceled later and loadDataInterruptibly will be interrupted if not completed by that time.

4. Timeouts

Any robust application must handle situations when things go wrong. An ability to cancel an operation that takes too long existed in the library from day one. But, the very definition of the "too long" was left to an application code initially. However, the practice shows that a lack of the proven, thoroughly tested timeout-related stuff in the library leads to a complex, repetitive, and, unfortunately, error-prone code in the application. Hence, Tascalate Concurrent was extended to address this omission.

The library offers the following operations to control execution time of the Promise (declared in Promise interface):

<T> Promise<T> orTimeout(long timeout, TimeUnit unit[, boolean cancelOnTimeout = true])
<T> Promise<T> orTimeout(Duration duration[, boolean cancelOnTimeout = true])


These methods create a new Promise that is either settled successfully/exceptionally when original promise is completed within a timeout given; or, it is settled exceptionally with a TimeoutException when time expired. In any case, handling code is executed on the default asynchronous Executor of the original Promise.

Executor myExecutor = ...; // Get an executor
Promise<String> callPromise = CompletableTask
    .supplyAsync( () -> someLongRunningIoBoundMehtod(), myExecutor )
    .orTimeout( Duration.ofSeconds(3) );

Promise<?> nextPromiseSync = callPromise.whenComplete((v, e) -> processResultSync(v, e));
Promise<?> nextPromiseAsync = callPromise.whenCompleteAsync((v,e) -> processResultAsync(v, e));


In the example above, callPromise will be settled within three seconds either successfully/exceptionally as a result of the someLongRunningIoBoundMehtod execution, or exceptionally with a TimeoutException.

It's worth mentioning that bothprocessResultSync and processResultAsync will be executed with myExecutor if timeout is triggered — this rule is true for all timeout-related methods.

The optional cancelOnTimeout parameter defines whether or not to cancel the original Promise when time is expired; it is implicitly true when omitted. So in the example above,the someLongRunningIoBoundMehtod will be interrupted if it takes more than three seconds to complete. Pay attention: any Promise is cancellable on timeout, even wrappers created via Promises.from(stage), but only CompletableTask is interruptible!

Canceling the original promise on timeout is the desired behavior in most cases but not always. In reality, "Warn-first-Cancel-next" scenarios are not rare, where "warn" may be logging, sending notification emails, showing messages to the user on UI, etc. The library provides an option to set several non-canceling timeouts, like in the example below:

Executor myExecutor = ...; // Get an executor
Promise<String> resultPromise = CompletableTask
    .supplyAsync( () -> someLongRunningIoBoundMehtod(), executor );

// Show UI message to user to let him/her know that everything is under control
Promise<?> t1 = resultPromise
    .orTimeout( Duration.ofSeconds(2), false )
    .exceptionally( e -> {
      if (e instanceof TimeoutException) {
        UI.showMessage("The operation takes longer than expected, please wait...");
      }
      return null;
    }, false); 

// Show UI confirmation to user to let him/her cancel operation explicitly
Promise<?> t2 = resultPromise
    .orTimeout( Duration.ofSeconds(5), false )
    .exceptionally( e -> {
      if (e instanceof TimeoutException) {
        UI.clearMessages();
        UI.showConfirmation("Service does not respond. Do you whant to cancel (Y/N)?");
      }
      return null;
    }, false); 

// Cancel in 10 seconds
resultPromise.orTimeout( Duration.ofSeconds(10), true );


Please note that the timeout is started from the call to the orTimeout method. Hence, if you have a chain of unresolved promises ending with the orTimeout call, then the whole chain should be completed within the time given:

Executor myExecutor = ...; // Get an executor
Promise<String> parallelPromise = CompletableTask
    .supplyAsync( () -> someLongRunningDbCall(), executor );
Promise<List<String>> resultPromise = CompletableTask
    .supplyAsync( () -> someLongRunningIoBoundMehtod(), executor )
    .thenApplyAsync( v -> converterMethod(v) )
    .thenCombineAsync(parallelPromise, (u, v) -> Arrays.asList(u, v))
    .orTimeout( Duration.ofSeconds(5) );


In the latest example, resultPromise will be resolved successfully if and only if all of someLongRunningIoBoundMehtod, converterMethod, and even someLongRunningDbCall are completed within five seconds. If it's necessary to restrict execution time of the single step, please use standard CompletionStage.thenCompose method. Say, that in the previous example we have to restrict the execution time of the converterMethod only. Then, the modified chain will look like:

Promise<List<String>> resultPromise = CompletableTask
    .supplyAsync( () -> someLongRunningIoBoundMehtod(), executor )
    // Restict only execution time of converterMethod
    // -- start of changes
    .thenCompose( v -> 
        CompletableTask.complete(v, executor)
                       .thenApplyAsync(vv -> converterMethod(vv))
                       .orTimeout( Duration.ofSeconds(5) )
    )
    // -- end of changes
    .thenCombineAsync(parallelPromise, (u, v) -> Arrays.asList(u, v))
    ;


Moreover, in the original example, only the call to the thenCombineAsync will be canceled on timeout (the last in the chain). To cancel the whole chain, it's necessary to use the functionality of the DependentPromise interface (will be discussed in the next article).

Another useful, timeout-related method declared in the Promise interface includes:

<T> Promise<T> onTimeout(T value, long timeout, TimeUnit unit[, boolean cancelOnTimeout = true])
<T> Promise<T> onTimeout(T value, Duration duration[, boolean cancelOnTimeout = true])
<T> Promise<T> onTimeout(Supplier<? extends T>, long timeout, TimeUnit unit[, boolean cancelOnTimeout = true])
<T> Promise<T> onTimeout(Supplier<? extends T>, Duration duration[, boolean cancelOnTimeout = true])


The onTimeout family of methods is similar in all regards to the orTimeout methods with the single obvious difference — instead of completing resulting Promise exceptionally with the TimeoutException when time is expired, they are settled successfully with the alternative value supplied (either directly or via Supplier):

Executor myExecutor = ...; // Get an executor
Promise<String> callPromise = CompletableTask
    .supplyAsync( () -> someLongRunningIoBoundMehtod(), executor )
    .onTimeout( "Timed-out!", Duration.ofSeconds(3) );


The example shows that callPromise will be settled within 3 seconds either successfully/exceptionally as a result of the someLongRunningIoBoundMehtod execution or with a default value "Timed-out!" when time exceeded.

It's important to mention the crucial difference between Promise.orTimeot / onTimeout and CompletableFuture.orTimeout / completeOnTimeout in Java 9+. In Tascalate Concurrent, both operations return a newPromise that may be canceled individually without canceling the original Promise. Moreover, the original Promise will not be completed with TimeoutException when time expired but rather with the CancellationException (in the case of orTimeout([duration], true) or orTimeout([duration])). The behavior of CompletableFuture in Java 9+ is radically different: timeout-related operations are just "side-effects," and the returned value is the original CompletableFuture itself. So, the call to completableFuture.orTimeout(100, TimeUnit.MILLIS).cancel() will cancel the completableFuture itself, and there is no way to revert the timeout once it's set. Correspondingly, when time expired, the original completableFuture will be completed exceptionally with TimeoutException.

Finally, the Promise interface provides an option to insert delays into the call chain:

<T> Promise<T> delay(long timeout, TimeUnit unit[, boolean delayOnError = true])
<T> Promise<T> delay(Duration duration[, boolean delayOnError = true])


The delay is started only after the original Promise is completed either successfully or exceptionally (unlike orTimeout / onTimeout methods where timeout is started immediately). The resulting delay Promise is resolved after the timeout specified with the same result as the original Promise. The latest methods' argument — delayOnError — specifies whether or not we should delay if original Promise is resolved exceptionally, by default this argument is true. If false, then delay Promise is completed immediately after the failed original Promise.

Executor myExecutor = ...; // Get an executor
Promise<String> callPromise1 = CompletableTask
    .supplyAsync( () -> someLongRunningIoBoundMehtod(), executor )
    .delay( Duration.ofSeconds(1) ) // Give a second for CPU to calm down :)
    .thenApply(v -> convertValue(v));

Promise<String> callPromise2 = CompletableTask
    .supplyAsync( () -> aletrnativeLongRunningIoBoundMehtod(), executor )
    .delay( Duration.ofSeconds(1), false ) // Give a second for CPU to calm down ONLY on success :)
    .thenApply(v -> convertValue(v));


Like with other timeout-related methods, convertValue is invoked on the default asynchronous Executor of the original Promise.

You may notice that delay may be introduced only in the middle of the chain, but what to do if you'd like to back off the whole chain execution? Just start with a resolved promise!

// Option 1
// Interruptible tasks chain on the executor supplied
CompletableTask.asyncOn(executor)
    .delay( Duration.ofSeconds(5) )
    .thenApplyAsync(ignore -> produceValue());

// Option2
// Computational tasks on ForkJoinPool.commonPool()
Promises.from(CompletableFuture.completedFuture(""))
    .delay( Duration.ofSeconds(5) )
    .thenApplyAsync(ignore -> produceValue());


As long as back-off execution is not a very rare case, the library provides the following convenient shortcuts in the CompletableTask class:

static Promise<Duration> delay(long timeout, TimeUnit unit, Executor executor);
static Promise<Duration> delay(Duration duration, Executor executor);


It worth it to mention that, in Java 9+, a different approach is chosen to implement delays — there is no corresponding operation defined for the CompletableFuture object and you should use the delayed Executor. Please read the documentation on the CompletableFuture.delayedExecutor method for details.

5. Combining Several CompletionStage-s.

The utility class Promises provides a rich set of methods to combine several CompletionStage-s that lefts limited functionality of CompletableFuter.allOf / anyOf far behind:

  1. The library works with any CompletionStage implementation without resorting to converting arguments to CompletableFuture first (and CompletionStage.toCompletableFuture is an optional operation, at least it's documented so in Java 8).
  2. It's possible to pass either an array or a List of CompletionStage-s as arguments.
  3. The resulting Promise let access individual results of the settled CompletionStage-s passed.
  4. There is an option to cancel non-settled CompletionStage-s passed once the result of the operation is known.
  5. Optionally, you can specify whether or not to tolerate individual failures as long as they don't affect the final result.
  6. General M completed successfully out of N passed promises scenario is possible. Let us review the relevant methods, from the simplest ones to the most advanced.
static <T> Promise<List<T>> all([boolean cancelRemaining=true,] 
                                 CompletionStage<? extends T>... promises)
static <T> Promise<List<T>> all([boolean cancelRemaining=true,] 
                                List<? extends CompletionStage<? extends T>> promises)


Returns a promise that is completed normally when all CompletionStage-s passed as parameters are completed normally; if any promise completed exceptionally, then resulting promise is completed exceptionally as well.

static <T> Promise<T> any([boolean cancelRemaining=true,] 
                          CompletionStage<? extends T>... promises)
static <T> Promise<T> any([boolean cancelRemaining=true,] 
                          List<? extends CompletionStage<? extends T>> promises)


Returns a promise that is completed normally when any CompletionStage passed as parameters is completed normally (race is possible); if all promises completed exceptionally, then resulting promise is completed exceptionally as well.

static <T> Promise<T> anyStrict([boolean cancelRemaining=true,] 
                                CompletionStage<? extends T>... promises)
static <T> Promise<T> anyStrict([boolean cancelRemaining=true,] 
                                List<? extends CompletionStage<? extends T>> promises)


Returns a promise that is completed normally when any CompletionStage passed as parameters are completed normally (race is possible); if any promise completed exceptionally before the first result is available, then resulting promise is completed exceptionally as well (unlike non-Strict variant, where exceptions are ignored if any result is available at all).

static <T> Promise<List<T>> atLeast(int minResultsCount, [boolean cancelRemaining=true,] 
                                    CompletionStage<? extends T>... promises)
static <T> Promise<List<T>> atLeast(int minResultsCount, [boolean cancelRemaining=true,] 
                                    List<? extends CompletionStage<? extends T>> promises)


Generalization of the any method returns a promise that is completed normally when at least minResultCount of CompletionStage-s passed as parameters are completed normally (race is possible); if less than minResultCount of promises completed normally, then resulting promise is completed exceptionally.

static <T> Promise<List<T>> atLeastStrict(int minResultsCount, [boolean cancelRemaining=true,] 
                                          CompletionStage<? extends T>... promises)
static <T> Promise<List<T>> atLeastStrict(int minResultsCount, [boolean cancelRemaining=true,] 
                                          List<? extends CompletionStage<? extends T>> promises)


Generalization of the anyStrict method returns a promise that is completed normally when at least minResultCount of CompletionStage-s passed as parameters are completed normally (race is possible); if any promise completed exceptionally before minResultCount of results are available, then resulting promise is completed exceptionally as well (unlike non-Strict variant, where exceptions are ignored if minResultsCount of successful results are available).

All methods above have an optional parameter cancelRemaining. When omitted, it means implicitly cancelRemaining = true. The cancelRemaining parameter defines whether or not to eagerly cancel the remaining promises once the result of the operation is known, i.e. enough promises passed are settled successfully or some CompletionStage completed exceptionally in strict version.

Each operation to combine CompletionStage-s has overloaded versions that accept either a List of CompletionStage-s or larger array of CompletionStage-s.

Besides any/anyStrict methods that return single-valued promise, all other combining methods return a list of values per every successfully completed promise, at the same indexed position. If the promise at the given position was not settled at all, or failed (in non-strict version), then corresponding item in the result list is null. If necessary number or promises was not completed successfully, or any one completed exceptionally in strict version, then resulting Promise is settled with a failure of the type MultitargetException. Application developer may examine MultitargetException.getExceptions() to check what is the exact failure per concrete CompletionStage passed. The Promise returned has the following characteristics:

  • Canceling the resulting Promise will cancel all the CompletionStage-s passed as arguments.
  • Default asynchronous executor of the resulting Promise is undefined, i.e. it could be either ForkJoin.commonPool or whatever Executor is used by any of the CompletionStage passed as argument. To ensure that necessary default Executor is used for subsequent asynchronous operations, please apply defaultAsyncOn(myExecutor) on the result. The list of features provided by the Tascalate Concurrent library doesn't stop here. There is more interesting stuff like Retry/ Poll functionality, controlling cancellation of the chain of Promises, extensions to ExecutorService, etc. But this article is already getting too long, so the remaining is left for the next time. In the meantime, you can check the home page of the Tascalate Concurrent library for the most up-to-date documentation.

Acknowledgments

Internal implementation details of the CompletableTask class are greatly inspired by the work done by Lukáš Křečan. A description of his library is available as a two-part article on DZone: Part I and Part II. It's worth reading for those who'd like to have a better understanding of the CompletableTask internals.

Topics:
concurrency ,multithreading ,completionstage ,completablefuture ,java ,java concurrency ,java 1.8 ,java 11 ,java 9 ,promise

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}