DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
View Events Video Library
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Integrating PostgreSQL Databases with ANF: Join this workshop to learn how to create a PostgreSQL server using Instaclustr’s managed service

Mobile Database Essentials: Assess data needs, storage requirements, and more when leveraging databases for cloud and edge applications.

Monitoring and Observability for LLMs: Datadog and Google Cloud discuss how to achieve optimal AI model performance.

Automated Testing: The latest on architecture, TDD, and the benefits of AI and low-code tools.

Related

  • Unraveling Lombok's Code Design Pitfalls: Exploring Encapsulation Issues
  • Legacy Code Refactoring: Tips, Steps, and Best Practices
  • Two Cool Java Frameworks You Probably Don’t Need
  • How To Build a Multi-Zone Java App in Days With Vaadin, YugabyteDB, and Heroku

Trending

  • Architecting a Completely Private VPC Network and Automating the Deployment
  • The Promise of Personal Data for Better Living
  • Build a Serverless App Fast with Zipper: Write TypeScript, Offload Everything Else
  • Application Integration for IoT Devices: A Detailed Guide To Unifying Your Ecosystem
  1. DZone
  2. Coding
  3. Java
  4. Implementing Java 8 CompletionStage (Part II)

Implementing Java 8 CompletionStage (Part II)

Lukas Krecan user avatar by
Lukas Krecan
·
Jan. 09, 15 · Interview
Like (1)
Save
Tweet
Share
8.95K Views

Join the DZone community and get the full member experience.

Join For Free

In the first part we have discussed the reasons for implementing Java 8 CompletionStage and why we can not use pure functional callbacks for the implementation. Today I'd like to finally dive into the code.

How it works?

CompletionStage interface has more than 30 methods, but we can simply illustrate the inner working on just few of them

@Override
public boolean complete(T result) {
    return callbackRegistry.success(result);
}
@Override
public boolean completeExceptionally(Throwable ex) {
    return callbackRegistry.failure(ex);
}
@Override
public <U> CompletionStage<U> thenApplyAsync(
        Function<? super T, ? extends U> fn,
        Executor executor)
{
    SimpleCompletionStage<U> nextStage
            = new SimpleCompletionStage<>(defaultExecutor);
    callbackRegistry.addCallbacks(
            // when the result is ready, transform it and pass 
            // it to the next completion stage
            result -> {
                try {
                    nextStage.complete(fn.apply(result));
                } catch (Throwable e) {
                    // transformation fails, next stage has to 
                    // complete exceptionally
                    nextStage.completeExceptionally(wrapException(e));
                }
            },
            // exception from previous stage is passed to the next stage
            e -> nextStage.completeExceptionally(wrapException(e)),
            executor
    );
    return nextStage;
}

CalbackRegistry keeps track of callbacks, if someone calls method “complete” the value is propagated to all previously registered callbacks. If a new callback is added after the method “success” is called, the value is propagated to the callback at once. CalbackRegistry is the only state-full class in the whole machinery, the rest is state-less. This class has been heavily inspired by similar class in Spring.

Let's move on to thenApplyAsync method. To quote JavaDoc “Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function.” We are supposed to return a new completion stage, so let's create one.

Then we register two callbacks, one for normal and one for exceptional completion. When someone calls "complete", the first callback is used. It takes the result, applies the function and passes the transformed value to the next stage. Please note, that the code in the callbacks is not executed directly, it will be executed only after “complete” method on this stage is called.

If the transformation function throws an exception or if "completeExceptionally" method is called, next stage completes exceptionally as well.

Let's simplify it

Even though the previous code clearly describes the intent, it may be simplified even further. After some refactoring I got to this

@Override
public <U> CompletionStage<U> thenApplyAsync(
        Function<? super T, ? extends U> fn,
        Executor executor
) {
    SimpleCompletionStage<U> nextStage = newSimpleCompletionStage();
    addCallbacks(
            result -> nextStage.acceptResult(() -> fn.apply(result)),
            nextStage::handleFailure,
            executor
    );
    return nextStage;
}

private void acceptResult(Supplier<? extends T> supplier) {
    try {
        complete(supplier.get());
    } catch (Throwable e) {
        handleFailure(e);
    }
}

