DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

How does AI transform chaos engineering from an experiment into a critical capability? Learn how to effectively operationalize the chaos.

Data quality isn't just a technical issue: It impacts an organization's compliance, operational efficiency, and customer satisfaction.

Are you a front-end or full-stack developer frustrated by front-end distractions? Learn to move forward with tooling and clear boundaries.

Developer Experience: Demand to support engineering teams has risen, and there is a shift from traditional DevOps to workflow improvements.

Related

  • Legacy Code Refactoring: Tips, Steps, and Best Practices
  • Did We Build The Right Thing? That's What UAT Is About.
  • 5 Advantages of Dart Over JavaScript, Python, and Bash
  • Test Driven Development Using TestNG, Gradle

Trending

  • Enterprise Data Loss Prevention (DLP) Security Policies and Tuning
  • The Evolution of Scalable and Resilient Container Infrastructure
  • Domain-Centric Agile Modeling for Legacy Insurance Systems
  • The End of “Good Enough Agile”
  1. DZone
  2. Coding
  3. Java
  4. Converting between Completablefuture and Observable

Converting between Completablefuture and Observable

By 
Tomasz Nurkiewicz user avatar
Tomasz Nurkiewicz
DZone Core CORE ·
Nov. 27, 14 · Interview
Likes (3)
Comment
Save
Tweet
Share
14.7K Views

Join the DZone community and get the full member experience.

Join For Free


CompletableFuture<T>
 from Java 8
 is an advanced abstraction over a promise that value of type T will be available in the future. Observable<T> is quite similar, but it promises arbitrary number of items in the future, from 0 to infinity. These two representations of asynchronous results are quite similar to the point where Observable with just one item can be used instead of CompletableFuture and vice-versa. On the other hand CompletableFuture is more specialized and because it's now part of JDK, should become prevalent quite soon. Let's celebrate RxJava 1.0 release with a short article showing how to convert between the two, without loosing asynchronous and event-driven nature of them.

From CompletableFuture<T> to Observable<T>

CompletableFuture represents one value in the future, so turning it into Observable is rather simple. When Futurecompletes with some value, Observable will emit that value as well immediately and close stream:

class FuturesTest extends Specification {
 
    public static final String MSG = "Don't panic"
 
    def 'should convert completed Future to completed Observable'() {
        given:
            CompletableFuture<String> future = CompletableFuture.completedFuture("Abc")
 
        when:
            Observable<String> observable = Futures.toObservable(future)
 
        then:
            observable.toBlocking().toIterable().toList() == ["Abc"]
    }
 
    def 'should convert failed Future into Observable with failure'() {
        given:
            CompletableFuture<String> future = failedFuture(new IllegalStateException(MSG))
 
        when:
            Observable<String> observable = Futures.toObservable(future)
 
        then:
            observable
                    .onErrorReturn({ th -> th.message } as Func1)
                    .toBlocking()
                    .toIterable()
                    .toList() == [MSG]
    }  
 
    CompletableFuture failedFuture(Exception error) {
        CompletableFuture future = new CompletableFuture()
        future.completeExceptionally(error)
        return future
    }
 
}

First test of not-yet-implemented Futures.toObservable() converts Future into Observable and makes sure value is propagated correctly. Second test created failed Future, replaces failure with exception's message and makes sure exception was propagated. The implementation is much shorter:

public static <T> Observable<T> toObservable(CompletableFuture<T> future) {
    return Observable.create(subscriber ->
            future.whenComplete((result, error) -> {
                if (error != null) {
                    subscriber.onError(error);
                } else {
                    subscriber.onNext(result);
                    subscriber.onCompleted();
                }
            }));
}

NB: Observable.fromFuture() exists, however we want to take full advantage of ComplatableFuture's asynchronous operators.

From Observable<T> toCompletableFuture<List<T>>

There are actually two ways to convert Observable to Future - creating CompletableFuture<List<T>> orCompletableFuture<T> (if we assume Observable has just one item). Let's start from the former case, described with the following test cases:

def 'should convert Observable with many items to Future of list'() {
    given:
        Observable<Integer> observable = Observable.just(1, 2, 3)
 
    when:
        CompletableFuture<List<Integer>> future = Futures.fromObservable(observable)
 
    then:
        future.get() == [1, 2, 3]
}
 
def 'should return failed Future when after few items exception was emitted'() {
    given:
        Observable<Integer> observable = Observable.just(1, 2, 3)
                .concatWith(Observable.error(new IllegalStateException(MSG)))
 
    when:
        Futures.fromObservable(observable)
 
    then:
        def e = thrown(Exception)
        e.message == MSG
}

Obviously Future doesn't complete until source Observable signals end of stream. Thus Observable.never() would never complete wrapping Future, rather then completing it with empty list. The implementation is much shorter and sweeter:

public static <T> CompletableFuture<List<T>> fromObservable(Observable<T> observable) {
    final CompletableFuture<List<T>> future = new CompletableFuture<>();
    observable
            .doOnError(future::completeExceptionally)
            .toList()
            .forEach(future::complete);
    return future;
}

The key is Observable.toList() that conveniently converts from Observable<T> and Observable<List<T>>. The latter emits one item of List<T> type when source Observable<T> finishes.

From Observable<T> to CompletableFuture<T>

Special case of the previous transformation happens when we know that CompletableFuture<T> will return exactly one item. In that case we can convert it directly to CompletableFuture<T>, rather than CompletableFuture<List<T>>with one item only. Tests first:

def 'should convert Observable with single item to Future'() {
    given:
        Observable<Integer> observable = Observable.just(1)
 
    when:
        CompletableFuture<Integer> future = Futures.fromSingleObservable(observable)
 
    then:
        future.get() == 1
}
 
def 'should create failed Future when Observable fails'() {
    given:
        Observable<String> observable = Observable.<String> error(new IllegalStateException(MSG))
 
    when:
        Futures.fromSingleObservable(observable)
 
    then:
        def e = thrown(Exception)
        e.message == MSG
}
 
def 'should fail when single Observable produces too many items'() {
    given:
        Observable<Integer> observable = Observable.just(1, 2)
 
    when:
        Futures.fromSingleObservable(observable)
 
    then:
        def e = thrown(Exception)
        e.message.contains("too many elements")
}

Again the implementation is quite straightforward and almost identical:

public static <T> CompletableFuture<List<T>> fromObservable(Observable<T> observable) {
    final CompletableFuture<List<T>> future = new CompletableFuture<>();
    observable
            .doOnError(future::completeExceptionally)
            .toList()
            .forEach(future::complete);
    return future;
}

Helpers methods above aren't fully robust yet, but if you ever need to convert between JDK 8 and RxJava style of asynchronous computing, this article should be enough to get you started.

Convert (command) Testing Implementation Java (programming language) Java Development Kit Stream (computing) Advantage (cryptography) Release (computing) Abstraction (computer science) IT

Published at DZone with permission of Tomasz Nurkiewicz, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Legacy Code Refactoring: Tips, Steps, and Best Practices
  • Did We Build The Right Thing? That's What UAT Is About.
  • 5 Advantages of Dart Over JavaScript, Python, and Bash
  • Test Driven Development Using TestNG, Gradle

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • [email protected]

Let's be friends: