Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Speed Up Services With Reactive API in Java EE 8

DZone's Guide to

Speed Up Services With Reactive API in Java EE 8

Learn how to optimize services for speed with asynchronous processing using the Reactive API in Java EE 8.

· Microservices Zone ·
Free Resource

Learn how modern cloud architectures use of microservices has many advantages and enables developers to deliver business software in a CI/CD way.

Services can often be optimized with asynchronous processing even without changing their behavior towards the outside world. The reason why some services aren't efficient is that they need to wait for other services to provide a result to continue further. Let's look how to call external REST services without waiting for them and also do multiple parallel calls independently and combine their results later with a reactive pipeline in Java EE 8.

This article is one in a series of articles about writing microservices with Java EE 8. All these topics and much more will be described in more detail in my upcoming book Java EE 8 Microservices, which I co-author with Mert Çaliskan and Pavel Pscheidl.

If our service calls multiple microservices and waits for each call to finish and return results before doing another call, it's a good candidate to refactor using reactive API. In order to make the service more efficient, it could do all the calls to external services in parallel if they don't depend on each other. This would decrease the time spent waiting and thus speed up the microservice.

In order to call REST services in parallel, we'll use the new reactive client API in JAX-RS. We'll combine it with the RxJava library to combine their results when available. This combination will allow us to write clean and efficient code. And with an additional benefit that the current thread can be released for further processing while wating for results from remote calls.

We'll build a pipeline which processes the results as they arrive and finally merges them into a single response. The first part of the pipeline will call each remote service. Instead of waiting for the results, we'll specify what to do with each received result and continue with calling other services. Using the rx() method on the JAX-RS client request builder allows us to call a version of the get() method, which immediately returns instead of waiting for the result. In order to process results when they arrive, we can chain method handlers onto a CompletionStage returned from the rx version of the get() method:

CompletionStage<Forecast> stage = temperatureServiceTarget
  .request()
  .rx()
  .get(Temperature.class)
  .thenApply(temperature -> new Forecast(temperature));

The above code will call a temperature service and then register a lambda expression to process the resulting temperature when it arrives. This maps the temperature to a forecast object, which can be accessed with the stage variable later.

However, we want to use another variant of the get() method together with an RxJava Flowable Invoker from the Jersey project to get a flowable from RxJava instead of a CompletionStage. The Flowable interface makes it easier to combine multiple asynchronous results with much simpler code than CompletionStage and also more efficiently.

With the following code, we will call an external service and return a Flowable:

Flowable<Forecast> flowable = temperatureServiceTarget
  .register(RxFlowableInvokerProvider.class)
  .request()
  .rx(RxFlowableInvoker.class)
  .get(Temperature.class)
  .map(temperature -> new Forecast(temperature);

We register additional  RxFlowableInvokerProvider, which allows to request RxFlowableInvoker later. This invoker then gives us the Flowable return type from RxJava. These classes are not in the JAX-RS API and we must add them with the Jersey RxJava2 library:

<dependency>
  <groupId>org.glassfish.jersey.ext.rx</groupId>
  <artifactId>jersey-rx-client-rxjava2</artifactId>
  <version>2.26</version>
</dependency>

At first sight, it seems we made the code more complicated while doing the same thing. But a Flowable instance allows us to combine multiple calls easily:

Flowable.concat(flowable1, flowable2)
  .doOnNext(forecast -> {
    forecasts.add(forecast);
  })
  .doOnComplete(() -> {
    asyncResponse.resume(Response.ok(forecasts).build());
  })
  .doOnError(asyncResponse::resume)
  .subscribe();
}

For each forecast received from any flowable, we add it to a list of forecasts. Finally, we send the list of forecasts as a response or send an error response. The final call to subscribe() is necessary to register the listeners; otherwise, they would be ignored.

You may have also noticed the asyncResponse variable used to send the final response or signal an error. This is a JAX-RS asynchronous response instance, which is used to complete a REST response at later time, when the data is available, without blocking the initial processing thread. Using the asynchronous response helps us save thread resources while waiting for results from external REST services. In order to turn on asynchronous processing in our REST endpoint, we will inject javax.ws.rs.container.AsyncResponse as the REST method argument, together with the @Suspended annotation. We will also change the return type to void because we'll be building the response using the AsyncResponse instance:

@GET
@Produces(MediaType.APPLICATION_JSON)
public void getForecasts(@Suspended AsyncResponse asyncResponse) {
  ...here come some asynchronous calls to REST services...
  asyncResponse.resume(...)
}


Final Code Example

The following code will:

  • Turn on asynchronous processing of REST requests in the getForecasts method
  • Set a 5-minute timeout on the asynchronous response
  • Execute the temperature service twice, for London and Beijing, without waiting for results
  • Combine the results into a sequence of forecasts
  • Add every forecast in the sequence into a list
  • Send the complete list when all results processed
  • Send an error result in case of an exception
  • Register the handlers with the method
private Flowable<Forecast> getTemperature(String location) {
  return temperatureTarget
    .register(RxFlowableInvokerProvider.class)
    .resolveTemplate("city", location)
    .request()
    .rx(RxFlowableInvoker.class)
    .get(Temperature.class)
    .map(temperature -> new Forecast(location, temperature));
}

@GET
@Produces(MediaType.APPLICATION_JSON)
public void getForecasts(@Suspended AsyncResponse asyncResponse) {
  List<Forecast> forecasts = new ArrayList<>();
  asyncResponse.setTimeout(5, TimeUnit.MINUTES);
  Flowable.concat(getTemperature("London"), getTemperature("Beijing"))
    .doOnNext(forecast -> {
      forecasts.add(forecast);
    })
  .doOnComplete(() -> {
    asyncResponse.resume(Response.ok(forecasts).build());
  })
  .doOnError(asyncResponse::resume)
  .subscribe();
}


Discover how to deploy pre-built sample microservices OR create simple microservices from scratch.

Topics:
microservices ,performance ,reactive api ,api ,java ee 8 ,tutorial

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}