DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workkloads.

Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • How To Validate HTTP Post Request Body - Restful Web Services With Spring Framework | Spring Boot
  • Composite Requests in Salesforce Are a Great Idea
  • RESTful Web Services With Spring Boot: Reading HTTP POST Request Body
  • Build Reactive REST APIs With Spring WebFlux

Trending

  • Scalable System Design: Core Concepts for Building Reliable Software
  • Enhancing Security With ZTNA in Hybrid and Multi-Cloud Deployments
  • Fixing Common Oracle Database Problems
  • Stateless vs Stateful Stream Processing With Kafka Streams and Apache Flink
  1. DZone
  2. Software Design and Architecture
  3. Integration
  4. Non-Blocking REST Services With Spring

Non-Blocking REST Services With Spring

This article demonstrates how to develop non-blocking, end-to-end integration with external systems in Spring-based applications using a loan broker example from the Enterprise Integration Patterns book.

By 
Muhammad Noor user avatar
Muhammad Noor
·
Dec. 22, 15 · Tutorial
Likes (20)
Comment
Save
Tweet
Share
82.4K Views

Join the DZone community and get the full member experience.

Join For Free

Request per thread is an old beast and should be avoided as much as possible. It not only affects an application's scalability but can also lead to cascading failures. Event driven architecture is an excellent software architecture pattern for effective utilization of resources. In the context of non-blocking services, it helps in allocating threads only when the response is ready for processing.

This article demonstrates how to develop non-blocking, end-to-end integration with external systems in Spring-based applications. To illustrate a moderately complex integration problem, I have chosen a well-known loan broker system example from the enterprise integration patterns book. For those who are not aware of this example, it is a system that gathers quotations from different banks for a loan request. This helps applicants in selecting the best available quotation.  

To keep the illustration short and to the point, I have intentionally avoided a few integration points/constructs from the original example. Also, the loan broker system below only returns the best quote in terms of interest the applicant has to pay by the end of the year, along with the principal amount. The example mainly relies on Spring MVC, Spring integration, and Spring Boot projects from the Spring stack. Integration with bank stubs is done via JMS queues and for that I have used ActiveMQ.  The diagram below depicts the internals of loan broker system.


Non-Blocking Loan Broker System













As shown in above diagram, the loan request arrives at LoanBrokerController then it is dispatched to multiple banks, and after that an aggregated response is sent back to the user. Each timeline in the diagram is labeled with its respective thread name. In this whole process of request/response the container keeps accepting the requests and does not block for any external interaction. The dotted lines shows JMS asynchronous communication with banks. This way we parallelize the requests to banks for quotation and aggregate the response before sending it back to the controller. The captions below different swimlanes highlight integration flows that are employed for orchestrating the whole system.

Below is the basic gateway interface along with controller from which it is invoked:

public interface LoanBrokerGateway {
	public ListenableFuture<Double> bestQuotation(Double loanAmount);
}

@RestController
public class LoanBrokerController {
	
	@Autowired
	private LoanBrokerGateway loanBrokerGateway;	
	private static final double BEST_QUOTE_VALUE = 0.04;
	
    @RequestMapping("/quotation")
    public DeferredResult<ResponseEntity<?>> quotation(final @RequestParam(value="loanAmount", required=true) Double loanAmount) throws Exception {
    	
    	final DeferredResult<ResponseEntity<?>> deferredResult = new DeferredResult<ResponseEntity<?>>(5000l);   	
    	deferredResult.onTimeout(new Runnable() {
			
			@Override
			public void run() { // Retry on timeout
				deferredResult.setErrorResult(ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT).body("Request timeout occurred."));
			}
		});
    	
