RabbitMQ - Processing Messages Serially Using Spring Integration Java DSL
Join the DZone community and get the full member experience.
Join For FreeIf you ever have a need to process messages serially with RabbitMQ with a cluster of listeners processing the messages, the best way that I have seen is to use a "exclusive consumer" flag on a listener with 1 thread on each listener processing the messages.
Exclusive consumer flag ensures that only 1 consumer can read messages from the specific queue, and 1 thread on that consumer ensures that the messages are processed serially. There is a catch however, I will go over it later.
Let me demonstrate this behavior with a Spring Boot and Spring Integration based RabbitMQ message consumer.
First, this is the configuration for setting up a queue using Spring java configuration, note that since this is a Spring Boot application, it automatically creates a RabbitMQ connection factory when the Spring-amqp library is added to the list of dependencies:
@Configuration @Configuration public class RabbitConfig { @Autowired private ConnectionFactory rabbitConnectionFactory; @Bean public Queue sampleQueue() { return new Queue("sample.queue", true, false, false); } }
Given this sample queue, a listener which gets the messages from this queue and processes them looks like this, the flow is written using the excellent Spring integration Java DSL library:
@Configuration public class RabbitInboundFlow { private static final Logger logger = LoggerFactory.getLogger(RabbitInboundFlow.class); @Autowired private RabbitConfig rabbitConfig; @Autowired private ConnectionFactory connectionFactory; @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer() { SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(); listenerContainer.setConnectionFactory(this.connectionFactory); listenerContainer.setQueues(this.rabbitConfig.sampleQueue()); listenerContainer.setConcurrentConsumers(1); listenerContainer.setExclusive(true); return listenerContainer; } @Bean public IntegrationFlow inboundFlow() { return IntegrationFlows.from(Amqp.inboundAdapter(simpleMessageListenerContainer())) .transform(Transformers.objectToString()) .handle((m) -> { logger.info("Processed {}", m.getPayload()); }) .get(); } }
The flow is very concisely expressed in the inboundFlow method, a message payload from RabbitMQ is transformed from byte array to String and finally processed by simply logging the message to the logs
The important part of the flow is the listener configuration, note the flag which sets the consumer to be an exclusive consumer and within this consumer the number of threads processing is set to 1. Given this even if multiple instances of the application is started up only 1 of the listeners will be able to connect and process messages.
Now for the catch, consider a case where the processing of messages takes a while to complete and rolls back during processing of the message. If the instance of the application handling the message were to be stopped in the middle of processing such a message, then the behavior is a different instance will start handling the messages in the queue, when the stopped instance rolls back the message, the rolled back message is then delivered to the new exclusive consumer, thus getting a message out of order.
If you are interested in exploring this further, here is a github project to play with this feature: https://github.com/bijukunjummen/test-rabbit-exclusive
Published at DZone with permission of Biju Kunjummen, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments