Non-Blocking REST Services With Spring
This article demonstrates how to develop non-blocking, end-to-end integration with external systems in Spring-based applications using a loan broker example from the Enterprise Integration Patterns book.
Join the DZone community and get the full member experience.
Join For FreeRequest per thread is an old beast and should be avoided as much as possible. It not only affects an application's scalability but can also lead to cascading failures. Event driven architecture is an excellent software architecture pattern for effective utilization of resources. In the context of non-blocking services, it helps in allocating threads only when the response is ready for processing.
This article demonstrates how to develop non-blocking, end-to-end integration with external systems in Spring-based applications. To illustrate a moderately complex integration problem, I have chosen a well-known loan broker system example from the enterprise integration patterns book. For those who are not aware of this example, it is a system that gathers quotations from different banks for a loan request. This helps applicants in selecting the best available quotation.
To keep the illustration short and to the point, I have intentionally avoided a few integration points/constructs from the original example. Also, the loan broker system below only returns the best quote in terms of interest the applicant has to pay by the end of the year, along with the principal amount. The example mainly relies on Spring MVC, Spring integration, and Spring Boot projects from the Spring stack. Integration with bank stubs is done via JMS queues and for that I have used ActiveMQ. The diagram below depicts the internals of loan broker system.
As shown in above diagram, the loan request arrives at LoanBrokerController then it is dispatched to multiple banks, and after that an aggregated response is sent back to the user. Each timeline in the diagram is labeled with its respective thread name. In this whole process of request/response the container keeps accepting the requests and does not block for any external interaction. The dotted lines shows JMS asynchronous communication with banks. This way we parallelize the requests to banks for quotation and aggregate the response before sending it back to the controller. The captions below different swimlanes highlight integration flows that are employed for orchestrating the whole system.
Below is the basic gateway interface along with controller from which it is invoked:
public interface LoanBrokerGateway {
public ListenableFuture<Double> bestQuotation(Double loanAmount);
}
@RestController
public class LoanBrokerController {
@Autowired
private LoanBrokerGateway loanBrokerGateway;
private static final double BEST_QUOTE_VALUE = 0.04;
@RequestMapping("/quotation")
public DeferredResult<ResponseEntity<?>> quotation(final @RequestParam(value="loanAmount", required=true) Double loanAmount) throws Exception {
final DeferredResult<ResponseEntity<?>> deferredResult = new DeferredResult<ResponseEntity<?>>(5000l);
deferredResult.onTimeout(new Runnable() {
@Override
public void run() { // Retry on timeout
deferredResult.setErrorResult(ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT).body("Request timeout occurred."));
}
});
ListenableFuture<Double> future = loanBrokerGateway.bestQuotation(loanAmount);
future.addCallback(new ListenableFutureCallback<Double>() {
@Override
public void onSuccess(Double result) {
// Double check response matches with request
if(result.equals(loanAmount * BEST_QUOTE_VALUE))
deferredResult.setResult(ResponseEntity.ok(result));
else
deferredResult.setErrorResult(ResponseEntity.status(HttpStatus.CONFLICT).body("Invalid quotation "+result+" for loan amount "+loanAmount));
}
@Override
public void onFailure(Throwable t) {
deferredResult.setErrorResult(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(t));
}
});
return deferredResult;
}
}
As you can see in the above code snippet LoanBrokerGateway returns ListenableFuture, which tells the caller that the response will be available at some point in time in future. Hence, we can use DeferredResult to release the requesting thread and send the response from a different thread. The connection will remain open by the Servlet >= 3.0 container, till the response arrives or times out. Timeout value is provided in DeferredResult object during initialization. The validation check at line 29 is added just to ensure the response should match the best available quotation, otherwise, send a conflict error. I hope it is quite simple to follow so far, now let's dive into spring integration flows one by one in the same order shown in the above diagram from left to right, beginning with "loan-broker-scatter " integration flow.
<beans ...>
<int:gateway service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway"
async-executor="" default-request-channel="best_quote_channel"/>
<int:channel id="best_quote_channel"/>
<int:chain input-channel="best_quote_channel" output-channel="best_quote_pubsub_channel">
<int:header-enricher>
<int:header name="quote" expression="payload"/>
<int:header name="future"
expression="new org.springframework.util.concurrent.SettableListenableFuture()"/>
<int:header name="loan_broker_selector"
expression="T(org.springframework.integration.loanbroker.NodeIdentificationUtil).nodeIdentifier()"/>
</int:header-enricher>
<int:transformer expression="headers.future"/>
<int:claim-check-in message-store="loanBrokerRequestStore"/>
<int:header-enricher>
<int:header name="loan_request_claim" expression="payload.toString()"/>
<int:header name="correlation_Id" expression="payload.toString()"/>
<int:header name="sequenceSize" expression="2"/>
</int:header-enricher>
<int:transformer expression="headers.quote"/>
</int:chain>
<int:publish-subscribe-channel id="best_quote_pubsub_channel"/>
<jms:outbound-channel-adapter
channel="best_quote_pubsub_channel"
destination="standardBankInQueue"
session-transacted="true"
connection-factory="connectionFactory"
auto-startup="true"/>
<jms:outbound-channel-adapter
channel="best_quote_pubsub_channel"
destination="primeBankInQueue"
session-transacted="true"
connection-factory="connectionFactory"
auto-startup="true"/>
<int:transformer input-channel="best_quote_pubsub_channel" expression="headers.future"/>
<bean id="loanBrokerRequestTimeOutExpiryReaper"
class="org.springframework.integration.store.MessageGroupStoreReaper">
<property name="messageGroupStore" ref="loanBrokerRequestStore" />
<property name="timeout" value="10000" />
</bean>
<task:scheduled-tasks scheduler="loanBrokerRequestTimeOutExpiryScheduler">
<task:scheduled ref="loanBrokerRequestTimeOutExpiryReaper" method="run" fixed-rate="3000" />
</task:scheduled-tasks>
<task:scheduler id="loanBrokerRequestTimeOutExpiryScheduler"/>
</beans>
In reference to the line numbers, below is the description of activities performed in loan-broker-scatter integration flow:
At line (2), while configuring the gateway, I have kept async-executor empty to disable gateway's default async handling. The reason I did that was I want to explicitly handle the async behavior of gateway instead of letting gateway create another thread pool and return listenable future.
At line (11), I am explicitly creating an instance of SettableListenableFuture. Later, I will return it to gateway so that it can safely release the calling thread.
At line (13), I am creating a custom message header with the name "loan_broker_slector" using SPEL. It internally calls a static "nodeIdentifier" function of NodeIdentificationUtil class. This class generates a unique identifier for a node. This message header will then be used as selector later while collecting messages from banks output queues. This will ensure messages generated from a node should return to itself.
At line (19), I am storing the ListenableFuture in the message store, so that later we can send the response back to the controller.
In header enricher at line (21), I am saving the claim-check identifier into header after converting to string. After that, I am creating two custom headers "correlation_id" and "sequenceSize". These headers are required to aggregate the result in loan-broker-gather integration flow. I have kept "sequenceSize" to '2', because I already know that number of banks to whom I have to send the request are also two.
As I want to send the same copy of the loan request to multiple banks, I have used a pub-sub channel at line (31). I would have used apply-sequence option in pub-sub to implicitly set sequenceSize, correlationId, and sequenceNumber headers, but correlationId was getting screwed up by JMS outbound adapters. I have not set "sequenceNumber" header, because the order of responses from different banks does not matter.
At line (55), I am scheduling a MessageGroupStoreReaper for erasing timed out messages from the message store. This helps in avoiding out of memory errors.
I am skipping both banks flow, as those are only stubs, but are part of an example available on GitHub. Let's now move to loan-broker-gather integration flow.
<beans ...>
<jms:message-driven-channel-adapter
destination="standardBankOutQueue"
channel="best_qoute_selection_channel"
auto-startup="true"
selector="loan_broker_selector = '#{T(org.springframework.integration.loanbroker.NodeIdentificationUtil).nodeIdentifier()}'"
connection-factory="connectionFactory"
concurrent-consumers="10"
max-concurrent-consumers="10"/>
<jms:message-driven-channel-adapter
destination="primeBankOutQueue"
channel="best_qoute_selection_channel"
auto-startup="true"
selector="loan_broker_selector = '#{T(org.springframework.integration.loanbroker.NodeIdentificationUtil).nodeIdentifier()}'"
connection-factory="connectionFactory"
concurrent-consumers="10"
max-concurrent-consumers="10"/>
<int:channel id="best_qoute_selection_channel"/>
<int:chain input-channel="best_qoute_selection_channel" output-channel="result_channel">
<int:aggregator release-strategy-expression="size()==2"
correlation-strategy-expression="headers['correlation_Id']"
send-partial-result-on-expiry="false"
expire-groups-upon-completion="true"
expire-groups-upon-timeout="true"
message-store="loanBrokerResponseStore"/>
<int:transformer >
<int-scripting:script lang="javascript">
<![CDATA[
(function(){
var indexRate = 10000;
for(i=0; i<payload.size(); i++){
if(indexRate > payload.get(i))
indexRate = payload.get(i);
}
return indexRate;
})();
]]>
</int-scripting:script>
</int:transformer>
<int:header-enricher>
<int:header name="result" expression="payload"/>
</int:header-enricher>
<int:transformer expression="T(java.util.UUID).fromString(headers.loan_request_claim)"/>
<int:claim-check-out message-store="loanBrokerRequestStore" remove-message="true"/>
<int:transformer expression="headers.result"/>
</int:chain>
<int:channel id="result_channel"/>
<int:logging-channel-adapter channel="result_channel" expression="headers.future.set(payload)" />
<bean id="loanBrokerResponseTimeOutExpiryReaper"
class="org.springframework.integration.store.MessageGroupStoreReaper">
<property name="messageGroupStore" ref="loanBrokerResponseStore" />
<property name="timeout" value="10000" />
</bean>
<task:scheduled-tasks scheduler="loanBrokerResponseTimeOutExpiryScheduler">
<task:scheduled ref="loanBrokerResponseTimeOutExpiryReaper" method="run" fixed-rate="3000" />
</task:scheduled-tasks>
<task:scheduler id="loanBrokerResponseTimeOutExpiryScheduler"/>
</beans>
Loan broker gather flow is the inverse of a scatter flow. In reference to the line numbers, below is the description of activities performed in this integration flow:
At line (2) and (11) it is picking the response from output queues of banks. To make sure the response should come to the same node, I have mentioned same message selector used at the time of scattering the message.
At line (24), it aggregates the response of both banks into a list. As a release strategy, we have mentioned that the size of the response list should be equal to '2'. Also, I have specified custom correlation identified at line (25) to correlate the responses with each other.
At line (31), it transforms the list outcome of the aggregate by selecting minimum interest rate from the list.
At line (47), I am saving the best quotation into the header, because I have to check out the listenable future from the message store in downstream to set the result.
At line (60), I am explicitly setting the result on listenable future we checked out from the message store. This will then invoke the listenable future callback functions in the controller, wherein which we hand over the result to container by setting result on DeferredResult.
At line (68), I am scheduling a MessageGroupStoreReaper for erasing timed out messages from the message store.
Outcome:
Below is the log captured from one of the executions of a loan request. I have selectively picked only interesting events from the logs. Just observe how each step to process request transitions from one thread to another. Each log event captured below can be represented using the below format:
"[thread] SimpleClassName - logMessage"
[http-nio-8080-exec-1] DispatcherServlet - DispatcherServlet with name 'dispatcherServlet' processing GET request for [/quotation]
....
[http-nio-8080-exec-1] DynamicJmsTemplate - Sending created message: ActiveMQObjectMessage {commandId = 0, responseRequired = false,properties = {quote=1020.0, sequenceSize=2, loan_broker_selector=07e93936-7bcc-45b6-be5a-0126a965eed5, loan_request_claim=5d59c870-60ab-3892-444f-37bd61461731, correlation_Id=5d59c870-60ab-3892-444f-37bd61461731}
....
[http-nio-8080-exec-1] DynamicJmsTemplate - Sending created message: ActiveMQObjectMessage {commandId = 0, responseRequired = false,properties = {quote=1020.0, sequenceSize=2, loan_broker_selector=07e93936-7bcc-45b6-be5a-0126a965eed5, loan_request_claim=5d59c870-60ab-3892-444f-37bd61461731, correlation_Id=5d59c870-60ab-3892-444f-37bd61461731}
....
[http-nio-8080-exec-1] WebAsyncManager - Concurrent handling starting for GET [/quotation]
[http-nio-8080-exec-1] DispatcherServlet - Leaving response open for concurrent processing
....
[org.springframework.jms.listener.DefaultMessageListenerContainer#3-2] BankStub - >> Callculating best quotation for price 1020.0 is 51.0. Where approx. calculation time is 1010 ms.
....
[org.springframework.jms.listener.DefaultMessageListenerContainer#0-5] AggregatingMessageHandler - org.springframework.integration.handler.MessageHandlerChain#1$child#0.handler received message: GenericMessage [payload=51.0, headers={timestamp=1450496257021, id=9f1d7387-b06a-8086-e356-0737d7ac6f6c, jms_timestamp=1450496257014, jms_redelivered=false, priority=4, quote=1020.0, sequenceSize=2, loan_broker_selector=07e93936-7bcc-45b6-be5a-0126a965eed5, loan_request_claim=5d59c870-60ab-3892-444f-37bd61461731, correlation_Id=5d59c870-60ab-3892-444f-37bd61461731, jms_messageId=ID:Husains-MacBook-Pro.local-53176-1450496240701-3:1:39:1:1}]
....
[org.springframework.jms.listener.DefaultMessageListenerContainer#2-3] BankStub - >> Callculating best quotation for price 1020.0 is 40.800000000000004. Where approx. calculation time is 1077 ms.
....
[org.springframework.jms.listener.DefaultMessageListenerContainer#1-1] AggregatingMessageHandler - org.springframework.integration.handler.MessageHandlerChain#1$child#0.handler received message: GenericMessage [payload=40.800000000000004, headers={timestamp=1450496257088, id=729752b7-f8f0-b952-bcda-36379de76fa3, jms_timestamp=1450496257083, jms_redelivered=false, priority=4, quote=1020.0, sequenceSize=2, loan_broker_selector=07e93936-7bcc-45b6-be5a-0126a965eed5, loan_request_claim=5d59c870-60ab-3892-444f-37bd61461731, correlation_Id=5d59c870-60ab-3892-444f-37bd61461731, jms_messageId=ID:Husains-MacBook-Pro.local-53176-1450496240701-3:1:28:1:1}]
....
[org.springframework.jms.listener.DefaultMessageListenerContainer#1-1] AbstractCorrelatingMessageHandler - Completing group with correlationKey [5d59c870-60ab-3892-444f-37bd61461731]
....
[org.springframework.jms.listener.DefaultMessageListenerContainer#1-1] LoggingHandler - org.springframework.integration.handler.LoggingHandler#0 received message: GenericMessage [payload=40.800000000000004, headers={errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2f92b51, result=40.800000000000004, jms_timestamp=1450496257083,
....
[org.springframework.jms.listener.DefaultMessageListenerContainer#1-1] WebAsyncManager - Concurrent result value [<200 OK,40.800000000000004,{}>] - dispatching request to resume processing
....
[http-nio-8080-exec-2] RequestMappingHandlerAdapter - Found concurrent result value [<200 OK,40.800000000000004,{}>]
....
[http-nio-8080-exec-2] DispatcherServlet - Successfully completed request
Line (1) shows the event at which the request arrives at spring MVC dispatcher servlet.
Line (3) & (5) shows JMS messages being sent to both banks' inbound queues on same HTTP request processing thread.
Line (7) shows spring MVC registering AsyncContext with the container and keeping the connection open for response at line (8). This happened because we are returning DeferredResult from LoanBrokerController.
Line (10) & (14) shows loan requests processing by different bank stubs at different points in time. I have put a random delay in each stub between 1000 to 1500 ms. Hence, the order of processing will vary on each execution.
Line (12) & (16) shows the request reaching to an aggregate component of spring integration via message-driven-channel-adapter, which internally use DefaultMessageListenerContainer.
Line (18) shows aggregate group completion when quotation response from banks reaches to aggregate. Here onward it uses the same thread on which aggregation completes. So whichever response completes the aggregation, that response thread will be used to set the result on ListenableFuture, which eventually sets result on DeferredResult. That is why until line (22) each activity occurred on the same thread.
Lines (24) and (26) show spring MVC sending the response back to the client on a different thread compared to the request.
Final Thoughts:
The way I have developed the non-blocking loan broker system may not be the only way, but it is very close to event driven architecture. As shown in the outcome section, each thread kicks in whenever there is an event to process. I have not used any of the blocking constructs like JMS gateway from spring integration. JMS response messages are only processed when they are ready. Until then there is no overhead of threads on the server. Also, I did not utilized proxy gateway default async behavior to avoid blocking the calling threads from controller, in case someone wants to use JMS gateway to keep integration flows simple. That way we would still be prone to request per thread issues. I understand it is bit complex, but it pays off at the end, when scalability is more demanding. Moreover, you can even avoid circuit breaker pattern, because with this way of integration there is no thread waiting for response.
I hope you enjoyed the article. The complete source code is available on GitHub. I have also kept the JMeter test along with source code, just in case you want to load test loan broker system by yourself. Your comments are more than welcome.
Opinions expressed by DZone contributors are their own.
Comments