DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Integrating Pub/Sub With MuleSoft
  • MuleSoft Integration With RabbitMQ
  • How to Use IBM App Connect's ‘Mapping Assist’ to Automatically Map Your Integration Flows
  • Building A Simple AI Application in 2023 for Fun and Profit

Trending

  • Logging What AI Agents Do in Salesforce: A Simple One-Object Audit Framework
  • The Hidden Cost of AI Tokens: Engineering Patterns for 10x Resource Efficiency
  • Building a High-Throughput Distributed Sequence Generator Using the Hi-Lo Algorithm
  • The Missing `bandit` for AI Agents: How I Built a Static Analyzer for Prompt Injection
  1. DZone
  2. Data Engineering
  3. Data
  4. How to Use Mulesoft VM Connector

How to Use Mulesoft VM Connector

In this article, you will develop an understanding of how to use a VM connector to implement a VM queue in Mulesoft through a quick tutorial.

By 
Anand Joshi user avatar
Anand Joshi
·
Updated Jan. 25, 21 · Tutorial
Likes (8)
Comment
Save
Tweet
Share
38.9K Views

Join the DZone community and get the full member experience.

Join For Free

VM Connector is used for intra-app (within the app) and inter-app communication through either Transient or Persistent asynchronous queues.               

  • Transient queues: This type of queue is volatile, meaning the data would be lost if the system crashed or restarted. Transient queues are faster than Persistent queues.
  • Persistent queues: This type of queue is more reliable; data would be persisted if the system crashed, failed, or restarted. These queues are slower than Transient queues.

VM Connector is mainly used for the following: 

  • To transfer the message from one flow to another.
  • To distribute workload (load balancing) across a cluster.
  • To communicate with different apps running in the same Mule domain.
  • To perform simple queueing.

VM Connector provides the following operations:

  • Publish: To publish the data into the queue.
  • Consume: To pull data from the queue. If there is no message in the queue, then the consume operation waits for up to the configured queueTimeout value, after which the VM:QUEUE_TIMEOUT error is thrown.
  • Listener: To listen to the queue and pick the message.
  • Publish Consume: To publish the message and then wait for a response from the consuming operation. This operation publishes the content into the queue and waits for a response up to the configured queueTimeout value. If the response is not received in the given time then the VM:QUEUE_TIMEOUT error is thrown.

Let's try to understand these operations and how VM queues work with a demo.

1. Publish the Message Into the VM Queue and Listen to the Message From the Queue

We will create Mule flows as shown in the below image. Publish-flow is a flow for publishing the message into the VM queue. Here, we have an HTTP Listener, we receive the message by triggering the HTTP endpoint, then, in the logger, the received message gets printed. Next, we have a transforming message where we are adding the current time in an additional field named "Time." Next, we have the VM publish operation, which publishes the message to the queue, and, lastly, we print the published message in a logger.

In the Persistent-Queue-Flow, we have a VM listener source that listens for the VM queue and picks the available message from the queue. Next, we have a logger that displays the message picked from by the listener source.

Logger displays the message picked from by the listener source screenshot.

Let's have a look into the configuration of VM Queue when we drag and drop the VM publish component into a Mule flow from the Mule Palette to create the flow. Initially, it would give an error, as we don’t have a configuration set up. We will create the VM configuration by clicking on the Plus (+) icon on the 'Connector configuration,' as highlighted in the below image.

Clicking on the Plus (+) icon on connector Configuration screenshot.

In the VM Connector configuration, inside the General section, for the Queue field, we will select Edit inline from the drop-down. And then, we can click on the plus (+) as shown in the below image.

Edit  inline and click on the plus (+) screenshot.

The moment we click, a pop-up, as shown in the below image, will appear for the Queue where we can see three fields: Queue name, Queue type (Transient or Persistent), and Max outstanding messages. We enter a value for these fields and click on finish.

Pop-up screenshot.

We can see our newly created Pqueue1 is visible in the Queue section, as shown in the below image.

Newly created Pqueue1 is visible in the Queue section screenshot.

If we want to create more queues, we will again click on the Plus (+) icon and add more queues. We can see the queues in the queue section of the VM Configuration.

Once the Connector configuration is done, we can see the queue name is visible as a drop-down inside Publish operation connector.  We will select the queue Pqueue1.Pqueue1 screenshot.

In the transform message component, we have written the below dataweave to add a timestamp:

Java
 




x
13
9


 
1
%dw 2.0
2
output application/json
3

          
4
---
5
{
6
       Message : payload.Message,
7
       Time : now()
8
}



In the Listener flow below, we set the configuration:

Configuration screenshot.

Now, let's startup the API in debug mode and try to trigger the request from Postman and see how the VM queue is working.

The request triggered by Postman is shown below:

Triggered request by the postman screenshot.

We can see the same message in our Publish flow; notice that we have only one field named "Message" in the payload.

One field named message screenshot.

After executing the Transform Message component, we have added one more field in the payload named "Time."  

Adding Time to the Playload screenshot.

Now we can publish it. Once we process the Publish and Logger components of the Publish flow, we will see the same message is being picked up by the VM listener.

Message picked up by VM listener screenshot.

We can get a clearer picture of this by looking at the Mule logs below: 

Shell
 




x
13


 
1
INFO  2021-01-09 19:40:40,451 [[MuleRuntime].uber.01: [vm-demo-app].publish-Flow.CPU_LITE @32c58043] [processor: publish-Flow/processors/0; event: 5ebbaa90-5284-11eb-808e-568d5a57601b] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Received message is {
2
       "Message" : "This is a sample message"
3
}
4

          
5
INFO  2021-01-09 19:49:09,877 [[MuleRuntime].uber.01: [vm-demo-app].publish-Flow.BLOCKING @69d02fe8] [processor: publish-Flow/processors/3; event: 5ebbaa90-5284-11eb-808e-568d5a57601b] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: "Message Published Successfully and Published Message is " {
6
   "Message": "This is a sample message",
7
   "Time": "2021-01-09T19:46:06.277+05:30"
8
} 
9

          
10
INFO  2021-01-09 19:50:33,301 [[MuleRuntime].uber.03: [vm-demo-app].Persistent-Queue-Flow.CPU_LITE @3b19e87c] [processor: Persistent-Queue-Flow/processors/0; event: 5ebbaa90-5284-11eb-808e-568d5a57601b] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: "Received message from VM listener is " {
11
   "Message": "This is a sample message",
12
   "Time": "2021-01-09T19:46:06.277+05:30"
13
}



In this way, we can send messages to the VM queue and consume the same message from the queue.

2. The Publish Consume Operation 

Upon using Publish Consume, the published message will wait for the defined time to get a response from the queue.

For the publish consume operation demo, we replace the publish component of our Publish flow with the Publish consume component. We will use the VM Listener component to receive messages from the queue.

VM Listener Component screenshot.

We are publishing the message into the queue, Pqueue1, and the defined timeout is set to 15 seconds, which means, after publishing, the message to the queue (i.e. the publish consume operation) waits for 15 seconds to receive the response. If it fails to receive anything, then it will throw a QUEUE_TIMEOUT error.

Below is the configuration of VM Listener. We can see that it has a section named "Response," with a default value of #[payload]. This is used to inform Publish Consumer that the message has successfully been picked up from the queue.

The message has been picked successfully screenshot.

Let's run the application to trigger the same sample message from Postman and see how it works: 

Shell
 




xxxxxxxxxx
1
13


 
1
INFO  2021-01-09 22:15:40,907 [[MuleRuntime].uber.09: [vm-demo-app].publish-Flow.CPU_LITE @47090f15] [processor: publish-Flow/processors/0; event: 1ba15051-529a-11eb-8433-568d5a57601b] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Received message is {
2
       "Message" : "This is a sample message"
3
}
4

          
5
INFO  2021-01-09 22:15:41,178 [[MuleRuntime].uber.01: [vm-demo-app].Persistent-Queue-Flow.CPU_LITE @388d8b85] [processor: Persistent-Queue-Flow/processors/0; event: 1ba15051-529a-11eb-8433-568d5a57601b] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: "Received message from VM listener is " {
6
   "Message": "This is a sample message",
7
   "Time": "2021-01-09T22:15:41.015+05:30"
8
}
9

          
10
INFO  2021-01-09 22:15:41,247 [[MuleRuntime].uber.09: [vm-demo-app].publish-Flow.BLOCKING @6a8f14d2] [processor: publish-Flow/processors/3; event: 1ba15051-529a-11eb-8433-568d5a57601b] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: "Message Published Successfully and Published Message is " {
11
   "Message": "This is a sample message",
12
   "Time": "2021-01-09T22:15:41.015+05:30"
13
}



If we look closely into the console logs, we can find that the first log is the message that is received from the HTTP listener for the publish-Flow. Then, we have logs for the Persistent-Queue-Flow, and it is printing the received message. In the last log, the publish consume operation in the publish-Flow is processed.

3. The Consume Operation 

For the consume operation, we will change the publish-consume component with the publish component in the Publish flow. We create one more flow named consume-flow, as shown in the image below. Here, we are using a scheduler that will run every 15 seconds, and, next to it, we have the VM consume operation where we will consume the message from the queue, and, in the logger, we are displaying the message received from the queue.

Message received by the queue screenshot.

In the consume operation, we perform the configuration as shown in the below image. We are using the same VM configuration, created as a publishing operation, and we are using Pqueue1. We have set the timeout to 20 Seconds, which means that once the flow is triggered, the consume operation will wait for 20 seconds to receive the message. If no message is received within this time frame, the VM:EMPTY_QUEUE error will be thrown. 

Error screenshot.

Note: We have removed the persistent-queue-flow, as the VM Listener is pointing to the same queue, Pqueue1.  

