DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • A Better Web3 Experience: Account Abstraction From Flow (Part 1)
  • Enterprise RIA With Spring 3, Flex 4 and GraniteDS
  • Spring Boot - How To Use Native SQL Queries | Restful Web Services
  • Spring Boot: Testing Service Layer Code with JUnit 5 and Mockito, RESTful Web Services

Trending

  • How to Set Up and Run PostgreSQL Change Data Capture
  • Throughput vs Goodput: The Performance Metric You Are Probably Ignoring in LLM Testing
  • Key Takeaways From Integrating a RAG Application With LangSmith
  • Your AI Agent Tests Are Passing, But Your Agent Is Still Broken
  1. DZone
  2. Coding
  3. Frameworks
  4. Using rx-java Observable in a Spring MVC Flow

Using rx-java Observable in a Spring MVC Flow

By 
Biju Kunjummen user avatar
Biju Kunjummen
·
Mar. 23, 15 · Interview
Likes (3)
Comment
Save
Tweet
Share
27.5K Views

Join the DZone community and get the full member experience.

Join For Free

Spring MVC has supported asynchronous request processing flow for sometime now and this support internally utilizes the Servlet 3 async support of containers like Tomcat/Jetty. 

Spring Web Async support

Consider a service call that takes a little while to process, simulated with a delay:

public CompletableFuture<Message> getAMessageFuture() {
    return CompletableFuture.supplyAsync(() -> {
        logger.info("Start: Executing slow task in Service 1");
        Util.delay(1000);
        logger.info("End: Executing slow task in Service 1");
        return new Message("data 1");
    }, futureExecutor);
}

If I were to call this service in a user request flow, the traditional blocking controller flow would look like this:

@RequestMapping("/getAMessageFutureBlocking")
public Message getAMessageFutureBlocking() throws Exception {
    return service1.getAMessageFuture().get();
}

A better approach is to use the Spring Asynchronous support to return the result back to the user when available from the CompletableFuture, this way not holding up the containers thread:

@RequestMapping("/getAMessageFutureAsync")
public DeferredResult<Message> getAMessageFutureAsync() {
    DeferredResult<Message> deffered = new DeferredResult<>(90000);
    CompletableFuture<Message> f = this.service1.getAMessageFuture();
    f.whenComplete((res, ex) -> {
        if (ex != null) {
            deffered.setErrorResult(ex);
        } else {
            deffered.setResult(res);
        }
    });
    return deffered;
}

Using Observable in a Async Flow


Now to the topic of this article, I have been using Rx-java's excellent Observable type as my service return types lately and wanted to ensure that the web layer also remains asynchronous in processing the Observable type returned from a service call.

Consider the service that was described above now modified to return an Observable:

public Observable<Message> getAMessageObs() {
    return Observable.<Message>create(s -> {
        logger.info("Start: Executing slow task in Service 1");
        Util.delay(1000);
        s.onNext(new Message("data 1"));
        logger.info("End: Executing slow task in Service 1");
        s.onCompleted();
    }).subscribeOn(Schedulers.from(customObservableExecutor));
}

I can nullify all the benefits of returning an Observable by ending up with a blocking call at the web layer, a naive call will be the following:

@RequestMapping("/getAMessageObsBlocking")
public Message getAMessageObsBlocking() {
    return service1.getAMessageObs().toBlocking().first();
}

To make this flow async through the web layer, a better way to handle this call is the following, essentially by transforming Observable to Spring's DeferredResult type:

@RequestMapping("/getAMessageObsAsync")
public DeferredResult<Message> getAMessageAsync() {
    Observable<Message> o = this.service1.getAMessageObs();
    DeferredResult<Message> deffered = new DeferredResult<>(90000);
    o.subscribe(m -> deffered.setResult(m), e -> deffered.setErrorResult(e));
    return deffered;
}
This would ensure that the thread handling the user flow would return as soon as the service call is complete and the user response will be processed reactively once the observable starts emitting values.


If you are interested in exploring this further, here is a github repo with working samples: https://github.com/bijukunjummen/spring-web-observable.

References:

Spring's reference guide on async flows in the web tier: http://docs.spring.io/spring/docs/current/spring-framework-reference/html/mvc.html#mvc-ann-async

More details on Spring DeferredResult by the inimitable Tomasz Nurkiewicz at the NoBlogDefFound blog - http://www.nurkiewicz.com/2013/03/deferredresult-asynchronous-processing.html


Flow (web browser) Spring Framework Web Service

Published at DZone with permission of Biju Kunjummen. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • A Better Web3 Experience: Account Abstraction From Flow (Part 1)
  • Enterprise RIA With Spring 3, Flex 4 and GraniteDS
  • Spring Boot - How To Use Native SQL Queries | Restful Web Services
  • Spring Boot: Testing Service Layer Code with JUnit 5 and Mockito, RESTful Web Services

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook