How to Use JMS ActiveMQ With Mule 4: Part 3
In this blog, we will see how to use the Mulesoft JMS connector to perform different JMS operations on topics instead of queues.
Join the DZone community and get the full member experience.
Join For FreeThis is the third part of the JMS ActiveMQ with Mule 4 series. You can read the previous parts here: Part 1, Part 2. In this blog, we will learn about JMS topics and how to use them with Mulesoft.
JMS topics are useful to implement a pub/sub model where the same message is broadcasted to multiple subscribers. In simple words, A message published to a topic can be consumed by any number of subscribers.
Let's understand how Mule JMS operations work in the case of a JMS topic.
1. Publish
To Publish a message into the JMS topic we will create a simple flow as shown in the below image. Here we are using an HTTP listener, a logger that displays the incoming message, a JMS Publish operation that will publish the message into the JMS topic (testTopic1), and in the end, a logger to print that the given message is published to ActiveMQ.
We can see in the below image of the JMS console that testTopic1 has 0 messages.
This is the request that we are going to send from Postman.
In the Mule debugger, we can see the message triggered from Postman.
On completing the execution of the flow, we will have a look at the ActiveMQ console to confirm whether the published message is present there or not.
We can see 1 message is present in testTopic1.
INFO 2021-07-14 21:05:08,938 [[MuleRuntime].uber.01: [jms-demo].publish-Flow.CPU_LITE @1d355755] [processor: publish-Flow/processors/0; event: 7728bbc0-b4c4-11eb-82b4-c8b29becab99] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: " Message received is " { "msg":"This is a sample message for topic" } INFO 2021-07-14 21:08:54,923 [[MuleRuntime].uber.01: [jms-demo].publish-Flow.CPU_LITE @1d355755] [processor: publish-Flow/processors/2; event: 7728bbc0-b4c4-11eb-82b4-c8b29becab99] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: "Message Published to JMS Topic" |
2. Consume
To consume a message from a topic we are going to create a Mule flow as shown below. Here we have one HTTP Listener, a JMS Consume Operation, and a logger. Configuration of JMS consume operation is also shown in the image, here we are providing Destination which is the name of the topic from which we want to consume/receive the message.
We can set the configuration as above to consume from the JMS Topic, but the message would only be received if the consume operation is looking for the message when JMS TOPIC publishes the message.
In simpler words, the consumer must remain continuously active to receive messages.
For a demonstration of this, we first trigger the endpoint of consume-Flow. And then we publish a message into the Topic by hitting the HTTP endpoint of publish-Flow.
INFO 2021-07-14 21:15:13,323 [[MuleRuntime].uber.04: [jms-demo].consume-Flow.CPU_LITE @76f9c23f] [processor: consume-Flow/processors/0; event: e9f55b90-c252-11eb-b4c2-c8b29becab99] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Flowstarted INFO 2021-07-14 21:15:39,580 [[MuleRuntime].uber.02: [jms-demo].publish-Flow.CPU_LITE @4fe978b0] [processor: publish-Flow/processors/0; event: f99af140-c252-11eb-b4c2-c8b29becab99] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: " Message received is " { "Message":"This is a sample message" } INFO 2021-07-14 21:15:39,590 [[MuleRuntime].uber.03: [jms-demo].publish-Flow.CPU_LITE @4fe978b0] [processor: publish-Flow/processors/2; event: f99af140-c252-11eb-b4c2-c8b29becab99] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: "Message Published to JMS Topic" INFO 2021-07-14 21:15:58,159 [[MuleRuntime].uber.04: [jms-demo].consume-Flow.CPU_LITE @76f9c23f] [processor: consume-Flow/processors/2; event: e9f55b90-c252-11eb-b4c2-c8b29becab99] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Message received from Topic is { "Message":"This is a sample message" } |
In the above demo, the consume operation was already looking for a new message (as it was triggered first) when the message gets published into the topic using publish flow.
We will get JMS:TIMEOUT error if the JMS consume operation doesn’t receive the message from the topic in the given time.
INFO 2021-07-14 21:18:54,441 [[MuleRuntime].uber.05: [jms-demo].consume-Flow.CPU_LITE @770a444f] [processor: consume-Flow/processors/0; event: 01c507d0-d3a0-11eb-a267-c8b29becab99] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Flowstarted ERROR 2021-07-14 21:18:58,600 [[MuleRuntime].uber.05: [jms-demo].consume-Flow.CPU_LITE @770a444f] [processor: consume-Flow/processors/1; event: 01c507d0-d3a0-11eb-a267-c8b29becab99] org.mule.runtime.core.internal.exception.OnErrorPropagateHandler: ******************************************************************************** Message : Failed to retrieve a Message. Operation timed out after 120000 milliseconds Element : consume-Flow/processors/1 @ jms-demo:jms-demo.xml:42 (Consume) Element DSL : <jms:consume doc:name="Consume" doc:id="5b3757b1-4036-46db-b04f-02c7df96f85e" config-ref="JMS_Config" destination="testTopic1" contentType="application/json" encoding="UTF-8" maximumWait="2" maximumWaitUnit="MINUTES" ackMode="IMMEDIATE"> <jms:consumer-type> <jms:topic-consumer></jms:topic-consumer> </jms:consumer-type> </jms:consume> Error type : JMS:TIMEOUT FlowStack : at consume-Flow(consume-Flow/processors/1 @ jms-demo:jms-demo.xml:42 (Consume)) (set debug level logging or '-Dmule.verbose.exceptions=true' for everything) ******************************************************************************** |
3. On New Message
For getting the latest or newly published message from the topic we can set the On New message configuration as shown in the below image.
We are using 2 flows: jms-demoFlow1 and jms-demoFlow2. Both are pointing to the same topic and have the same configuration of the On New Message listener.
When we trigger the publish flow from the Postman, the message gets published into the topic testTopic1 and both jms-demoFlow1 and jms-demoFlow2 will consume the same message.
We can see the details in the below Mule logs:
INFO 2021-05-14 21:38:47,211 [[MuleRuntime].uber.04: [jms-demo].publish-Flow.CPU_LITE @69d1501d] [processor: publish-Flow/processors/0; event: 0b08ee10-c257-11eb-a766-c8b29becab99] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: " Message received is " { "Message":"This is a sample message" } INFO 2021-05-14 21:38:47,360 [[MuleRuntime].uber.07: [jms-demo].publish-Flow.CPU_LITE @69d1501d] [processor: publish-Flow/processors/2; event: 0b08ee10-c257-11eb-a766-c8b29becab99] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: "Message Published to JMS Topic" INFO 2021-05-14 21:38:47,419 [[MuleRuntime].uber.03: [jms-demo].jms-demoFlow2.CPU_LITE @2fe1c3f0] [processor: jms-demoFlow2/processors/0; event: 0b08ee10-c257-11eb-a766-c8b29becab99] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Message Received from JMS by SecondFlow : { "Message":"This is a sample message" } INFO 2021-05-14 21:38:47,419 [[MuleRuntime].uber.04: [jms-demo].jms-demoFlow1.CPU_LITE @7665a622] [processor: jms-demoFlow1/processors/0; event: 0b08ee10-c257-11eb-a766-c8b29becab99] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Message Received from JMS by FirstFlow : { "Message":"This is a sample message" } |
Conclusion
This is how we can use the Mulesoft JMS connector to publish, consume and listen to the messages from the JMS topic.
Opinions expressed by DZone contributors are their own.
Comments