DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Integrate Spring With Open AI
  • Enterprise RIA With Spring 3, Flex 4 and GraniteDS
  • 5 DevOps Tools To Add to Your Stack in 2022
  • Secure Spring Boot Application With Keycloak

Trending

  • MCP Servers: The Technical Debt That Is Coming
  • The Human Side of Logs: What Unstructured Data Is Trying to Tell You
  • Secure by Design: Modernizing Authentication With Centralized Access and Adaptive Signals
  • Kullback–Leibler Divergence: Theory, Applications, and Implications
  1. DZone
  2. Testing, Deployment, and Maintenance
  3. Deployment
  4. Spring Integration: Splitter-Aggregator

Spring Integration: Splitter-Aggregator

Within Spring Integration, one form of EIP scatter-gather is provided by the splitter and aggregator constructs.

By 
Matt Vickery user avatar
Matt Vickery
·
May. 18, 12 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
47.1K Views

Join the DZone community and get the full member experience.

Join For Free

Within Spring Integration, one form of EIP scatter-gather is provided by the splitter and aggregator constructs. Semantics for both of these are pretty straight forward to understand, the splitter receives an input message and returns a list of objects that are each turned into first-class messages. Those messages get routed to message handlers following which they are aggregated as a list of input messages by an aggregator. This pattern can be used successfully with fairly simple configuration.

The following example works with trivial message processing taking place. It uses simple one-fail-all-fail error handling semantics. This means that any errors occurring in one or both services will result in an exception being thrown back to the invoker.

Having worked on projects that require a far more robust solution following partial failure, I've written another post that includes details regarding a design strategy to support that - A Robust Splitter Aggregator Strategy <URL>.

Let's explore how this context configuration is working and what happens in circumstances where and exception is thrown by one of the services.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/integration
       http://www.springframework.org/schema/integration/spring-integration-2.1.xsd">

    <!--##############################################-->
    <!-- Bean specifications.  -->
    <!--##############################################-->
    <import resource="bean-refs.xml"/>

    <!--##############################################-->
    <!-- Gateway specification. -->
    <!--##############################################-->
    <int:gateway service-interface="com.l8mdv.sample.gateway.BrokerRequestGateway"/>
    <int:channel id="broker-request-channel"
                 datatype="com.l8mdv.sa.BrokerRequestMessage"/>

    <!--##############################################-->
    <!-- Request message splitter. -->
    <!--##############################################-->
    <int:splitter input-channel="broker-request-channel"
                  output-channel="broker-router-channel"

                  ref="brokerQuoteRequestSplitter"/>

    <!--##############################################-->
    <!-- Request message routing. -->
    <!--##############################################-->
    <int:channel id="broker-router-channel"
                 datatype="com.l8mdv.sa.BrokerQuoteRequestMessage"/>
    <int:recipient-list-router input-channel="broker-router-channel">
        <int:recipient channel="openex-broker-channel"
                       selector-expression="payload.BrokerName.equals('openex')"/>
        <int:recipient channel="yahoo-broker-channel"
                       selector-expression="payload.BrokerName.equals('yahoo')"/>
    </int:recipient-list-router>

    <!--##############################################-->
    <!-- Request message routing to OpenEx. -->
    <!--##############################################-->
    <int:channel id="openex-broker-channel" datatype="com.l8mdv.sa.BrokerQuoteRequestMessage"/>
    <int:chain input-channel="openex-broker-channel"
               output-channel="aggregator-channel">
        <int:service-activator>
            <bean id="OpenExServiceFaker" class="com.l8mdv.sample.ServiceFaker">
                <constructor-arg name="response" ref="OpenExFakeResponseData"/>
            </bean>
        </int:service-activator>
    </int:chain>

    <!--##############################################-->
    <!-- Request message routing to Yahoo. -->
    <!--##############################################-->
    <int:channel id="yahoo-broker-channel"
                 datatype="com.l8mdv.sa.BrokerQuoteRequestMessage"/>
    <int:chain input-channel="yahoo-broker-channel"
               output-channel="aggregator-channel">
        <int:service-activator>
            <bean id="YahooServiceFaker" class="com.l8mdv.sample.ServiceFaker">
                <constructor-arg name="response" ref="YahooFakeResponseData"/>
            </bean>
        </int:service-activator>
    </int:chain>

    <!--##############################################-->
    <!-- Response message handling, return the best -->
    <!-- quote to the invoker. -->
    <!--##############################################-->
    <int:channel id="aggregator-channel" datatype="com.l8mdv.sa.BrokerQuoteResponseMessage"/>
    <int:chain input-channel="aggregator-channel">
        <int:aggregator/>
        <int:transformer ref="aggregationToBrokerQuoteResponseTransformer"/>
    </int:chain>

