Spring Integration - Message Gateway Adapters
Join the DZone community and get the full member experience.
Join For FreeThis article is going to present a technique for handling response types from message sub-systems in a service-based Message Driven Architecture (MDA). It will provide some theory, experience of message sub-system development and some robust working code that's running in several financial institution production deployments.
As Spring Integration is underpinned by a well established set of Spring APIs familiar to most developers, I have chosen this as a basis for this article.
If developers only ever had to build integration flows to handle positive business responses, this article would not be necessary. However, real world development, particularly within service integration environments, results in developers having to deal not just with positive business responses but business exceptions, technical exceptions and services that become unresponsive.
Spring Integration Gateways
Typical Message Driven Architecture (MDA) Applications
As with Java design patterns for application development, similar patterns exist for integration applications. One such common integration pattern is service-chaining - a number of services are connected in series or parallel that together perform a business function. If you've built and deployed services using MuleSoft's Mule, SpringSource's Spring Integration, FuseSource's ServiceMix (Camel), Oracle's Service Bus or IBMs WebSphere ESB the chances are that you've already built an application using chained-services. This pattern will become ever more widespread as the software industry moves away from client-server topologies towards service based architectures.
A recent engagement in which Incept5 supplied architectural consultancy will be used as an example implementation of the service-chain application pattern. The engagement required the team to build a solution that would transition financial messages (SWIFT FIN) from a raw (ISO15022) state through binding, validation and transformation (into XML) and ultimately added to a data store for further processing or business exception management.
Using EIP graph notation, the first phase service composition could be represented as follows. A document message arrives into the application domain, it's parsed and bound to a Java object, semantically validated as a SWIFT message, transformed to XML and then stored in a datastore. As a side note, the binding, semantic validation and transformation are all performed by C24s iO product. Furthermore, the full solution sample that can be found in GitHub for this project contains dispatcher configurations in order that thread pools can be used for each message sub-system, for the sake of brevity and clarity, such details will be omitted from this article.

Unresponsive Service Invocation
<int:channel id="parsing-gw-request-channel" datatype="java.lang.String"> <int:queue capacity="${gateway.parse.queue.capacity}"/> </int:channel> <int:gateway id="parseGateway" service-interface= "com.c24.solution.swift.flatten.gateway.ParseGateway" default-request-channel="parsing-gw-request-channel" default-reply-channel="parsing-gw-reply-channel" default-reply-timeout="${gateway.parse.timeout}"/> <int:channel id="parsing-gw-reply-channel" datatype="biz.c24.io.api.data.ComplexDataObject"/>

