Asynchronous Retries With AWS SQS
In this article, I will explain asynchronous retry mechanisms and demonstrate how to achieve retries by using the native functions of a broker using AWS SQS.
Join the DZone community and get the full member experience.
Join For FreeWhile designing our integration points against the remote APIs, we also need to consider error scenarios. Two types of errors/failures can occur during API communication: functional and non-functional. Most non-functional errors, (like temporary network outages, infrastructure problems, slow resources) are retriable.
We can implement two kinds of retry-mechanisms against these failures: synchronous and asynchronous.
Synchronous retries are straightforward: you block your thread and retry on the spot, while "sleeping" in between attempts. You can write your own algorithm or rely on libraries such as Spring Retry to implement these kinds of retries. These libraries are also pre-packed with advanced policies which enable exponential backoffs (send, retry after 3 seconds, next retry after 6 seconds, etc.).
Synchronous retries are expensive but sometimes inevitable. For example, if the second operation after the failed one requires an input from the first, you need to block your flow and follow synchronous retries.
If the remote API's nature is asynchronous and the order of API operations are not that important, we should leverage asynchronous retry mechanisms for better scalability. Asynchronous retries can be implemented in multiple ways. The one I will concentrate on in this article utilizes the Message Queues.
Queueing is a simple concept. Publishers publish a message to a queue; consumers consume from the queue. However, modern brokers enabling queues (RabbitMQ, ActiveMQ, Some JMS Providers such as JBoss, AWS SQS, etc.) also put advanced features on the top of this queue concept. TTLs, Dead Letter Exchanges or Queues, Routing, and others allow us to cover some non-functional requirements without writing any code.
AWS Simple Queue Service (SQS) also has its own additional features, and the one we will use here is called "Visibility Timeout" (you can refer to the AWS documentation for a detailed explanation of AWS SQS and its features).
In short, visibility timeout refers to the time period that a message is “invisible” to other consumers of the queue. As soon as a consumer fetches a message from the queue, the visibility timer starts ticking.
If the consumer has not deleted the message from the queue by the end of the visibility timeout period, the message becomes visible again for the other consumers (or for the same consumer).
We can leverage this mechanism in a sample retry scenario. The scenario will include the following steps:
- A producer will put a message on the queue.
- The consumer will pick it up and start processing it.
- The consumer will fail to process the message (due to external web service failure, DB failure, etc.).
- The consumer will move to the next message in the queue, without deleting the failed one.
- The leftover message becomes visible again to this consumer and other consumers after the visibility timeout is reached. The same cycle starts again which will lead to a retry of the same message.
There is a possibility of endless loops in here, which we should also avoid. To tackle this problem, we need to introduce a “maximum retries” feature. The message header or an external system should keep track of the re-delivery count of the same message. On every consumption, this counter is increased by 1. If the consumer realizes that the message has reached its maximum retries, the consumer will move that message to another queue (dead letter queue). This will allow for the replaying of the failed messages in the future.
JMS 2.0 defines a managed header attribute for broker re-deliveries, but this is not available on all other non-JMS broker implementations. For these kinds of brokers, we can maintain a Map of message_id vs. delivery_count in the consumer application. If we are dealing with a consumer fleet, we need to move this Map to a central repository such as Redis or an RDBMS. Luckily, SQS maintains just such a managed attribute at the message level (ApproximateReceiveCount) and we will use this in our sample retry setup.
SQS also offers Redrive Policy which employs a “Maximum Receives” attribute representing the maximum number of receives before the message is sent to a dead letter queue. You can also use this policy to avoid loops, rather than managing your own counters. However, this article approaches the problem from a more generic perspective.
The below figure shows the flow of the messages in our sample scenario.
An AWS SQS Queue’s visibility timeout can be set between 0 seconds up to 12 hours. This will give you a wide range of retry frequencies. If you set your TTL to 1 minute, for example, a retry will occur every minute.
In this type of setup, there are no broker-built-in retry mechanisms like exponential backoffs. Backoffs, however, can be calculated by the consumer and applied to the message TTL after it is consumed. The AWS API allows one to change the visibility timeout per message basis.
The below Java code demonstrates a Consumer which is consuming a message from an SQS queue (please note that this code has an endless loop because I previously enabled long polling in my queue by setting the Receive Message Wait Time parameter to 5 seconds. If you don’t use long polling, and your consumer is continuously polling, you should implement scheduling in your code with sleeps, timers, etc. to avoid unnecessary bills).
public final class SQSWorker {
private static AmazonSQS sqsClient;
private final static String queueUrl="https://sqs.us-west-2.amazonaws.com/123/standardq";
private final static int queueTtl=10;
private final static int maxRetries=3;
public static void main (String args[]){
BasicAWSCredentials awsCreds = new BasicAWSCredentials("ABC", "DFG");
sqsClient = AmazonSQSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.withRegion(Regions.US_WEST_2)
.build();
System.out.println("Worker started listening the queue");
while(true){
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl).withAttributeNames("All").withMessageAttributeNames("All");
List<Message> messages = sqsClient.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages) {
String messageId=message.getMessageId();
int previousDeliveries = Integer.parseInt(message.getAttributes().get("ApproximateReceiveCount"));
System.out.println(" Message Received : " + message.getBody() + ", Previous Delivery Count:" + previousDeliveries);
boolean isSuccess=callExternalWebService();
if(isSuccess){
sqsClient.deleteMessage(new DeleteMessageRequest(queueUrl, message.getReceiptHandle())); //delete the message from the queue.
}
else
{
//unsuccessful attempt. Check if we have reached max retry count.
if(previousDeliveries>=maxRetries){
System.out.println("Cannot reach the remote API. Max retries reached");
sqsClient.deleteMessage(new DeleteMessageRequest(queueUrl, message.getReceiptHandle()));//delete the message from the queue.
//If feasible, publish the same content to the dead letter queue in here.
continue;
}
//do nothing or alternatively, you can apply an exponential backoff by increasing the TTL value of this individual message.
//our simple algorithm will add 5 seconds at every iteration to the queue default ttl of 10 seconds.
sqsClient.changeMessageVisibility(queueUrl, message.getReceiptHandle(), queueTtl + 5*(previousDeliveries-1));
System.out.println("Cannot reach the remote API. Will retry the message (" + messageId + ") in "+ (queueTtl + 5*(previousDeliveries-1)) +" seconds. Moving to the next message");
}
}
}
}
private static boolean callExternalWebService(){
return false; //external service is down
}
}
My console output looks like this:
Worker started listening the queue
Message Received : {'subject':'test message','Body'='anotherJson123245'}, Previous Delivery Count:1
Cannot reach the remote API. Will retry the message (ee5a8b04-e773-4083-86e5-73a2b2e90438) in 10 seconds. Moving to the next message
Message Received : {'subject':'test message','Body'='anotherJson123245'}, Previous Delivery Count:2
Cannot reach the remote API. Will retry the message (ee5a8b04-e773-4083-86e5-73a2b2e90438) in 15 seconds. Moving to the next message
Message Received : {'subject':'test message','Body'='anotherJson123245'}, Previous Delivery Count:3
Cannot reach the remote API. Max retries reached
In this article, I tried to explain asynchronous retry mechanisms and demonstrate how to achieve retries by using the native functions of a broker. I used the serverless AWS SQS product for this demonstration, however, the same concepts apply to other brokers as well. For more information about asynchronous communication mechanisms, please take a look at my other article here.
Opinions expressed by DZone contributors are their own.
Comments