</beans>

Regarding the splitter and aggregator, the interesting configuration starts at line 25 with the splitter specification. The construct has been customised with input-channel, output-channel and ref attributes. The channel configuration for this construct is obvious, the other attribute allows association of a bean reference that is able to perform the split function. It's generally true that a splitter bean with appropriate arguments and return types will be invoked if it's unambiguous.

The next stop for messages is the router that's defined on line 35, the recipient list router. This message endpoint is able to forward route messages into appropriate channels given an expression to invoke on the payload of the message. This router will examine the payload and route each message to one of two services - these are located on lines 48 and 62. The service(s) that are invoked are entirely dependent on what is returned from the splitter. Either or both of these services may be invoked for a given splitter input message.

Finally, results of one or two service invocations are routed towards the aggregator, the configuration starts for this at the chain defined at line 74. Finally the result of the aggregation is input to a transformer where some further processing takes place. Notice that there is no output channel on this chain, the implicitly created default output channel is relied upon here - it goes back through the gateway to the gateway invoker.

There are some interesting aspects to this service:

  1. If the splitter at line 25 receives a message but does not generate a list of one or more response messages, then an empty list will result on the router not getting called. This can be overridden by using the requires-reply attribute in which case an empty list will result in a message handling exception being thrown.
  2. Strong typing has been used on the data channels in an attempt to enforce strict processing rules and make the configuration easier to follow and understand.
  3. The chain construct has been used in an attempt to keep configuration compact where useful. It should be noticed that chain definition and strong typing are often two sides of the same coin. By grouping the aggregator and transformer in a chain I have been unable to control and hence document message type input to the transformer endpoint.
  4. Spring beans referenced from within this context have been loaded from an external file. Whilst they could have been component-scanned or defined in the same file I have chosen to keep them distinct in order that they are not loaded if not necessary for operation - I'd usually create mock and spy objects around these message endpoints.
  5. Any exception in Service Activator invocation at lines 48 and 62 would result in aggregation not completing. In this example, I have not created an error handler on the gateway in line 18 and so any exceptions thrown by SAs would result in an exception being thrown to the invoker of the gateway. A more robust solution, in the face of exception handling, would require a different design approach.
  6. I have documented, albeit briefly, intent for each section of the configuration specifically in order to help readers understand intentions of my design.

In the case that they're useful, Spring bean definitions are as follows:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="brokerQuoteRequestSplitter"
          class="com.l8mdv.sample.service.BrokerQuoteRequestSplitter"/>

    <bean id="brokerRawResponseTransformer"
          class="com.l8mdv.sample.service.impl.BrokerRawResponseTransformer"/>

    <bean id="aggregationToBrokerQuoteResponseTransformer"
          class="com.l8mdv.sample.service.impl.AggregationToBrokerQuoteResponseTransformer"/>

    <bean id="YahooFakeResponseData" class="com.l8mdv.sa.BrokerResponseMessage">
        <property name="brokerName" value="Yahoo"/>
        <property name="value" value="2"/>
        <property name="brokerRequestType" value="#{T(com.l8mdv.sa.BrokerRequestType).QUOTE}"/>
    </bean>

    <bean id="OpenExFakeResponseData" class="com.l8mdv.sa.BrokerResponseMessage">
        <property name="brokerName" value="OpenEx"/>
        <property name="value" value="5"/>
        <property name="brokerRequestType" value="#{T(com.l8mdv.sa.BrokerRequestType).QUOTE}"/>
    </bean>

