Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Event-Driven Microservices Using Spring Cloud Stream and RabbitMQ

DZone's Guide to

Event-Driven Microservices Using Spring Cloud Stream and RabbitMQ

Propagating one trace and span ID between all calls of a single order isn't that difficult when you have the right knowledge and toolset.

Free Resource

Share, secure, distribute, control, and monetize your APIs with the platform built with performance, time-to-value, and growth in mind. Free 90-day trial of 3Scale by Red Hat

Before we start, let’s look at the site Spring Cloud Quick Start. There is a list of Spring-cloud releases available grouped as release trains. We use the newest release, Camden.SR5, with 1.4.4.RELEASE Spring Boot and Brooklyn.SR2 Spring Cloud Stream version.

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.4.RELEASE</version>
</parent>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

Here’s our architecture visualization: The order service sends a message to RabbitMQ topic exchange. Product and shipment services listen on that topic for incoming order messages and then process them. After processing, they send a reply message to the topic that the payment service listens to. The payment service stores incoming messages aggregating replies from product and shipment services, then count prices and sends the final response.

sample1

Each service has the following dependencies. We have a sample-common module where the object for messages sent to topics is stored. They’re shared between all services. We’re also using Spring Cloud Sleuth for distributed tracing with one request ID between all microservices.

<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
<dependency>
<groupId>pl.piomin.services</groupId>
<artifactId>sample-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

Let me start with a few words on the theoretical aspects of the Spring cloud stream. The Spring Cloud Stream Reference Guide provides a short reference to that framework. It’s based on Spring integration. It provides three predefined interfaces out of the box:

  • Source: Can be used for an application which has a single outbound channel.
  • Sink: Can be used for an application which has a single inbound channel.
  • Processor: Can be used for an application which has both an inbound channel and an outbound channel.

I’m going to show you sample usage of all of these interfaces. In the order service, we’re using Source class. Using @InboundChannelAdapter and @Poller annotations, we’re sending the order message to output once per 10 seconds.

@SpringBootApplication
@EnableBinding(Source.class)
public class Application {

protected Logger logger = Logger.getLogger(Application.class.getName());

private int index = 0;

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
public MessageSource<Order> orderSource() {
return () -> {
Order o = new Order(index++, OrderType.PURCHASE, LocalDateTime.now(), OrderStatus.NEW, new Product("Example#2"), new Shipment(ShipmentType.SHIP));
logger.info("Sending order: " + o);
return new GenericMessage<>(o); 
};
}

@Bean
public AlwaysSampler defaultSampler() {
  return new AlwaysSampler();
}

}

Here’s the output configuration in an application.yml file:

 spring:
  cloud:
    stream:
      bindings:
        input:
          destination: ex.stream.in
          binder: rabbit1
        output:
          destination: ex.stream.out
          group: order
          binder: rabbit1
      binders:
        rabbit1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.99.100
                port: 30000
                username: guest
                password: guest               

Product and shipment services use the Processor interface. They listen to stream input and, after processing, send a message to their outputs.

@SpringBootApplication
@EnableBinding(Processor.class)
public class Application {

@Autowired
private ProductService productService;

protected Logger logger = Logger.getLogger(Application.class.getName());

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Order processOrder(Order order) {
logger.info("Processing order: " + order);
order.setProduct(productService.processOrder(order));
logger.info("Output order: " + order);
return order;
}

@Bean
public AlwaysSampler defaultSampler() {
return new AlwaysSampler();
}

}

Here’s service configuration. It listens on order service output exchange and also defines its group, named Product. That group name will be used for automatic queue creation and exchange binding on RabbitMQ. There is also output exchange defined.

 spring:
  cloud:
    stream:
      bindings:
        input:
          destination: ex.stream.out
          group: product
          binder: rabbit1
        output:
          destination: ex.stream.out2
          binder: rabbit1
      binders:
        rabbit1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.99.100
                port: 30000
                username: guest
                password: guest

We use a Docker container for running a RabbitMQ instance.

docker run -d --name rabbit1 -p 30001:15672 -p 30000:5672 rabbitmq:management

Let’s look at the management console. It’s available on http://192.168.99.100:30001. Here’s the ex.stream.out topic exchange configuration. Below we see the list of declared queues.

rabbit1

rabbit2

Here’s the main application class from the payment service. We use the Sink interface for listening to incoming messages. The input order is processed and we print the final price of the order sent by the order service. (The sample application source code is available on GitHub!)

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

@Autowired
private PaymentService paymentService;

protected Logger logger = Logger.getLogger(Application.class.getName());

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@StreamListener(Sink.INPUT)
public void processOrder(Order order) {
logger.info("Processing order: " + order);
Order o = paymentService.processOrder(order);
if (o != null)
logger.info("Final response: " + (o.getProduct().getPrice() + o.getShipment().getPrice()));
}

@Bean
public AlwaysSampler defaultSampler() {
return new AlwaysSampler();
}

}

By using @Bean AlwaysSampler in every main class of our microservices, we propagate one trace and span ID between all calls of a single order. 

Also, I get the following warning message, which I don't understand:

Deprecated trace headers detected. Please upgrade Sleuth to 1.1 or start sending headers present in the TraceMessageHeaders class’.

Version 1.1.2.RELEASE of Spring Cloud Sleuth is not an applicable Camden.SR5 release? Let me know what you think.

Here's a fragment from our microservices logging console: 

Image title

Explore the core elements of owning an API strategy and best practices for effective API programs. Download the API Owner's Manual, brought to you by 3Scale by Red Hat

Topics:
rabbitmq ,spring cloud stream ,integration ,event-driven microservices

Published at DZone with permission of Piotr Mińkowski, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}