Step by Step Guide to Use Anypoint MQ - Part 3
In this article, the third installment, we will learn about concepts like ACK, NACK, how to use them in the mule API, and the deletion of messages and queues.
Join the DZone community and get the full member experience.
Join For Free1. ACK/NACK in Anypoint MQ
Let's first try to understand what ACK/NACK is and why it is required.
Queues ensure that messages are always processed, even in the case of failure. In Anypoint MQ, reliability is provided with queues using the acknowledge mechanism. This queue uses ACK and NACK. It is a kind of mechanism that tells the queue that the message has been received successfully by the app or not.
ACK stands for Acknowledgment: It indicates that the message has been processed and can be deleted from the queue.
NACK stands for No Acknowledgement: It indicates that the current message is not processed, and Anypoint MQ returns the message to the queue for redelivery.
The Anypoint MQ connectors provide three acknowledgment modes:
Immediate: Acknowledges messages as soon as they are consumed, regardless of the processing.
Automatic: Acknowledges messages automatically only if the flow execution finishes successfully.
Manual: We need to acknowledge the message using ACK.
Note: By default, Acknowledgment mode is Immediate for Consume operation, which means the message gets deleted from the queue once it is consumed. Similarly, by default, Acknowledgement mode is Automatic for Subscriber. This means within the defined Acknowledgment time if the message is processed successfully, then automatically, Acknowledgment is sent to the queue.
ACK:
For using Manual ACK mode, we need to use the ACK Component of MQ in the flow. For sending ACK/NACK to the queue, we need to send a Message Context, which we can obtain from the #[attributes] in the received message.
Let's try to understand it with the help of the example below.
We will create a sample flow that has an Anypoint Subscriber source that will pick the message from the queue as soon as they arrive in the queue, then the logger will display the message picked by the subscriber. Then, we are using the Ack component of MQ to send the Acknowledgment to the queue before completion of the whole processing, and the last logger displays the message that flow is ended.
You can see in the ACK component, we have set the value of Ack token as #[attributes.ackToken]. So as soon as the message passed from this component, ACK is sent to the queue.
Now, let's try to publish a message to the queue test-queue from the MQ console.
After clicking on send, the message is published to the queue and we can see the same message in the queue.
Now, test it by running the application in DEBUG mode. We can see that the message received from the queue has an attribute named as ackToken which we are using for sending the ACK.
If we look at the queue status, we can see 1 message is in flight status.
Let's process the message to the next component of the flow.
If we check again, the status of the queue, we found that there is no in-flight message which confirms that the message which was in the processing has sent the acknowledgment and is deleted from the queue.
In this way, we can use manual acknowledgment in Anypoint MQ.
NACK:
There might be cases where when we want to send a No Acknowledgement NACK to the queue, like in case of failure in the flow, etc. For this, we can use the NACK component of MQ and use its inflow.
In the below example, in case of failure in the processing, we want to send the NACK, so the message would be sent to the dead letter queue(DLQ) assigned to the queue.
In the below flow, the HTTP listener will start the flow anypointmq-consume-demoFlow, then we have Anypoint MQ Consume, which will pick the first available message from the queue. Then, the message received from the queue is logged using logger, and then we are trying to call the HTTP requester. Note here the configuration given inside the requester is incorrect, and it is done intentionally so that our flow will fail here, and it goes to error handling section. Inside the error scope, we have a logger, then a NACK component.
Inside NACK, the below configuration is done, and we can see it is the same as what we did for ACK.
Now, we will try to send the below message to the queue from Anypoint Platform and trigger the HTTP request using Postman to see how NACK works.
Upon clicking Send, the message will be published to the queue, and we can see the same message in the queue.
We can see that the test-queue is having 1 In-flight message and DLQ-test-queue is having 0 messages.
After running the application in DEBUG mode and triggering the HTTP Listener endpoint, we can see that the same message is consumed in the flow.
Now, let's move the processing to the next component i.e. HTTP Requester, and here it should fail, as we have given the incorrect configuration for this.
After failing, it is routed to the error handler section of the flow.
Once it has processed the NACK component, we will see the status of the queue.
In the below application logs, we can see the details of that message.
INFO 2020-12-16 12:19:45,461 [[MuleRuntime].uber.05: [anypointmq-demo].anypointmq-consume-demoFlow.CPU_LITE @f8b7fac] [processor: anypointmq-consume-demoFlow/processors/1; event: b9d37b60-3f6a-11eb-b027-568d5a57601b] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: message received from the queue is {"Message" : "Sample message to test NACK"}
ERROR 2020-12-16 12:19:49,024 [[anypointmq-demo].http.requester.HTTP_Request_configuration.08 SelectorRunner] [processor: anypointmq-consume-demoFlow/processors/2; event: b9d37b60-3f6a-11eb-b027-568d5a57601b] org.mule.extension.http.internal.request.HttpRequester: Error sending HTTP request to http://localhost:8084/abc/demo
ERROR 2020-12-16 12:20:36,283 [[anypointmq-demo].http.requester.HTTP_Request_configuration.08 SelectorRunner] [processor: anypointmq-consume-demoFlow/processors/2; event: b9d37b60-3f6a-11eb-b027-568d5a57601b] org.mule.runtime.core.internal.exception.OnErrorPropagateHandler:
********************************************************************************
Message : HTTP GET on resource 'http://localhost:8084/abc/demo' failed: Connection refused: no further information.
Element : anypointmq-consume-demoFlow/processors/2 @ anypointmq-demo:anypointmq-demo.xml:36 (Request)
Element DSL : <http:request method="GET" doc:name="Request" doc:id="dbb69399-37e0-43a4-8ddf-317e1a9349c3" config-ref="HTTP_Request_configuration" path="/demo"></http:request>
Error type : HTTP:CONNECTIVITY
FlowStack : at anypointmq-consume-demoFlow(anypointmq-consume-demoFlow/processors/2 @ anypointmq-demo:anypointmq-demo.xml:36 (Request))
(set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************
INFO 2020-12-16 12:20:48,797 [[anypointmq-demo].http.requester.HTTP_Request_configuration.08 SelectorRunner] [processor: anypointmq-consume-demoFlow/errorHandler/0/processors/0; event: b9d37b60-3f6a-11eb-b027-568d5a57601b] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: In the Error Section
INFO 2020-12-16 12:22:29,514 [[MuleRuntime].uber.04: [anypointmq-demo].anypointmq-consume-demoFlow.CPU_LITE @f8b7fac] [processor: anypointmq-consume-demoFlow/errorHandler/0/processors/2; event: b9d37b60-3f6a-11eb-b027-568d5a57601b] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: "Message sent to DLQ after failure"
If we look at the DLQ, we can find the same message there. We can confirm that it is the same message by looking at the message content, or by confirming the ID, which is unique for each message, and in our example, it is the same that we saw in the test-queue while publishing the message.
Note: If we are not using any DLQ, then the message would be sent again to the same queue from which it was consumed.
In this way, we can use Anypoint MQ for asynchronous messaging in our mule API.
2. Delete Message and Queue
If we want to delete all the messages present in a queue, then it can also be done. For this, we need to click on the queue name. On the right hand, we see a graphical status, and there we can select the option Purge Message.
A pop-up will appear stating this action will delete all the messages present in the queue. Check the box, and then click on delete to proceed with the deletion of all the messages.
Similarly, if we want to delete a queue, we can do that by clicking on the Delete option.
Once we click this, a pop-up will appear showing the information about the delete queue option that this action can not be undone, and all the messages which are present inside the queue would be deleted along with the queue.
So in this way, we can use manual acknowledgment for Anypoint MQ using ACK/NACK and delete message and queue from Anypoint MQ.
Opinions expressed by DZone contributors are their own.
Comments