Event-Driven Microservices Using Spring Cloud Stream and RabbitMQ
Propagating one trace and span ID between all calls of a single order isn't that difficult when you have the right knowledge and toolset.
Join the DZone community and get the full member experience.
Join For FreeBefore we start, let’s look at the site Spring Cloud Quick Start. There is a list of Spring-cloud releases available grouped as release trains. We use the newest release, Camden.SR5, with 1.4.4.RELEASE Spring Boot and Brooklyn.SR2 Spring Cloud Stream version.
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.4.RELEASE</version>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
Here’s our architecture visualization: The order service sends a message to RabbitMQ topic exchange. Product and shipment services listen on that topic for incoming order messages and then process them. After processing, they send a reply message to the topic that the payment service listens to. The payment service stores incoming messages aggregating replies from product and shipment services, then count prices and sends the final response.
Each service has the following dependencies. We have a sample-common
module where the object for messages sent to topics is stored. They’re shared between all services. We’re also using Spring Cloud Sleuth for distributed tracing with one request ID between all microservices.
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
<dependency>
<groupId>pl.piomin.services</groupId>
<artifactId>sample-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
Let me start with a few words on the theoretical aspects of the Spring cloud stream. The Spring Cloud Stream Reference Guide provides a short reference to that framework. It’s based on Spring integration. It provides three predefined interfaces out of the box:
- Source: Can be used for an application which has a single outbound channel.
- Sink: Can be used for an application which has a single inbound channel.
- Processor: Can be used for an application which has both an inbound channel and an outbound channel.
I’m going to show you sample usage of all of these interfaces. In the order service, we’re using Source
class. Using @InboundChannelAdapter
and @Poller
annotations, we’re sending the order message to output once per 10 seconds.
@SpringBootApplication
@EnableBinding(Source.class)
public class Application {
protected Logger logger = Logger.getLogger(Application.class.getName());
private int index = 0;
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
public MessageSource<Order> orderSource() {
return () -> {
Order o = new Order(index++, OrderType.PURCHASE, LocalDateTime.now(), OrderStatus.NEW, new Product("Example#2"), new Shipment(ShipmentType.SHIP));
logger.info("Sending order: " + o);
return new GenericMessage<>(o);
};
}
@Bean
public AlwaysSampler defaultSampler() {
return new AlwaysSampler();
}
}
Here’s the output configuration in an application.yml
file:
spring:
cloud:
stream:
bindings:
input:
destination: ex.stream.in
binder: rabbit1
output:
destination: ex.stream.out
group: order
binder: rabbit1
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.99.100
port: 30000
username: guest
password: guest
Product and shipment services use the Processor
interface. They listen to stream input and, after processing, send a message to their outputs.
@SpringBootApplication
@EnableBinding(Processor.class)
public class Application {
@Autowired
private ProductService productService;
protected Logger logger = Logger.getLogger(Application.class.getName());
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Order processOrder(Order order) {
logger.info("Processing order: " + order);
order.setProduct(productService.processOrder(order));
logger.info("Output order: " + order);
return order;
}
@Bean
public AlwaysSampler defaultSampler() {
return new AlwaysSampler();
}
}
Here’s service configuration. It listens on order service output exchange and also defines its group, named Product
. That group name will be used for automatic queue creation and exchange binding on RabbitMQ. There is also output exchange defined.
spring:
cloud:
stream:
bindings:
input:
destination: ex.stream.out
group: product
binder: rabbit1
output:
destination: ex.stream.out2
binder: rabbit1
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.99.100
port: 30000
username: guest
password: guest
We use a Docker container for running a RabbitMQ instance.
docker run -d --name rabbit1 -p 30001:15672 -p 30000:5672 rabbitmq:management
Let’s look at the management console. It’s available on http://192.168.99.100:30001
. Here’s the ex.stream.out
topic exchange configuration. Below we see the list of declared queues.
Here’s the main application class from the payment service. We use the Sink interface for listening to incoming messages. The input order is processed and we print the final price of the order sent by the order service. (The sample application source code is available on GitHub!)
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
@Autowired
private PaymentService paymentService;
protected Logger logger = Logger.getLogger(Application.class.getName());
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void processOrder(Order order) {
logger.info("Processing order: " + order);
Order o = paymentService.processOrder(order);
if (o != null)
logger.info("Final response: " + (o.getProduct().getPrice() + o.getShipment().getPrice()));
}
@Bean
public AlwaysSampler defaultSampler() {
return new AlwaysSampler();
}
}
By using @Bean AlwaysSampler
in every main class of our microservices, we propagate one trace and span ID between all calls of a single order.
Also, I get the following warning message, which I don't understand:
Deprecated trace headers detected. Please upgrade Sleuth to 1.1 or start sending headers present in the TraceMessageHeaders class’.
Version 1.1.2.RELEASE of Spring Cloud Sleuth is not an applicable Camden.SR5 release? Let me know what you think.
Here's a fragment from our microservices logging console:
Published at DZone with permission of Piotr Mińkowski, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments