DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
  1. DZone
  2. Software Design and Architecture
  3. Integration
  4. Scatter-Gather in Mule ESB

Scatter-Gather in Mule ESB

Before Scatter-Gather, there was the All message processor. Unlike All, Scatter-Gather executes routes concurrently instead of sequentially.

Harish Kumar user avatar by
Harish Kumar
·
Mar. 17, 17 · Tutorial
Like (9)
Save
Tweet
Share
55.14K Views

Join the DZone community and get the full member experience.

Join For Free

Scatter-Gather is a routing message processor in Mule ESB runtime that sends a request message to multiple targets concurrently. It then collects the responses from all routes and aggregates them back into a single response.

Before Scatter-Gather, there was the All message processor, which was deprecated in Mule 3.5.0. Unlike All, Scatter-Gather executes routes concurrently instead of sequentially. Parallel execution of routes greatly increases the efficiency of your application and provides more information than sequential processing. 

Here's a sample Scatter-Gather flow in Mule Anypoint studio:Image title

How It Works

Image title

Once a message is received by Scatter-Gather, it sends a message for concurrent processing to all configured routes. The main thread executing the flow that owns the router waits until all routes complete or time out.

If there are no failures, Mule aggregates the results from each of the routes into a message collection (MessageCollection class). Failure in one route does not stop Scatter-Gather from sending messages to its other configured routes, so it is possible that many or all routes may fail concurrently.

If and when some of the route fails,Scatter-Gather performs the below operations:

  1. Sets the exception payload accordingly for each route.
  2. Throws a CompositeRoutingException, which maps each exception to its corresponding route using a sequential route ID.

Aggregation Strategies

We can define our own aggregation strategies for Scatter-Gather to override its default aggregation strategy.

  • Discard message responses.
  • Merge message properties that originated in different routes.
  • Discard failed messages without throwing an exception.
  • Select only one from multiple responses.
package com.example.poc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;

import org.apache.commons.lang.StringUtils;
import org.mule.DefaultMuleEvent;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.routing.AggregationContext;
import org.mule.routing.AggregationStrategy;

public class CustomSGAggregationStrategy implements AggregationStrategy {

   @Override
     public MuleEvent aggregate(AggregationContext context) throws MuleException {
        StringBuilder responseBuilder = new StringBuilder();
        MuleEvent result = null;
        ArrayList < MarketRate > marketRates = new ArrayList < > ();
        for (MuleEvent event: context.collectEventsWithoutExceptions()) {
           String response = (String) event.getMessage().getPayload();
           String[] spiltResponse = StringUtils.split(response, ",");
           MarketRate marketRate = new MarketRate();
           marketRate.setEvent(event);
           marketRate.setMarketName(spiltResponse[0]);
           marketRate.setMarketRate(new Integer(spiltResponse[1]));
           marketRates.add(marketRate);
           System.out.println(marketRate);
        }

        Collections.sort(marketRates);

        result = DefaultMuleEvent.copy(marketRates.get(0).getEvent());
        result.getMessage().setPayload(marketRates.get(0).toString());

        if (result != null) {
           return result;
        }

        throw new RuntimeException("no response obtained");
     }
}

Configuration

An important thing to remember is that since Scatter-Gather routes messages to multiple routes, if you configure only one route, then the application will throw an exception and fail to start. There should be a minimum of two routes in Scatter-Gather.

General Tab

Image title

You can configure your custom aggregation strategy in the General tab of Scatter-Gather.

Advanced Tab

Image title

In the Advanced tab, you can configure:

  • Timeout: Sets the timeout for responses from messages in milliseconds. A value of 0 or lower than 0 means no timeout. The default is 0.

  • Threading profile: Optionally, you can use this to configure the threading profile. maxThreadsActive for Scatter-Gather is the number of routes in a scatter-Gather *maxThreadsActive for the flow.

Sample flow:

