Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

RabbitMQ and Spring Boot Integration With Fault Tolerance and Concurrency Capabilities

DZone's Guide to

RabbitMQ and Spring Boot Integration With Fault Tolerance and Concurrency Capabilities

Read this article in order to view a tutorial on how to integrate RabbitMQ in a Spring Boot application, and how to handle fault tolerance.

· Integration Zone ·
Free Resource

SnapLogic is the leading self-service enterprise-grade integration platform. Download the 2018 GartnerMagic Quadrant for Enterprise iPaaS or play around on the platform, risk free, for 30 days.

This article is a continuation of my previous article on asynchronous messaging using RabbitMQ. In the previous article, I explained the theoretical concept and internal mechanism of asynchronous messaging using RabbitMQ along with how to set-up to get started with it. In this article, I will mainly put emphasis on “RabbitMQ in action” part using Java and Spring Boot. To understand the high-level concept of async messaging and RabbitMQ you can visit my previous article. So, let’s look at the following aspects of this article:

  • How to integrate RabbitMQ in Spring Boot application
  • How to create producer/consumer to send/receive messages of String, JSON or Java Object
  • How to handle fault tolerance
  • How to provide concurrency

Create Spring Boot Project and Add RabbitMQ Dependencies

The first step required to make use of RabbitMQ in your application is to create a spring boot application and add the required dependencies of RabbitMQ into it. If you already have a Spring Boot application and wanted to just integrate RabbitMQ into it, then you can simply add the dependencies without creating a new project. However, creating a separate project for RabbitMQ instead of integrating into the existing project has one advantage that your message queue related code will be outside your actual application, hence, it can be easily shared or pluggable to any of your other application if required.

You can either create a Spring Boot project from spring initializer and import in your IDE, or you can create directly from Spring Tool Suite IDE (if you are using it). Add RabbitMQ dependency spring-boot-starter-amqpin pom.xml.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency> 

Add and Set Some Configs. In application.properties

These properties names are self-explanatory. The properties will be used in the application for exchange name, queue name, biding, etc.

# Message Queue specific configs for app1
app1.exchange.name=app1-exchange
app1.queue.name=app1-queue
app1.routing.key=app1-routing-key

# Message Queue specific configs for app2
app2.exchange.name=app2-exchange
app2.queue.name=app2-queue
app2.routing.key=app2-routing-key

#AMQP RabbitMQ configuration 
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# Additional RabbitMQ properties
spring.rabbitmq.listener.simple.concurrency=4
spring.rabbitmq.listener.simple.max-concurrency=8
spring.rabbitmq.listener.simple.retry.initial-interval=5000

Create a Properties File Reader Class

After creating the properties file, let's create a property file reader class to read the properties.

package com.dpk.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;

@Configuration
@PropertySource("classpath:application.properties")
public class ApplicationConfigReader {

@Value("${app1.exchange.name}")
private String app1Exchange;

@Value("${app1.queue.name}")
private String app1Queue;

@Value("${app1.routing.key}")
private String app1RoutingKey;

@Value("${app2.exchange.name}")
private String app2Exchange;

@Value("${app2.queue.name}")
private String app2Queue;

@Value("${app2.routing.key}")
private String app2RoutingKey;

// All getters and setters

}

Create the Beans for Queue, Exchange, Routing Key, and Binding

package com.dpk;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;

import com.dpk.config.ApplicationConfigReader;


@EnableRabbit
@SpringBootApplication
public class MsgqApplication extends SpringBootServletInitializer implements RabbitListenerConfigurer {

@Autowired
private ApplicationConfigReader applicationConfig;

public ApplicationConfigReader getApplicationConfig() {
return applicationConfig;
}

public void setApplicationConfig(ApplicationConfigReader applicationConfig) {
this.applicationConfig = applicationConfig;
}

public static void main(String[] args) {
SpringApplication.run(MsgqApplication.class, args);
}

protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(MsgqApplication.class);
}

/* This bean is to read the properties file configs */
@Bean
public ApplicationConfigReader applicationConfig() {
return new ApplicationConfigReader();
}

/* Creating a bean for the Message queue Exchange */
@Bean
public TopicExchange getApp1Exchange() {
return new TopicExchange(getApplicationConfig().getApp1Exchange());
}

/* Creating a bean for the Message queue */
@Bean
public Queue getApp1Queue() {
return new Queue(getApplicationConfig().getApp1Queue());
}

/* Binding between Exchange and Queue using routing key */
@Bean
public Binding declareBindingApp1() {
return BindingBuilder.bind(getApp1Queue()).to(getApp1Exchange()).with(getApplicationConfig().getApp1RoutingKey());
}

