Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Functional and Reactive Spring with Reactor and Netflix OSS

DZone's Guide to

Functional and Reactive Spring with Reactor and Netflix OSS

Reactive programming keeps your flow going, whereas functional programming is easy to reason about. See them in action with Spring and Netflix OSS.

· Java Zone
Free Resource

Just released, a free O’Reilly book on Reactive Microsystems: The Evolution of Microservices at Scale. Brought to you in partnership with Lightbend.

Reactive programming is a programming paradigm that deals with a stream of events in a non-blocking and event-driven fashion. Exceptions are considered first-class citizens and are treated as events to avoid breaking the flow of events. 

Functional programming, on the other hand, enables you to write declarative code that is composable and easy to reason about. 

Project Reactor is an implementation of the Reactive Streams specification. It helps in writing an event stream execution model by utilizing features from both reactive and functional programming paradigms. You built the execution model to specify how events will be processed in the form of a data flow pipeline, utilizing numerous operators like transformation, merge, filter, and window, etc. Being the implementation of Reactive Streams, backpressure handling is at the heart of Reactor to avoid overwhelming both the source and destination. The table below shows the two major reactive types provided by Project Reactor and its comparison with other frameworks.

Image title


This article exploits the possibility of developing a functional and reactive application using Spring Web ReactiveProject Reactor and Spring Cloud Netflix. The key features that are used from Spring Cloud Netflix are Service Discovery (Eureka), Circuit Breaker (Hystrix), and Client Side Load Balancing (Ribbon).

Designing an application using a reactive stack enable you to gain the maximum benefit of the CPU. Instead of waiting for the blocking operations, application threads keep on entertaining the new requests and react only when the response is ready. That means the general technology stack is no longer applicable to achieve that kind of performance. The blocking operation can be I/O operations or even JSON parsing.

Reactive Loan Broker System

The example application developed for exploration is a revamp of the Loan Broker that I had developed a long while back using Spring Integration. Just to give you a quick overview of the Loan Broker System, it takes loan request from an application and in the response, it returns best available quotation after gathering it from various banks. Below is the high-level microservice architecture diagram of the loan broker system.

System Architecture


The Loan Broker System has the following main components:

Image title

The LoanBrokerHandler receives a client request and dispatches it to ReactiveLoanBrokerAgent to find the best quotation. ReactiveLoanBrokerAgent then collects the endpoints of all the banks from ReactiveServiceLocator provided by Eureka and forwards the request to each bank. That means the system will automatically be able adapt to a new bank addition or removal without any modification. Moreover, if there is more than one instance of the same bank, then they will be load balanced by Ribbon. The code snippet below shows the execution model from LoanBrokerHandler:

private ReactiveLoanBrokerAgent loanBrokerAgent;

public LoanBrokerHandler(ReactiveLoanBrokerAgent reactorLoanBrokerAgent) {
    this.loanBrokerAgent = reactorLoanBrokerAgent;
}

public Mono<ServerResponse> bestQuotation(ServerRequest request) {

    return request.queryParam("loanAmount").map((loanAmountParam) -> Mono.just(loanAmountParam)
            .map(Double::valueOf)
            .then((loanAmount) ->
                    Mono.from( loanBrokerAgent.findBestQuotation(loanAmount))
                            .then((quotation -> isValid(quotation,loanAmount))))
            .flatMap(bestQuotationResponse ->
                    ServerResponse.ok()
                            .body(Mono.just(bestQuotationResponse), BestQuotationResponse.class))
            .switchIfEmpty(Errors.noBankServiceAvailable())
            .onErrorResumeWith(IllegalStateException.class, Errors::mapException)
            .onErrorResumeWith(Errors::unknownException)
            .next()).orElseGet(()->Errors.loanAmountRequired());
}

private Mono<BestQuotationResponse> isValid(BestQuotationResponse quotation,Double loanAmount){
    return quotation.getRequestedLoanAmount().equals(loanAmount) ?
            Mono.just(quotation) : Mono.error(new IllegalStateException("Returned amount does not match with the best offer"));
}


At line(12), it is dispatching the request to ReactiveLoanBrokerAgent, and if a proper response is found, then it sends the response back to the client — otherwise, it falls back to various error paths. Note how neatly errors/exceptions are being handled without any try/catch clutter. Now let's see how ReactorLoanBrokerAgent (an implementation of ReactiveLoanBrokerAgent) defines the execution model using Project Reactor.

public ReactorLoanBrokerAgent(ReactiveBankServiceLocator bankServiceLocator,
                              WebClient webClient){
  super(bankServiceLocator);
  hystrixWebClient = HystrixWebClient.create(webClient);
}

public Mono<BestQuotationResponse> findBestQuotation(Publisher<String> banksURL,Double loanAmount){

  return Flux.from(banksURL)
    .flatMap(bankURL -> requestForQuotation(bankURL, loanAmount)
             .otherwiseReturn(ReactiveLoanRequestService.OFFER_IN_CASE_OF_ERROR)) // Scatter
    .filter(offer -> !offer.equals(ReactiveLoanRequestService.OFFER_IN_CASE_OF_ERROR))
    .collect(()->new BestQuotationResponse(loanAmount), BestQuotationResponse::offer) // Gather
    .doOnSuccess(BestQuotationResponse::finish)
    .flatMap(bqr -> Mono.justOrEmpty(selectBestQuotation(bqr.getOffers()))
             .map(bestQuotation -> { bqr.bestOffer(bestQuotation); return bqr;}))
    .timeout(Duration.ofMillis(3000))
    .singleOrEmpty();
}

@Override
protected Mono<Quotation> requestForQuotation(String bankURL, Double loanAmount) {
  ClientRequest<Void> request = ClientRequest.GET(bankURL.concat(REQUEST_QUOTATION_PATH), loanAmount)
    .accept(MediaType.APPLICATION_JSON)
    .build();

  return hystrixWebClient.exchangeCommandBuilder()
    .withCommandName(bankURL)
    .withErrorThresholdPercentage(50)
    .withRequestVolumeThreshold(5)
    .withSleepWindowInMilliseconds(5000)
    .withExecutionTimeoutInMilliseconds(1000)
    .exchange(request)
    .then(response -> response.bodyToMono(Quotation.class));
}
  • At Line(10), the execution model is requesting (from each bank) the URLs provided by the ReactiveServiceLocator in the scatter phase.

  • In case any error is received from any bank, then it returns a default offer. The filter then removes any default offer.

  • Next, the aggregation phase collects all the offers and saves them into one BestQuotationResponse object.

  • At Line(15), the execution model selects the best quotation and saves it back into a BestQuotationResponse object.

  • Line(17) sets the overall timeout for request completion to 3 seconds, and at Line(18), the execution model is converting Flux to Mono to send back a single response.

Note how the execution model defines scatter, aggregation, timeout, and error handling strategy in a declarative manner. This is really concise, readable and powerful, at the same time. Next, let's discuss the requestForQuotation function. Here, the execution model is defined using the Spring Web Reactive WebClient. The WebClient allows you to make non-blocking network calls. What that means is that your Loan Broker System is reactive all the way down to the network call. Hence, there is no need to parallelize the calls to banks at Line(10). HystrixWebClient is an experiment that I did to enable us to use the Hystrix Circuit Breaker in a declarative style. The advantage of using a circuit breaker is that it prevents unnecessary calls to a faulty service unless it is healthy. Below is one of the execution cycles with three banks: 

Image title


The Hystrix dashboard is showing a circuit breaker opened for bank-2 because it always returns an error. Compare to the fact that the circuit breaker is closed for the rest of the banks.

Image title

Final Thoughts

The reactive application development initiative in Spring Framework 5 seems promising. Once you are used to it, then it is difficult to look back and think of writing code in an imperative style. Although the declarative style of writing application code seems attractive, on the flip side, it has a steep learning curve. 

The Spring Web Reactive module is under probation and its support with other modules, like Spring Security and Spring Data, is also underway.

If you are interested in playing with this project, then you can get start with the awesome test cases that I have written to verify the LoanBrokerHandler behavior when bank services are either slow, faulty, or unavailable, of course, along with the happy scenario. 

Strategies and techniques for building scalable and resilient microservices to refactor a monolithic application step-by-step, a free O'Reilly book. Brought to you in partnership with Lightbend.

Topics:
functional programming ,spring framework ,netflix oss ,reactive programming ,tutorial ,java

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}