Scatter-Gather in Mule ESB
Before Scatter-Gather, there was the All message processor. Unlike All, Scatter-Gather executes routes concurrently instead of sequentially.
Join the DZone community and get the full member experience.
Join For FreeScatter-Gather is a routing message processor in Mule ESB runtime that sends a request message to multiple targets concurrently. It then collects the responses from all routes and aggregates them back into a single response.
Before Scatter-Gather, there was the All message processor, which was deprecated in Mule 3.5.0. Unlike All, Scatter-Gather executes routes concurrently instead of sequentially. Parallel execution of routes greatly increases the efficiency of your application and provides more information than sequential processing.
Here's a sample Scatter-Gather flow in Mule Anypoint studio:
How It Works
Once a message is received by Scatter-Gather, it sends a message for concurrent processing to all configured routes. The main thread executing the flow that owns the router waits until all routes complete or time out.
If there are no failures, Mule aggregates the results from each of the routes into a message collection (MessageCollection
class). Failure in one route does not stop Scatter-Gather from sending messages to its other configured routes, so it is possible that many or all routes may fail concurrently.
If and when some of the route fails,Scatter-Gather performs the below operations:
- Sets the exception payload accordingly for each route.
- Throws a
CompositeRoutingException
, which maps each exception to its corresponding route using a sequential route ID.
Aggregation Strategies
We can define our own aggregation strategies for Scatter-Gather to override its default aggregation strategy.
- Discard message responses.
- Merge message properties that originated in different routes.
- Discard failed messages without throwing an exception.
- Select only one from multiple responses.
package com.example.poc;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import org.apache.commons.lang.StringUtils;
import org.mule.DefaultMuleEvent;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.routing.AggregationContext;
import org.mule.routing.AggregationStrategy;
public class CustomSGAggregationStrategy implements AggregationStrategy {
@Override
public MuleEvent aggregate(AggregationContext context) throws MuleException {
StringBuilder responseBuilder = new StringBuilder();
MuleEvent result = null;
ArrayList < MarketRate > marketRates = new ArrayList < > ();
for (MuleEvent event: context.collectEventsWithoutExceptions()) {
String response = (String) event.getMessage().getPayload();
String[] spiltResponse = StringUtils.split(response, ",");
MarketRate marketRate = new MarketRate();
marketRate.setEvent(event);
marketRate.setMarketName(spiltResponse[0]);
marketRate.setMarketRate(new Integer(spiltResponse[1]));
marketRates.add(marketRate);
System.out.println(marketRate);
}
Collections.sort(marketRates);
result = DefaultMuleEvent.copy(marketRates.get(0).getEvent());
result.getMessage().setPayload(marketRates.get(0).toString());
if (result != null) {
return result;
}
throw new RuntimeException("no response obtained");
}
}
Configuration
An important thing to remember is that since Scatter-Gather routes messages to multiple routes, if you configure only one route, then the application will throw an exception and fail to start. There should be a minimum of two routes in Scatter-Gather.
General Tab
You can configure your custom aggregation strategy in the General tab of Scatter-Gather.
Advanced Tab
In the Advanced tab, you can configure:
Timeout: Sets the timeout for responses from messages in milliseconds. A value of 0 or lower than 0 means no timeout. The default is 0.
Threading profile: Optionally, you can use this to configure the threading profile.
maxThreadsActive
for Scatter-Gather is the number of routes in a scatter-Gather*maxThreadsActive
for the flow.
Sample flow:
<?xml version="1.0" encoding="UTF-8"?>
<mule
xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw"
xmlns:ws="http://www.mulesoft.org/schema/mule/ws"
xmlns:metadata="http://www.mulesoft.org/schema/mule/metadata"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/ws http://www.mulesoft.org/schema/mule/ws/current/mule-ws.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/dw http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd">
<flow name="scatter-gather-exampleFlow">
<http:listener config-ref="HTTP_Listener_Configuration" path="sgpoc" doc:name="HTTP"/>
<object-to-string-transformer doc:name="Object to String"/>
<scatter-gather doc:name="Scatter-Gather">
<custom-aggregation-strategy class="com.example.poc.CustomSGAggregationStrategy"/>
<processor-chain>
<logger message="#[payload]" level="INFO" doc:name="Logger"/>
<ws:consumer config-ref="Web_Service_Consumer" operation="getTokyoMarketRate" doc:name="Tokyo Market Rate"/>
<dw:transform-message metadata:id="4250084e-3fc9-448a-8bf4-f015d0916d1d" doc:name="Transform Message">
<dw:set-payload>
<![CDATA[%dw 1.0
%output application/java
%namespace ns0 http://ws.example.poc.com/
---
payload.ns0#getTokyoMarketRateResponse.return as :string]]>
</dw:set-payload>
</dw:transform-message>
<set-payload value="#['Tokyo,' + payload]" doc:name="Set Payload"/>
<logger message="#[payload]" level="INFO" doc:name="Logger"/>
</processor-chain>
<processor-chain>
<logger message="#[payload]" level="INFO" doc:name="Logger"/>
<ws:consumer config-ref="Web_Service_Consumer" operation="getNewYorkMarketRate" doc:name="New York Market Rate"/>
<dw:transform-message metadata:id="a94c93e1-404e-4849-9d29-bd17fc05482b" doc:name="Transform Message">
<dw:set-payload>
<![CDATA[%dw 1.0
%output application/java
%namespace ns0 http://ws.example.poc.com/
---
payload.ns0#getNewYorkMarketRateResponse.return as :string]]>
</dw:set-payload>
</dw:transform-message>
<set-payload value="#['NewYork,' + payload]" doc:name="Set Payload"/>
<logger message="#[payload ]" level="INFO" doc:name="Logger"/>
</processor-chain>
</scatter-gather>
<logger message="#[payload]" level="INFO" doc:name="Logger"/>
</flow>
<flow name="NewYorkRate">
<http:listener config-ref="HTTP_Listener_Configuration" path="newyorkrate" doc:name="HTTP"/>
<ws:consumer config-ref="Web_Service_Consumer" operation="getNewYorkMarketRate" doc:name="Web Service Consumer"/>
<dw:transform-message metadata:id="2663a4b2-9427-4f79-b212-6cab941c8a9d" doc:name="Transform Message">
<dw:set-payload>
<![CDATA[%dw 1.0
%output application/java
%namespace ns0 http://ws.example.poc.com/
---
payload.ns0#getNewYorkMarketRateResponse.return as :string]]>
</dw:set-payload>
</dw:transform-message>
</flow>
<flow name="TokyoRate">
<http:listener config-ref="HTTP_Listener_Configuration" path="tokyorate" doc:name="HTTP"/>
<ws:consumer config-ref="Web_Service_Consumer" operation="getTokyoMarketRate" doc:name="Web Service Consumer"/>
<dw:transform-message metadata:id="95e2d8f2-f447-433a-84a5-fb756aa2857e" doc:name="Transform Message">
<dw:set-payload>
<![CDATA[%dw 1.0
%output application/java
%namespace ns0 http://ws.example.poc.com/
---
payload.ns0#getTokyoMarketRateResponse.return as :string]]>
</dw:set-payload>
</dw:transform-message>
</flow>
</mule>
Video Tutorial
You can check out the video tutorial here:
Opinions expressed by DZone contributors are their own.
Comments