{{announcement.body}}
{{announcement.title}}

Spring Cloud Stream + RabbitMQ

DZone 's Guide to

Spring Cloud Stream + RabbitMQ

We'll use Spring Cloud Stream and RabbitMQ. Integrating Spring Boot Applications with Messaging Systems like Apache Kafka and RabbitMQ.

· Cloud Zone ·
Free Resource

In this example, we'll use Spring Cloud Stream and RabbitMQ. Integrating Spring Boot Applications with Messaging Systems like Apache Kafka and RabbitMQ.

If you look at these examples these required a lot of configuration code which was Broker specific. For example, in the case of RabbitMQ integration with Spring Boot, we had to write code to create AmqpTemplate Template and Bindings. So if tomorrow the Messaging System changes we will also need to make application code changes.

Spring Cloud helps solve this problem using Spring Cloud Stream. Using Spring Cloud Stream we can develop applications where we do not need to specify the implementation details of the messaging system we want to use. We just need to specify the required binding dependencies and Spring Cloud Stream will integrate the messaging systems to Spring Boot Application.

spring cloud

Spring Cloud Concepts

  • Binder— Depending upon the messaging system we will have to specify the messaging platform dependency, in this case, it's RabbitMQ
  • Source— When a message is needed to be published it is done using  Source . The  Source  is an interface having a method annotated with  @Output . The  @Output annotation is used to identify output channels. The Source takes a POJO object, serializes it, and then publishes it to the output channel.
Java
 




xxxxxxxxxx
1


1
public interface PaymentSource {
2
    // Defines methods for sending messages.
3
    @Output("paymentChannel")
4
    MessageChannel paymentChannel();
5
}
6
 
          



Channel— A channel represents an input and output pipe between the Spring Cloud Stream application and the Middleware Platform. A channel abstracts the queue that will either publish or consume the message. A channel is always associated with a queue. With this approach, we do not need to use the queue name in the application code. So if tomorrow, the queue needs to be changed, we don't need to change the application code.

For example, in the PaymentSource we have specified the channel name as  paymentChannel. In application.properties we have associated this channel with a RabbitMQ Exchange.

Start the Consumer first,

PaymentConsumerApplication.java — @EnableBinding annotation tells Spring Cloud Stream that you want to bind the Controller to a message broker. 

Java
 




x
22


 
1
package com.example;
2
 
          
3
import org.springframework.boot.SpringApplication;
4
import org.springframework.boot.autoconfigure.SpringBootApplication;
5
import org.springframework.cloud.stream.annotation.EnableBinding;
6
import org.springframework.cloud.stream.annotation.StreamListener;
7
import org.springframework.cloud.stream.messaging.Sink;
8
 
          
9
 
          
10
@EnableBinding(Sink.class)
11
@SpringBootApplication
12
public class PaymentConsumerApplication {
13
 
          
14
    public static void main(String[] args) {
15
        SpringApplication.run(PaymentConsumerApplication.class, args);
16
    }
17
 
          
18
    @StreamListener(target = Sink.INPUT)
19
    public void processRegisterEmployees(String payment) {
20
        System.out.println("Employees Registered by Client " + payment);
21
    }
22
}



application.properties

Java
 




xxxxxxxxxx
1


1
server.port=8090
2
spring.rabbitmq.host=localhost
3
spring.rabbitmq.port=5672
4
spring.rabbitmq.username=guest
5
spring.rabbitmq.password=guest
6
 
          
7
spring.cloud.stream.bindings.input.destination=payment
8
spring.cloud.stream.bindings.input.group=paymentQueue



pom.xml

Java
 




xxxxxxxxxx
1
87


1
<?xml version="1.0" encoding="UTF-8"?>
2
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4
    <modelVersion>4.0.0</modelVersion>
5
    <parent>
6
        <groupId>org.springframework.boot</groupId>
7
        <artifactId>spring-boot-starter-parent</artifactId>
8
        <version>2.2.2.RELEASE</version>
9
        <relativePath/> <!-- lookup parent from repository -->
10
    </parent>
11
    <groupId>com.example</groupId>
12
    <artifactId>employee-registration-consumer</artifactId>
13
    <version>0.0.1-SNAPSHOT</version>
14
    <name>employee-registration-consumer</name>
15
    <description>Spring Boot Open API Spec</description>
16
 
          
17
    <properties>
18
        <java.version>1.8</java.version>
19
        <spring-cloud.version>Hoxton.SR3</spring-cloud.version>
20
        <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
21
    </properties>
22
 
          
23
    <dependencies>
24
        <dependency>
25
            <groupId>org.springframework.boot</groupId>
26
            <artifactId>spring-boot-starter-amqp</artifactId>
27
        </dependency>
28
        <dependency>
29
            <groupId>org.springframework.cloud</groupId>
30
            <artifactId>spring-cloud-stream</artifactId>
31
        </dependency>
32
        <dependency>
33
            <groupId>org.springframework.cloud</groupId>
34
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
35
        </dependency>