/* Creating a bean for the Message queue Exchange */
@Bean
public TopicExchange getApp2Exchange() {
return new TopicExchange(getApplicationConfig().getApp2Exchange());
}

/* Creating a bean for the Message queue */
@Bean
public Queue getApp2Queue() {
return new Queue(getApplicationConfig().getApp2Queue());
}

/* Binding between Exchange and Queue using routing key */
@Bean
public Binding declareBindingApp2() {
return BindingBuilder.bind(getApp2Queue()).to(getApp2Exchange()).with(getApplicationConfig().getApp2RoutingKey());
}

/* Bean for rabbitTemplate */
@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
return rabbitTemplate;
}

@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}

@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}

@Bean
public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(consumerJackson2MessageConverter());
return factory;
}

@Override
public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}

}

Create a Message Sender

MessageSender is pretty simple. It just makes use of convertAndSend() method of RabbitTemplate to send the message to the queue using exchange, routing-key, and data.

package com.dpk;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

/**
 * Message sender to send message to queue using exchange.
 */

@Component
public class MessageSender {

private static final Logger log = LoggerFactory.getLogger(MessageSender.class);

/**
 * 
 * @param rabbitTemplate
 * @param exchange
 * @param routingKey
 * @param data
 */
public void sendMessage(RabbitTemplate rabbitTemplate, String exchange, String routingKey, Object data) {
log.info("Sending message to the queue using routingKey {}. Message= {}", routingKey, data);
rabbitTemplate.convertAndSend(exchange, routingKey, data);
log.info("The message has been sent to the queue.");
}

}

Create Message Listeners

Implementing message listener is tricky as it requires handling some of the scenarios like:

  • How to auto deserialize the message to a POJO
  • What if listener is making a REST call to some API which is unreachable, or what if an error occurred on the API side while processing the request?
  • How to make multiple listeners to concurrently pop the message from queue and process
  • When and how to re-queue the message in the message queue in failure scenarios

Deserializing Message to POJO

Spring provides an annotation @RabbitListener , which can be used to receive messages from the queue. It has a great feature of deserializing the message to a POJO while receiving. The below example illustrates that.

