Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Message Throttling with Mule ESB

DZone's Guide to

Message Throttling with Mule ESB

· Integration Zone ·
Free Resource

SnapLogic is the leading self-service enterprise-grade integration platform. Download the 2018 GartnerMagic Quadrant for Enterprise iPaaS or play around on the platform, risk free, for 30 days.

 mulesoft-logoWhile implementing the Mule ESB I ran into the following scenario:
Messages were picked up from a queue, the message was transformed, offered to a web service, the response was transformed and put on a response queue.
Not an unusual use case in the world of integration. One thing that made this one special was the (lack of) performance of the web service. It appeared as soon as we started to send messages to the services via the ESB the response time for each of the messages increased significally. Investigation showed that the web service was having serious issues when concurrent messages were sent. This is what the Mule ESB do by default. It will start of with a set of threads to process the messages as quickly as possible. Until the issue with the web service is solved I decided to add throttling functionality to my flow so I could manage the number of calls to the web service.
Anyway to add the throttling I think there are two ways: one is by adding some delay before a message is delivered. This way is described here. But although I haven’t tried this one I expect it would lead to the situation where 16 concurrent web service calls are delayed 10 seconds and then still fired all at once.
So what I was looking for was to be able to configure the number of concurrent threads that would call the web service. Luckily this has been greatly simplified in Mule3 vs Mule2 as you can read here. To mimic my situation I created the following Mule flow:

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
      xmlns:test="http://www.mulesoft.org/schema/mule/test"
      version="EE-3.4.1"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd
        http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/current/mule-test.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd">
    <vm:endpoint name="input" path="input" exchange-pattern="one-way" />
    <vm:endpoint name="ws-call" path="ws-call" exchange-pattern="request-response" />
    <vm:endpoint name="output" path="output" exchange-pattern="one-way" />
    <flow name="testFlow" processingStrategy="asynchronous">
        <inbound-endpoint ref="input" />
        <outbound-endpoint ref="ws-call" />
        <outbound-endpoint ref="output" />
    </flow>
    <flow name="wsFlow" processingStrategy="synchronous">
        <inbound-endpoint ref="ws-call" />
        <append-string-transformer message=" added to the payload" />
        <test:component waitTime="2000"/>
    </flow>
</mule>

As you can see I have done nothing special related to threads, processing etc. The test class to run this flow looks like this:

package net.pascalalma.mule;
 
import org.junit.Test;
import org.mule.DefaultMuleMessage;
import org.mule.api.MuleMessage;
import org.mule.module.client.MuleClient;
import org.mule.tck.junit4.FunctionalTestCase;
 
import java.util.Date;
 
public class ThrottleTest extends FunctionalTestCase {
 
    @Test
    public void simplePassThrough() throws Exception
    {
        MuleClient client = new MuleClient(muleContext);
        long start = new Date().getTime();
        for (int i=0;i<30;i++) {
            MuleMessage inMsg = new DefaultMuleMessage("Message "+ i,muleContext);
            client.dispatch("input", inMsg);
        }
        MuleMessage result = client.request("output",3000);
        while (result != null) {
            result = client.request("output",3000);
            long end = new Date().getTime();
            System.out.println("Message took : " + (end - start)/1000);
        }
        long end = new Date().getTime();
        System.out.println("Total Service took : " + (end - start)/1000);
    }
    @Override
    protected String getConfigResources() {
        return "mule-config.xml";
    }
}

This generates the following output:

================================================================================
= Testing: simplePassThrough =
================================================================================
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 5
Message took : 5
Message took : 5
Message took : 5
Message took : 5
Message took : 5
Message took : 5
Message took : 7
Message took : 7
Message took : 7
Message took : 7
Message took : 7
Message took : 7
Total Service took : 10

As you can see a lot of messages are processed concurrently in a ‘batch’.
To have control about the number of threads you can do two things (at least). The first is to make sure there are only a certain number of threads for the flow allowed. This can only be done if the processing strategy of the flow is asynchronous. In that case you can define your own processing strategy like this:

...
  <queued-asynchronous-processing-strategy name="allow2Threads" maxThreads="2" poolExhaustedAction="RUN"/>
 
  <flow name="testFlow" processingStrategy="allow2Threads">
   ...

The rest of the config is the same as before. If we run this we get the following output (for 10 messages):

================================================================================
= Testing: simplePassThrough =
================================================================================
Message took : 3
Message took : 3
Message took : 5
Message took : 5
Message took : 7
Message took : 9
Message took : 9
Message took : 11
Message took : 11
Message took : 13
Total Service took : 16

Process finished with exit code 0

As we can see there are now max 2 concurrent messages.
In case we have to have a synchronous flow (when using transactions for instance) we cannot set maxthreads on the processing strategy. But we can set it on the connector as I show here:

...
  <vm:connector name="myConnector">
    <receiver-threading-profile doThreading="true" maxThreadsActive="2" poolExhaustedAction="RUN" />
  </vm:connector>
  <flow name="testFlow" processingStrategy="synchronous">
...

If we run this (for 10 messages) we get the following output:

================================================================================
= Testing: simplePassThrough =
================================================================================
Message took : 2
Message took : 2
Message took : 4
Message took : 4
Message took : 6
Message took : 6
Message took : 9
Message took : 9
Message took : 11
Message took : 11
Total Service took : 14
We get a similar result as the previous case but now the whole flow is processed synchronously.
With this test case you are able to test it your self to see what best fits your situation. You can also play with the ‘poolExhaustedAction’ and other attributes.

With SnapLogic’s integration platform you can save millions of dollars, increase integrator productivity by 5X, and reduce integration time to value by 90%. Sign up for our risk-free 30-day trial!

Topics:

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}