    	ListenableFuture<Double> future = loanBrokerGateway.bestQuotation(loanAmount);
		future.addCallback(new ListenableFutureCallback<Double>() {
			@Override
			public void onSuccess(Double result) {
				// Double check response matches with request
				if(result.equals(loanAmount * BEST_QUOTE_VALUE))
					deferredResult.setResult(ResponseEntity.ok(result));
				else
					deferredResult.setErrorResult(ResponseEntity.status(HttpStatus.CONFLICT).body("Invalid quotation "+result+" for loan amount "+loanAmount));
			}
			@Override
			public void onFailure(Throwable t) {
				deferredResult.setErrorResult(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(t));
			}
		});				
		return deferredResult;
    }
}

As you can see in the above code snippet LoanBrokerGateway returns ListenableFuture, which tells the caller that the response will be available at some point in time in future. Hence, we can use DeferredResult to release the requesting thread and send the response from a different thread. The connection will remain open by the Servlet >= 3.0 container, till the response arrives or times out. Timeout value is provided in DeferredResult object during initialization. The validation check at line 29 is added just to ensure the response should match the best available quotation, otherwise, send a conflict error. I hope it is quite simple to follow so far, now let's dive into spring integration flows one by one in the same order shown in the above diagram from left to right, beginning with "loan-broker-scatter " integration flow.

<beans ...>
	<int:gateway service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway" 
				 async-executor="" default-request-channel="best_quote_channel"/>
	
	<int:channel id="best_quote_channel"/>
	
	<int:chain input-channel="best_quote_channel" output-channel="best_quote_pubsub_channel">
	
		<int:header-enricher>
			<int:header name="quote" expression="payload"/>
			<int:header name="future" 
			            expression="new org.springframework.util.concurrent.SettableListenableFuture()"/>
			<int:header name="loan_broker_selector" 
						expression="T(org.springframework.integration.loanbroker.NodeIdentificationUtil).nodeIdentifier()"/>			
		</int:header-enricher>
		
		<int:transformer expression="headers.future"/>
	
		<int:claim-check-in message-store="loanBrokerRequestStore"/>
	
		<int:header-enricher>
			<int:header name="loan_request_claim" expression="payload.toString()"/>
			<int:header name="correlation_Id" expression="payload.toString()"/>
			<int:header name="sequenceSize" expression="2"/>			
		</int:header-enricher>

		<int:transformer expression="headers.quote"/>
	
	</int:chain>

	<int:publish-subscribe-channel id="best_quote_pubsub_channel"/>

	<jms:outbound-channel-adapter 
			        channel="best_quote_pubsub_channel"
					destination="standardBankInQueue"
					session-transacted="true"
					connection-factory="connectionFactory" 
					auto-startup="true"/>				
	
	<jms:outbound-channel-adapter 
			        channel="best_quote_pubsub_channel"
					destination="primeBankInQueue"
					session-transacted="true"
					connection-factory="connectionFactory" 
					auto-startup="true"/>

	<int:transformer input-channel="best_quote_pubsub_channel" expression="headers.future"/>	
	
	<bean id="loanBrokerRequestTimeOutExpiryReaper" 
		  class="org.springframework.integration.store.MessageGroupStoreReaper">
		<property name="messageGroupStore" ref="loanBrokerRequestStore" />
		<property name="timeout" value="10000" />
	</bean>

	<task:scheduled-tasks scheduler="loanBrokerRequestTimeOutExpiryScheduler">
		<task:scheduled ref="loanBrokerRequestTimeOutExpiryReaper" method="run" fixed-rate="3000" />
	</task:scheduled-tasks>	
	
	<task:scheduler id="loanBrokerRequestTimeOutExpiryScheduler"/>
</beans>

In reference to the line numbers, below is the description of activities performed in loan-broker-scatter integration flow:

  • At line (2), while configuring the gateway, I have kept async-executor empty to disable gateway's default async handling. The reason I did that was I want to explicitly handle the async behavior of gateway instead of letting gateway create another thread pool and return listenable future.

  • At line (11), I am explicitly creating an instance of SettableListenableFuture. Later, I will return it to gateway so that it can safely release the calling thread.

  • At line (13), I am creating a custom message header with the name "loan_broker_slector" using SPEL. It internally calls a static "nodeIdentifier" function of  NodeIdentificationUtil class. This class generates a unique identifier for a node. This message header will then be used as selector later while collecting messages from banks output queues. This will ensure messages  generated from a node should return to itself.

  • At line (19), I am storing the ListenableFuture in the message store, so that later we can send the response back to the controller.

  • In header enricher at line (21), I am saving the claim-check identifier into header after converting to string. After that, I am creating two custom headers "correlation_id" and "sequenceSize". These headers are required to aggregate the result in loan-broker-gather integration flow. I have kept "sequenceSize" to '2', because I already know that number of banks to whom I have to send the request are also two.

  • As I want to send the same copy of the loan request to multiple banks, I have used a pub-sub channel at line (31). I would have used apply-sequence option in pub-sub to implicitly set sequenceSize, correlationId, and sequenceNumber headers, but correlationId was getting screwed up by JMS outbound adapters. I have not set "sequenceNumber" header, because the order of responses from different banks does not matter.

  • At line (55), I am scheduling a MessageGroupStoreReaper for erasing timed out messages from the message store. This helps in avoiding out of memory errors. 

I am skipping both banks flow, as those are only stubs, but are part of an example available on GitHub. Let's now move to loan-broker-gather integration flow.

<beans ...>
	<jms:message-driven-channel-adapter 
		destination="standardBankOutQueue" 
		channel="best_qoute_selection_channel" 
		auto-startup="true"
		selector="loan_broker_selector = '#{T(org.springframework.integration.loanbroker.NodeIdentificationUtil).nodeIdentifier()}'"
		connection-factory="connectionFactory"
		concurrent-consumers="10"
		max-concurrent-consumers="10"/>

	<jms:message-driven-channel-adapter 
		destination="primeBankOutQueue" 
		channel="best_qoute_selection_channel" 
		auto-startup="true"
		selector="loan_broker_selector = '#{T(org.springframework.integration.loanbroker.NodeIdentificationUtil).nodeIdentifier()}'"
		connection-factory="connectionFactory"
		concurrent-consumers="10"
		max-concurrent-consumers="10"/>
	
	<int:channel id="best_qoute_selection_channel"/>

	<int:chain input-channel="best_qoute_selection_channel" output-channel="result_channel">

		<int:aggregator release-strategy-expression="size()==2"
						correlation-strategy-expression="headers['correlation_Id']"
						send-partial-result-on-expiry="false"
						expire-groups-upon-completion="true" 
						expire-groups-upon-timeout="true"
						message-store="loanBrokerResponseStore"/>
						
		<int:transformer >
			<int-scripting:script lang="javascript">
			<![CDATA[
			     (function(){
			     	 var indexRate = 10000; 
			     	 for(i=0; i<payload.size(); i++){
			     	   if(indexRate > payload.get(i))
			     	 	  indexRate = payload.get(i);
			     	 }
			     	 return indexRate;
			     })();
	   		]]>			
			</int-scripting:script>
		</int:transformer>
		
		<int:header-enricher>
			<int:header name="result" expression="payload"/>
		</int:header-enricher>
		
		<int:transformer expression="T(java.util.UUID).fromString(headers.loan_request_claim)"/>
		
		<int:claim-check-out message-store="loanBrokerRequestStore" remove-message="true"/>
		
		<int:transformer expression="headers.result"/>
	
	</int:chain>

	<int:channel id="result_channel"/>
		
	<int:logging-channel-adapter channel="result_channel" expression="headers.future.set(payload)" />
	
	<bean id="loanBrokerResponseTimeOutExpiryReaper" 
		  class="org.springframework.integration.store.MessageGroupStoreReaper">
		<property name="messageGroupStore" ref="loanBrokerResponseStore" />
		<property name="timeout" value="10000" />
	</bean>

	<task:scheduled-tasks scheduler="loanBrokerResponseTimeOutExpiryScheduler">
		<task:scheduled ref="loanBrokerResponseTimeOutExpiryReaper" method="run" fixed-rate="3000" />
	</task:scheduled-tasks>	
	
	<task:scheduler id="loanBrokerResponseTimeOutExpiryScheduler"/>	