Now, let’s run the app and trigger a request from Postman to see how it works.

When we sent the same message from Postman, it was received by the publish flow and displayed in the logs. Then, after the addition of the time field, this message was published to the queue. In the logs, we can see that the published message had a time field.  

The same message is picked up by the consumer, which we can see in the logs. 

Here, after our defined time of 20 seconds, the consume operation will throw a VM:EMPTY_QUEUE error, as there was no other message in the queue to pick up. 

Shell
 




xxxxxxxxxx
1
32


 
1
INFO  2021-01-09 20:20:42,656 [[MuleRuntime].uber.05: [vm-demo-app].publish-Flow.CPU_LITE @33648a8f] [processor: publish-Flow/processors/0; event: 0c0c28a1-528a-11eb-b090-568d5a57601b] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Received message is {
2
       "Message" : "This is a sample message"
3
}
4

          
5
INFO  2021-01-09 20:20:42,740 [[MuleRuntime].uber.05: [vm-demo-app].publish-Flow.BLOCKING @4b4fc8df] [processor: publish-Flow/processors/3; event: 0c0c28a1-528a-11eb-b090-568d5a57601b] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: "Message Published Successfully and Published Message is " {
6
   "Message": "This is a sample message",
7
   "Time": "2021-01-09T20:20:42.714+05:30"
8
}
9

          
10
INFO  2021-01-09 20:20:42,756 [[MuleRuntime].uber.03: [vm-demo-app].consume-Flow.BLOCKING @5f53f894] [processor: consume-Flow/processors/1; event: 0994c000-528a-11eb-b090-568d5a57601b] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: "The message received from the vm queues is " {
11
   "Message": "This is a sample message",
12
   "Time": "2021-01-09T20:20:42.714+05:30"
13
}
14

          
15
ERROR 2021-01-09 20:21:13,482 [[MuleRuntime].uber.05: [vm-demo-app].consume-Flow.BLOCKING @5f53f894] [processor: ; event: 1284a720-528a-11eb-b090-568d5a57601b] org.mule.runtime.core.internal.exception.OnErrorPropagateHandler: 
16

          
17
********************************************************************************
18

          
19
Message               : Tried to consume messages from VM queue 'Pqueue1' but it was empty after timeout of 20 SECONDS
20

          
21
Element               : consume-Flow/processors/0 @ vm-demo-app:vm-demo-app.xml:55 (Consume)
22

          
23
Element DSL           : <vm:consume queueName="Pqueue1" doc:name="Consume" doc:id="d1a15f72-43be-4f92-aa3c-a9d0ffbbc947" config-ref="VM_Config" timeout="20"></vm:consume>
24

          
25
Error type            : VM:EMPTY_QUEUE
26

          
27
FlowStack             : at consume-Flow(consume-Flow/processors/0 @ vm-demo-app:vm-demo-app.xml:55 (Consume))
28

          
29
 
30

          
31
##  (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)



We said that Persistent Queue holds the data even in case of a restart or crash. Let’s test this hypothesis. For this, make a small alteration to the existing Mule flow by changing the scheduler frequency to 60 seconds.


Changing the scheduler frequency to 60 seconds screenshot.



Changing the scheduler frequency to 60 seconds screenshot 2.

After these changes, when we run the application, the scheduler starts, and the consume operation will wait for 20 seconds to get a message from the queue. If nothing is found, then it will throw an error. And, again, after one minute from the start time elapses, the scheduler starts and consumes the operation, then waits for the message from the queue, and so on. 

Here, we will wait for the timeout and, after that, we will trigger the message from Postman to stop the application just after publishing the message. Upon restart, we will see whether the VM consumers receive the same message or not.


Logs screenshot.


From the above logs, we can say that the application started and triggered the consume flow, but as the consume operation couldn’t find any messages from the queue in the given time, the EMPTY_QUEUE error was thrown. After this, we triggered the message from Postman, and we can see in the logs that the message was published in the queue with the timestamp. After publishing, we stopped the app.

On restarting the application again after five minutes, when the scheduler triggered the flow, the consume operation picked up the message that was published before the app stopped. We can confirm that it is the same message by looking at the time field, which has the time when the message was published, and it is the same as in the last image.

In this way, by using a Persistent queue, we can process our messages even after an application restarts or crashes.

Note: In the same scenario, if we use a Transient queue in the place of a Persistent queue, the published message would be lost upon restarting the application.

Now we know how to use a VM queue in Mule flow and how we can perform various operations provided by the VM connector.

Virtual Machine Connector (mathematics) Flow (web browser) application app MuleSoft Data (computing) job scheduling Transient (computer programming)

Opinions expressed by DZone contributors are their own.

Related

  • Integrating Pub/Sub With MuleSoft
  • MuleSoft Integration With RabbitMQ
  • How to Use IBM App Connect's ‘Mapping Assist’ to Automatically Map Your Integration Flows
  • Building A Simple AI Application in 2023 for Fun and Profit

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook