Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Message Throttling with Mule ESB

DZone's Guide to

Message Throttling with Mule ESB

· Integration Zone
Free Resource

Modernize your application architectures with microservices and APIs with best practices from this free virtual summit series. Brought to you in partnership with CA Technologies.

 mulesoft-logoWhile implementing the Mule ESB I ran into the following scenario:
Messages were picked up from a queue, the message was transformed, offered to a web service, the response was transformed and put on a response queue.
Not an unusual use case in the world of integration. One thing that made this one special was the (lack of) performance of the web service. It appeared as soon as we started to send messages to the services via the ESB the response time for each of the messages increased significally. Investigation showed that the web service was having serious issues when concurrent messages were sent. This is what the Mule ESB do by default. It will start of with a set of threads to process the messages as quickly as possible. Until the issue with the web service is solved I decided to add throttling functionality to my flow so I could manage the number of calls to the web service.
Anyway to add the throttling I think there are two ways: one is by adding some delay before a message is delivered. This way is described here. But although I haven’t tried this one I expect it would lead to the situation where 16 concurrent web service calls are delayed 10 seconds and then still fired all at once.
So what I was looking for was to be able to configure the number of concurrent threads that would call the web service. Luckily this has been greatly simplified in Mule3 vs Mule2 as you can read here. To mimic my situation I created the following Mule flow:

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
      xmlns:test="http://www.mulesoft.org/schema/mule/test"
      version="EE-3.4.1"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd
        http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/current/mule-test.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd">
    <vm:endpoint name="input" path="input" exchange-pattern="one-way" />
    <vm:endpoint name="ws-call" path="ws-call" exchange-pattern="request-response" />
    <vm:endpoint name="output" path="output" exchange-pattern="one-way" />
    <flow name="testFlow" processingStrategy="asynchronous">
        <inbound-endpoint ref="input" />
        <outbound-endpoint ref="ws-call" />
        <outbound-endpoint ref="output" />
    </flow>
    <flow name="wsFlow" processingStrategy="synchronous">
        <inbound-endpoint ref="ws-call" />
        <append-string-transformer message=" added to the payload" />
        <test:component waitTime="2000"/>
    </flow>
</mule>

As you can see I have done nothing special related to threads, processing etc. The test class to run this flow looks like this:

package net.pascalalma.mule;
 
import org.junit.Test;
import org.mule.DefaultMuleMessage;
import org.mule.api.MuleMessage;
import org.mule.module.client.MuleClient;
import org.mule.tck.junit4.FunctionalTestCase;
 
import java.util.Date;
 
public class ThrottleTest extends FunctionalTestCase {
 
    @Test
    public void simplePassThrough() throws Exception
    {
        MuleClient client = new MuleClient(muleContext);
        long start = new Date().getTime();
        for (int i=0;i<30;i++) {
            MuleMessage inMsg = new DefaultMuleMessage("Message "+ i,muleContext);
            client.dispatch("input", inMsg);
        }
        MuleMessage result = client.request("output",3000);
        while (result != null) {
            result = client.request("output",3000);
            long end = new Date().getTime();
            System.out.println("Message took : " + (end - start)/1000);
        }
        long end = new Date().getTime();
        System.out.println("Total Service took : " + (end - start)/1000);
    }
    @Override
    protected String getConfigResources() {
        return "mule-config.xml";
    }
}

This generates the following output:

================================================================================
= Testing: simplePassThrough =
================================================================================
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 3
Message took : 5
Message took : 5
Message took : 5
Message took : 5
Message took : 5
Message took : 5
Message took : 5
Message took : 7
Message took : 7
Message took : 7
Message took : 7
Message took : 7
Message took : 7
Total Service took : 10

As you can see a lot of messages are processed concurrently in a ‘batch’.
To have control about the number of threads you can do two things (at least). The first is to make sure there are only a certain number of threads for the flow allowed. This can only be done if the processing strategy of the flow is asynchronous. In that case you can define your own processing strategy like this:

...
  <queued-asynchronous-processing-strategy name="allow2Threads" maxThreads="2" poolExhaustedAction="RUN"/>
 
  <flow name="testFlow" processingStrategy="allow2Threads">
   ...

The rest of the config is the same as before. If we run this we get the following output (for 10 messages):

================================================================================
= Testing: simplePassThrough =
================================================================================
Message took : 3
Message took : 3
Message took : 5
Message took : 5
Message took : 7
Message took : 9
Message took : 9
Message took : 11
Message took : 11
Message took : 13
Total Service took : 16

Process finished with exit code 0

As we can see there are now max 2 concurrent messages.
In case we have to have a synchronous flow (when using transactions for instance) we cannot set maxthreads on the processing strategy. But we can set it on the connector as I show here:

...
  <vm:connector name="myConnector">
    <receiver-threading-profile doThreading="true" maxThreadsActive="2" poolExhaustedAction="RUN" />
  </vm:connector>
  <flow name="testFlow" processingStrategy="synchronous">
...

If we run this (for 10 messages) we get the following output:

================================================================================
= Testing: simplePassThrough =
================================================================================
Message took : 2
Message took : 2
Message took : 4
Message took : 4
Message took : 6
Message took : 6
Message took : 9
Message took : 9
Message took : 11
Message took : 11
Total Service took : 14
We get a similar result as the previous case but now the whole flow is processed synchronously.
With this test case you are able to test it your self to see what best fits your situation. You can also play with the ‘poolExhaustedAction’ and other attributes.

The Integration Zone is proudly sponsored by CA Technologies. Learn from expert microservices and API presentations at the Modernizing Application Architectures Virtual Summit Series.

Topics:

Published at DZone with permission of Pascal Alma. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}