</beans>

Loan broker gather flow is the inverse of a scatter flow. In reference to the line numbers, below is the description of activities performed in this integration flow:

  • At line (2) and (11) it is picking the response from output queues of banks.  To make sure  the response should come to the same node, I have mentioned same message selector used at the time of scattering the message.

  • At line (24), it aggregates the response of both banks into a list. As a release strategy, we have mentioned that the size of the response list should be equal to '2'. Also, I have specified custom correlation identified at line (25) to correlate the responses with each other.

  • At line (31), it transforms the list outcome of the aggregate by selecting minimum interest rate from the list.

  • At line (47), I am saving the best quotation into the header, because I have to check out the listenable future from the message store in downstream to set the result.

  • At line (60), I am explicitly setting the result on listenable future we checked out from the message store. This will then invoke the listenable future callback functions in the controller, wherein which we hand over the result to container by setting result on DeferredResult.

  • At line (68), I am scheduling a MessageGroupStoreReaper for erasing timed out messages from the message store.

Outcome:

Below is the log captured from one of the executions of a loan request. I have selectively picked only interesting events from the logs. Just observe how each step to process request transitions from one thread to another. Each log event captured below can be represented using the below format:

"[thread] SimpleClassName - logMessage"

[http-nio-8080-exec-1] DispatcherServlet - DispatcherServlet with name 'dispatcherServlet' processing GET request for [/quotation]
....
[http-nio-8080-exec-1] DynamicJmsTemplate - Sending created message: ActiveMQObjectMessage {commandId = 0, responseRequired = false,properties = {quote=1020.0, sequenceSize=2, loan_broker_selector=07e93936-7bcc-45b6-be5a-0126a965eed5, loan_request_claim=5d59c870-60ab-3892-444f-37bd61461731, correlation_Id=5d59c870-60ab-3892-444f-37bd61461731}
....
[http-nio-8080-exec-1] DynamicJmsTemplate - Sending created message: ActiveMQObjectMessage {commandId = 0, responseRequired = false,properties = {quote=1020.0, sequenceSize=2, loan_broker_selector=07e93936-7bcc-45b6-be5a-0126a965eed5, loan_request_claim=5d59c870-60ab-3892-444f-37bd61461731, correlation_Id=5d59c870-60ab-3892-444f-37bd61461731}
....
[http-nio-8080-exec-1] WebAsyncManager - Concurrent handling starting for GET [/quotation]
[http-nio-8080-exec-1] DispatcherServlet - Leaving response open for concurrent processing
....
[org.springframework.jms.listener.DefaultMessageListenerContainer#3-2] BankStub -  >> Callculating best quotation for price 1020.0 is 51.0. Where approx. calculation time is 1010 ms.
....
[org.springframework.jms.listener.DefaultMessageListenerContainer#0-5] AggregatingMessageHandler - org.springframework.integration.handler.MessageHandlerChain#1$child#0.handler received message: GenericMessage [payload=51.0, headers={timestamp=1450496257021, id=9f1d7387-b06a-8086-e356-0737d7ac6f6c, jms_timestamp=1450496257014, jms_redelivered=false, priority=4, quote=1020.0, sequenceSize=2, loan_broker_selector=07e93936-7bcc-45b6-be5a-0126a965eed5, loan_request_claim=5d59c870-60ab-3892-444f-37bd61461731, correlation_Id=5d59c870-60ab-3892-444f-37bd61461731, jms_messageId=ID:Husains-MacBook-Pro.local-53176-1450496240701-3:1:39:1:1}]
....
[org.springframework.jms.listener.DefaultMessageListenerContainer#2-3] BankStub -  >> Callculating best quotation for price 1020.0 is 40.800000000000004. Where approx. calculation time is 1077 ms.
....
[org.springframework.jms.listener.DefaultMessageListenerContainer#1-1] AggregatingMessageHandler - org.springframework.integration.handler.MessageHandlerChain#1$child#0.handler received message: GenericMessage [payload=40.800000000000004, headers={timestamp=1450496257088, id=729752b7-f8f0-b952-bcda-36379de76fa3, jms_timestamp=1450496257083, jms_redelivered=false, priority=4, quote=1020.0, sequenceSize=2, loan_broker_selector=07e93936-7bcc-45b6-be5a-0126a965eed5, loan_request_claim=5d59c870-60ab-3892-444f-37bd61461731, correlation_Id=5d59c870-60ab-3892-444f-37bd61461731, jms_messageId=ID:Husains-MacBook-Pro.local-53176-1450496240701-3:1:28:1:1}]
....
[org.springframework.jms.listener.DefaultMessageListenerContainer#1-1] AbstractCorrelatingMessageHandler - Completing group with correlationKey [5d59c870-60ab-3892-444f-37bd61461731]
....
[org.springframework.jms.listener.DefaultMessageListenerContainer#1-1] LoggingHandler - org.springframework.integration.handler.LoggingHandler#0 received message: GenericMessage [payload=40.800000000000004, headers={errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2f92b51, result=40.800000000000004, jms_timestamp=1450496257083, 
....
[org.springframework.jms.listener.DefaultMessageListenerContainer#1-1] WebAsyncManager - Concurrent result value [<200 OK,40.800000000000004,{}>] - dispatching request to resume processing
....
[http-nio-8080-exec-2] RequestMappingHandlerAdapter - Found concurrent result value [<200 OK,40.800000000000004,{}>]
....
[http-nio-8080-exec-2] DispatcherServlet - Successfully completed request
  • Line (1) shows the event at which the request arrives at spring MVC dispatcher servlet.

  • Line (3) & (5) shows JMS messages being sent to both banks' inbound queues on same HTTP request processing thread.

  • Line (7) shows spring MVC registering AsyncContext with the container and keeping the connection open for response at line (8). This happened because we are returning DeferredResult from LoanBrokerController.

  • Line (10) & (14) shows loan requests processing by different bank stubs at different points in time. I have put a random delay in each stub between 1000 to 1500 ms. Hence, the order of processing will vary on each execution.

  • Line (12) & (16) shows the request reaching to an aggregate component of spring integration via message-driven-channel-adapter, which internally use DefaultMessageListenerContainer.

  • Line (18) shows aggregate group completion when quotation response from banks reaches to aggregate. Here onward it uses the same thread on which aggregation completes. So whichever response completes the aggregation, that response thread will be used to set the result on ListenableFuture, which eventually sets result on DeferredResult. That is why until line (22) each activity occurred on the same thread.

  • Lines (24) and (26) show spring MVC sending the response back to the client on a different thread compared to the request.

Final Thoughts:

The way I have developed the non-blocking loan broker system may not be the only way, but it is very close to event driven architecture. As shown in the outcome section, each thread kicks in whenever there is an event to process. I have not used any of the blocking constructs like JMS gateway from spring integration. JMS response messages are only processed when they are ready. Until then there is no overhead of threads on the server. Also, I did not utilized proxy gateway default async behavior to avoid blocking the calling threads from controller, in case someone wants to use JMS gateway to keep integration flows simple. That way we would still be prone to request per thread issues. I understand it is bit complex, but it pays off at the end, when scalability is more demanding. Moreover, you can even avoid circuit breaker pattern, because with this way of integration there is no thread waiting for response.

I hope you enjoyed the article. The complete source code is available on GitHub. I have also kept the JMeter test along with source code, just in case you want to load test loan broker system by yourself. Your comments are more than welcome.

Spring Framework REST Web Protocols Enterprise integration Requests Flow (web browser)

Opinions expressed by DZone contributors are their own.

Related

  • How To Validate HTTP Post Request Body - Restful Web Services With Spring Framework | Spring Boot
  • Composite Requests in Salesforce Are a Great Idea
  • RESTful Web Services With Spring Boot: Reading HTTP POST Request Body
  • Build Reactive REST APIs With Spring WebFlux

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!