Aggregation with Mule – The “Fork and Join Pattern”
Aggregation with Mule – The “Fork and Join Pattern”
Join the DZone community and get the full member experience.Join For Free
In your daily work as an integration developer, you’re working with different kinds of patterns, even if you’re not aware of it.
Since Mule is based on EIP (Enterprise Integration Patterns), you’re most definitely using patterns when using Mule.
One of the patterns that seems to raise a lot of questions is the “fork and join pattern.” The purpose of the fork and join pattern is to send a request to different targets, in parallel, and wait for an aggregated response from all the targets.
This is very useful when you want to merge information from different systems, perform calculations based on information from different targets or compare data from a range of data sources.
See the flowchart describing the fork and join pattern
Since I’ve noticed questions on the MuleSoft forum and questions from colleagues saying that this is a tricky thing to do, I thought I would show you a simple example of how it’s done.
In my example, there’s an HTTP request coming in that should return the lowest price from different shops. The request is delegated to two different shops, shop1 and shop2, in parallel, (asynchronous), and then it is collected for comparison to return the lowest price. This is easily done with Mule using the request-reply router. Let’s dig in to the configuration…
<?xml version="1.0" encoding="UTF-8"?> <mule xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:https="http://www.mulesoft.org/schema/mule/https" xmlns:mulexml="http://www.mulesoft.org/schema/mule/xml" xmlns:script="http://www.mulesoft.org/schema/mule/scripting" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xsi:schemaLocation=" http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.2/mule.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/3.2/mule-http.xsd http://www.mulesoft.org/schema/mule/https http://www.mulesoft.org/schema/mule/https/3.2/mule-https.xsd http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/3.2/mule-xml.xsd http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/3.2/mule-scripting.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/3.2/mule-vm.xsd "> <flow name="forkAndJoinFlow"> <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="81" path="lowestprice" /> <not-filter> <wildcard-filter pattern="*favicon*" /> </not-filter> <request-reply> <all enableCorrelation="ALWAYS"> <vm:outbound-endpoint path="shop1"/> <vm:outbound-endpoint path="shop2"/> </all> <vm:inbound-endpoint path="response"> <message-properties-transformer> <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="2" /> </message-properties-transformer> <collection-aggregator /> </vm:inbound-endpoint> </request-reply> <expression-transformer evaluator="groovy" expression="java.util.Collections.min(payload)" /> <object-to-string-transformer/> <logger level="WARN" message="#[string:Lowest price: #[payload]]" /> </flow> <flow name="shop1Flow"> <vm:inbound-endpoint path="shop1"/> <expression-transformer evaluator="groovy" expression="new java.lang.Double(1000.0 * Math.random()).intValue()" /> <logger level="WARN" message="#[string:Price from shop 1: #[payload]]" /> </flow> <flow name="shop2Flow"> <vm:inbound-endpoint path="shop2" /> <expression-transformer evaluator="groovy" expression="new java.lang.Double(1000.0 * Math.random()).intValue()" /> <logger level="WARN" message="#[string:Price from shop 2: #[payload]]" /> </flow> </mule>
In this example, I’m using the VM transport to simulate communication with external systems, and a random price is returned from the different shops.
Notice that the VM endpoints can be replaced with whatever protocol you desire, and the number of external sources can be as many as you like, just be sure to tell Mule how many results it should wait for by using the property MULE_CORRELATION_GROUP_SIZE.
By opening a browser and entering http://localhost:81/lowestprice, a number is returned. That's not very useful, but that’s not the point of the example.
When looking at the log the following can be seen:
10:23:10,669 WARN LoggerMessageProcessor,[forkandjoin-0.0.1-SNAPSHOT].shop1Flow.stage1.02:269 – Price from shop 1: 362
10:23:10,679 WARN LoggerMessageProcessor,[forkandjoin-0.0.1-SNAPSHOT].shop2Flow.stage1.02:269 – Price from shop 2: 109
10:23:10,729 WARN LoggerMessageProcessor,[forkandjoin-0.0.1-SNAPSHOT].connector.http.mule.default.receiver.02:269 – Lowest price: 109
That tells us that the request has been sent to both shop1 and shop2. In this case, the lowest price was received from shop2.
It's a very simple example, but I hope you get the idea on how the fork and join pattern can be used and how to use it in more advanced scenarios.
Published at DZone with permission of Tomas Blohm , DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.