Asynchronous Message Processing with Mule
The Integration Zone is brought to you in partnership with Red Hat. Download the IDC Report: The Business Value of Red Hat Integration Products to learn more about Red hat Integration.
Processing messages asynchronously is an important technique when developing integration applications . Asynchronous applications are typically easier to scale, allow for the implementation of reliability patterns and sometimes better reflect use cases in the real world. Mule, not surprisingly, offers a wealth of opportunities to process messages asynchronously.
Setting exchange-pattern of a message source to “one-way” enables asynchronous processing for a flow. Some transports and connectors, like JMS or the VM transport, are asynchronous by default. Other transports which are inherently synchronous, like HTTP, need there exchange pattern explicitly set. Setting one-way exchange patterns on these transports allows you to simulate asynchronous behavior with protocols that would otherwise not be asynchronous. The following Gist demonstrates how to asynchronously bridge an HTTP request to JMS.
<flow name="HTTP to JMS Flow"> <http:inbound-endpoint address="http://localhost:8080/foo" exchange-pattern="one-way"/> <jms:outbound-endpoint queue="messages"/> </flow>
You can process asynchronously dispatched messages in groups by using the collection- aggregator. Message groups are defined by setting the correlationId property of a MuleMessage or by setting the MULE_CORRELATION_ID outbound header. The correlationGroupSize property of MuleMessage, or the MULE_CORRELATION_GROUP_SIZE header, define the amount of messages in a group.
The following demonstrates how the collection-aggregator can be used to asynchronously wait and collect the contents of a correlation group arriving on a VM inbound-endpoint.
<flow name="aggregate.messages"> <vm:inbound-endpoint path="foo.bar" exchange-pattern=”one-way”/> <collection-aggregator timeout="6000" failOnTimeout="false"/> <vm:outbound-endpoint path="messages.out"/> </flow>
Some message payloads, like collections or XML documents, can be split and dispatched asynchronously. Here’s some of the message splitters Mule supports:
- collection-splitter: Splits a List payload into individual messages.
- splitter: Split a message using the Mule Expression Language.
- mulexml:filter-based-splitter: Spits an XML document payload using an XPath expression.
The following Gist demonstrates splitting a java.util.List and routing to a JMS queue.
<collection-splitter/> <jms:outbound-endpoint queue="order.queue"/>
Split message payloads can be reassembled by using the message-chunk-aggregator. By default the message-chunk-aggregator will use the correlationId and correlationGroupSize propertis of the MuleMessage for reassembly. You can define an optional “correlationIdExpression” to reassemble with a different message property.
The following flow illustrates how to assemble a group of split messages back together.
<flow name="aggregate.chunks"> <vm:inbound-endpoint path="foo.bar"/> <message-chunk-aggregator/> <vm:outbound-endpint path="chunk.out"/> </flow>
Tuning Asynchronous Flows
Asynchronous processing for a flow can be tuned by defining a queued-asynchronous-processing-strategy. Multiple queued-asynchronous-processing-strategy can be defined and set using the flow’s “processingStrategy” attribute. The following illustrates how to configure a flow to use up to 500 threads to asynchronously process messages arriving a VM inbound- endpoint.
<queued-asynchronous-processing-strategy name="allow500Threads" maxThreads="500"/> <flow name="acceptOrders" processingStrategy="allow500Threads"> <vm:inbound-endpoint path="acceptOrders" exchange-pattern="one-way"/> <vm:outbound-endpoint path="commonProcessing" exchange-pattern="one-way"/> </flow>
Asynchronous message handling is one of the keys to using Mule effectively. Hopefully this post illustrated some of Mule’s features that make dispatching, sending and tuning asynchronous message flows easy.