A small amount of additional configuration and some very simple Java interfaces means that developers can now configure request/reply timeouts on the gateway and avoid any unresponsive code - assuming (as we always should) that code written locally or supplied by 3rd parties is capable of misbehaving.
Additionally, the SI gateway allows specification of an error handling channel so that you have the opportunity to handle errors. This design I am presenting here is not going to use error channels but handle them in a different way - hopefully that will become more obvious as the design evolves.
Specific semantics and configuration examples for gateways and gateway timeouts can be found in reference material provided by SpringSource and other blogs.
A significant benefit to the integration solution has been added with the use of Spring Integration gateways. However, further improvements need to be made for the following reasons:
A gateway timeout will result in a null value being returned to the calling, or outer, flow. This is a business exception condition that means that the payload that was undergoing processing needs to be pushed into a business exception management process in order that it can be progressed through to conclusion - normally dictated by the business. If a transient technical issue caused this problem, resubmission may solve the exception, otherwise further investigation will be required. Whatever the eventual outcome, context about the failure and it's location need to be recorded and made available to the business exception management operative. The context must contain information about the message sub-system that failed to process the message and the message itself.
If transactions were involved in this flow, for example a transactional JMS message consumption flow trigger, it would be possible to rollback the transaction in order that it can be re-queued to a DLQ. However, the design in this software compensates for failures by pushing failing messages to a destination configured by the developer directly; and this may of course be a JMS queue if that's required. This avoids transaction rollback and adds the benefit of exception context directly rather than operations staff and developers having to scour logs to locate exception details.
The Gateway Adapter
In order to handle all exceptions taking place within a message sub-system and also handle null values a Gateway Adapter class can be used. The reason that the SI gateway error channel is not used for this purpose is that it would be complex to define and would have to be done in several places. The gateway adapter allows all business exception conditions to be handled in one place and treat them in the same way. The Gateway Adapter is a custom written Spring bean that invokes the injected gateway directly and manages null and exception responses before allowing the invocation request to return to the caller (or outer flow).
The architectural design diagram evolved from the previous design phase includes a service activator backed by a Gateway Adapter bean, this calls the injected gateway (dynamic proxy) which calls the business service.
Nuts and Bolts
The design diagrams are a useful guide for making the point but maybe the more interesting part is the configuration and code itself. As with the entire solution, the outer or calling flow can be seen in the project located in GitHub, however a useful snippet to view is one of the gateway adapter namespace configurations, the gateway, the gateway adapter and the gateway service configuration. As a number of gateways exist in this application, and they all follow the same pattern, the following configuration and code will merely demonstrate one of them.
Gateway Adapter Namespace Configuration
<int:channel id="message-parse-channel" datatype="java.lang.String"/> <int:chain input-channel="message-parse-channel" output-channel="message-validate-channel"> <int:service-activator ref="parseGatewayService" method="service"/> </int:chain>
Gateway
package com.c24.solution.swift.flatten.gateway; import org.springframework.integration.Message; /** * @author Matt Vickery - matt.vickery@incept5.com * @since 17/05/2012 */ public interface ParseGateway { public Message<?> send(Message<String> message); }
GatewayAdapter
package com.c24.solution.swift.flatten.gateway.adapter; import biz.c24.io.api.data.ComplexDataObject; import com.c24.solution.swift.flatten.exception.ExceptionContext; import com.c24.solution.swift.flatten.exception.ExceptionSubContext; import com.c24.solution.swift.flatten.gateway.ExceptionGatewayService; import com.c24.solution.swift.flatten.gateway.ParseGateway; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.integration.Message; import org.springframework.util.Assert; /** * @author Matt Vickery - matt.vickery@incept5.com * @since 17/05/2012 */ public class ParseGatewayService extends AbstractGatewayService { private static final Logger LOG = LoggerFactory.getLogger(ParseGatewayService.class); private final ParseGateway parseGateway; private final ExceptionGatewayService exceptionGatewayService; public ParseGatewayService( final ParseGateway parseGateway, final ExceptionGatewayService exceptionGatewayService) { this.parseGateway = parseGateway; this.exceptionGatewayService = exceptionGatewayService; } public Message<ComplexDataObject> service(Message<String> message) { Message<?> response; try { LOG.debug("Entering parse gateway."); response = parseGateway.send(message); } catch (RuntimeException e) { LOG.error("Exception response .. {}, exception: {}", getClass(), e); LOG.error("Invoking ... process because: {}.", e.getCause()); buildExceptionContextAndDispatch( message, ExceptionContext.PARSE_FAILURE, ExceptionSubContext.EXCEPTION_GATEWAY_RESPONSE, exceptionGatewayService); throw e; } if (response != null) { if (!(response.getPayload() instanceof ComplexDataObject)) throw new IllegalStateException(INTERRUPTING_..._EXCEPTION); } else { LOG.info("Null response received ....", getClass()); buildExceptionContextAndDispatch( message, ExceptionContext.PARSE_FAILURE, ExceptionSubContext.NULL_GATEWAY_RESPONSE, exceptionGatewayService); throw new GatewayAdapterException(NULL_GATEWAY_RESPONSE_CAUGHT); } Assert.state(response.getPayload() instanceof ComplexDataObject); return (Message<ComplexDataObject>) response; } }
GatewayService
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:c24="http://schema.c24.biz/spring-integration" xmlns:int="http://www.springframework.org/schema/integration" xsi:schemaLocation="http://www.springframework.org/schema/integration http://www..org/schema/integration/spring-integration.xsd http://schema.c24.biz/spring-integration http://schema.c24.biz/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int:channel id="parsing-gw-request-channel" datatype="java.lang.String"> <int:queue capacity="${gateway.parse.queue.capacity}"/> </int:channel> <int:gateway id="parseGateway" service-interface="com.c24.solution.swift.flatten.gateway.ParseGateway" default-request-channel="parsing-gw-request-channel" default-reply-channel="parsing-gw-reply-channel" default-reply-timeout="${gateway.parse.timeout}"/> <int:channel id="parsing-gw-reply-channel" datatype="biz.c24.io.api.data.ComplexDataObject"/> <int:chain input-channel="parsing-gw-request-channel" output-channel="parsing-gw-reply-channel"> <int:poller fixed-delay="50" task-executor="binding-thread"/> <c24:unmarshalling-transformer id="c24Mt541UnmarshallingTransformer" source-factory-ref="textualSourceFactory" model-ref="mt541Model"/> </int:chain> </beans>
Summary
The intent for the Gateway Adapter should be clear from the example configuration and code provided but can also be run and tested using the full source located on GitHub. Please leave any feedback or comments as you see fit.
Opinions expressed by DZone contributors are their own.
Comments