Aggregation with Mule – “Fork and Join Pattern”
Aggregation with Mule – “Fork and Join Pattern”
If you dabble in integration, you likely use a smattering of different patterns. Here's a glance at Mule aggregation with a few neat examples.
Join the DZone community and get the full member experience.Join For Free
The Future of Enterprise Integration: Learn how organizations are re-architecting their integration strategy with data-driven app integration for true digital transformation.
In your daily work as an integration developer you’re working with different kinds of patterns, even if you’re not aware of it. Since Mule is based on EIP (Enterprise Integration Patterns) you’re most definitely using patterns when using Mule.
One of those patterns that seems to raise a lot of questions is the “fork and join pattern.” The purpose of the fork and join pattern is to send a request to different targets, in parallel, and wait for a aggregated response from all the targets.
This is very useful when you want to merge information from different systems, perform calculation based on information from different targets or compare data from a range of data sources.
Flowchart describing the fork and join pattern
Since I’ve noticed from questions on the forum and from colleagues in the business that this is tricky thing to do I thought I would show you guys a simple example on how it’s done.
In my example there’s a http request coming in that should return the lowest price from different shops. In the example the request is delegated to 2 different shops, shop1 and shop2, in parallel (asynchronous) and collected for comparison to return the lowest price. This is easily done with Mule and the key feature is the request-reply router. So let’s dig in to the configuration…
<?xml version="1.0" encoding="UTF-8"?> <mule xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:https="http://www.mulesoft.org/schema/mule/https" xmlns:mulexml="http://www.mulesoft.org/schema/mule/xml" xmlns:script="http://www.mulesoft.org/schema/mule/scripting" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xsi:schemaLocation=" http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.2/mule.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/3.2/mule-http.xsd http://www.mulesoft.org/schema/mule/https http://www.mulesoft.org/schema/mule/https/3.2/mule-https.xsd http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/3.2/mule-xml.xsd http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/3.2/mule-scripting.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/3.2/mule-vm.xsd "> <flow name="forkAndJoinFlow"> <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="81" path="lowestprice" /> <not-filter> <wildcard-filter pattern="*favicon*" /> </not-filter> <request-reply> <all enableCorrelation="ALWAYS"> <vm:outbound-endpoint path="shop1"/> <vm:outbound-endpoint path="shop2"/> </all> <vm:inbound-endpoint path="response"> <message-properties-transformer> <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="2" /> </message-properties-transformer> <collection-aggregator /> </vm:inbound-endpoint> </request-reply> <expression-transformer evaluator="groovy" expression="java.util.Collections.min(payload)" /> <object-to-string-transformer/> <logger level="WARN" message="#[string:Lowest price: #[payload]]" /> </flow> <flow name="shop1Flow"> <vm:inbound-endpoint path="shop1"/> <expression-transformer evaluator="groovy" expression="new java.lang.Double(1000.0 * Math.random()).intValue()" /> <logger level="WARN" message="#[string:Price from shop 1: #[payload]]" /> </flow> <flow name="shop2Flow"> <vm:inbound-endpoint path="shop2" /> <expression-transformer evaluator="groovy" expression="new java.lang.Double(1000.0 * Math.random()).intValue()" /> <logger level="WARN" message="#[string:Price from shop 2: #[payload]]" /> </flow> </mule>
In this example I’m using the vm transport to simulate communication with external systems and a random price is returned from the different shops.
Notice that the vm endpoints could be replaced with whatever protocol you desire and the number of external sources could be as many as you like, just be sure to tell Mule how many results to wait for by using the property MULE_CORRELATION_GROUP_SIZE.
So by opening a browser and enter http://localhost:81/lowestprice a number is returned, not very useful but that’s not the point of the example.
When looking at the log the following can be seen:
10:23:10,669 WARN LoggerMessageProcessor, [forkandjoin-0.0.1-SNAPSHOT].shop1Flow.stage1.02:269 – Price from shop 1: 362 10:23:10,679 WARN LoggerMessageProcessor, [forkandjoin-0.0.1-SNAPSHOT].shop2Flow.stage1.02:269 – Price from shop 2: 109 10:23:10,729 WARN LoggerMessageProcessor, [forkandjoin-0.0.1-SNAPSHOT].connector.http.mule.default.receiver.02:269 – Lowest price: 109
That tells us that the request has been sent to both shop1 and shop2 and in this case the lowest price was received from shop2.
A simple example but I hope you get the idea on how fork and join pattern can be used and how to use it in more advanced scenarios.
Published at DZone with permission of Tomas Blohm , DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.