DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Integrate Spring With Open AI
  • Secure Spring Boot Application With Keycloak
  • Spring Config Integration With a PCF Application: A Step-by-Step Guide
  • Integration Between Java and Slack With Webhooks

Trending

  • The Cypress Edge: Next-Level Testing Strategies for React Developers
  • Cookies Revisited: A Networking Solution for Third-Party Cookies
  • Start Coding With Google Cloud Workstations
  • Automating Data Pipelines: Generating PySpark and SQL Jobs With LLMs in Cloudera
  1. DZone
  2. Coding
  3. Frameworks
  4. RabbitMQ and Spring Boot Integration With Fault Tolerance and Concurrency Capabilities

RabbitMQ and Spring Boot Integration With Fault Tolerance and Concurrency Capabilities

Read this article in order to view a tutorial on how to integrate RabbitMQ in a Spring Boot application, and how to handle fault tolerance.

By 
Deepak Kumar user avatar
Deepak Kumar
·
May. 30, 18 · Tutorial
Likes (14)
Comment
Save
Tweet
Share
105.6K Views

Join the DZone community and get the full member experience.

Join For Free

This article is a continuation of my previous article on asynchronous messaging using RabbitMQ. In the previous article, I explained the theoretical concept and internal mechanism of asynchronous messaging using RabbitMQ along with how to set-up to get started with it. In this article, I will mainly put emphasis on “RabbitMQ in action” part using Java and Spring Boot. To understand the high-level concept of async messaging and RabbitMQ you can visit my previous article. So, let’s look at the following aspects of this article:

  • How to integrate RabbitMQ in Spring Boot application
  • How to create producer/consumer to send/receive messages of String, JSON or Java Object
  • How to handle fault tolerance
  • How to provide concurrency

Create Spring Boot Project and Add RabbitMQ Dependencies

The first step required to make use of RabbitMQ in your application is to create a spring boot application and add the required dependencies of RabbitMQ into it. If you already have a Spring Boot application and wanted to just integrate RabbitMQ into it, then you can simply add the dependencies without creating a new project. However, creating a separate project for RabbitMQ instead of integrating into the existing project has one advantage that your message queue related code will be outside your actual application, hence, it can be easily shared or pluggable to any of your other application if required.

You can either create a Spring Boot project from spring initializer and import in your IDE, or you can create directly from Spring Tool Suite IDE (if you are using it). Add RabbitMQ dependency spring-boot-starter-amqpin pom.xml.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency> 

Add and Set Some Configs. In application.properties

These properties names are self-explanatory. The properties will be used in the application for exchange name, queue name, biding, etc.

# Message Queue specific configs for app1
app1.exchange.name=app1-exchange
app1.queue.name=app1-queue
app1.routing.key=app1-routing-key

# Message Queue specific configs for app2
app2.exchange.name=app2-exchange
app2.queue.name=app2-queue
app2.routing.key=app2-routing-key

#AMQP RabbitMQ configuration 
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# Additional RabbitMQ properties
spring.rabbitmq.listener.simple.concurrency=4
spring.rabbitmq.listener.simple.max-concurrency=8
spring.rabbitmq.listener.simple.retry.initial-interval=5000

Create a Properties File Reader Class

After creating the properties file, let's create a property file reader class to read the properties.

package com.dpk.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;

@Configuration
@PropertySource("classpath:application.properties")
public class ApplicationConfigReader {

@Value("${app1.exchange.name}")
private String app1Exchange;

@Value("${app1.queue.name}")
private String app1Queue;

@Value("${app1.routing.key}")
private String app1RoutingKey;

@Value("${app2.exchange.name}")
private String app2Exchange;

@Value("${app2.queue.name}")
private String app2Queue;

@Value("${app2.routing.key}")
private String app2RoutingKey;

// All getters and setters

}

Create the Beans for Queue, Exchange, Routing Key, and Binding

package com.dpk;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;

import com.dpk.config.ApplicationConfigReader;


@EnableRabbit
@SpringBootApplication
public class MsgqApplication extends SpringBootServletInitializer implements RabbitListenerConfigurer {

@Autowired
private ApplicationConfigReader applicationConfig;

public ApplicationConfigReader getApplicationConfig() {
return applicationConfig;
}

public void setApplicationConfig(ApplicationConfigReader applicationConfig) {
this.applicationConfig = applicationConfig;
}

public static void main(String[] args) {
SpringApplication.run(MsgqApplication.class, args);
}

protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(MsgqApplication.class);
}

/* This bean is to read the properties file configs */
@Bean
public ApplicationConfigReader applicationConfig() {
return new ApplicationConfigReader();
}

/* Creating a bean for the Message queue Exchange */
@Bean
public TopicExchange getApp1Exchange() {
return new TopicExchange(getApplicationConfig().getApp1Exchange());
}

/* Creating a bean for the Message queue */
@Bean
public Queue getApp1Queue() {
return new Queue(getApplicationConfig().getApp1Queue());
}

/* Binding between Exchange and Queue using routing key */
@Bean
public Binding declareBindingApp1() {
return BindingBuilder.bind(getApp1Queue()).to(getApp1Exchange()).with(getApplicationConfig().getApp1RoutingKey());
}

/* Creating a bean for the Message queue Exchange */
@Bean
public TopicExchange getApp2Exchange() {
return new TopicExchange(getApplicationConfig().getApp2Exchange());
}

/* Creating a bean for the Message queue */
@Bean
public Queue getApp2Queue() {
return new Queue(getApplicationConfig().getApp2Queue());
}

/* Binding between Exchange and Queue using routing key */
@Bean
public Binding declareBindingApp2() {
return BindingBuilder.bind(getApp2Queue()).to(getApp2Exchange()).with(getApplicationConfig().getApp2RoutingKey());
}

/* Bean for rabbitTemplate */
@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
return rabbitTemplate;
}

@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}

@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}

@Bean
public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(consumerJackson2MessageConverter());
return factory;
}

@Override
public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}

}

Create a Message Sender

MessageSender is pretty simple. It just makes use of convertAndSend() method of RabbitTemplate to send the message to the queue using exchange, routing-key, and data.

package com.dpk;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

/**
 * Message sender to send message to queue using exchange.
 */

@Component
public class MessageSender {

private static final Logger log = LoggerFactory.getLogger(MessageSender.class);

/**
 * 
 * @param rabbitTemplate
 * @param exchange
 * @param routingKey
 * @param data
 */
public void sendMessage(RabbitTemplate rabbitTemplate, String exchange, String routingKey, Object data) {
log.info("Sending message to the queue using routingKey {}. Message= {}", routingKey, data);
rabbitTemplate.convertAndSend(exchange, routingKey, data);
log.info("The message has been sent to the queue.");
}

}

Create Message Listeners

Implementing message listener is tricky as it requires handling some of the scenarios like:

  • How to auto deserialize the message to a POJO
  • What if listener is making a REST call to some API which is unreachable, or what if an error occurred on the API side while processing the request?
  • How to make multiple listeners to concurrently pop the message from queue and process
  • When and how to re-queue the message in the message queue in failure scenarios

Deserializing Message to POJO

Spring provides an annotation @RabbitListener , which can be used to receive messages from the queue. It has a great feature of deserializing the message to a POJO while receiving. The below example illustrates that.

Error Handling and Message Re-Queuing Feature in Listener

  • Listener is trying to call an API to process the request that is unreachable
  • Listener has called the API but error occurred in API while processing the request
  • In this situation, depending on your business requirement, either you should not re-queue the message, or you should re-queue with max number of trial option to re-try to process it up to a limit.

    To not requeue the message in queue, you can throw exception AmqpRejectAndDontRequeueException . For max number of trial handling, you can add an additional parameter in the message to set max number of trial and use it while receiving the message by incrementing it’s value and checking whether total number of trial has not exceeded max limit.

    There is an alternative of above approach to add this properties in application.properties and specify max number of attempts:-

    spring.rabbitmq.listener.simple.retry.max-attempts=3

    Concurrency Capability

    Concurrency feature can be implemented in 2 ways:

    • Create a thread pool with a specified number of max threads and using ThreadExecutorcall the methods/APIs to process the request.
    • By using Inbuild concurrency feature. I believe this is the simplest approach to implement concurrency. This requires just making use of 2 properties in application.properties file.

    Note: You can set the values of these properties as per your application scalability.

    spring.rabbitmq.listener.simple.concurrency=4

    spring.rabbitmq.listener.simple.max-concurrency=8

    package com.dpk;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.AmqpRejectAndDontRequeueException;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.http.HttpStatus;
    import org.springframework.stereotype.Service;
    import org.springframework.web.client.HttpClientErrorException;
    import com.dpk.config.ApplicationConfigReader;
    import com.dpk.dto.UserDetails;
    import com.dpk.util.ApplicationConstant;
    
    /**
     * Message Listener for RabbitMQ
     */
    
    @Service
    public class MessageListener {
    
        private static final Logger log = LoggerFactory.getLogger(MessageListener.class);
    
        @Autowired
        ApplicationConfigReader applicationConfigReader;
    
    
        /**
         * Message listener for app1
         * @param UserDetails a user defined object used for deserialization of message
         */
        @RabbitListener(queues = "${app1.queue.name}")
        public void receiveMessageForApp1(final UserDetails data) {
        log.info("Received message: {} from app1 queue.", data);
    
        try {
        log.info("Making REST call to the API");
        //TODO: Code to make REST call
            log.info("<< Exiting receiveMessageForApp1() after API call.");
        } catch(HttpClientErrorException  ex) {
    
        if(ex.getStatusCode() == HttpStatus.NOT_FOUND) {
            log.info("Delay...");
            try {
        Thread.sleep(ApplicationConstant.MESSAGE_RETRY_DELAY);
        } catch (InterruptedException e) { }
    
        log.info("Throwing exception so that message will be requed in the queue.");
        // Note: Typically Application specific exception should be thrown below
        throw new RuntimeException();
        } else {
        throw new AmqpRejectAndDontRequeueException(ex); 
        }
    
        } catch(Exception e) {
        log.error("Internal server error occurred in API call. Bypassing message requeue {}", e);
        throw new AmqpRejectAndDontRequeueException(e); 
        }
    
        }
    
    
        /**
         * Message listener for app2
         * 
         */
    
        @RabbitListener(queues = "${app2.queue.name}")
        public void receiveMessageForApp2(String reqObj) {
        log.info("Received message: {} from app2 queue.", reqObj);
    
        try {
        log.info("Making REST call to the API");
        //TODO: Code to make REST call
            log.info("<< Exiting receiveMessageCrawlCI() after API call.");
        } catch(HttpClientErrorException  ex) {
    
        if(ex.getStatusCode() == HttpStatus.NOT_FOUND) {
            log.info("Delay...");
            try {
        Thread.sleep(ApplicationConstant.MESSAGE_RETRY_DELAY);
        } catch (InterruptedException e) { }
    
        log.info("Throwing exception so that message will be requed in the queue.");
        // Note: Typically Application specific exception can be thrown below
        throw new RuntimeException();
        } else {
        throw new AmqpRejectAndDontRequeueException(ex); 
        }
    
        } catch(Exception e) {
        log.error("Internal server error occurred in python server. Bypassing message requeue {}", e);
        throw new AmqpRejectAndDontRequeueException(e); 
        }
    
        }
    
    }
    

    Create the Service Endpoint

    Finally, create the service class that contains the service endpoint, which will be called by the user. The service class calls the MessageSender to put the message into the queue.

    package com.dpk;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.http.HttpStatus;
    import org.springframework.http.MediaType;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.dpk.config.ApplicationConfigReader;
    import com.dpk.dto.UserDetails;
    import com.dpk.util.ApplicationConstant;
    
    
    @RestController
    @RequestMapping(path = "/userservice")
    public class UserService {
    
    private static final Logger log = LoggerFactory.getLogger(UserService.class);
    
    private final RabbitTemplate rabbitTemplate;
    private ApplicationConfigReader applicationConfig;
    private MessageSender messageSender;
    
    public ApplicationConfigReader getApplicationConfig() {
    return applicationConfig;
    }
    
    @Autowired
    public void setApplicationConfig(ApplicationConfigReader applicationConfig) {
    this.applicationConfig = applicationConfig;
    }
    
    @Autowired
    public UserService(final RabbitTemplate rabbitTemplate) {
    this.rabbitTemplate = rabbitTemplate;
    }
    
    public MessageSender getMessageSender() {
    return messageSender;
    }
    
    @Autowired
    public void setMessageSender(MessageSender messageSender) {
    this.messageSender = messageSender;
    }
    
    
    @RequestMapping(path = "/add", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
    public ResponseEntity<?> sendMessage(@RequestBody UserDetails user) {
    
    String exchange = getApplicationConfig().getApp1Exchange();
    String routingKey = getApplicationConfig().getApp1RoutingKey();
    
    /* Sending to Message Queue */
    try {
    messageSender.sendMessage(rabbitTemplate, exchange, routingKey, user);
    return new ResponseEntity<String>(ApplicationConstant.IN_QUEUE, HttpStatus.OK);
    
    } catch (Exception ex) {
    log.error("Exception occurred while sending message to the queue. Exception= {}", ex);
    return new ResponseEntity(ApplicationConstant.MESSAGE_QUEUE_SEND_ERROR,
    HttpStatus.INTERNAL_SERVER_ERROR);
    }
    
    }
    
    }
    

    Wrap-Up

    If you want to view the complete code, then I have uploaded the same on GitHub. You can log in and view. If you have any doubts or any suggestions, feel free to write in the comments below. I would be happy to interact with you. Thank you!

    Spring Framework Spring Boot Fault tolerance Integration Fault (technology)

    Opinions expressed by DZone contributors are their own.

    Related

    • Integrate Spring With Open AI
    • Secure Spring Boot Application With Keycloak
    • Spring Config Integration With a PCF Application: A Step-by-Step Guide
    • Integration Between Java and Slack With Webhooks

    Partner Resources

    ×

    Comments
    Oops! Something Went Wrong

    The likes didn't load as expected. Please refresh the page and try again.

    ABOUT US

    • About DZone
    • Support and feedback
    • Community research
    • Sitemap

    ADVERTISE

    • Advertise with DZone

    CONTRIBUTE ON DZONE

    • Article Submission Guidelines
    • Become a Contributor
    • Core Program
    • Visit the Writers' Zone

    LEGAL

    • Terms of Service
    • Privacy Policy

    CONTACT US

    • 3343 Perimeter Hill Drive
    • Suite 100
    • Nashville, TN 37211
    • support@dzone.com

    Let's be friends:

    Likes
    There are no likes...yet! 👀
    Be the first to like this post!
    It looks like you're not logged in.
    Sign in to see who liked this post!