First-in, First-out Messaging With Amazon SQS
Join the DZone community and get the full member experience.
Join For FreeIf you have used Amazon Web Services, you probably know Simple Queue Service(SQS). It's a reliable, highly-scalable hosted queue for storing messages. One of the main drawbacks of SQS is that it does not guarantee first-in, first-out (FIFO) access to messages, and that's clearly stated in the Amazon documentation:
Amazon SQS does not guarantee FIFO access to messages in Amazon SQS queues, mainly because of the distributed nature of the Amazon SQS. If you require specific message ordering, you should design your application to handle it.
-- Amazon SQS Docs
Here is a quick example of how the Apache Camel resequencer pattern can help you overcome this drawback in 2 to 3 lines of code. To setup our example scenario, let's first create a route that will populate our queue with 100 messages, each containing the message number:
<route> <camel:from uri="timer://publish?repeatCount=100&period=1&fixedRate=true"/> <setBody> <property>Exchange.TIMER_COUNTER</property> </setBody> <camel:to uri="aws-sqs://{{aws.queue}}?amazonSQSClient=#amazonSQSClient"/> </route>Then create a consumer route that will read from the queue and log each message:
<camel:route> <camel:from uri="aws-sqs://{{aws.queue}}?amazonSQSClient=#amazonSQSClient"/> <convertBodyTo type="java.lang.Integer"/> <camel:to uri="log:com.ofbizian.before?level=INFO"/> <camel:to uri="mock:result"/> </camel:route>To prove that Amazon doesn't guarantee FIFO ordering, we will write a test:
public class SQSFIFOTest extends CamelSpringTestSupport { @EndpointInject(uri = "mock:result") private MockEndpoint result; @Test public void receivesMessageInAscendingOrder() throws Exception { result.expectedMessageCount(100); result.expectsAscending(body().convertTo(Number.class)); assertMockEndpointsSatisfied(); } @Override protected ClassPathXmlApplicationContext createApplicationContext() { return new ClassPathXmlApplicationContext("META-INF/spring/camel-context.xml"); } }Don't be misled by the short size of the test. It starts both routes and verifies that all 100 messages are received in the right order.
The above test fails in most of the runs and proves that SQS doesn't support FIFO order (when the messages are sent too quickly). The idea in this example is that our producer will index the messages or provide some kind of sequencing information, allowing the message consumer to interpret it and order the messages. To do that, we simply send a number as the message body, but in a real world application that can be a field in a JSON or XML message. Then to make the test pass and ensure that the messages are received in the same order as they were sent, all we have to do is to add the resequencer pattern in our consumer route:
<camel:route> <camel:from uri="aws-sqs://{{aws.queue}}?amazonSQSClient=#amazonSQSClient"/> <convertBodyTo type="java.lang.Integer"/> <camel:to uri="log:com.ofbizian.before?level=INFO"/> <camel:resequence> <!--<camel:batch-config batchSize="100" batchTimeout="1000" />--> <stream-config capacity="100" timeout="1000"/> <camel:simple>body</camel:simple> <camel:to uri="log:com.ofbizian.before?level=INFO"/> <camel:to uri="mock:result"/> </camel:resequence> </camel:route>The streaming-based resequencer will let the messages go without any delay as long as they are in the right order. If the messages are not in the right sequence, it can hold up to 100 messages for 1 second while waiting for the missing message. Depending on your message load, you should adjust these numbers to hold enough messages while waiting for the missing one, but not to hold them for too long. That would reduce the throughput. Another option would be to try the a non-streaming, batch-based resequencer that always collects a number of messages before sorting and releasing them.

Published at DZone with permission of Bilgin Ibryam, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments