Mule ESB offers an amazing out-of-the-box integration which easily integrates with ActiveMQ. There are a plethora of examples on the internet that will show how to use ActiveMQ with Mule. But here we will explore how to use a filter with ActiveMQ and Mule that will help us picking up the right JMS messages we need.
Consider an environment where there is a JMS queue and there are multiple consumers listening to that queue; how can a Mule flow filter and pick up a particular JMS message of choice from all the messages available in the queue?
Let’s assume we have a JMS queue named MyQueue which many consumers listening to it:
We can see the queue contains few messages in it. Now all these messages can be consumed by any of the consumers out there which are listening to this queue. But how can our JMS consumer in our Mule flow select particular messages and consume only a selection of messages out of it?
We can filter JMS messages based on JMS properties like JMS priority, JMS Type and Headers, etc. We will first look into how we can filter JMS messages based on JMS priority and then with Header.
To consume a JMS message based on JMS priority, we need to insert the message into the queue with that priority. Let’s consider first that we will only consume JMS messages whose priority will be, say, 9.
So, let’s first insert a couple of JMS messages in the queue MyQueue with priority 9.
So, now we have our queue with total 5 messages out of which 2 messages have the priority 9 and remaining to have default priority of 0.
With this in place, we will now consume the messages in the queue MyQueue with 2 consumers listening to the same queue; one that will consume only those messages with priority=9 and the other will consume the remaining messages with default priority.
We will now create a flow that will filter and consume messages from the JMS queue based on priority. That means we will be consuming only those messages from the queue MyQueue whose JMS priority is 9. We will be using jms:selector here to filter JMS messages as follows:
<flow name="JMSReceiver" doc:name="JMSReceiver"> <jms:inbound-endpoint <a href="https://www.mulesoft.com/exchange#!/?types=connector"target="_blank" title="Cloud Connectors" >connector-</a>ref="Active_MQ" doc:name="JMS" exchange-pattern="request-response" address="jms://tcp:MyQueue"> <jms:selector expression="JMSPriority = 9" /> </jms:inbound-endpoint> <logger level="INFO" message="Received Payload :-#[message.payload]" doc:name="Logger"/> </flow>
So, now if we start our flow, we will find that our JMS consumer from our Mule flow has selectively consumed the messages from the queue MyQueuewhose JMS priority is 9 leaving other messages in that queue.
You can see here that it has consumed only those messages of priority 9 and remaining messages will be ignored.
So we can now configure JMS messages with a particular message priority and later we can consume the messages based on that priority.
Now let’s have another Mule flow, with another JMS consumer, and this time, without a filter listening to the same queue MyQueue:
<flow name="JMSReceiver2" doc:name="JMSReceiver"> <jms:inbound-endpoint connector-ref="Active_MQ" doc:name="JMS" exchange-pattern="request-response" address="jms://tcp:MyQueue"/> <logger level="INFO" message="Received Payload :-#[message.payload]" doc:name="Logger"/> </flow>
This time, we haven’t given any filter and now we will find all the other messages are consumed by the consumer leaving the queue empty:
Using a dynamic JMS selector:-
Now the question arises, can we do this dynamically? Can we select the messages by its JMS properties which are set dynamically?
Yes, we can. We use this selector option in an expression to use a dynamic JMS selector. To select dynamically we can use the select option of Mule Requester and can call it in the middle of a Mule flow, selecting and consuming the messages with dynamic selection:-
<flow name="JMSReceiverDynamic" > <http:listener config-ref="HTTP_Listener_Configuration" path="/dynamic" doc:name="HTTP"/> <set-variable variableName="priority" value="9" doc:name="Variable"/> <mulerequester:request config-ref="Mule_Requester" resource="jms://MyQueue?selector=Priority%3D'#[flowVars.priority]'" doc:name="Mule Requester" timeout="120000"/> <logger level="INFO" message="Received Payload with selecting Dynamically:-#[message.payload]" doc:name="Logger"/> </flow>
Here we have selected the messages with priority=9 dynamically from a variable with an expression. We can use values from variables, inbound properties, properties file or from a message payload directly to make it dynamic.
Pushing a message to JMS queue with JMS priority from Mule :-
Next we will see how to insert a message in our JMS queue, directly from our Mule flow, setting JMS priority of the message to 9.
Here is the Mule flow, which can directly set the message into JMS queue with a priority 9:-
<http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration"/> <jms:activemq-connector name="Active_MQ" numberOfConcurrentTransactedReceivers="20" brokerURL="tcp://localhost:61616"/> <flow name="JMSSender" > <http:listener config-ref="HTTP_Listener_Configuration" path="/jms" doc:name="HTTP"/> <set-payload value="This is a test JMS message with priority 9" doc:name="Set Payload"/> <logger message="Payload :- #[message.payload]" level="INFO" doc:name="Logger" /> <jms:outbound-endpoint queue="MyQueue" connector-ref="Active_MQ" doc:name="JMS"> <message-properties-transformer> <add-message-property key="Priority" value="9" /> </message-properties-transformer> </jms:outbound-endpoint> </flow>
Here we will set the priority in our message payload. To set priority in our payload we will be configuring it as the following:-
<message-properties-transformer> <add-message-property key="Priority" value="9"/> </message-properties-transformer>
So, if we invoke the mule flow by hitting the URL:- http://localhost:8081/jms the message will now be pushed into the queue MyQueue with priority 9 as follows:-
In a similar way we can also configure the JMS messages by setting other JMS properties like JMS header in our Mule flow as follows:
<message-properties-transformer> <add-message-property key="Header" value="MyCustom-Header" /> </message-properties-transformer>
Each message will be sent to the queue MyQueue with a header attached to it. So, if we check the property of the message in the queue, we can see our header is attached to it:
And then consume it based on the Header:
<jms:inbound-endpoint connector-ref="Active_MQ" doc:name="JMSHeader" exchange-pattern="request-response" address="jms://tcp:MyQueue"> <jms:selector expression="Header = 'MyCustom-Header'"/> </jms:inbound-endpoint>
So here messages from queue MyQueue will be consumed only if its header is ‘MyCustom-Header’ in the same way it filtered and consumed JMS messages based on priority.
Using dynamic JMS selector again:-
Again, if we want to consume it dynamic selection, we can use the expression in Mule Requester and select the message based on header while fetching the header from a variable:-
<set-variable variableName="Custom-Header" value="MyCustom-Header" doc:name="Variable"/> <mulerequester:request config-ref="Mule_Requester" resource="jms://MyQueue?selector=Header%3D'#[flowVars.Custom-Header]'" doc:name="Mule Requester" timeout="120000"/>
At this point, you should have a good idea on the way to configure JMS messages with JMS properties and consuming the messages from a Queue in a multi-consumer environment based on the properties by applying filters both statically and dynamically.
Now, you can experiment in your own way and configure JMS messages and implement the example.
Please do share your feedback and experiences in the below section for comments.
This article was written by Anirban