36
 
          
37
        <dependency>
38
            <groupId>org.projectlombok</groupId>
39
            <artifactId>lombok</artifactId>
40
            <optional>true</optional>
41
        </dependency>
42
        <dependency>
43
            <groupId>org.springframework.boot</groupId>
44
            <artifactId>spring-boot-starter-test</artifactId>
45
            <scope>test</scope>
46
            <exclusions>
47
                <exclusion>
48
                    <groupId>org.junit.vintage</groupId>
49
                    <artifactId>junit-vintage-engine</artifactId>
50
                </exclusion>
51
            </exclusions>
52
        </dependency>
53
        <dependency>
54
            <groupId>org.springframework.amqp</groupId>
55
            <artifactId>spring-rabbit-test</artifactId>
56
            <scope>test</scope>
57
        </dependency>
58
        <dependency>
59
            <groupId>org.springframework.cloud</groupId>
60
            <artifactId>spring-cloud-stream-test-support</artifactId>
61
            <scope>test</scope>
62
        </dependency>
63
    </dependencies>
64
 
          
65
    <dependencyManagement>
66
        <dependencies>
67
            <dependency>
68
                <groupId>org.springframework.cloud</groupId>
69
                <artifactId>spring-cloud-dependencies</artifactId>
70
                <version>${spring-cloud.version}</version>
71
                <type>pom</type>
72
                <scope>import</scope>
73
            </dependency>
74
        </dependencies>
75
    </dependencyManagement>
76
 
          
77
    <build>
78
        <plugins>
79
            <plugin>
80
                <groupId>org.springframework.boot</groupId>
81
                <artifactId>spring-boot-maven-plugin</artifactId>
82
            </plugin>
83
        </plugins>
84
    </build>
85
 
          
86
</project>
87
 
          



RabbitMQ — http://localhost:15672/, username/password = guests/guests

rabbitMQ

Spring


Let's start the Producer Now.

Payment.java

Java
 




xxxxxxxxxx
1
10


 
1
@Data
2
@AllArgsConstructor
3
@NoArgsConstructor
4
@Builder
5
@JsonIgnoreProperties(ignoreUnknown = true)
6
public class Payment {
7
    private Integer customerNumber;
8
    private String checkNumber;
9
    private Double amount;
10
}


PaymentSource.java

Java
 




xxxxxxxxxx
1
10


1
package com.example;
2
 
          
3
import org.springframework.cloud.stream.annotation.Output;
4
import org.springframework.messaging.MessageChannel;
5
 
          
6
public interface PaymentSource {
7
    // Defines methods for sending messages.
8
    @Output("paymentChannel")
9
    MessageChannel paymentChannel();
10
}



PaymentController.java

Java
 




xxxxxxxxxx
1
25


1
package com.example;
2
 
          
3
import org.springframework.beans.factory.annotation.Autowired;
4
import org.springframework.cloud.stream.annotation.EnableBinding;
5
import org.springframework.integration.support.MessageBuilder;
6
import org.springframework.web.bind.annotation.RequestBody;
7
import org.springframework.web.bind.annotation.RequestMapping;
8
import org.springframework.web.bind.annotation.ResponseBody;
9
import org.springframework.web.bind.annotation.RestController;
10
 
          
11
@RestController
12
@EnableBinding(PaymentSource.class)
13
public class PaymentController {
14
 
          
15
    @Autowired
16
    PaymentSource paymentSource;
17
 
          
18
    @RequestMapping("/register")
19
    @ResponseBody
20
    public String orderFood(@RequestBody Payment payment) {
21
        paymentSource.paymentChannel().send(MessageBuilder.withPayload(payment).build());
22
        //System.out.println(payment.toString());
23
        return "Patment Details Provided";
24
    }
25
}



PaymentProducersApplication.java

@EnableBinding annotation tells Spring Cloud Stream that you want to bind the Controller to a message broker.

Java
 




xxxxxxxxxx
1
14


1
package com.example;
2
 
          
3
import org.springframework.boot.SpringApplication;
4
import org.springframework.boot.autoconfigure.SpringBootApplication;
5
 
          
6
@SpringBootApplication
7
public class PaymentProducersApplication {
8
 
          
9
    public static void main(String[] args) {
10
        SpringApplication.run(PaymentProducersApplication.class, args);
11
    }
12
 
          
13
}
14
 
          


rabbitMQ

application.properties

Java
 




xxxxxxxxxx
1


1
server.port=8080
2
spring.rabbitmq.host=localhost
3
spring.rabbitmq.port=5672
4
spring.rabbitmq.username=guest
5
spring.rabbitmq.password=guest
6
 
          
7
spring.cloud.stream.bindings.paymentChannel.destination=payment
8
spring.cloud.stream.default.contentType=application/json



POST

Make the POST request — 

POST request

Here is the output from

 Employees Registered by Client {"customerNumber":101,

"checkNumber":"HQ336336","amount":5000.0} 

Topics:
cloud, java, spring, spring cloud stream

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}