</beans>

The splitter code:

package com.l8mdv.sample.service;

import com.l8mdv.sa.BrokerQuoteRequestMessage;
import com.l8mdv.sa.BrokerRequest;
import com.l8mdv.sa.BrokerRequestMessage;
import com.l8mdv.sa.QuoteRequestSortPolicy;
import org.springframework.util.Assert;

import java.util.ArrayList;
import java.util.List;

public class BrokerQuoteRequestSplitter {

    public List<BrokerQuoteRequestMessage> split(BrokerRequestMessage message) {

        Assert.notNull(message, "Mandatory argument missing.");
        List requests = new ArrayList();

        for (BrokerRequest brokerRequest:
                message.getBrokerRequest()) {
            BrokerQuoteRequestMessage brokerQuoteRequestMessage
                    = new BrokerQuoteRequestMessage();
            brokerQuoteRequestMessage
                    .setBrokerName(brokerRequest.getBrokerName());
            brokerQuoteRequestMessage
                    .setQuoteRequestSortPolicy(QuoteRequestSortPolicy.BUY_LOWEST);
            requests.add(brokerQuoteRequestMessage);
        }

        return requests;
    }
}

The transformer:

package com.l8mdv.sample.service.impl;

import com.l8mdv.sa.BrokerQuoteResponseMessage;
import org.springframework.util.Assert;

import java.util.List;

public class AggregationToBrokerQuoteResponseTransformer {

    public BrokerQuoteResponseMessage
    transform(List<BrokerQuoteResponseMessage> serviceResponses) {

        Assert.notNull(serviceResponses, "Mandatory argument missing.");

        BrokerQuoteResponseMessage bestQuote = null;
        for (BrokerQuoteResponseMessage 
                brokerQuoteResponseMessage: serviceResponses) {
            if (bestQuote == null)
                bestQuote = brokerQuoteResponseMessage;
            else {
                if (brokerQuoteResponseMessage.getSellPrice()
                        .compareTo(bestQuote.getSellPrice()) > 0)
                    bestQuote = brokerQuoteResponseMessage;
            }
        }

        return bestQuote;
    }
}

and the integration test:

package com.l8mdv.sample;

import com.l8mdv.sa.BrokerQuoteResponseMessage;
import com.l8mdv.sa.BrokerRequest;
import com.l8mdv.sa.BrokerRequestMessage;
import com.l8mdv.sa.BrokerRequestType;
import com.l8mdv.sample.gateway.BrokerRequestGateway;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(
    locations = {"classpath:META-INF/spring/simple-splitter-aggregator.xml"}
)
public class SimpleSplitterAggregatorIntegrationTest {

    @Autowired
    public BrokerRequestGateway brokerRequestGateway;

    @Test
    public void run() throws Exception {
        BrokerRequestMessage requestMessage = new BrokerRequestMessage();

        BrokerRequest yahooRequest = new BrokerRequest();
        yahooRequest.setBrokerName("yahoo");
        yahooRequest.setBrokerRequestType(BrokerRequestType.QUOTE);
        requestMessage.getBrokerRequest().add(yahooRequest);

        BrokerRequest oxRequest = new BrokerRequest();
        oxRequest.setBrokerName("openex");
        oxRequest.setBrokerRequestType(BrokerRequestType.QUOTE);
        requestMessage.getBrokerRequest().add(oxRequest);

        BrokerQuoteResponseMessage response = 
           brokerRequestGateway.send(requestMessage);
    }

}
Spring Framework Integration

Published at DZone with permission of Matt Vickery, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Integrate Spring With Open AI
  • Enterprise RIA With Spring 3, Flex 4 and GraniteDS
  • 5 DevOps Tools To Add to Your Stack in 2022
  • Secure Spring Boot Application With Keycloak

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!