Platinum Partner
java,enterprise-integration,patterns,integration,tips and tricks,mule esb

Aggregation with Mule – The “Fork and Join Pattern”

In your daily work as an integration developer, you’re working with different kinds of , even if you’re not aware of it.

Since Mule is based on EIP (Enterprise Integration Patterns), you’re most definitely using patterns when using Mule.

One of the patterns that seems to raise a lot of questions is the “fork and join pattern.” The purpose of the fork and join pattern is to send a request to different targets, in parallel, and wait for an aggregated response from all the targets.

This is very useful when you want to merge information from different systems, perform calculations based on information from different targets or compare data from a range of data sources.

Fork and join flowchart

See the flowchart describing the fork and join pattern

Since I’ve noticed questions on the MuleSoft forum and questions from colleagues saying that this is a tricky thing to do, I thought I would show you a simple example of how it’s done.

In my example, there’s an HTTP request coming in that should return the lowest price from different shops. The request is delegated to two different shops, shop1 and shop2, in parallel, (asynchronous), and then it is collected for comparison to return the lowest price. This is easily done with Mule using the request-reply router. Let’s dig in to the configuration…


<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:context="http://www.springframework.org/schema/context"
	xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:https="http://www.mulesoft.org/schema/mule/https"
	xmlns:mulexml="http://www.mulesoft.org/schema/mule/xml" xmlns:script="http://www.mulesoft.org/schema/mule/scripting"
	xmlns:spring="http://www.springframework.org/schema/beans" xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
	xsi:schemaLocation="
          http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.2/mule.xsd
          http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
    	  http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/3.2/mule-http.xsd
          http://www.mulesoft.org/schema/mule/https http://www.mulesoft.org/schema/mule/https/3.2/mule-https.xsd
    	  http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/3.2/mule-xml.xsd
    	  http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/3.2/mule-scripting.xsd 
    	  http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    	  http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/3.2/mule-vm.xsd
       ">

	<flow name="forkAndJoinFlow">
		<http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="81" path="lowestprice" />
		<not-filter>
			<wildcard-filter pattern="*favicon*" />
		</not-filter>
		<request-reply>
			<all enableCorrelation="ALWAYS">
				<vm:outbound-endpoint path="shop1"/>
				<vm:outbound-endpoint path="shop2"/>
			</all>
			<vm:inbound-endpoint path="response">
				<message-properties-transformer>
					<add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="2" />
				</message-properties-transformer>
				<collection-aggregator />
			</vm:inbound-endpoint>
		</request-reply>
		<expression-transformer evaluator="groovy" expression="java.util.Collections.min(payload)" />
		<object-to-string-transformer/>
		<logger level="WARN" message="#[string:Lowest price: #[payload]]" />
	</flow>
	
	<flow name="shop1Flow">
		<vm:inbound-endpoint path="shop1"/>
		<expression-transformer evaluator="groovy" expression="new java.lang.Double(1000.0 * Math.random()).intValue()" />
		<logger level="WARN" message="#[string:Price from shop 1: #[payload]]" />
	</flow>
	
	<flow name="shop2Flow">
		<vm:inbound-endpoint path="shop2" />
		<expression-transformer evaluator="groovy" expression="new java.lang.Double(1000.0 * Math.random()).intValue()" />
		<logger level="WARN" message="#[string:Price from shop 2: #[payload]]" />
	</flow>

</mule>

In this example, I’m using the VM transport to simulate communication with external systems, and a random price is returned from the different shops.

Notice that the VM endpoints can be replaced with whatever protocol you desire, and the number of external sources can be as many as you like, just be sure to tell Mule how many results it should wait for by using the property MULE_CORRELATION_GROUP_SIZE.

By opening a browser and entering http://localhost:81/lowestprice, a number is returned.  That's not very useful, but that’s not the point of the example.

When looking at the log the following can be seen:

10:23:10,669  WARN LoggerMessageProcessor,[forkandjoin-0.0.1-SNAPSHOT].shop1Flow.stage1.02:269 – Price from shop 1: 362
10:23:10,679  WARN LoggerMessageProcessor,[forkandjoin-0.0.1-SNAPSHOT].shop2Flow.stage1.02:269 – Price from shop 2: 109
10:23:10,729  WARN LoggerMessageProcessor,[forkandjoin-0.0.1-SNAPSHOT].connector.http.mule.default.receiver.02:269 – Lowest price: 109

That tells us that the request has been sent to both shop1 and shop2.  In this case, the lowest price was received from shop2.

It's a very simple example, but I hope you get the idea on how the fork and join pattern can be used and how to use it in more advanced scenarios.


Published at DZone with permission of {{ articles[0].authors[0].realName }}, DZone MVB. (source)

Opinions expressed by DZone contributors are their own.

{{ tag }}, {{tag}},

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}
{{ parent.authors[0].realName || parent.author}}

{{ parent.authors[0].tagline || parent.tagline }}

{{ parent.views }} ViewsClicks
Tweet

{{parent.nComments}}