Over a million developers have joined DZone.

Simple and Fast Parallel Processing (Fork-Join Pattern) for Mule

· Integration Zone

Learn how API management supports better integration in Achieving Enterprise Agility with Microservices and API Management, brought to you in partnership with 3scale

During my time working as a consultant, I have learned that one of the most popular patterns that clients ask for is the fork-join pattern. Basically the idea is that in the middle of a Mule flow, we need to do some stuff in parallel, wait for all paths to finish execution, and then continue.

Why would you need fork-join pattern? There are at least a hundred and one reasons, but let me give you a small example. Imagine you have a flow where you are required to do some lookups online, perhaps call 2 different external web-services. Since external web-services tend to be on the slow side when compared to invoking internal services, you might decide to do these lookups in parallel to reduce the response times. Once you collect the lookup data from both services, you carry on with your flow. Basically here we are looking for a reduction in response times.

A very good explanation of how this can currently be achieved in Mule can be found here.

This is all well and good, but I’m of the opinion that this is way too complicated to implement for a simple pattern. And complication usually also means worse performance, higher implementation costs and higher maintenance costs. I knew I had to do something, and I did.

First I created a simple flow with the fork-join pattern as you would normally do in Mule, and stress tested that. I discovered that on average, just to do the fork join, Mule was taking about 100ms under light load on a Mac with dual core HyperThreaded i7 processor. In my books this looked like a bit on the high side.

Then I decided to implement a custom message processor (all the code is available in the GIST at the end of this blog). This message processor is quite simple. As a configuration, it accepts a list of message processors. The idea is that each of these message processors will receive a copy of the current message, and then they are executed in parallel. Threads for execution are taken from a thread pool which is configurable too. The result of the message processor is a MuleMessageCollection, which contains a list of MuleMessages with the response from each execution, in the same order as configured. The following GIST shows an example of how you would configure this message processor.

<flow name="parallelWsLookupFlow" doc:name="parallelWsLookupFlow">
	<enricher doc:name="Parallel Lookups">
		<processor-chain>
	        	<custom-processor class="com.ricston.processor.ParallelMessageProcessor" doc:name="ParallelMessageProcessor">
	        		<spring:property name="processors">
	        			<spring:list>
	        				<spring:ref bean="lookupWs1"/>
	        				<spring:ref bean="lookupWs2"/>
	        			</spring:list>
	        		</spring:property>
	        		<spring:property name="maxActive" value="100"/>
	        	</custom-processor>
	        	<combine-collections-transformer />
		</processor-chain>
	    <enrich target="#[flowVars.lookup0]" source="#[message.payload[0]]" />
	    <enrich target="#[flowVars.lookup1]" source="#[message.payload[1]]" />
	</enricher>
</flow>
    
<flow name="lookupWs1" doc:name="lookupWs1">
	<cxf:jaxws-client 
		clientClass="${lookupWs1.clientClass}" 
		operation="${lookupWs1.operation}" port="${lookupWs1.port}" >
	</cxf:jaxws-client>
	<http:outbound-endpoint exchange-pattern="request-response" 
		host="${lookupWs1.host}" port="${lookupWs1.port}" path="${lookupWs1.path}"
		method="POST" responseTimeout="${lookupWs1.responsetimeout}" doc:name="HTTP" />
</flow>

<flow name="lookupWs2" doc:name="lookupWs2">
	<cxf:jaxws-client 
		clientClass="${lookupWs2.clientClass}" 
		operation="${lookupWs2.operation}" port="${lookupWs2.port}" >
	</cxf:jaxws-client>
	<http:outbound-endpoint exchange-pattern="request-response" 
		host="${lookupWs2.host}" port="${lookupWs2.port}" path="${lookupWs2.path}"
		method="POST" responseTimeout="${lookupWs2.responsetimeout}" doc:name="HTTP" />
</flow>

Here we are invoking two web services in parallel, storing the result of each lookup in a flow variable (with the help of an enricher). As you can see, the ParallelMessageProcessor accepts a list of MessageProcessors, in this case they are called lookupWs1 and lookupWs2. These message processors can be any kind of message processors, but here we are calling other flows, which internally invoke the external web services.

Once we get the response from each individual web service, we are using the combine-collections-transformer to transform the MuleMessageCollection into a MuleMessage, and then take each individual result using MEL. The maximum of active threads on the internal thread pool is also configurable.

The following GIST is the code for this message processor. As you can see it’s very simple and easy to follow. The only restriction is that we need Java 1.6 (or above) as we use the ThreadPoolExecutor to execute the message processors in parallel. Having said that, this is not really an issues as it has became a requirement for the later versions of Mule.

The process() method is where all the magic happens. For each message processor configured, we create a ProcessorRunner which clones the current event and uses it to execute the message processor. Once the ProcessorRunner is created, we use the invokeAll() available on the ThreadPoolExecutor.

The invokeAll() returns immediately with a list of Future objects, used to get the response, and of course, wait where necessary.

Using the exact same tests as before, the new ParallelMessageProcessor happily invoked the web services in parallel, and the fork-join pattern was taking less than 5ms under the same load.

Hope you find this ParallelMessageProcessor as useful as I did!

package com.ricston.processor;

/**
 * imports
 * /

/**
 * ParallelMessageProcessor is a message processor that will execute the inner message 
 * processors in parallel. Before returning, it will wait for all the processors to finish
 * execution. This implements the fork-join pattern.
 * 
 * The result will be a MuleMessageCollection with an entry for each inner 
 * message processor, in the same order they were listed in processors. 
 * In simple terms, this is very similar to a what a parallel ALL could look like.
 * 
 * The message that comes in is replicated and routed to each message processor. However
 * all the inner message processors are executed using a thread pool. The thread pool is
 * configurable.
 * 
 * Usages: need to do multiple lookups in parallel, like web service calls, jdbc calls...
 */
public class ParallelMessageProcessor implements MessageProcessor, MuleContextAware, Initialisable
{
    private Log logger = LogFactory.getLog(getClass());
    /**
     * List of MessageProcessors to be executed in parallel 
     */
    private List<MessageProcessor> processors;
    
    /**
     * MuleContext used to create events, messages ...
     */
    private MuleContext muleContext;
    
    /**
     * ThreadPoolExecutor to be used to run the procesors in parallel
     */
    protected ThreadPoolExecutor threadPool;
    
    /**
     * Max threads active in the pool
     */
    private int maxActive = 100;
    
    /**
     * The length of the queue used to queue the work for the thread pool
     */
    private int queueLength = 1000000;
    
    
    public ParallelMessageProcessor()
    {
        super();
    }
    
    /**
     * Initialse the thread pool
     */
    @Override
    public void initialise() throws InitialisationException
    {
        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(queueLength);
        threadPool = new ThreadPoolExecutor(maxActive, maxActive, 10000, TimeUnit.MILLISECONDS, queue);
    }

    /**
     * Send the message to all processors to be processed in parallel using the pool. Also
     * wait for all processors to finish processing, and return the result in a 
     * MuleMessageCollection, in the same order as the processors were configured.
     */
    @Override
    public MuleEvent process(MuleEvent event) throws MuleException
    {
        //create list of ProcessorRunner, each one will execute a message processor
        int noOfProcessors = processors.size();
        List<ProcessorRunner> threads = new ArrayList<ProcessorRunner>(noOfProcessors);
        
        //create a MuleMessageCollection, to be used to return the results
        MuleMessageCollection resultMessages = new DefaultMessageCollection(muleContext);

        try
        {
            //create a ProcessorRunner for each message processor, initialising it 
            //with the message processor to execute, and the current MuleEvent
            for (MessageProcessor mp : processors)
            {
                ProcessorRunner t = new ProcessorRunner(mp, event);
                threads.add(t);
            }
            
            logDebugStart();
            
            //invoke the message processors using the thread pool
            List<Future<MuleEvent>> future = threadPool.invokeAll(threads);
            
            //collect the results into a MuleMessageCollection, wait if necessary
            for (Future<MuleEvent> f : future)
            {
                resultMessages.addMessage(f.get().getMessage());
            }
            
            logDebugEnd(resultMessages);
        }
        catch (InterruptedException e)
        {
            throw new MessagingException(event, e);
        }
        catch (Exception e)
        {
            throw new MessagingException(event, e);
        }

        //return the MuleMessageCollection as a result 
        return new DefaultMuleEvent(resultMessages, event);
    }
    
    protected void logDebugStart()
    {
        if (logger.isDebugEnabled())
        {
            
            logger.debug("Firing parallel requests");
        }
    }
    
    protected void logDebugEnd(MuleMessageCollection resultMessages)
    {
        if (logger.isDebugEnabled())
        {
            logger.debug("Collected " + resultMessages.getMessagesAsArray().length + " messages");
        }
    }

    @Override
    public void setMuleContext(MuleContext context)
    {
        this.muleContext = context;
    }

    /**
     * getters and setters here
     * /

}
package com.ricston.processor;

/**
 * imports
 * /

/**
 * A simple class implementing Callable which will execute a MessageProcessor
 * given a MuleEvent as input. the Callable and MuleEvent are configured on 
 * creation of this class, or usings the setter methods.
 */
public class ProcessorRunner implements Callable<MuleEvent>
{
    /**
     * The MessageProcessor to be executed
     */
    private MessageProcessor processor;
    
    /**
     * The MuleEvent to be passed on to the MessageProcessor
     */
    private MuleEvent event;
    
    public ProcessorRunner()
    {
        super();
    }
    
    /**
     * Initialise the class with the given parameters.
     * 
     * @param processor The MessageProcessor to be executed
     * @param event The MuleEvent to be passed to the processor
     */
    public ProcessorRunner(MessageProcessor processor, MuleEvent event)
    {
        this();
        this.processor = processor;
        this.event = event;
    }
    
    /**
     * Create a clone of the MuleEvent passed in (no event can be shared with
     * multiple threads) and execute the processor using the cloned event.
     */
    @Override
    public MuleEvent call() throws Exception
    {
        try
        {
            MuleEvent clonedEvent = DefaultMuleEvent.copy(event);
            MuleEvent result = this.processor.process(clonedEvent);
            
            return result;
        }
        catch (MuleException e)
        {
            throw new RuntimeException(e);
        }
    }
    
    /**
     * getters and setters
     * /

}


Unleash the power of your APIs with future-proof API management - Create your account and start your free trial today, brought to you in partnership with 3scale.

Topics:

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}