Building and Testing Message-Driven Microservices Using Spring Cloud Stream
In this microservice testing tutorial, learn to build, scale, run and test messaging microservices based on the RabbitMQ broker.
Join the DZone community and get the full member experience.
Join For FreeSpring Boot and Spring Cloud give you a great opportunity to build microservices fast using different styles of communication. You can create synchronous REST microservices based on Spring Cloud Netflix libraries as shown in one of my previous articles, Quick Guide to Microservices with Spring Boot 2.0, Eureka and Spring Cloud. You can create asynchronous, reactive microservices deployed on Netty with Spring WebFlux and combine it successfully with Spring Cloud libraries, as shown in my article Reactive Microservices with Spring WebFlux and Spring Cloud. And finally, you may implement message-driven microservices based on a publish/subscribe model using Spring Cloud Stream and a message broker like Apache Kafka or RabbitMQ. The last of listed approaches to building microservices is the main subject of this article. I'm going to show you how to effectively build, scale, run and test messaging microservices based on RabbitMQ broker.
Architecture
For the purpose of demonstrating Spring Cloud Stream's features, we will design a sample system which uses a publish/subscribe model for inter-service communication. We have three microservices: order-service
, product-service
and account-service
. The application order-service
exposes an HTTP endpoint that is responsible for processing orders sent to our system. All the incoming orders are processed asynchronously — order-service
prepares and sends a message to RabbitMQ exchange and then responds to the calling client that the request has been accepted for processing. Applications account-service
and product-service
are listening for the order messages incoming to the exchange. Microservice account-service
is responsible for checking if there are sufficient funds in the customer's account for order realization and then withdrawing cash from this account. The microservice product-service
checks if there is a sufficient amount of products in the store, and changes the number of available products after processing the order. Both account-service
and product-service
send an asynchronous response through RabbitMQ exchange (this time it is one-to-one communication using direct exchange) with a status of operation. The microservice order-service
after receiving the response messages, sets the appropriate status of the order and exposes it through the REST endpoint GET /order/{id}
to the external client.
If you feel that the description of our sample system is a little incomprehensible, here's a diagram with architecture for clarification:
Enabling Spring Cloud Stream
The recommended way to include Spring Cloud Stream in the project is with a dependency management system. Spring Cloud Stream has an independent release trains management in relation to the whole Spring Cloud framework. However, if we have declared spring-cloud-dependencies
in the Elmhurst.RELEASE
version inside the dependencyManagement
section, we wouldn't have to declare anything else in the pom.xml
. If you prefer to use only the Spring Cloud Stream project, you should define the following section.
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>Elmhurst.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
The next step is to add the spring-cloud-stream
artifact to the project dependencies. I also recommend you include at least the spring-cloud-sleuth
library to provide sending messaging with the same traceId
as the source request incoming to order-service
.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-sleuth</artifactId>
</dependency>
Spring Cloud Stream Programming Model
To enable connectivity to a message broker for your application, annotate the main class with @EnableBinding
. The @EnableBinding
annotation takes one or more interfaces as parameters. You may choose between three interfaces provided by Spring Cloud Stream:
- Sink: This is used for marking a service that receives messages from the inbound channel.
- Source: This is used for sending messages to the outbound channel.
- Processor: This can be used in case you need both an inbound channel and an outbound channel, as it extends the Source and Sink interfaces. Because
order-service
sends messages, as well as receives them, its main class has been annotated with@EnableBinding(Processor.class)
.
Here's the main class of order-service that enables Spring Cloud Stream binding.
@SpringBootApplication
@EnableBinding(Processor.class)
public class OrderApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(OrderApplication.class).web(true).run(args);
}
}
Adding Message Broker
In Spring Cloud Stream nomenclature the implementation responsible for integration with specific message broker is called binder. By default, Spring Cloud Stream provides binder implementations for Kafka and RabbitMQ. It is able to automatically detect and use a binder found on the classpath. Any middleware-specific settings can be overridden through external configuration properties in the form supported by Spring Boot, such as application arguments, environment variables, or just the application.yml
file. To include support for RabbitMQ, which is used in this article as a message broker, you should add the following dependency to the project.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
Now, our applications need to connect with one shared instance of RabbitMQ broker. That's why I run a Docker image with RabbitMQ exposed outside on default 5672 port. It also launches web dashboard available under address http://192.168.99.100:15672.
$ docker run -d --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
We need to override default address of RabbitMQ for every Spring Boot application by settings property spring.rabbitmq.host
to Docker machine IP 192.168.99.100.
spring:
rabbitmq:
host: 192.168.99.100
port: 5672
Implementing Message-Driven Microservices
Spring Cloud Stream is built on top of the Spring Integration project. Spring Integration extends the Spring programming model to support the well-known Enterprise Integration Patterns (EIP). EIP defines a number of components that are typically used for orchestration in distributed systems. You have probably heard about patterns such as message channels, routers, aggregators, or endpoints. Let's proceed to the implementation.
We begin from order-service
, which is responsible for accepting orders, publishing them on shared topic, and then collecting asynchronous responses from downstream services. Here's the @Service, which builds a message and publishes it to the remote topic using the Source
bean.
@Service
public class OrderSender {
@Autowired
private Source source;
public boolean send(Order order) {
return this.source.output().send(MessageBuilder.withPayload(order).build());
}
}
That @Service
is called by the controller, which exposes the HTTP endpoints for submitting new orders and getting an order with status by id
.
@RestController
public class OrderController {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);
private ObjectMapper mapper = new ObjectMapper();
@Autowired
OrderRepository repository;
@Autowired
OrderSender sender;
@PostMapping
public Order process(@RequestBody Order order) throws JsonProcessingException {
Order o = repository.add(order);
LOGGER.info("Order saved: {}", mapper.writeValueAsString(order));
boolean isSent = sender.send(o);
LOGGER.info("Order sent: {}", mapper.writeValueAsString(Collections.singletonMap("isSent", isSent)));
return o;
}
@GetMapping("/{id}")
public Order findById(@PathVariable("id") Long id) {
return repository.findById(id);
}
}
Now, let's take a closer look on the consumer side. The message sent by the OrderSender
bean from order-service
is received by account-service
and product-service
. To receive the message from the topic exchange, we just have to annotate the method that takes the Order object as a parameter with @StreamListener
. We also have to define target channel for listener — in that case, it is Processor.INPUT
.
@SpringBootApplication
@EnableBinding(Processor.class)
public class OrderApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderApplication.class);
@Autowired
OrderService service;
public static void main(String[] args) {
new SpringApplicationBuilder(OrderApplication.class).web(true).run(args);
}
@StreamListener(Processor.INPUT)
public void receiveOrder(Order order) throws JsonProcessingException {
LOGGER.info("Order received: {}", mapper.writeValueAsString(order));
service.process(order);
}
}
The received order is then processed by the AccountService
bean. The order may be accepted or rejected by account-service
dependending on sufficient funds in the customer's account for the order's realization. The response with the acceptance status is sent back to order-service
via an output channel invoked by the OrderSender
bean.
@Service
public class AccountService {
private static final Logger LOGGER = LoggerFactory.getLogger(AccountService.class);
private ObjectMapper mapper = new ObjectMapper();
@Autowired
AccountRepository accountRepository;
@Autowired
OrderSender orderSender;
public void process(final Order order) throws JsonProcessingException {
LOGGER.info("Order processed: {}", mapper.writeValueAsString(order));
List accounts = accountRepository.findByCustomer(order.getCustomerId());
Account account = accounts.get(0);
LOGGER.info("Account found: {}", mapper.writeValueAsString(account));
if (order.getPrice() <= account.getBalance()) {
order.setStatus(OrderStatus.ACCEPTED);
account.setBalance(account.getBalance() - order.getPrice());
} else {
order.setStatus(OrderStatus.REJECTED);
}
orderSender.send(order);
LOGGER.info("Order response sent: {}", mapper.writeValueAsString(order));
}
}
The last step is configuration. It is provided in the application.yml
file. We have to properly define destinations for channels. While order-service
is assigning an orders-out
destination to the output channel, and orders-in
destination to the input channel, account-service
and product-service
do the opposite. It is logical, because the message sent by order-service
via its output destination is received by consuming services via their input destinations. But, it is still the same destination on the shared broker's exchange. Here are the configuration settings of order-service
.
spring:
cloud:
stream:
bindings:
output:
destination: orders-out
input:
destination: orders-in
rabbit:
bindings:
input:
consumer:
exchangeType: direct
Here's configuration provided for account-service
and product-service
.
spring:
cloud:
stream:
bindings:
output:
destination: orders-in
input:
destination: orders-out
rabbit:
bindings:
output:
producer:
exchangeType: direct
routingKeyExpression: '"#"'
Finally, you can run our sample microservice. For now, we just need to run a single instance of each microservice. You can easily generate some test requests by running JUnit test class OrderControllerTest
provided in my source code repository inside the module order-service
. This case is simple. In the next we will study more advanced sample with multiple running instances of consuming services.
Scaling Up
To scale up our Spring Cloud Stream applications we just need to launch additional instances of each microservice. They will still listen for incoming messages on the same topic exchange as the currently running instances. After adding one instance of account-service
and product-service
, we may send a test order. The result of that test won't be satisfactory for us...Why? A single order is received by all the running instances of every microservice. This is exactly how topic exchanges works; the message sent to the topic is received by all consumers, which are listening on that topic. Fortunately, Spring Cloud Stream is able to solve that problem by providing a solution called consumer group. It is responsible for the guarantee that only one of the instances is expected to handle a given message if they are placed in a competing consumer relationship. The transformation to consumer group mechanism when running multiple instances of the service has been visualized on the following figure.
Configuration of a consumer group mechanism is not very difficult. We just have to set group
parameter with name of the group for given destination. Here's the current binding configuration for account-service
. The orders-in
destination is a queue created for direct communication with order-service
, so only orders-out
is grouped using spring.cloud.stream.bindings..group
property.
spring:
cloud:
stream:
bindings:
output:
destination: orders-in
input:
destination: orders-out
group: account
Consumer group mechanisms is a concept taken from Apache Kafka, and implemented in Spring Cloud Stream also for RabbitMQ broker, which does not natively support it. So, I think it is pretty interesting how it is configured on RabbitMQ. If you run two instances of the service without setting a group name on the destination, there are two bindings created for a single exchange (one binding per one instance) as shown in the picture below. Because two applications are listening on that exchange, there four bindings assigned to that exchange in total.
If you set the group name for a selected destination, Spring Cloud Stream will create a single binding for all running instances of a given service. The name of binding will be suffixed with the group name.
Because we have included spring-cloud-starter-sleuth
in the project dependencies, the same traceId
header is sent between all the asynchronous requests exchanged during the realization of a single request incoming to the order-service
POST endpoint. Thanks to that, we can easily correlate all logs using this header using Elastic Stack (Kibana).
Automated Testing
You can easily test your microservice without connecting to a message broker. To achieve it you need to include spring-cloud-stream-test-support
to your project dependencies. It contains the TestSupportBinder
bean that lets you interact with the bound channels and inspect any messages sent and received by the application.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
In the test class, we need to declare MessageCollector
bean, which is responsible for receiving messages retained by TestSupportBinder
. Here's my test class from account-service
. Using Processor
bean I send test order to input channel. Then MessageCollector
receives the message that is sent back to order-service
via output channel. The test method testAccepted
creates an order that should be accepted by account-service, while testRejected
method sets too high order price that results in rejecting the order.
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class OrderReceiverTest {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderReceiverTest.class);
@Autowired
private Processor processor;
@Autowired
private MessageCollector messageCollector;
@Test
@SuppressWarnings("unchecked")
public void testAccepted() {
Order o = new Order();
o.setId(1 L);
o.setAccountId(1 L);
o.setCustomerId(1 L);
o.setPrice(500);
o.setProductIds(Collections.singletonList(2 L));
processor.input().send(MessageBuilder.withPayload(o).build());
Message received = (Message) messageCollector.forChannel(processor.output()).poll();
LOGGER.info("Order response received: {}", received.getPayload());
assertNotNull(received.getPayload());
assertEquals(OrderStatus.ACCEPTED, received.getPayload().getStatus());
}
@Test
@SuppressWarnings("unchecked")
public void testRejected() {
Order o = new Order();
o.setId(1 L);
o.setAccountId(1 L);
o.setCustomerId(1 L);
o.setPrice(100000);
o.setProductIds(Collections.singletonList(2 L));
processor.input().send(MessageBuilder.withPayload(o).build());
Message received = (Message) messageCollector.forChannel(processor.output()).poll();
LOGGER.info("Order response received: {}", received.getPayload());
assertNotNull(received.getPayload());
assertEquals(OrderStatus.REJECTED, received.getPayload().getStatus());
}
}
Conclusion
Message-driven microservices are a good choice whenever you don't need synchronous responses from your API. In this article, I have shown a sample use case of a publish/subscribe model in inter-service communication between your microservices. The source code is, as usual, available on GitHub ( https://github.com/piomin/sample-message-driven-microservices.git). For more interesting examples of Spring Cloud Stream library along with Apache Kafka, you can refer to Chapter 11 in my book Mastering Spring Cloud ( https://www.packtpub.com/application-development/mastering-spring-cloud).
Published at DZone with permission of Piotr Mińkowski, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments