Aggregation with Mule – “Fork and Join Pattern”

DZone 's Guide to

Aggregation with Mule – “Fork and Join Pattern”

If you dabble in integration, you likely use a smattering of different patterns. Here's a glance at Mule aggregation with a few neat examples.

· Integration Zone ·
Free Resource

In your daily work as an integration developer you’re working with different kinds of patterns, even if you’re not aware of it. Since Mule is based on EIP (Enterprise Integration Patterns) you’re most definitely using patterns when using Mule.

One of those patterns that seems to raise a lot of questions is the “fork and join pattern.” The purpose of the fork and join pattern is to send a request to different targets, in parallel, and wait for a aggregated response from all the targets.

This is very useful when you want to merge information from different systems, perform calculation based on information from different targets or compare data from a range of data sources.

Fork and join flowchart









Flowchart describing the fork and join pattern

Since I’ve noticed from questions on the forum and from colleagues in the business that this is tricky thing to do I thought I would show you guys a simple example on how it’s done.

In my example there’s a http request coming in that should return the lowest price from different shops. In the example the request is delegated to 2 different shops, shop1 and shop2, in parallel (asynchronous) and collected for comparison to return the lowest price. This is easily done with Mule and the key feature is the request-reply router. So let’s dig in to the configuration…

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:https="http://www.mulesoft.org/schema/mule/https"
xmlns:mulexml="http://www.mulesoft.org/schema/mule/xml" xmlns:script="http://www.mulesoft.org/schema/mule/scripting"
xmlns:spring="http://www.springframework.org/schema/beans" xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
          http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.2/mule.xsd
          http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
      http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/3.2/mule-http.xsd
          http://www.mulesoft.org/schema/mule/https http://www.mulesoft.org/schema/mule/https/3.2/mule-https.xsd
      http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/3.2/mule-xml.xsd
      http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/3.2/mule-scripting.xsd 
      http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
      http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/3.2/mule-vm.xsd

  <flow name="forkAndJoinFlow">
    <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="81" path="lowestprice" />
        <wildcard-filter pattern="*favicon*" />
      <all enableCorrelation="ALWAYS">
          <vm:outbound-endpoint path="shop1"/>
          <vm:outbound-endpoint path="shop2"/>
      <vm:inbound-endpoint path="response">
              <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="2" />
          <collection-aggregator />
    <expression-transformer evaluator="groovy" expression="java.util.Collections.min(payload)" />
    <logger level="WARN" message="#[string:Lowest price: #[payload]]" />

  <flow name="shop1Flow">
    <vm:inbound-endpoint path="shop1"/>
    <expression-transformer evaluator="groovy" expression="new java.lang.Double(1000.0 * Math.random()).intValue()" />
    <logger level="WARN" message="#[string:Price from shop 1: #[payload]]" />

  <flow name="shop2Flow">
    <vm:inbound-endpoint path="shop2" />
    <expression-transformer evaluator="groovy" expression="new java.lang.Double(1000.0 * Math.random()).intValue()" />
    <logger level="WARN" message="#[string:Price from shop 2: #[payload]]" />

In this example I’m using the vm transport to simulate communication with external systems and a random price is returned from the different shops.

Notice that the vm endpoints could be replaced with whatever protocol you desire and the number of external sources could be as many as you like, just be sure to tell Mule how many results to wait for by using the property MULE_CORRELATION_GROUP_SIZE.

So by opening a browser and enter http://localhost:81/lowestprice a number is returned, not very useful but that’s not the point of the example.

When looking at the log the following can be seen:

10:23:10,669  WARN LoggerMessageProcessor,
                   [forkandjoin-0.0.1-SNAPSHOT].shop1Flow.stage1.02:269 – 
                   Price from shop 1: 362
10:23:10,679  WARN LoggerMessageProcessor,
                   [forkandjoin-0.0.1-SNAPSHOT].shop2Flow.stage1.02:269 – 
                   Price from shop 2: 109
10:23:10,729  WARN LoggerMessageProcessor,
                   [forkandjoin-0.0.1-SNAPSHOT].connector.http.mule.default.receiver.02:269 – 
                   Lowest price: 109

That tells us that the request has been sent to both shop1 and shop2 and in this case the lowest price was received from shop2.

A simple example but I hope you get the idea on how fork and join pattern can be used and how to use it in more advanced scenarios.

enterprise integration patterns ,fork and join ,java ,mulesoft

Published at DZone with permission of Tomas Blohm , DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}