<?xml version="1.0" encoding="UTF-8"?>
<mule
	xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
	xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw"
	xmlns:ws="http://www.mulesoft.org/schema/mule/ws"
	xmlns:metadata="http://www.mulesoft.org/schema/mule/metadata"
	xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns="http://www.mulesoft.org/schema/mule/core"
	xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:spring="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
		http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
		http://www.mulesoft.org/schema/mule/ws http://www.mulesoft.org/schema/mule/ws/current/mule-ws.xsd
		http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
		http://www.mulesoft.org/schema/mule/ee/dw http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd
		http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd">
	<flow name="scatter-gather-exampleFlow">
		<http:listener config-ref="HTTP_Listener_Configuration" path="sgpoc" doc:name="HTTP"/>
		<object-to-string-transformer doc:name="Object to String"/>
		<scatter-gather doc:name="Scatter-Gather">
			<custom-aggregation-strategy class="com.example.poc.CustomSGAggregationStrategy"/>
			<processor-chain>
				<logger message="#[payload]" level="INFO" doc:name="Logger"/>
				<ws:consumer config-ref="Web_Service_Consumer" operation="getTokyoMarketRate" doc:name="Tokyo Market Rate"/>
				<dw:transform-message metadata:id="4250084e-3fc9-448a-8bf4-f015d0916d1d" doc:name="Transform Message">
					<dw:set-payload>
						<![CDATA[%dw 1.0
						%output application/java
						%namespace ns0 http://ws.example.poc.com/
						---
						payload.ns0#getTokyoMarketRateResponse.return as :string]]>
					</dw:set-payload>
				</dw:transform-message>
				<set-payload value="#['Tokyo,' + payload]" doc:name="Set Payload"/>
				<logger message="#[payload]" level="INFO" doc:name="Logger"/>
			</processor-chain>
			<processor-chain>
				<logger message="#[payload]" level="INFO" doc:name="Logger"/>
				<ws:consumer config-ref="Web_Service_Consumer" operation="getNewYorkMarketRate" doc:name="New York Market Rate"/>
				<dw:transform-message metadata:id="a94c93e1-404e-4849-9d29-bd17fc05482b" doc:name="Transform Message">
					<dw:set-payload>
						<![CDATA[%dw 1.0
						%output application/java
						%namespace ns0 http://ws.example.poc.com/
						---
						payload.ns0#getNewYorkMarketRateResponse.return as :string]]>
					</dw:set-payload>
				</dw:transform-message>
				<set-payload value="#['NewYork,' + payload]" doc:name="Set Payload"/>
				<logger message="#[payload ]" level="INFO" doc:name="Logger"/>
			</processor-chain>
		</scatter-gather>
		<logger message="#[payload]" level="INFO" doc:name="Logger"/>
	</flow>
	<flow name="NewYorkRate">
		<http:listener config-ref="HTTP_Listener_Configuration" path="newyorkrate" doc:name="HTTP"/>
		<ws:consumer config-ref="Web_Service_Consumer" operation="getNewYorkMarketRate" doc:name="Web Service Consumer"/>
		<dw:transform-message metadata:id="2663a4b2-9427-4f79-b212-6cab941c8a9d" doc:name="Transform Message">
			<dw:set-payload>
				<![CDATA[%dw 1.0
				%output application/java
				%namespace ns0 http://ws.example.poc.com/
				---
				payload.ns0#getNewYorkMarketRateResponse.return as :string]]>
			</dw:set-payload>
		</dw:transform-message>
	</flow>
	<flow name="TokyoRate">
		<http:listener config-ref="HTTP_Listener_Configuration" path="tokyorate" doc:name="HTTP"/>
		<ws:consumer config-ref="Web_Service_Consumer" operation="getTokyoMarketRate" doc:name="Web Service Consumer"/>
		<dw:transform-message metadata:id="95e2d8f2-f447-433a-84a5-fb756aa2857e" doc:name="Transform Message">
			<dw:set-payload>
				<![CDATA[%dw 1.0
				%output application/java
				%namespace ns0 http://ws.example.poc.com/
				---
				payload.ns0#getTokyoMarketRateResponse.return as :string]]>
			</dw:set-payload>
		</dw:transform-message>
	</flow>
</mule>

Image title

Video Tutorial

You can check out the video tutorial here:

Enterprise service bus

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • How To Validate Three Common Document Types in Python
  • Distributed Stateful Edge Platforms
  • What Is Policy-as-Code? An Introduction to Open Policy Agent
  • Web Application Architecture: The Latest Guide

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: