Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Developing Event Driven Microservices With (Almost) No Code

DZone's Guide to

Developing Event Driven Microservices With (Almost) No Code

Learn about the uses of the event driven microservices architecture pattern, and how to implement it with the Spring Cloud Stream framework.

Free Resource

Share, secure, distribute, control, and monetize your APIs with the platform built with performance, time-to-value, and growth in mind. Free 90-day trial of 3Scale by Red Hat

Microservices are all the rage these days. One of the key microservices architecture patterns is the Event Driven Microservices pattern. It is easy to see the need for event driven microservices. Consider another microservices pattern - the database per service pattern, where each microservice owns its own database. But business transactions usually span multiple services, and hence, multiple databases. The question then is how to maintain consistency among these databases.

For example, say we have an Order service with its own database and a Customer service with a customer database. When an order is placed, we need to make sure the customer has not exceeded her credit limit. But the credit limit is in the customer database. The typical solution is to put the Order in a pending state and publish an Order event. The Customer service subscribes to the event, and in response, publishes a Credit allowed or disallowed event. Depending on the case, the Order service allows the order to go through, or not.

Spring Cloud Stream (SCS) is the Spring framework to build event driven microservices. The easiest way to think of SCS is as the bootification of Spring Integration. Spring Integration is a framework to implement the Enterprise Integration patterns set forth in Hohpe and Wolfe’s classic “Enterprise Integration Patterns.”

An important feature of SCS is the independence of the message producer and consumer code from the message broker specific code. The application communicates with the outside world through input and output channels injected into it by Spring Cloud Stream. Channels are connected to external brokers through middleware-specific Binder implementations. Simply by adding the required binding dependency to the code will let SCS create the connectivity and communications. Kafka and RabbitMQ binders are provided out of the box. Here is a diagram to illustrate these concepts:

Image title

Source.

A source provides data to the system (database, file system, etc.) and the system sends the data to an output channel. A sink receives data from an input channel and sends it some destination (database, file system etc.) A processor receives data on an input channel, does something - filter it, transform it, etc. - and sends the data to an output channel.

While in principle, there is an unlimited number of sources and sinks, in practice, this is a finite list. For instance, we get data by polling a database, a file system and so on. Likewise, the system sends its data to a log, to an Http/FTP endpoint, a database etc. Pivotal engineers have created implementations for some commonly occurring sources and sinks. For instance, File source, FTP source, Http source, JDC source, JMS source, Mail source, MongoDB source, Syslog source. Likewise, we have Cassandra sink, File sink, FTP sink, HDFS sink, Log sink, RabbitMQ sink, Redis sink, and so on. For a complete list, see this documentation.

Each of these has options to configure them. The options are provided as properties either via command line or via an application.properties (.yaml) file. For instance, the File source provides the options for the directory (to poll for new files), the file name pattern. The Http source has options for the path pattern and server port. See the above-mentioned reference for the list of available options for each source and sink. Of course, an unmentioned configuration option is the name of the output queue –for a source and the name of the input queue – for a sink. Each of the sources and sinks is available in a Rabbit and a Kafka version. See this repo for the jars.

We can choose a prebuilt source and put the event stream through a sequence of processors and send the output to a sink. Unsurprisingly, there are prebuilt jars for common processors as well. For instance, a Splitter processor, a Filter processor, a Transform processor, and so on. However, unlike the prebuilt sources and sink, if for instance, you want a Filter to decide which messages to pass to the output channel, you need to provide the code for it. Likewise, if you want a Transform processor to transform the message, you need to provide the transformation code. The sources above provide a list of available processors and their configuration options, and likewise, the other link provides the jars.

Let us work through an example: create directories source, sink, and processor. Download the source file-source-rabbit-<version>.RELEASE.jar and put in the source directory. Likewise, download the sink log-sink-rabbit-<version>.RELEASE.jar and place it in the sink directory. In the source directory, create two files: G.txt, that contains some sample content, say

hello
hello one
hello twice
hello thrice
hello fourth
hello fifth
hello sixth
hello seventh
hello eighth
hello ninth
hello tenth

And add a file, H.txt, with content, say

this is a second file
this is the second line in the second file
this is the third line in the second file

Now add a script, start.sh, to start the application to the source directory :

#!/bin/bash
java -jar file-source-rabbit-1.2.0.RELEASE.jar --file.consumer.mode=lines --spring.cloud.stream.bindings.output.destination=simple-demo --server.port=8082 --file.directory=. --file.filename-pattern=*.txt

We are starting the Rabbit version of the file source application binding its output channel to the “simple-demo” queue, running on port 8082 to poll the current directory for files which end in *.txt. 

In the sink directory, add the script start.sh to start the application:

#!/bin/bash

java -jar log-sink-rabbit-1.1.1.RELEASE.jar --spring.cloud.stream.bindings.input.destination=simple2-demo --server.port=8080

Here we are binding the Rabbit version of the logger to the input queue simple2-demo.

Now for the processor. In Spring Tool Suite, create a Spring Starter project called Spring-Cloud-Stream-Processor with spring-cloud-starter-rabbit and spring –boot-starter-web in the pom (stream-rabbit is under Cloud Messaging). As usual, you will get a SpringCloudStreamProcessor class in src/main/java. Now create a SimpleProcessor.java class with this content:

@EnableBinding(Processor.class)
public class SimpleProcessor {

private static final Logger log = LoggerFactory.getLogger(SimpleProcessor.class);

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public String transformToUpperCase(String message) {
log.info("The input message is "+ message);
return message.toUpperCase() 
   }

}

We add @EnableBinding(Processor.class) to make the class a Spring Cloud Stream class, and this transformer uses the Processor.INPUT channel to receive the messages sent by the file source and sends the transformed message to Processor.OUTPUT. The transformation here is trivial; it just turns the input string to uppercase.

Now, all we have to do is to bind the Input and Output channels. So in src/main/resources, we add an application.properties file:

server.port=8081
spring.application.name=processor

spring.cloud.stream.bindings.input.destination=simple-demo
spring.cloud.stream.bindings.output.destination=simple2-demo
spring.cloud.stream.bindings.input.content-type=text/plain

We could have applied these properties as before on the command line, but we did it in application.properties for a change of pace. Note that the input here is simple-demo, which is the same as the output channel of the file source. Likewise, its output channel is simple2-demo which is the output channel of the log sink. Also, we specified the content-type to be text/plain, since the default in SCS is a byte array.

Package the application into a jar file as always to get a jar file: Spring-Cloud-Stream-Processor-0.0.1-SNAPSHOT.jar.

 Copy this jar file into the processor directory and add a startup script, start.sh:

#!/bin/bash
java -jar Spring-Cloud-Stream-Processor-0.0.1-SNAPSHOT.jar

Note that instead of system properties specified on the command line, it is all specified in the application.properties inside the jar.

Now go to the sink directory and start the log sink application.

Go to the processor directory and start the processor.

Finally, go to the source directory and start the file source application.

You will see the processor log the incoming message:

Image title

Likewise, the log sink logs the messages in uppercase:

Image title

You can write many more processors and chain them together as long as the inputs and outputs match the ones of the previous/next source/processor/sink.

You may feel, with some justification, that we cheated in the title. After all, we wrote some code for the processor. But the essence of this code is the transformation. The rest could be considered boilerplate. There are Groovy Filter processors and a Groovy Transform processor where you just have to provide a Groovy script as a command line system property to the application jar. Nor is the script language restricted to Groovy. You could use the Scriptable Transform Processor and specify the language, which could be Groovy, Ruby, JavaScript, or Python in the transformer language property at runtime, and the script itself in the transformer script property.

Explore the core elements of owning an API strategy and best practices for effective API programs. Download the API Owner's Manual, brought to you by 3Scale by Red Hat

Topics:
spring 4.1 ,spring integration ,event driven architecture ,microservices ,integration

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}