DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
  1. DZone
  2. Data Engineering
  3. Databases
  4. Speed Up Services With Reactive API in Java EE 8

Speed Up Services With Reactive API in Java EE 8

Learn how to optimize services for speed with asynchronous processing using the Reactive API in Java EE 8.

Ondro Mihalyi user avatar by
Ondro Mihalyi
·
Jul. 03, 18 · Tutorial
Like (4)
Save
Tweet
Share
7.80K Views

Join the DZone community and get the full member experience.

Join For Free

Services can often be optimized with asynchronous processing even without changing their behavior towards the outside world. The reason why some services aren't efficient is that they need to wait for other services to provide a result to continue further. Let's look how to call external REST services without waiting for them and also do multiple parallel calls independently and combine their results later with a reactive pipeline in Java EE 8.

This article is one in a series of articles about writing microservices with Java EE 8. All these topics and much more will be described in more detail in my upcoming book Java EE 8 Microservices, which I co-author with Mert Çaliskan and Pavel Pscheidl.

If our service calls multiple microservices and waits for each call to finish and return results before doing another call, it's a good candidate to refactor using reactive API. In order to make the service more efficient, it could do all the calls to external services in parallel if they don't depend on each other. This would decrease the time spent waiting and thus speed up the microservice.

In order to call REST services in parallel, we'll use the new reactive client API in JAX-RS. We'll combine it with the RxJava library to combine their results when available. This combination will allow us to write clean and efficient code. And with an additional benefit that the current thread can be released for further processing while wating for results from remote calls.

We'll build a pipeline which processes the results as they arrive and finally merges them into a single response. The first part of the pipeline will call each remote service. Instead of waiting for the results, we'll specify what to do with each received result and continue with calling other services. Using the rx() method on the JAX-RS client request builder allows us to call a version of the get() method, which immediately returns instead of waiting for the result. In order to process results when they arrive, we can chain method handlers onto a CompletionStage returned from the rx version of the get() method:

CompletionStage<Forecast> stage = temperatureServiceTarget
  .request()
  .rx()
  .get(Temperature.class)
  .thenApply(temperature -> new Forecast(temperature));

The above code will call a temperature service and then register a lambda expression to process the resulting temperature when it arrives. This maps the temperature to a forecast object, which can be accessed with the stage variable later.

However, we want to use another variant of the get() method together with an RxJava Flowable Invoker from the Jersey project to get a flowable from RxJava instead of a CompletionStage. The Flowable interface makes it easier to combine multiple asynchronous results with much simpler code than CompletionStage and also more efficiently.

With the following code, we will call an external service and return a Flowable:

Flowable<Forecast> flowable = temperatureServiceTarget
  .register(RxFlowableInvokerProvider.class)
  .request()
  .rx(RxFlowableInvoker.class)
  .get(Temperature.class)
  .map(temperature -> new Forecast(temperature);

We register additional  RxFlowableInvokerProvider, which allows to request RxFlowableInvoker later. This invoker then gives us the Flowable return type from RxJava. These classes are not in the JAX-RS API and we must add them with the Jersey RxJava2 library:

<dependency>
  <groupId>org.glassfish.jersey.ext.rx</groupId>
  <artifactId>jersey-rx-client-rxjava2</artifactId>
  <version>2.26</version>
</dependency>

At first sight, it seems we made the code more complicated while doing the same thing. But a Flowable instance allows us to combine multiple calls easily:

Flowable.concat(flowable1, flowable2)
  .doOnNext(forecast -> {
    forecasts.add(forecast);
  })
  .doOnComplete(() -> {
    asyncResponse.resume(Response.ok(forecasts).build());
  })
  .doOnError(asyncResponse::resume)
  .subscribe();
}

For each forecast received from any flowable, we add it to a list of forecasts. Finally, we send the list of forecasts as a response or send an error response. The final call to subscribe() is necessary to register the listeners; otherwise, they would be ignored.

You may have also noticed the asyncResponse variable used to send the final response or signal an error. This is a JAX-RS asynchronous response instance, which is used to complete a REST response at later time, when the data is available, without blocking the initial processing thread. Using the asynchronous response helps us save thread resources while waiting for results from external REST services. In order to turn on asynchronous processing in our REST endpoint, we will inject javax.ws.rs.container.AsyncResponse as the REST method argument, together with the @Suspended annotation. We will also change the return type to void because we'll be building the response using the AsyncResponse instance:

@GET
@Produces(MediaType.APPLICATION_JSON)
public void getForecasts(@Suspended AsyncResponse asyncResponse) {
  ...here come some asynchronous calls to REST services...
  asyncResponse.resume(...)
}


Final Code Example

The following code will:

  • Turn on asynchronous processing of REST requests in the getForecasts method
  • Set a 5-minute timeout on the asynchronous response
  • Execute the temperature service twice, for London and Beijing, without waiting for results
  • Combine the results into a sequence of forecasts
  • Add every forecast in the sequence into a list
  • Send the complete list when all results processed
  • Send an error result in case of an exception
  • Register the handlers with the method
private Flowable<Forecast> getTemperature(String location) {
  return temperatureTarget
    .register(RxFlowableInvokerProvider.class)
    .resolveTemplate("city", location)
    .request()
    .rx(RxFlowableInvoker.class)
    .get(Temperature.class)
    .map(temperature -> new Forecast(location, temperature));
}

@GET
@Produces(MediaType.APPLICATION_JSON)
public void getForecasts(@Suspended AsyncResponse asyncResponse) {
  List<Forecast> forecasts = new ArrayList<>();
  asyncResponse.setTimeout(5, TimeUnit.MINUTES);
  Flowable.concat(getTemperature("London"), getTemperature("Beijing"))
    .doOnNext(forecast -> {
      forecasts.add(forecast);
    })
  .doOnComplete(() -> {
    asyncResponse.resume(Response.ok(forecasts).build());
  })
  .doOnError(asyncResponse::resume)
  .subscribe();
}


API microservice Java EE Java (programming language)

Published at DZone with permission of Ondro Mihalyi, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • DZone's Article Submission Guidelines
  • The 5 Books You Absolutely Must Read as an Engineering Manager
  • How To Handle Secrets in Docker
  • Spring Boot vs Eclipse MicroProfile: Resident Set Size (RSS) and Time to First Request (TFR) Comparative

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: