{{announcement.body}}
{{announcement.title}}

Spring Boot + RabbitMQ Tutorial — Retry and Error Handling Example

DZone 's Guide to

Spring Boot + RabbitMQ Tutorial — Retry and Error Handling Example

In this tutorial, we will be implementing a Spring Boot + RabbitMQ example to retry messages on exception.

· Integration Zone ·
Free Resource

In a previous tutorial, we implemented a Spring Boot + RabbitMQ example to understand the various exchange types. In this tutorial, we will be implementing a Spring Boot + RabbitMQ example to retry messages on exception. If the exception still exists after maximum retries, then we put a message in a dead letter queue where it can be analyzed and corrected later.

What Is a Dead Letter Queue?

In English vocabulary, dead letter mail is undeliverable mail that cannot be delivered to the addressee. A dead-letter queue (DLQ), sometimes known as an undelivered-message queue, is a holding queue for messages that cannot be delivered to their destinations due to something.
According to Wikipedia — In message queueing the dead letter queue is a service implementation to store messages that meet one or more of the following failure criteria:

  • Message that is sent to a queue that does not exist
  • Queue length limit exceeded
  • Message length limit exceeded
  • Message is rejected by another queue exchange
  • Message reaches a threshold read counter number because it is not consumed. Sometimes this is called a “back out queue”

Later on, we can analyze the messages in the DLQ to know the reason why the messages are failing.


Dead Letter Queue Tutorial


This tutorial is explained in the below YouTube video.

We will be implementing two modules:

  1. Spring Boot Producer Module — It will produce a message and put it in the RabbitMQ queue. It will also be responsible for creating the required queues including the dead letter queue.
  2. Spring Boot Consumer Module — It will consume a message from the RabbitMQ queue. We will be throwing an exception and then retrying the message. After maximum retries, it will then be put in dead letter queue.
    Spring Boot + RabbitMQ Error Handling Application

Spring Boot + RabbitMQ Producer Module

The Maven project will be as follows:

Spring Boot + RabbitMQ Producer Tutorial 

The pom.xml will have the following dependencies:

XML
 




x
35


 
1
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3
 <modelVersion>4.0.0</modelVersion>
4
 <groupId>com.javainuse</groupId>
5
 <artifactId>spring-boot-rabbitmq-producer</artifactId>
6
 <version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><parent>
7
  <groupId>org.springframework.boot</groupId>
8
  <artifactId>spring-boot-starter-parent</artifactId>
9
  <version>2.1.1.RELEASE</version>
10
  <relativePath />
11
 </parent><properties>
12
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
13
  <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
14
  <java.version>1.8</java.version>
15
 </properties><dependencies>
16
  <dependency>
17
   <groupId>org.springframework.boot</groupId>
18
   <artifactId>spring-boot-starter-amqp</artifactId>
19
  </dependency>
20
  <dependency>
21
   <groupId>org.springframework.boot</groupId>
22
   <artifactId>spring-boot-starter-web</artifactId>
23
  </dependency>
24
  <dependency>
25
   <groupId>org.springframework.boot</groupId>
26
   <artifactId>spring-boot-starter-logging</artifactId>
27
  </dependency>
28
 </dependencies><build>
29
  <plugins>
30
   <plugin>
31
    <groupId>org.springframework.boot</groupId>
32
    <artifactId>spring-boot-maven-plugin</artifactId>
33
   </plugin>
34
  </plugins>
35
 </build></project>



Define the domain class Employee as follows:

Java
 




xxxxxxxxxx
1
21


 
1
package com.javainuse.model;import com.fasterxml.jackson.annotation.JsonIdentityInfo;
2
import com.fasterxml.jackson.annotation.ObjectIdGenerators;@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id", scope = Employee.class)
3
public class Employee {    private String empName;
4
    private String empId;
5
    private int salary;    public String getEmpName() {
6
        return empName;
7
    }    public void setEmpName(String empName) {
8
        this.empName = empName;
9
    }    public String getEmpId() {
10
        return empId;
11
    }    public void setEmpId(String empId) {
12
        this.empId = empId;
13
    }    public int getSalary() {
14
        return salary;
15
    }    public void setSalary(int salary) {
16
        this.salary = salary;
17
    }    @Override
18
    public String toString() {
19
        return "Employee [empName=" + empName + ", empId=" + empId + ", salary=" + salary + "]";
20
    }
21
}



Next, define the configuration class where we:

  • Create Direct Exchanges named — deadLetterExchange and javainuseExchange.
  • Create Queue named javainuse and dlq. For the javainuse queue, specify the x-dead-letter-exchange argument as the deadLetterExchange. This means that any message in the javainuse queue that cannot be delivered will be sent to the deadLetterExchange.
  • Bind the javainuse queue with javainuseExchange and the dlq queue with deadLetterExchange.
Java
 




xxxxxxxxxx
1
60


 
1
package com.javainuse.config;
2
 
          
3
import org.springframework.amqp.core.AmqpTemplate;
4
import org.springframework.amqp.core.Binding;
5
import org.springframework.amqp.core.BindingBuilder;
6
import org.springframework.amqp.core.DirectExchange;
7
import org.springframework.amqp.core.Queue;
8
import org.springframework.amqp.core.QueueBuilder;
9
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
10
import org.springframework.amqp.rabbit.core.RabbitTemplate;
11
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
12
import org.springframework.amqp.support.converter.MessageConverter;
13
import org.springframework.context.annotation.Bean;
14
import org.springframework.context.annotation.Configuration;
15
 
          
16
@Configuration
17
public class RabbitMQConfig {
18
 
          
19
    @Bean
20
    DirectExchange deadLetterExchange() {
21
        return new DirectExchange("deadLetterExchange");
22
    }
23
    
24
    @Bean
25
    DirectExchange exchange() {
26
        return new DirectExchange("javainuseExchange");
27
    }
28
 
          
29
    @Bean
30
    Queue dlq() {
31
        return QueueBuilder.durable("deadLetter.queue").build();
32
    }
33
 
          
34
    @Bean
35
    Queue queue() {
36
        return QueueBuilder.durable("javainuse.queue").withArgument("x-dead-letter-exchange", "deadLetterExchange")
37
                .withArgument("x-dead-letter-routing-key", "deadLetter").build();
38
    }
39
 
          
40
    @Bean
41
    Binding DLQbinding() {
42
        return BindingBuilder.bind(dlq()).to(deadLetterExchange()).with("deadLetter");
43
    }
44
 
          
45
    @Bean
46
    Binding binding() {
47
        return BindingBuilder.bind(queue()).to(exchange()).with("javainuse");
48
    }
49
 
          
50
    @Bean
51
    public MessageConverter jsonMessageConverter() {
52
        return new Jackson2JsonMessageConverter();
53
    }
54
 
          
55
    public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
56
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
57
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
58
        return rabbitTemplate;
59
    }
60
}



Create the RabbitMQWebController class where we expose API to send a message to RabbitMQ Exchange.

Java
 




xxxxxxxxxx
1
17


 
1
package com.javainuse.controller;import org.springframework.amqp.core.AmqpTemplate;
2
import org.springframework.beans.factory.annotation.Autowired;
3
import org.springframework.web.bind.annotation.GetMapping;
4
import org.springframework.web.bind.annotation.RequestMapping;
5
import org.springframework.web.bind.annotation.RequestParam;
6
import org.springframework.web.bind.annotation.RestController;@RestController
7
@RequestMapping(value = "/javainuse-rabbitmq/")
8
public class RabbitMQWebController {    @Autowired
9
    private AmqpTemplate amqpTemplate;    @GetMapping(value = "/producer")
10
    public String producer(@RequestParam("empName") String empName,@RequestParam("empId") String empId,@RequestParam("salary") int salary) {
11
        Employee emp=new Employee();
12
        emp.setEmpId(empId);
13
        emp.setEmpName(empName);
14
        emp.setSalary(salary);        amqpTemplate.convertAndSend("javainuseExchange", "javainuse", emp);
15
        return "Message sent to the RabbitMQ Successfully";
16
    }
17
}



Create the Spring Boot Bootstrap class with SpringBootApplication annotation.

Java
 




xxxxxxxxxx
1


 
1
 package com.javainuse;import org.springframework.boot.SpringApplication;
2
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
3
public class SpringBootHelloWorldApplication {    public static void main(String[] args) {
4
        SpringApplication.run(SpringBootHelloWorldApplication.class, args);
5
    }
6



Spring Boot Consumer Module

The project will be as follows:
Spring Boot RabbitMQ consume Eclipse Setup
Define the pom.xml as follows: Add the spring-boot-starter-amqp dependency.

XML
 




xxxxxxxxxx
1
37


 
1
<?xml version="1.0" encoding="UTF-8"?>
2
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4
 <modelVersion>4.0.0</modelVersion><groupId>com.javainuse</groupId>
5
 <artifactId>spring-boot-rabbitmq-consumer</artifactId>
6
 <version>0.0.1-SNAPSHOT</version>
7
 <packaging>jar</packaging><name>spring-boot-rabbitmq-consumer</name><parent>
8
  <groupId>org.springframework.boot</groupId>
9
  <artifactId>spring-boot-starter-parent</artifactId>
10
  <version>2.1.1.RELEASE</version>
11
  <relativePath /> <!-- lookup parent from repository -->
12
 </parent><properties>
13
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
14
  <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
15
  <java.version>1.8</java.version>
16
 </properties><dependencies>
17
  <dependency>
18
   <groupId>org.springframework.boot</groupId>
19
   <artifactId>spring-boot-starter-amqp</artifactId>
20
  </dependency>
21
  <dependency>
22
   <groupId>org.springframework.boot</groupId>
23
   <artifactId>spring-boot-starter-web</artifactId>
24
  </dependency>
25
  <dependency>
26
   <groupId>org.springframework.boot</groupId>
27
   <artifactId>spring-boot-starter-logging</artifactId>
28
  </dependency>
29
 </dependencies><build>
30
  <plugins>
31
   <plugin>
32
    <groupId>org.springframework.boot</groupId>
33
    <artifactId>spring-boot-maven-plugin</artifactId>
34
   </plugin>
35
  </plugins>
36
 </build><description>SpringBootRabbitMQConsumer</description>
37
</project>



Define the domain class Employee as follows:

Java
 




xxxxxxxxxx
1
21


 
1
package com.javainuse.model;import com.fasterxml.jackson.annotation.JsonIdentityInfo;
2
import com.fasterxml.jackson.annotation.ObjectIdGenerators;@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id", scope = Employee.class)
3
public class Employee {    private String empName;
4
    private String empId;
5
    private int salary;    public String getEmpName() {
6
        return empName;
7
    }    public void setEmpName(String empName) {
8
        this.empName = empName;
9
    }    public String getEmpId() {
10
        return empId;
11
    }    public void setEmpId(String empId) {
12
        this.empId = empId;
13
    }    @Override
14
    public String toString() {
15
        return "Employee [empName=" + empName + ", empId=" + empId + ", salary=" + salary + "]";
16
    }    public int getSalary() {
17
        return salary;
18
    }    public void setSalary(int salary) {
19
        this.salary = salary;
20
    }
21
}



Define a custom checked exception named InvalidSalaryException as follows:

Java
 




xxxxxxxxxx
1


 
1
package com.javainuse.exception;public class InvalidSalaryException extends Exception {    private static final long serialVersionUID = -3154618962130084535L;}



Define the RabbitMQConsumer class, which consumes the message from RabbitMQ using RabbitListener. The RabbitMQ listener listens to the RabbitMQ queue for any incoming messages. For the basic configuration, we specify the Queue/Topic name (the name of the queue/topic where the message should be consumed). Also here we will be checking the incoming message for the salary field. If this field is negative, we will be throwing an InvalidSalaryException.

Java
 




xxxxxxxxxx
1
13


 
1
package com.javainuse.service;import org.slf4j.Logger;
2
import org.slf4j.LoggerFactory;
3
import org.springframework.amqp.rabbit.annotation.RabbitListener;
4
import org.springframework.stereotype.Component;import com.javainuse.exception.InvalidSalaryException;
5
import com.javainuse.model.Employee;@Component
6
public class RabbitMQConsumer {    private static final Logger logger = LoggerFactory.getLogger(RabbitMQConsumer.class);    @RabbitListener(queues = "javainuse.queue")
7
    public void recievedMessage(Employee employee) throws InvalidSalaryException {
8
        logger.info("Recieved Message From RabbitMQ: " + employee);
9
        if (employee.getSalary() < 0) {
10
            throw new InvalidSalaryException();
11
        }
12
    }
13
}



Next, define the following properties in application.yml. Here we enable the Spring Boot RabbitMQ retry mechanism and specify some more additional parameters:

  • Initial interval: The message should be retried after an interval of 3s.
  • Max-attempts: The message should be retried maximum of 6 times. After which it will be sent to dead letter Queue.
  • Max-interval: The maximum time interval between two retries should never exceed 10s.
  • Multiplier: The interval between second retry gets multiplied by 2. But this interval can never exceed the max-interval. So the retry interval values will be 3s, 6s, 10s, 10s, 10s. As 10 sec is the max interval specified.
Java
 




xxxxxxxxxx
1
13


 
1
spring:
2
  rabbitmq:
3
    listener:
4
      simple:
5
        retry:
6
          enabled: true
7
          initial-interval: 3s
8
          max-attempts: 6
9
          max-interval: 10s
10
          multiplier: 2
11
         
12
server:
13
  port: 8081



Finally, define the Spring Boot Class with @SpringBootApplication annotation:

Java
 




xxxxxxxxxx
1
11


 
1
package com.javainuse;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
2
import org.springframework.boot.SpringApplication;
3
import org.springframework.boot.autoconfigure.SpringBootApplication;
4
import org.springframework.context.annotation.Bean;@SpringBootApplication
5
public class SpringBootConsumerApplication {    public static void main(String[] args) {
6
        SpringApplication.run(SpringBootConsumerApplication.class, args);
7
    }    @Bean
8
    public Jackson2JsonMessageConverter converter() {
9
        return new Jackson2JsonMessageConverter();
10
    }
11
}



In a previous tutorial, we have shown how to install RabbitMQ and get started.
Start the Producer and Consumer applications, and go to http://localhost:8080/javainuse-rabbitmq/producer?empName=emp1&empId=emp001&salary=-50. The message will be sent to the RabbitMQ queue named javainuse.queue and consumed by the consumer application. As the salary is negative, InvalidSalaryException will be thrown. This message will be retried 6 times and then will be put in dead letter queue.

Spring Boot RabbitMQ Retry Example 


Spring Boot DLQ Example

Thanks for reading!

Topics:
spring boot 2 ,spring boot ,rabbitmq ,rabbitmq tutorial ,spring amqp ,integration ,tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}