Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Scatter-Gather in Mule ESB

DZone's Guide to

Scatter-Gather in Mule ESB

Before Scatter-Gather, there was the All message processor. Unlike All, Scatter-Gather executes routes concurrently instead of sequentially.

· Integration Zone
Free Resource

The Integration Zone is brought to you in partnership with Cloud Elements. What's below the surface of an API integration? Download The Definitive Guide to API Integrations to start building an API strategy.

Scatter-Gather is a routing message processor in Mule ESB runtime that sends a request message to multiple targets concurrently. It then collects the responses from all routes and aggregates them back into a single response.

Before Scatter-Gather, there was the All message processor, which was deprecated in Mule 3.5.0. Unlike All, Scatter-Gather executes routes concurrently instead of sequentially. Parallel execution of routes greatly increases the efficiency of your application and provides more information than sequential processing. 

Here's a sample Scatter-Gather flow in Mule Anypoint studio:Image title

How It Works

Image title

Once a message is received by Scatter-Gather, it sends a message for concurrent processing to all configured routes. The main thread executing the flow that owns the router waits until all routes complete or time out.

If there are no failures, Mule aggregates the results from each of the routes into a message collection (MessageCollection class). Failure in one route does not stop Scatter-Gather from sending messages to its other configured routes, so it is possible that many or all routes may fail concurrently.

If and when some of the route fails,Scatter-Gather performs the below operations:

  1. Sets the exception payload accordingly for each route.
  2. Throws a CompositeRoutingException, which maps each exception to its corresponding route using a sequential route ID.

Aggregation Strategies

We can define our own aggregation strategies for Scatter-Gather to override its default aggregation strategy.

  • Discard message responses.
  • Merge message properties that originated in different routes.
  • Discard failed messages without throwing an exception.
  • Select only one from multiple responses.
package com.example.poc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;

import org.apache.commons.lang.StringUtils;
import org.mule.DefaultMuleEvent;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.routing.AggregationContext;
import org.mule.routing.AggregationStrategy;

public class CustomSGAggregationStrategy implements AggregationStrategy {

   @Override
     public MuleEvent aggregate(AggregationContext context) throws MuleException {
        StringBuilder responseBuilder = new StringBuilder();
        MuleEvent result = null;
        ArrayList < MarketRate > marketRates = new ArrayList < > ();
        for (MuleEvent event: context.collectEventsWithoutExceptions()) {
           String response = (String) event.getMessage().getPayload();
           String[] spiltResponse = StringUtils.split(response, ",");
           MarketRate marketRate = new MarketRate();
           marketRate.setEvent(event);
           marketRate.setMarketName(spiltResponse[0]);
           marketRate.setMarketRate(new Integer(spiltResponse[1]));
           marketRates.add(marketRate);
           System.out.println(marketRate);
        }

        Collections.sort(marketRates);

        result = DefaultMuleEvent.copy(marketRates.get(0).getEvent());
        result.getMessage().setPayload(marketRates.get(0).toString());

        if (result != null) {
           return result;
        }

        throw new RuntimeException("no response obtained");
     }
}

Configuration

An important thing to remember is that since Scatter-Gather routes messages to multiple routes, if you configure only one route, then the application will throw an exception and fail to start. There should be a minimum of two routes in Scatter-Gather.

General Tab

Image title

You can configure your custom aggregation strategy in the General tab of Scatter-Gather.

Advanced Tab

Image title

In the Advanced tab, you can configure:

  • Timeout: Sets the timeout for responses from messages in milliseconds. A value of 0 or lower than 0 means no timeout. The default is 0.

  • Threading profile: Optionally, you can use this to configure the threading profile. maxThreadsActive for Scatter-Gather is the number of routes in a scatter-Gather *maxThreadsActive for the flow.

Sample flow:

<?xml version="1.0" encoding="UTF-8"?>
<mule
	xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
	xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw"
	xmlns:ws="http://www.mulesoft.org/schema/mule/ws"
	xmlns:metadata="http://www.mulesoft.org/schema/mule/metadata"
	xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns="http://www.mulesoft.org/schema/mule/core"
	xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:spring="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
		http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
		http://www.mulesoft.org/schema/mule/ws http://www.mulesoft.org/schema/mule/ws/current/mule-ws.xsd
		http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
		http://www.mulesoft.org/schema/mule/ee/dw http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd
		http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd">
	<flow name="scatter-gather-exampleFlow">
		<http:listener config-ref="HTTP_Listener_Configuration" path="sgpoc" doc:name="HTTP"/>
		<object-to-string-transformer doc:name="Object to String"/>
		<scatter-gather doc:name="Scatter-Gather">
			<custom-aggregation-strategy class="com.example.poc.CustomSGAggregationStrategy"/>
			<processor-chain>
				<logger message="#[payload]" level="INFO" doc:name="Logger"/>
				<ws:consumer config-ref="Web_Service_Consumer" operation="getTokyoMarketRate" doc:name="Tokyo Market Rate"/>
				<dw:transform-message metadata:id="4250084e-3fc9-448a-8bf4-f015d0916d1d" doc:name="Transform Message">
					<dw:set-payload>
						<![CDATA[%dw 1.0
						%output application/java
						%namespace ns0 http://ws.example.poc.com/
						---
						payload.ns0#getTokyoMarketRateResponse.return as :string]]>
					</dw:set-payload>
				</dw:transform-message>
				<set-payload value="#['Tokyo,' + payload]" doc:name="Set Payload"/>
				<logger message="#[payload]" level="INFO" doc:name="Logger"/>
			</processor-chain>
			<processor-chain>
				<logger message="#[payload]" level="INFO" doc:name="Logger"/>
				<ws:consumer config-ref="Web_Service_Consumer" operation="getNewYorkMarketRate" doc:name="New York Market Rate"/>
				<dw:transform-message metadata:id="a94c93e1-404e-4849-9d29-bd17fc05482b" doc:name="Transform Message">
					<dw:set-payload>
						<![CDATA[%dw 1.0
						%output application/java
						%namespace ns0 http://ws.example.poc.com/
						---
						payload.ns0#getNewYorkMarketRateResponse.return as :string]]>
					</dw:set-payload>
				</dw:transform-message>
				<set-payload value="#['NewYork,' + payload]" doc:name="Set Payload"/>
				<logger message="#[payload ]" level="INFO" doc:name="Logger"/>
			</processor-chain>
		</scatter-gather>
		<logger message="#[payload]" level="INFO" doc:name="Logger"/>
	</flow>
	<flow name="NewYorkRate">
		<http:listener config-ref="HTTP_Listener_Configuration" path="newyorkrate" doc:name="HTTP"/>
		<ws:consumer config-ref="Web_Service_Consumer" operation="getNewYorkMarketRate" doc:name="Web Service Consumer"/>
		<dw:transform-message metadata:id="2663a4b2-9427-4f79-b212-6cab941c8a9d" doc:name="Transform Message">
			<dw:set-payload>
				<![CDATA[%dw 1.0
				%output application/java
				%namespace ns0 http://ws.example.poc.com/
				---
				payload.ns0#getNewYorkMarketRateResponse.return as :string]]>
			</dw:set-payload>
		</dw:transform-message>
	</flow>
	<flow name="TokyoRate">
		<http:listener config-ref="HTTP_Listener_Configuration" path="tokyorate" doc:name="HTTP"/>
		<ws:consumer config-ref="Web_Service_Consumer" operation="getTokyoMarketRate" doc:name="Web Service Consumer"/>
		<dw:transform-message metadata:id="95e2d8f2-f447-433a-84a5-fb756aa2857e" doc:name="Transform Message">
			<dw:set-payload>
				<![CDATA[%dw 1.0
				%output application/java
				%namespace ns0 http://ws.example.poc.com/
				---
				payload.ns0#getTokyoMarketRateResponse.return as :string]]>
			</dw:set-payload>
		</dw:transform-message>
	</flow>
</mule>

Image title

Video Tutorial

You can check out the video tutorial here:

Your API is not enough. Learn why (and how) leading SaaS providers are turning their products into platforms with API integration in the ebook, Build Platforms, Not Products from Cloud Elements.

Topics:
message processing ,scatter gather ,mule esb ,integration ,tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}