It's the same code as before, exception handling is just wrapped in reusable methods. Method “acceptResult” takes a Supplier that provides value to be sent to the next stage. If the Supplier throws exception, the next stage completes exceptionally. Having this support, implementation of other methods is quite easy.

Take the method "handleAsync" for example. The JavaDoc says “Returns a new CompletionStage that, when this stage completes either normally or exceptionally, is executed with this stage's result and exception as arguments to the supplied function. The given function is invoked with the result (or null if none) and the exception (or null if none) of this stage when complete as arguments.”

@Override
public <U> CompletionStage<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn,
        Executor executor) {
    SimpleCompletionStage<U> nextStage = newSimpleCompletionStage();
    addCallbacks(
            result -> nextStage.acceptResult(() -> fn.apply(result, null)),
            // exceptions are treated as success
            e -> nextStage.acceptResult(() -> fn.apply(null, e)),
            executor
    );
    return nextStage;
}

The code does exactly what it says in the JavaDoc. If this stage completes normally, we apply the function to the value and pass the result to next stage. In case of an exception we call the same function with different arguments and pass the result to the next stage as well. Simple, isn't it?

Transforming functions is easy

Even though the CompletionStage interface looks scary, at the end, I found out that most of the methods can be implemented by reusing existing methods after applying a simple transformation. For example "thenRunAsync" is same as "thenApplyAsync" but instead of a function accepts a runnable. It's easy to convert a runnable to a function and reuse "thenApplyAsync".

public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor) {
    return thenApplyAsync(convertRunnableToFunction(action), executor);
}

private Function<T, Void> convertRunnableToFunction(Runnable action) {
    return result -> {
        action.run();
        return null;
    };
}

Runnable is a function that ignores its parameter and does not return anything. You can see that it's natural to express the idea using lambdas.

Apply to either

Not all methods are so simple, the trickiest was "applyToEither" which takes two completion stages and picks only one result while ignoring the other one. Sounds complicated but even this method can be easilly implemented.

@Override
public <U> CompletionStage<U> applyToEitherAsync(
        CompletionStage<? extends T> other,
        Function<? super T, U> fn,
        Executor executor) {
    SimpleCompletionStage<T> nextStage = newSimpleCompletionStage();
    // only the first result is accepted by completion stage,
    // the other one is ignored
    this.thenAccept(nextStage::complete).exceptionally(nextStage::handleFailure);
    other.thenAccept(nextStage::complete).exceptionally(nextStage::handleFailure);
    return nextStage.thenApplyAsync(fn, executor);
}

We reuse existing methods "thenAccept" and "exceptionally" to call "complete" or "handleFailure" on the next stage. The nextStage completion stage implementation ensures that it accepts only the first result and ignores further calls to "complete*" methods. It means that only the first result is applied. In order to use the function provided in parameter we call "thenApplyAsync" (last line) on the next stage and we are done. Note, that we are creating extra CompletionStages so the code is not as effective as it could be, but I tend to prefer clarity above negligable gains in performance.

Tests are obligatory

There are so many edge cases and combinations that the work would have been impossible without good tests. I have created a common test suite that can be executed on top of CompletableFuture and my implementation of CompletionStage. It's incredibly useful, it allows to keep both implementations in sync and to troubleshoot the tests. Moreover, I was able to find a strange behavior (possible bug) in CompletableFuture.
The code has near 100% test coverage but unfortunatelly the interface is so complex that it's not possible to be sure about the correctness.

It's useful to write about the code

It's interesting how writing about the problem switches the mind to a different mode. While writing this post, I have uncovered several bugs and figured-out several simplifications. I had fun writing the code but without writing this post the code would have been much more complicated and buggy.
So feel free to check the code and more importantly, try similar exercise for yourself, it's really fun.

Java (programming language) IT

Opinions expressed by DZone contributors are their own.

Related

  • Unraveling Lombok's Code Design Pitfalls: Exploring Encapsulation Issues
  • Legacy Code Refactoring: Tips, Steps, and Best Practices
  • Two Cool Java Frameworks You Probably Don’t Need
  • How To Build a Multi-Zone Java App in Days With Vaadin, YugabyteDB, and Heroku

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: