Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Implementing Java 8 CompletionStage (Part 1)

DZone's Guide to

Implementing Java 8 CompletionStage (Part 1)

· Integration Zone
Free Resource

Modernize your application architectures with microservices and APIs with best practices from this free virtual summit series. Brought to you in partnership with CA Technologies.

Java 8 brought a standard support for fluent asynchronous processing. With CompletableFutures and lambdas we can write code like this:

CompletableFuture<LaunchResult> missilesLaunched =
  getAuthorizationCode().thenApply(this::launchMissiles);

missilesLaunched
  .thenApply(this::generateDamageReport)
  .thenAccept(this::updateMainScreen)
  .exceptionally(this::playAlertSound);

...

missilesLaunched.thenAccept(LavaLamp::turnOn);

The example is self-documenting, I obtain a CompletableFuture and then I chain actions one after each other. All the ugly asynchronous code is hidden. It does not matter that damage report takes hours to generate, once it's ready, the next stage is called.

Isn't it great? Now we just have to add support for CompletableFutures to all libraries and then we can write highly scalable asynchronous code that easily connects asynchronous servlets with asynchronous HTTP drivers, asynchronous JDBC drivers and asynchronous missile launcher drivers. It will be the final blow to Node.js, hurray.

Unfortunately it's will not be so easy. First of all, it will take time to migrate all libraries to use CompletableFutures. But that's not what I want to write about today. I want to write about deficiencies in CompletableFuture implementation and how I tried to overcome them.

The first issue I have with CompletableFuture is that it's tightly coupled with fork-join framework and fork-join framework is meant to be used mainly for CPU intensive tasks. But if you think about it, the usual use-case for asynchronous processing is the opposite. We want to use it mainly for blocking tasks, we do not want to block a thread while waiting for a network operation or for a missile to be launched. Of course, you do not have to use fork-join executors with CompletableFutures. You just have to be careful since it's the default choice for async methods.

The other issue I have with CompletableFutures is the implementation. Just check the code and see for yourself. Do not get me wrong, I believe that it's well tested and really well optimized code. But honestly, would it pass code-review in your company? Are you able to read it and see how it works?

Luckily, there is CompletionStage interface so we can use an alternative implementation. Just google for “CompletionStage alternative implementation” and you get ... nothing.

Cool, it gives me the opportunity to implement it. I will help humanity and finally get famous. Joking aside, it turned out to be a really interesting and educative exercise, if you have few days to spare I really recommend it.

Callbacks FTW!

The first issue to solve is how to integrate the CompletionStage with the source of the data. My first attempt was to use only stateless functions registered as callbacks. I have created the following interface

public interface Listenable<T> {
  public void addCallbacks(Consumer<? super T> onSuccess, Consumer<Throwable> onFailure);  
}

Basically it allows everyone to register callbacks. That's all we need to implement CompletionStage, we just need to be notified when the previous phase succeeded or failed. For example, to create CompletionStage from Spring ListenableFuture, you just have to do this

ListenableFuture<String> springListenableFuture = createSpringListenableFuture();

CompletionStage<Object> completionStage = factory.createCompletionStage(
     (onSuccessCallback, onFailureCallback) -> {
            springListenableFuture.addCallback(new ListenableFutureCallback<String>() {
                @Override
                public void onSuccess(String result) {
                    onSuccessCallback.accept(result);
                }

                @Override
                public void onFailure(Throwable t) {
                    onFailureCallback.accept(t);
                }
            });
        });

The code uses a factory to create a completion stage. The lambda parameter is a Listenabe which basically makes Spring ListenableFuture to call onSuccessCallback when it finishes and onFailureCallback when it fails. I admit, it's hard to read but I had a good reason for choosing this solution. It allowed me to register as many callbacks I liked and thus delegate all heavy lifting to Spring ListenableFuture

You see, most of the asynchronous libraries already have all the code necessary for managing callbacks, I wanted to reuse it. Each of the CompletionStage method calls would just add a new callback to the source. Take this code for example

CompletionStage<LaunchResult> missilesLaunched =
  getAuthorizationCode()
  .thenApply(this::launchMissiles);

missilesLaunched
  .thenApply(this::generateDamageReport)
  .thenAccept(this::updateMainScreen)
  .exceptionally(this::playAlertSound);

...
missilesLaunched.thenAccept(LavaLamp::turnOn);

The method call thenApply(this::launchMissiles) would add a new callback that would launch the missiles. The next thenApply would generate yet another callback and so on. The callbacks would have been simple stateless transformations, each of them building on the previous transformations. All state would have been managed by the source, in my case by Spring ListenableFuture.

Callbacks WTF?

But unfortunately it's not possible. The trouble is that you do not want to execute intermediate steps multiple times. In our example, we want to generate a damage report and turn on the lava lamp based on the missile launch result. This would end-up in two callbacks and consequently in two calls of launchMissiles method. That's not what we want, obviously we want to call the method only once. Especially this one.

There is no way around it, we have to deal with state in each of the steps. Take a look at the last line of the example: missilesLaunched.thenAccept(LavaLamp::turnOn). If the result of the missile launch is already known, the missilesLaunched CompletionStage has to remember it, so it can pass it to the lava lamp. There is no other way to figure out the result without launching the missiles again. If the result is not known yet, missilesLaunched has to register a callback that will turn on the lava lamp once the result is available. We can not delegate it any further, previous stages do not have access to the result.

If you think about it, the issue is caused by side effects. If all my methods were pure functional methods I would not have this problem. I would not mind calling the methods multiple times. But missile launch is not side-effect free and in Java, I can not force all users of the library to use only pure functions.

So the concept of Listenable callback did not work out. At the end I have removed it completely. Not only it did not work, it was really hard to reason about. Look again at the example with Spring ListenableFuture. It's quite confusing. It was even more confusing internally, I ended up creating callbacks in callbacks in callbacks.

What's more, the version without callbacks is much easier to use for clients

CompletableCompletionStage<Object> completionStage = factory.createCompletionStage();
springListenableFuture.addCallback(new ListenableFutureCallback<String>() {
    @Override
    public void onSuccess(String result) {
        completionStage.complete(result);
    }
    @Override
    public void onFailure(Throwable t) {
        completionStage.completeExceptionally(t);
    }
});

Now you do not have to create a new Listenable, you just call "complete" method. It's more natural and coincidentally CompletableFutures is used in the same way.

There is more interesting stuff I have learned, but this article is already getting too long so I will leave it for another time. In the meantime, you can check the source code here.

The Integration Zone is proudly sponsored by CA Technologies. Learn from expert microservices and API presentations at the Modernizing Application Architectures Virtual Summit Series.

Topics:

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}