Implementing Java 8 CompletionStage (Part II)
Join the DZone community and get the full member experience.
Join For FreeIn the first part we have discussed the reasons for implementing Java 8 CompletionStage and why we can not use pure functional callbacks for the implementation. Today I'd like to finally dive into the code.
How it works?
CompletionStage interface has more than 30 methods, but we can simply illustrate the inner working on just few of them
@Override public boolean complete(T result) { return callbackRegistry.success(result); } @Override public boolean completeExceptionally(Throwable ex) { return callbackRegistry.failure(ex); } @Override public <U> CompletionStage<U> thenApplyAsync( Function<? super T, ? extends U> fn, Executor executor) { SimpleCompletionStage<U> nextStage = new SimpleCompletionStage<>(defaultExecutor); callbackRegistry.addCallbacks( // when the result is ready, transform it and pass // it to the next completion stage result -> { try { nextStage.complete(fn.apply(result)); } catch (Throwable e) { // transformation fails, next stage has to // complete exceptionally nextStage.completeExceptionally(wrapException(e)); } }, // exception from previous stage is passed to the next stage e -> nextStage.completeExceptionally(wrapException(e)), executor ); return nextStage; }
CalbackRegistry keeps track of callbacks, if someone calls method “complete” the value is propagated to all previously registered callbacks. If a new callback is added after the method “success” is called, the value is propagated to the callback at once. CalbackRegistry is the only state-full class in the whole machinery, the rest is state-less. This class has been heavily inspired by similar class in Spring.
Let's move on to thenApplyAsync method. To quote JavaDoc “Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function.” We are supposed to return a new completion stage, so let's create one.
Then we register two callbacks, one for normal and one for exceptional completion. When someone calls "complete", the first callback is used. It takes the result, applies the function and passes the transformed value to the next stage. Please note, that the code in the callbacks is not executed directly, it will be executed only after “complete” method on this stage is called.
If the transformation function throws an exception or if "completeExceptionally" method is called, next stage completes exceptionally as well.
Let's simplify it
Even though the previous code clearly describes the intent, it may be simplified even further. After some refactoring I got to this
@Override public <U> CompletionStage<U> thenApplyAsync( Function<? super T, ? extends U> fn, Executor executor ) { SimpleCompletionStage<U> nextStage = newSimpleCompletionStage(); addCallbacks( result -> nextStage.acceptResult(() -> fn.apply(result)), nextStage::handleFailure, executor ); return nextStage; } private void acceptResult(Supplier<? extends T> supplier) { try { complete(supplier.get()); } catch (Throwable e) { handleFailure(e); } }
It's the same code as before, exception handling is just wrapped in reusable methods. Method “acceptResult” takes a Supplier that provides value to be sent to the next stage. If the Supplier throws exception, the next stage completes exceptionally. Having this support, implementation of other methods is quite easy.
Take the method "handleAsync" for example. The JavaDoc says “Returns a new CompletionStage that, when this stage completes either normally or exceptionally, is executed with this stage's result and exception as arguments to the supplied function. The given function is invoked with the result (or null if none) and the exception (or null if none) of this stage when complete as arguments.”
@Override public <U> CompletionStage<U> handleAsync( BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) { SimpleCompletionStage<U> nextStage = newSimpleCompletionStage(); addCallbacks( result -> nextStage.acceptResult(() -> fn.apply(result, null)), // exceptions are treated as success e -> nextStage.acceptResult(() -> fn.apply(null, e)), executor ); return nextStage; }
The code does exactly what it says in the JavaDoc. If this stage completes normally, we apply the function to the value and pass the result to next stage. In case of an exception we call the same function with different arguments and pass the result to the next stage as well. Simple, isn't it?
Transforming functions is easy
Even though the CompletionStage interface looks scary, at the end, I found out that most of the methods can be implemented by reusing existing methods after applying a simple transformation. For example "thenRunAsync" is same as "thenApplyAsync" but instead of a function accepts a runnable. It's easy to convert a runnable to a function and reuse "thenApplyAsync".
public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor) { return thenApplyAsync(convertRunnableToFunction(action), executor); } private Function<T, Void> convertRunnableToFunction(Runnable action) { return result -> { action.run(); return null; }; }
Runnable is a function that ignores its parameter and does not return anything. You can see that it's natural to express the idea using lambdas.
Apply to either
Not all methods are so simple, the trickiest was "applyToEither" which takes two completion stages and picks only one result while ignoring the other one. Sounds complicated but even this method can be easilly implemented.
@Override public <U> CompletionStage<U> applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor) { SimpleCompletionStage<T> nextStage = newSimpleCompletionStage(); // only the first result is accepted by completion stage, // the other one is ignored this.thenAccept(nextStage::complete).exceptionally(nextStage::handleFailure); other.thenAccept(nextStage::complete).exceptionally(nextStage::handleFailure); return nextStage.thenApplyAsync(fn, executor); }
We reuse existing methods "thenAccept" and "exceptionally" to call "complete" or "handleFailure" on the next stage. The nextStage completion stage implementation ensures that it accepts only the first result and ignores further calls to "complete*" methods. It means that only the first result is applied. In order to use the function provided in parameter we call "thenApplyAsync" (last line) on the next stage and we are done. Note, that we are creating extra CompletionStages so the code is not as effective as it could be, but I tend to prefer clarity above negligable gains in performance.
Tests are obligatory
There are so many edge cases and combinations that the work would have been impossible without good tests. I have created a common test suite that can be executed on top of CompletableFuture and my implementation of CompletionStage. It's incredibly useful, it allows to keep both implementations in sync and to troubleshoot the tests. Moreover, I was able to find a strange behavior (possible bug) in CompletableFuture.
The code has near 100% test coverage but unfortunatelly the interface is so complex that it's not possible to be sure about the correctness.
It's useful to write about the code
It's interesting how writing about the problem switches the mind to a different mode. While writing this post, I have uncovered several bugs and figured-out several simplifications. I had fun writing the code but without writing this post the code would have been much more complicated and buggy.
So feel free to check the code and more importantly, try similar exercise for yourself, it's really fun.
Opinions expressed by DZone contributors are their own.
Comments