Error Handling and Message Re-Queuing Feature in Listener

  • Listener is trying to call an API to process the request that is unreachable
  • Listener has called the API but error occurred in API while processing the request
  • In this situation, depending on your business requirement, either you should not re-queue the message, or you should re-queue with max number of trial option to re-try to process it up to a limit.

    To not requeue the message in queue, you can throw exception AmqpRejectAndDontRequeueException . For max number of trial handling, you can add an additional parameter in the message to set max number of trial and use it while receiving the message by incrementing it’s value and checking whether total number of trial has not exceeded max limit.

    There is an alternative of above approach to add this properties in application.properties and specify max number of attempts:-

    spring.rabbitmq.listener.simple.retry.max-attempts=3

    Concurrency Capability

    Concurrency feature can be implemented in 2 ways:

    • Create a thread pool with a specified number of max threads and using ThreadExecutorcall the methods/APIs to process the request.
    • By using Inbuild concurrency feature. I believe this is the simplest approach to implement concurrency. This requires just making use of 2 properties in application.properties file.

    Note: You can set the values of these properties as per your application scalability.

    spring.rabbitmq.listener.simple.concurrency=4

    spring.rabbitmq.listener.simple.max-concurrency=8

    package com.dpk;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.AmqpRejectAndDontRequeueException;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.http.HttpStatus;
    import org.springframework.stereotype.Service;
    import org.springframework.web.client.HttpClientErrorException;
    import com.dpk.config.ApplicationConfigReader;
    import com.dpk.dto.UserDetails;
    import com.dpk.util.ApplicationConstant;
    
    /**
     * Message Listener for RabbitMQ
     */
    
    @Service
    public class MessageListener {
    
        private static final Logger log = LoggerFactory.getLogger(MessageListener.class);
    
        @Autowired
        ApplicationConfigReader applicationConfigReader;
    
    
        /**
         * Message listener for app1
         * @param UserDetails a user defined object used for deserialization of message
         */
        @RabbitListener(queues = "${app1.queue.name}")
        public void receiveMessageForApp1(final UserDetails data) {
        log.info("Received message: {} from app1 queue.", data);
    
        try {
        log.info("Making REST call to the API");
        //TODO: Code to make REST call
            log.info("<< Exiting receiveMessageForApp1() after API call.");
        } catch(HttpClientErrorException  ex) {
    
        if(ex.getStatusCode() == HttpStatus.NOT_FOUND) {
            log.info("Delay...");
            try {
        Thread.sleep(ApplicationConstant.MESSAGE_RETRY_DELAY);
        } catch (InterruptedException e) { }
    
        log.info("Throwing exception so that message will be requed in the queue.");
        // Note: Typically Application specific exception should be thrown below
        throw new RuntimeException();
        } else {
        throw new AmqpRejectAndDontRequeueException(ex); 
        }
    
        } catch(Exception e) {
        log.error("Internal server error occurred in API call. Bypassing message requeue {}", e);
        throw new AmqpRejectAndDontRequeueException(e); 
        }
    
        }
    
    
        /**
         * Message listener for app2
         * 
         */
    
        @RabbitListener(queues = "${app2.queue.name}")
        public void receiveMessageForApp2(String reqObj) {
        log.info("Received message: {} from app2 queue.", reqObj);
    
        try {
        log.info("Making REST call to the API");
        //TODO: Code to make REST call
            log.info("<< Exiting receiveMessageCrawlCI() after API call.");
        } catch(HttpClientErrorException  ex) {
    
        if(ex.getStatusCode() == HttpStatus.NOT_FOUND) {
            log.info("Delay...");
            try {
        Thread.sleep(ApplicationConstant.MESSAGE_RETRY_DELAY);
        } catch (InterruptedException e) { }
    
        log.info("Throwing exception so that message will be requed in the queue.");
        // Note: Typically Application specific exception can be thrown below
        throw new RuntimeException();
        } else {
        throw new AmqpRejectAndDontRequeueException(ex); 
        }
    
        } catch(Exception e) {
        log.error("Internal server error occurred in python server. Bypassing message requeue {}", e);
        throw new AmqpRejectAndDontRequeueException(e); 
        }
    
        }
    
    }
    

    Create the Service Endpoint

    Finally, create the service class that contains the service endpoint, which will be called by the user. The service class calls the MessageSender to put the message into the queue.

    package com.dpk;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.http.HttpStatus;
    import org.springframework.http.MediaType;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.dpk.config.ApplicationConfigReader;
    import com.dpk.dto.UserDetails;
    import com.dpk.util.ApplicationConstant;
    
    
    @RestController
    @RequestMapping(path = "/userservice")
    public class UserService {
    
    private static final Logger log = LoggerFactory.getLogger(UserService.class);
    
    private final RabbitTemplate rabbitTemplate;
    private ApplicationConfigReader applicationConfig;
    private MessageSender messageSender;
    
    public ApplicationConfigReader getApplicationConfig() {
    return applicationConfig;
    }
    
    @Autowired
    public void setApplicationConfig(ApplicationConfigReader applicationConfig) {
    this.applicationConfig = applicationConfig;
    }
    
    @Autowired
    public UserService(final RabbitTemplate rabbitTemplate) {
    this.rabbitTemplate = rabbitTemplate;
    }
    
    public MessageSender getMessageSender() {
    return messageSender;
    }
    
    @Autowired
    public void setMessageSender(MessageSender messageSender) {
    this.messageSender = messageSender;
    }
    
    
    @RequestMapping(path = "/add", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
    public ResponseEntity<?> sendMessage(@RequestBody UserDetails user) {
    
    String exchange = getApplicationConfig().getApp1Exchange();
    String routingKey = getApplicationConfig().getApp1RoutingKey();
    
    /* Sending to Message Queue */
    try {
    messageSender.sendMessage(rabbitTemplate, exchange, routingKey, user);
    return new ResponseEntity<String>(ApplicationConstant.IN_QUEUE, HttpStatus.OK);
    
    } catch (Exception ex) {
    log.error("Exception occurred while sending message to the queue. Exception= {}", ex);
    return new ResponseEntity(ApplicationConstant.MESSAGE_QUEUE_SEND_ERROR,
    HttpStatus.INTERNAL_SERVER_ERROR);
    }
    
    }
    
    }
    

    Wrap-Up

    If you want to view the complete code, then I have uploaded the same on GitHub. You can log in and view. If you have any doubts or any suggestions, feel free to write in the comments below. I would be happy to interact with you. Thank you!

    With SnapLogic’s integration platform you can save millions of dollars, increase integrator productivity by 5X, and reduce integration time to value by 90%. Sign up for our risk-free 30-day trial!

    Topics:
    integration ,rabbitmq ,spring boot ,message queue ,message broker ,concurrency ,fault tolerance ,decouple ,java

    Opinions expressed by DZone contributors are their own.

    {{ parent.title || parent.header.title}}

    {{ parent.tldr }}

    {{ parent.urlSource.name }}