Message sequencing with Mule and JMS Message Groups
Join the DZone community and get the full member experience.
Join For FreeThe first solution coming to mind is to use exclusive consumer, but I’ve come to realise that there are some disadvantages with exclusive consumer such as having active consumers doing nothing (I don’t like the idea of having workers ready to dig in but they are not allowed to). Another thing is that, with exclusive consumer, a message might block another message even though they really don’t have anything to do with each other. For example, let’s say we’re dealing with car orders, then you probably don’t want a order for a Volvo, that doesn’t go through, to block a order for a Saab (I’m Swedish).
An alternative would be to use JMS message groups. With message groups you can guarantee ordering of related messages, you get load balancing across multiple consumers and auto failover to other consumers if JVM goes down. What that means is that all messages in a group will be delivered to the same consumer as long as it’s available but will switch to another one if that one goes away. Different groups will also be dispatched on different consumers depending on load, illustrated by image below:
So, how to implement message groups?
It’s very simple, the only thing you have to do is to set the JMSXGroupID property on the client producer (outbound endpoint) before sending it off. By default, all messages will be delivered in the same order as they arrived but it’s also possible to set the JMSXGroupSec property to control in which order different messages should be delivered.
To illustrate this with Mule and ActiveMQ I will show you a config that read orders from a xml file. Two orders for Volvo and one for Saab, like:
<?xml version='1.0' encoding='UTF-8'?> <orders> <order> <type>Volvo</type> <description>Volvo V70</description> </order> <order> <type>Volvo</type> <description>Volvo C30</description> </order> <order> <type>Saab</type> <description>Saab 9-5</description> </order> </orders>
I will use type as message group id. I will also use a retry policy
and provoke an exception for the first Volvo order just to show that the
Volvo V70 will be send X times before the Volvo C30 gets consumed.
Meanwhile the Saab order will slide through since it’s not part of the
same group.
Config:
<?xml version="1.0" encoding="UTF-8"?> <mule xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:file="http://www.mulesoft.org/schema/mule/file" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:jms="http://www.mulesoft.org/schema/mule/jms" xmlns:test="http://www.mulesoft.org/schema/mule/test" xmlns:script="http://www.mulesoft.org/schema/mule/scripting" xsi:schemaLocation=" http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.2/mule.xsd http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/3.2/mule-file.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/3.2/mule-jms.xsd http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/3.2/mule-test.xsd http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/3.2/mule-scripting.xsd "> <beans xmlns="http://www.springframework.org/schema/beans"> <!-- Redelivery Policy --> <bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> <property name="initialRedeliveryDelay" value="1000"/> <property name="redeliveryDelay" value="1000"/> <property name="useExponentialBackOff" value="false"/> <property name="backOffMultiplier" value="2"/> </bean> <!-- ActiveMQ Connection factory --> <bean id="amqFactory" class="org.apache.activemq.ActiveMQConnectionFactory" lazy-init="true"> <property name="brokerURL" value="vm://localhost:61616" /> <property name="redeliveryPolicy" ref="redeliveryPolicy"> </property> </bean> </beans> <jms:activemq-connector name="amqConnector" maxRedelivery="2" connectionFactory-ref="amqFactory" persistentDelivery="false" specification="1.1" > <service-overrides sessionHandler="org.mule.session.NullSessionHandler"/> </jms:activemq-connector> <file:connector name="input" pollingFrequency="100" moveToDirectory="/tmp/carorders/processed" moveToPattern="#[header:originalFilename]"/> <flow name="messageGroupsProducer"> <file:inbound-endpoint connector-ref="input" path="/tmp/carorders" /> <object-to-string-transformer/> <splitter evaluator="xpath" expression="/orders/order"/> <jms:outbound-endpoint queue="orders.car" connector-ref="amqConnector"> <message-properties-transformer scope="outbound"> <add-message-property key="JMSXGroupID" value="#[xpath://type]"/> </message-properties-transformer> <expression-transformer> <return-argument evaluator="xpath" expression="//description"/> </expression-transformer> </jms:outbound-endpoint> </flow> <flow name="messageGroupsConsumer"> <jms:inbound-endpoint queue="orders.car"> <jms:transaction action="ALWAYS_BEGIN" /> </jms:inbound-endpoint> <choice> <when evaluator="groovy" expression="payload == 'Volvo V70'"> <echo-component /> <test:component throwException="true"/> </when> <otherwise> <echo-component /> </otherwise> </choice> <default-exception-strategy> <commit-transaction exception-pattern="org.mule.transport.jms.redelivery.MessageRedeliveredException"/> </default-exception-strategy> </flow> </mule>
If you take a look at the log after processing the file it will show
that the Volvo V70 order will be consumed 3 times before it’s committed,
and not until then the Volvo C30 order will be consumed (sequence is
remained). Meanwhile the Saab 9-5 order has been consumed as expected
since it’s not part of the same group.
Log output (stripped from the provoked exception trace):
******************************************************************************** * Message received in service: messageGroupsConsumer. Content is: 'Volvo V70' * ******************************************************************************** ******************************************************************************** * Message received in service: messageGroupsConsumer. Content is: 'Saab 9-5' * ******************************************************************************** ******************************************************************************** * Message received in service: messageGroupsConsumer. Content is: 'Volvo V70' * ******************************************************************************** ******************************************************************************** * Message received in service: messageGroupsConsumer. Content is: 'Volvo V70' * ******************************************************************************** ******************************************************************************** * Message received in service: messageGroupsConsumer. Content is: 'Volvo C30' * ********************************************************************************
Note: JMSXGroupID is part of the JMS spec but I’ve only evaluated this using ActiveMQ. From what I can see in documentation it should be supported by the most common platforms. Please feel free to comment if you have experience from using message groups with other platforms.
Published at DZone with permission of Tomas Blohm, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Trending
-
How to Use an Anti-Corruption Layer Pattern for Improved Microservices Communication
-
How To Use an Automatic Sequence Diagram Generator
-
A Complete Guide to Agile Software Development
-
How AI Will Change Agile Project Management
Comments