DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Spring Application Listeners
  • Distributed Tracing System (Spring Cloud Sleuth + OpenZipkin)
  • A Robust Distributed Payment Network With Enchanted Audit Functionality - Part 2: Spring Boot, Axon, and Implementation
  • Migration of Microservice Applications From WebLogic to Openshift

Trending

  • IoT and Cybersecurity: Addressing Data Privacy and Security Challenges
  • Prioritizing Cloud Security Risks: A Developer's Guide to Tackling Security Debt
  • How Kubernetes Cluster Sizing Affects Performance and Cost Efficiency in Cloud Deployments
  • Optimizing Serverless Computing with AWS Lambda Layers and CloudFormation
  1. DZone
  2. Data Engineering
  3. Data
  4. Orchestrating Event Driven Microservices With Spring Dataflow

Orchestrating Event Driven Microservices With Spring Dataflow

This tutorial will show you how to set up communication between your microservices with event notifications through the Spring Boot platform.

By 
Taruvai Subramaniam user avatar
Taruvai Subramaniam
DZone Core CORE ·
Jan. 22, 18 · Tutorial
Likes (7)
Comment
Save
Tweet
Share
31.1K Views

Join the DZone community and get the full member experience.

Join For Free

Event Driven Microservices is an important pattern in microservices architecture. Although microservices communicate predominantly via APIs, there are cases where communicating via event notifications is a good fit.  For instance, with the rise of IoT, we can have "event firehoses" that deal with a large volume of events in real time. Likewise, microservices can notify other microservices about data change events or even send an asynchronous command.

Spring Boot has become more or less the de facto Java platform for building microservices. There are enough book and articles on synchronous microservices available now to choke a horse. But Event Driven microservices are somewhat underserved. The Spring Framework for building such microservices is Spring Cloud Stream (SCS). This was the subject of an earlier post by me, Developing Event Driven Microservices With (Almost) No Code.

To briefly summarize the earlier paper, SCS abstracts away the message producer and consumer code from the message broker specific code. The application communicates with the outside world through input and output channels injected into it by SCS. Channels are connected to external brokers through middleware-specific Binder implementations. Simply by adding the required binding dependency to the code will let SCS to create the connectivity and communications. Kafka and RabbitMQ binders are provided out of the box. It also provides a common abstraction which applies to both RabbitMQ and Kafka for such concepts as Consumer Groups ( to scale the consumers) and Partitioning  (to provide message affinity).

We showed in our earlier submission how to use leverage SCS and its coterie of prebuilt app starters. In this companion submission, we show how to orchestrate these services using Spring Data Flow. We show how to do this using both the GUI and using the shell. We also provide some details that deserve to be better known than they are. We start by setting up an example that runs through this paper.

In view of the recent near biblical flooding in Houston, the example simulates the rainfall. The source will generate a random integer to represent the rainfall in inches.

@EnableBinding(Source.class)
@SpringBootApplication
public class SpringCloudDataFlowSrcApplication {

public static void main(String[] args) {
SpringApplication.run(SpringCloudDataFlowSrcApplication.class, args);
}

@Bean
    @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
    public MessageSource<Integer> timeMessageSource() {
              return () -> MessageBuilder.withPayload(randomIn(40, 100)).build();
    }

public int randomIn(int min, int max){

return ThreadLocalRandom.current().nextInt(min, max + 1);

    }

}

The processor will baseline the rainfall by 50 inc

@EnableBinding(Processor.class)
@SpringBootApplication
public class SpringCloudDataFlowProcessorApplication {

private static Logger logger = LoggerFactory.getLogger(SpringCloudDataFlowProcessorApplication.class);

public static void main(String[] args) {
SpringApplication.run(SpringCloudDataFlowProcessorApplication.class, args);
}

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public Object transform(Integer rainFall) {
logger.info("The rain level is "+ rainFall);

        return rainFall -50;
    }
}

The sink logs a message depending on whether the rain fall is of epic proportions and if so it logs a message that global warming is real.

@EnableBinding(Sink.class)
@SpringBootApplication
public class SpringCloudDataFlowProcessorApplication {

private static Logger logger = LoggerFactory.getLogger(SpringCloudDataFlowLoggerApplication.class);

public static void main(String[] args) {
SpringApplication.run(SpringCloudDataFlowLoggerApplication.class, args);
}

 @StreamListener(Sink.INPUT)
    public void logMsg(Integer rainFall) {
                  if(rainFall > 30){
        logger.info("Global warming is REAL!!");
        }
        else{
        logger.info("Global warming is a HOAX");
        }
        }
    }

The fourth application is supposed to represent an analytics application and creates a running average of the rainfall over the days. We blush to provide the trivial code here

These are Spring Boot applications with a Rabbit binding:

<dependency>

                                    <groupId>org.springframework.cloud</groupId>

                                    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>

</dependency>

As Spring Data Flow works with a maven repository, we will do a “mvn clean install” for all these projects and use the local .m2 repository. Download the local dataflow server jar from GitHub.

Start the application: java -jar spring-cloud-dataflow-server-local-1.2.1.jar and in a web browser, invoke the dashboard at http://localhost:9393/dashboard. The dataflow server is just a simple spring boot app:

@EnableDataFlowServer
@SpringBootApplication
public class SpringDataFlowServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowServerApplication.class, args);
    }
}

 And as usual you can customize it with databases , ports, views etc.

Image title

Step 1

We will now register the applications we developed. Click on Register Applications and enter the application name, type – Source, Sink or Processor, and the maven url which is maven://<GroupId>:<ArtifactID>:jar:<version>. Doing this over and over 4 times can get tedious, so we do it all in bulk. We create a file called dataflow.config:

source.RainSrc=maven://org.tn.dataflow:Spring-Cloud-Data-Flow-src:jar:0.0.1-SNAPSHOT
processor.RainProcessor=maven://org.tn.dataflow:Spring-Cloud-Data-Flow-Processor:jar:0.0.1-SNAPSHOT
sink.Logger=maven://org.tn.dataflow:Spring-Cloud-Data-Flow-Sink:jar:0.0.1-SNAPSHOT
sink.Average=maven://org.tn.dataflow:Spring-Cloud-Data-Flow-Sink-Average:jar:0.0.1-SNAPSHOT

You can import a much more extensive list of pre-built applications from http://bit.ly/Celsius-M1-stream-applications-rabbit-maven for the Rabbit version and there is likewise a Kafka version.

Image title

Step 2

Now that we have registered or applications and made the GUI aware of them, it is time to build the flow. Switch to the Streams tab on the top and then pick Create Stream. You will see all your apps on the left pane grouped by type. Now drag and drop the applications and connect them.

Image title

Note that the upper panel shows the DSL corresponding to this flow. We use a ‘named source’ :RainSrc and connect it to the analytics Average via the DSL :RainSrc>Average

Now click Create Stream sub menu and fill out the name for each stream and optionally ask for the streams to be deployed. You will see a list of the flows you created and if not already deployed then deploy it. Go wash your car or wait for the swallows to come to Capistrano and eventually the legend will turn from ‘deploying’ to ‘deployed’.

Image title

Step 3

Now go to the RunTime tab and click on the Sink say RainHouston.Logger and you will see the stdout location. Go to that file and see

Image title

Now that we have used the GUI to register the applications and build out the streams, let us see how to migrate it to another environment, say QA. I wish I could say that there is way to serialize this configuration but AFAIK there no such action in the GUI. So now we will script everything we have done so far.

First create a file, say dataflowShell.config:

app register --name RainSrc --type source   --uri maven://org.tn.dataflow:Spring-Cloud-Data-Flow-src:jar:0.0.1-SNAPSHOT
app register --name RainProcessor --type processor  --uri maven://org.tn.dataflow:Spring-Cloud-Data-Flow-Processor:jar:0.0.1-SNAPSHOT
app register --name logger --type sink --uri maven://org.tn.dataflow:Spring-Cloud-Data-Flow-Sink:jar:0.0.1-SNAPSHOT
app register --name Average --type sink --uri maven://org.tn.dataflow:Spring-Cloud-Data-Flow-Sink-Average:jar:0.0.1-SNAPSHOT
stream create --name RainInHouston --definition "RainSrc| RainProcessor|logger"  --deploy
stream create --name RainInHoustonAverage --definition ":RainInHouston.RainSrc>Average" --deploy

This is self-explanatory. First, we register the applications and then we create the streams and deploy them in one fell swoop.

Now download the dataflow shell jar from the repo.

Start up the shell as a jar applications – java -jar etc. with the option --dataflow.uri=<uri> where the uri is the uri of the QA server- the default is localhost:9393 which was used above. Shut down that server and start it again.

Now, in the command line of the shell:

Image title

On the QA server dashboard, see that the apps are registered under the Apps tab and all the streams are deployed under the Streams tab. We have automated Steps 1 and 2. Just repeat Step 3 with Runtime and see the logs as before.

One drawback with what we have shown is that the definitions are not persisted so that every time you restart the server the dashboard is empty. Given that we can script the flow, it is no big deal, but this is because we are using the default H2 in-memory database. However, you can use a persistent store say, MySQL by starting the local server so:

java -jar spring-cloud-dataflow-server-local-1.0.0.BUILD-SNAPSHOT.jar \
    --spring.datasource.url=jdbc:mysql:<db-info> \
    --spring.datasource.username=<user> \
    --spring.datasource.password=<password> \
    --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver

We have just scratched the surface. We can add analytics, tasks, jobs, etc. See the reference for many more details.

Spring Framework microservice Event application

Opinions expressed by DZone contributors are their own.

Related

  • Spring Application Listeners
  • Distributed Tracing System (Spring Cloud Sleuth + OpenZipkin)
  • A Robust Distributed Payment Network With Enchanted Audit Functionality - Part 2: Spring Boot, Axon, and Implementation
  • Migration of Microservice Applications From WebLogic to Openshift

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!