Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Scatter-Gather in Mule ESB

DZone's Guide to

Scatter-Gather in Mule ESB

Before Scatter-Gather, there was the All message processor. Unlike All, Scatter-Gather executes routes concurrently instead of sequentially.

· Integration Zone
Free Resource

Today’s data climate is fast-paced and it’s not slowing down. Here’s why your current integration solution is not enough. Brought to you in partnership with Liaison Technologies.

Scatter-Gather is a routing message processor in Mule ESB runtime that sends a request message to multiple targets concurrently. It then collects the responses from all routes and aggregates them back into a single response.

Before Scatter-Gather, there was the All message processor, which was deprecated in Mule 3.5.0. Unlike All, Scatter-Gather executes routes concurrently instead of sequentially. Parallel execution of routes greatly increases the efficiency of your application and provides more information than sequential processing. 

Here's a sample Scatter-Gather flow in Mule Anypoint studio:Image title

How It Works

Image title

Once a message is received by Scatter-Gather, it sends a message for concurrent processing to all configured routes. The main thread executing the flow that owns the router waits until all routes complete or time out.

If there are no failures, Mule aggregates the results from each of the routes into a message collection (MessageCollection class). Failure in one route does not stop Scatter-Gather from sending messages to its other configured routes, so it is possible that many or all routes may fail concurrently.

If and when some of the route fails,Scatter-Gather performs the below operations:

  1. Sets the exception payload accordingly for each route.
  2. Throws a CompositeRoutingException, which maps each exception to its corresponding route using a sequential route ID.

Aggregation Strategies

We can define our own aggregation strategies for Scatter-Gather to override its default aggregation strategy.

  • Discard message responses.
  • Merge message properties that originated in different routes.
  • Discard failed messages without throwing an exception.
  • Select only one from multiple responses.
package com.example.poc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;

import org.apache.commons.lang.StringUtils;
import org.mule.DefaultMuleEvent;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.routing.AggregationContext;
import org.mule.routing.AggregationStrategy;

public class CustomSGAggregationStrategy implements AggregationStrategy {

   @Override
     public MuleEvent aggregate(AggregationContext context) throws MuleException {
        StringBuilder responseBuilder = new StringBuilder();
        MuleEvent result = null;
        ArrayList < MarketRate > marketRates = new ArrayList < > ();
        for (MuleEvent event: context.collectEventsWithoutExceptions()) {
           String response = (String) event.getMessage().getPayload();
           String[] spiltResponse = StringUtils.split(response, ",");
           MarketRate marketRate = new MarketRate();
           marketRate.setEvent(event);
           marketRate.setMarketName(spiltResponse[0]);
           marketRate.setMarketRate(new Integer(spiltResponse[1]));
           marketRates.add(marketRate);
           System.out.println(marketRate);
        }

        Collections.sort(marketRates);

        result = DefaultMuleEvent.copy(marketRates.get(0).getEvent());
        result.getMessage().setPayload(marketRates.get(0).toString());

        if (result != null) {
           return result;
        }

        throw new RuntimeException("no response obtained");
     }
}

Configuration

An important thing to remember is that since Scatter-Gather routes messages to multiple routes, if you configure only one route, then the application will throw an exception and fail to start. There should be a minimum of two routes in Scatter-Gather.

General Tab

Image title

You can configure your custom aggregation strategy in the General tab of Scatter-Gather.

Advanced Tab

Image title

In the Advanced tab, you can configure:

  • Timeout: Sets the timeout for responses from messages in milliseconds. A value of 0 or lower than 0 means no timeout. The default is 0.

  • Threading profile: Optionally, you can use this to configure the threading profile. maxThreadsActive for Scatter-Gather is the number of routes in a scatter-Gather *maxThreadsActive for the flow.

Sample flow:

<?xml version="1.0" encoding="UTF-8"?>
<mule
	xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
	xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw"
	xmlns:ws="http://www.mulesoft.org/schema/mule/ws"
	xmlns:metadata="http://www.mulesoft.org/schema/mule/metadata"
	xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns="http://www.mulesoft.org/schema/mule/core"
	xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:spring="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
		http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
		http://www.mulesoft.org/schema/mule/ws http://www.mulesoft.org/schema/mule/ws/current/mule-ws.xsd
		http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
		http://www.mulesoft.org/schema/mule/ee/dw http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd
		http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd">
	<flow name="scatter-gather-exampleFlow">
		<http:listener config-ref="HTTP_Listener_Configuration" path="sgpoc" doc:name="HTTP"/>
		<object-to-string-transformer doc:name="Object to String"/>
		<scatter-gather doc:name="Scatter-Gather">
			<custom-aggregation-strategy class="com.example.poc.CustomSGAggregationStrategy"/>
			<processor-chain>
				<logger message="#[payload]" level="INFO" doc:name="Logger"/>
				<ws:consumer config-ref="Web_Service_Consumer" operation="getTokyoMarketRate" doc:name="Tokyo Market Rate"/>
				<dw:transform-message metadata:id="4250084e-3fc9-448a-8bf4-f015d0916d1d" doc:name="Transform Message">
					<dw:set-payload>
						<![CDATA[%dw 1.0
						%output application/java
						%namespace ns0 http://ws.example.poc.com/
						---
						payload.ns0#getTokyoMarketRateResponse.return as :string]]>
					</dw:set-payload>
				</dw:transform-message>
				<set-payload value="#['Tokyo,' + payload]" doc:name="Set Payload"/>
				<logger message="#[payload]" level="INFO" doc:name="Logger"/>
			</processor-chain>
			<processor-chain>
				<logger message="#[payload]" level="INFO" doc:name="Logger"/>
				<ws:consumer config-ref="Web_Service_Consumer" operation="getNewYorkMarketRate" doc:name="New York Market Rate"/>
				<dw:transform-message metadata:id="a94c93e1-404e-4849-9d29-bd17fc05482b" doc:name="Transform Message">
					<dw:set-payload>
						<![CDATA[%dw 1.0
						%output application/java
						%namespace ns0 http://ws.example.poc.com/
						---
						payload.ns0#getNewYorkMarketRateResponse.return as :string]]>
					</dw:set-payload>
				</dw:transform-message>
				<set-payload value="#['NewYork,' + payload]" doc:name="Set Payload"/>
				<logger message="#[payload ]" level="INFO" doc:name="Logger"/>
			</processor-chain>
		</scatter-gather>
		<logger message="#[payload]" level="INFO" doc:name="Logger"/>
	</flow>
	<flow name="NewYorkRate">
		<http:listener config-ref="HTTP_Listener_Configuration" path="newyorkrate" doc:name="HTTP"/>
		<ws:consumer config-ref="Web_Service_Consumer" operation="getNewYorkMarketRate" doc:name="Web Service Consumer"/>
		<dw:transform-message metadata:id="2663a4b2-9427-4f79-b212-6cab941c8a9d" doc:name="Transform Message">
			<dw:set-payload>
				<![CDATA[%dw 1.0
				%output application/java
				%namespace ns0 http://ws.example.poc.com/
				---
				payload.ns0#getNewYorkMarketRateResponse.return as :string]]>
			</dw:set-payload>
		</dw:transform-message>
	</flow>
	<flow name="TokyoRate">
		<http:listener config-ref="HTTP_Listener_Configuration" path="tokyorate" doc:name="HTTP"/>
		<ws:consumer config-ref="Web_Service_Consumer" operation="getTokyoMarketRate" doc:name="Web Service Consumer"/>
		<dw:transform-message metadata:id="95e2d8f2-f447-433a-84a5-fb756aa2857e" doc:name="Transform Message">
			<dw:set-payload>
				<![CDATA[%dw 1.0
				%output application/java
				%namespace ns0 http://ws.example.poc.com/
				---
				payload.ns0#getTokyoMarketRateResponse.return as :string]]>
			</dw:set-payload>
		</dw:transform-message>
	</flow>
</mule>

Image title

Video Tutorial

You can check out the video tutorial here:

Is iPaaS solving the right problems? Not knowing the fundamental difference between iPaaS and iPaaS+ could cost you down the road. Brought to you in partnership with Liaison Technologies.

Topics:
message processing ,scatter gather ,mule esb ,integration ,tutorial

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}