How to Use the Circuit Breaker Functionality in Anypoint MQ
Circuit breaker design pattern detects failures and prevents a failure from constantly recurring. Here's how to use this functionality in Anypoint MQ.
Join the DZone community and get the full member experience.
Join For FreeCircuit breaker design pattern is used to detect failures and encapsulates the logic of preventing a failure from constantly recurring during maintenance, temporary external system failure, or unexpected system difficulties. Here's how it works and how you can use this functionality with Anypoint MQ.
How Does Circuit Breaker Design Pattern Work?
Circuit breaker handles faults that might take a variable amount of time to recover from when connecting to a remote service or resource. This can improve the stability and resiliency of an application.
A circuit breaker wraps a protected function call in a circuit breaker object, which monitors for failures. The breaker trips once the failures reach a certain threshold. From there, all further calls to the circuit breaker return with an error without a protected call.
The purpose of the Circuit Breaker pattern is not the same as the Retry pattern. The Retry pattern enables an application to retry an operation in the expectation that it will succeed. The Circuit Breaker pattern prevents an application from performing an operation that is likely to fail.
Anypoint MQ Circuit Breaker Functionality
The Circuit Breaker functionality in Anypoint MQ is available from Mulesoft 4.X.X version. We can enable this design pattern while subscribing to any message in MQ. For more details, you can follow this Mulesoft document.
I have prepared this simple POC which will give you some insight about Anypoint MQ subscription flow by using Circuit breaker configuration, ACK and NACK operation:
- Define a subscription flow that will get a message from the configured queue and call a backend API(developed in Spring boot) using http requester.
- Define the circuit breaker configuration in the component level as below:
<anypoint-mq:subscriber doc:name="Subscribe Msgs from Queue"
config-ref="Anypoint_MQ_Config"
destination="${anypoint.MQ.circuit_breaker_queue}"
acknowledgementMode="MANUAL">
<anypoint-mq:circuit-breaker
tripTimeout="2"
onErrorTypes="HTTP:CONNECTIVITY,MULE:RETRY_EXHAUSTED"
errorsThreshold="3"
tripTimeoutUnit="MINUTES"/>
</anypoint-mq:subscriber>
The above configuration explains that, during the flow processing, if the specified Error is raised within the flow three times, we can stop the flow processing. Once the flow stops, it will wait for two minutes for the service to recover before retrying with a new message.
Mulesoft Flow Design
<configuration-properties doc:name="Configuration properties" doc:id="7a20f706-1883-41fc-9727-a2f357a02bc6" file="properties/${mule.env}-properties.yaml" />
<anypoint-mq:config name="Anypoint_MQ_Config" doc:name="Anypoint MQ Config" doc:id="dcee94a5-274c-492f-b344-1a2a0bdff335" >
<anypoint-mq:connection clientId="#[p('anypoint.MQ.client_id')]" clientSecret="#[p('anypoint.MQ.client_secret')]" />
</anypoint-mq:config>
<http:request-config name="HTTP_Request_configuration" doc:name="HTTP Request configuration" doc:id="1c4bac30-c1bc-4c48-ba40-f9b0b75dd9d0" basePath="/api" >
<http:request-connection host="localhost" port="8081" />
</http:request-config>
<flow name="circuit-breaker-test-flow" doc:id="3d0d34ea-4a41-4ed4-a49f-96c86eb661bd" >
<anypoint-mq:subscriber doc:name="Subscribe Msgs from Queue" doc:id="134b0b50-4d18-4c6c-b499-16001c44c491" config-ref="Anypoint_MQ_Config" destination="${anypoint.MQ.circuit_breaker_queue}" acknowledgementMode="MANUAL">
<anypoint-mq:circuit-breaker tripTimeout="${circuit_breaker.trip_timeout}" onErrorTypes="${circuit_breaker.error_types}" errorsThreshold="${circuit_breaker.error_threshold}" tripTimeoutUnit="MINUTES"/>
</anypoint-mq:subscriber>
<logger level="INFO" doc:name="Read Queue Message" doc:id="7fcd08b1-e27b-4ec3-bda5-b917c513470e" message="=============== Input Payload from Queue : #[payload] === correlationId : #[correlationId]"/>
<flow-ref doc:name="set-acknowledgement-token-subflow" doc:id="147e4fa3-2eeb-4020-b684-6302b1ad817e" name="set-acknowledgement-token-subflow" />
<http:request method="GET" doc:name="Request" doc:id="9af99c61-d2ff-4ead-aac3-1d85df8fe585" config-ref="HTTP_Request_configuration" path="/getEmployees"/>
<flow-ref doc:name="send-acknowledgement-token-subflow" doc:id="345246fe-7d16-46c3-8bad-1f8b6ad81da7" name="send-acknowledgement-token-subflow" />
<flow-ref doc:name="remove-variables-subflow" doc:id="c83adb13-dd92-4e21-8cc1-526d89315107" name="remove-variables-subflow" />
<error-handler >
<on-error-continue enableNotifications="true" logException="true" doc:name="On Error Continue" doc:id="1815f743-9e31-4ce7-974e-86cedde4e9fe" type="ANY">
<flow-ref doc:name="do-not-send-acknowledgement-token-subflow" doc:id="666c41c7-8bbb-449b-8898-2adef69ad434" name="do-not-send-acknowledgement-token-subflow" />
<flow-ref doc:name="remove-variables-subflow" doc:id="a3e49420-85d9-4bec-9076-c31b365bad51" name="remove-variables-subflow" />
</on-error-continue>
</error-handler>
</flow>
<sub-flow name="set-acknowledgement-token-subflow" doc:id="de6e2aff-edc2-4225-9133-5d684e991db4" >
<set-variable value="#[attributes.ackToken]" doc:name="Set Acknowledgement Token" doc:id="86e375d9-1ed9-47e1-a2e4-5c7dc45cc107" variableName="ackToken"/>
</sub-flow>
<sub-flow name="send-acknowledgement-token-subflow" doc:id="4c979475-bbf6-4393-b042-0547f7fcdfe2" >
<logger level="INFO" doc:name="Sending Acknowledgement" doc:id="71dee82d-d818-47ae-9bb5-85e999306517" message="=============== Sending Acknowledgement === correlationId : #[correlationId]"/>
<anypoint-mq:ack doc:name="Send Acknowledgement" doc:id="2057ed55-773f-40bd-b60b-5abac743762c" config-ref="Anypoint_MQ_Config" ackToken="#[vars.ackToken]"/>
</sub-flow>
<sub-flow name="do-not-send-acknowledgement-token-subflow" doc:id="7d97fb3e-0c83-4d51-9fde-fbe3156161bd" >
<logger level="INFO" doc:name="Not Sending Acknowledgement" doc:id="743edfc2-37e1-4959-9af1-5b05880a1a0e" message="=============== Not Sending Acknowledgement === correlationId : #[correlationId]"/>
<anypoint-mq:nack doc:name="Send Nack Acknowledgement" doc:id="2c3345e3-a82e-419b-8538-79d1a9a5a783" config-ref="Anypoint_MQ_Config" ackToken="#[vars.ackToken]"/>
</sub-flow>
<sub-flow name="remove-variables-subflow" doc:id="a66095b2-1254-4498-bd87-fe95f6d4e218" >
<remove-variable doc:name="Remove Variable 'ackToken'" doc:id="a5e6a741-cfc1-4d0b-a6f8-0ab0fb916fdc" variableName="ackToken" />
</sub-flow>
How to Test the POC
- Once you download the POC from here, you have to run the spring boot application (behaving as an end system) and Mulesoft application at the same time.
- Once both apps run successfully, you can send messages from the configured queue using an Anypoint MQ window.
Happy Flow
Once the Mule API flow consumes the message from the configured queue, it will try to hit the backend API(spring boot API). If it gets the response back, it will acknowledge the message by using anypoint-mq:ack, and flow processing will finish.
Error Flow
Before sending the message to the queue, stop the spring boot application. This means we are trying to create a backend failure scenario. As soon as the message arrives from the queue and tries to hit the backend API, it will get an HTTP:CONNECTIVITY error and flow control will go to the error handling section. It will not send an acknowledgment by using anypoint-mq:nack to the queue. This means, it will try to connect to the backend and the Mule app will follow the circuit breaker pattern based on the configuration).
This POC is just to give a background on how to use the circuit breaker configuration, ACK and NACK operation of Anypoint MQ. I would recommend executing the POC and I hope it will be helpful to someone.
Opinions expressed by DZone contributors are their own.
Comments