DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Spring Cloud Stream Channel Interceptor
  • Spring Cloud Stream Binding Kafka With EmbeddedKafkaRule Using In Tests
  • Request Tracing in Spring Cloud Stream Data Pipelines With Kafka Binder
  • How Kafka Can Make Microservice Planet a Better Place

Trending

  • From Zero to Production: Best Practices for Scaling LLMs in the Enterprise
  • Performing and Managing Incremental Backups Using pg_basebackup in PostgreSQL 17
  • Integration Isn’t a Task — It’s an Architectural Discipline
  • Beyond ChatGPT, AI Reasoning 2.0: Engineering AI Models With Human-Like Reasoning
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Kafka With Spring Cloud Stream

Kafka With Spring Cloud Stream

Everything you need to get started with Kafka's Spring Cloud Stream.

By 
Nakul Shukla user avatar
Nakul Shukla
·
Oct. 08, 19 · Tutorial
Likes (14)
Comment
Save
Tweet
Share
60.0K Views

Join the DZone community and get the full member experience.

Join For Free

clouds-out-of-plane-window-dawn


This post gives a step-by-step tutorial to enable messaging in a microservice using Kafka with Spring Cloud Stream.

Spring Cloud Stream is a framework under the umbrella project Spring Cloud, which enables developers to build event-driven microservices with messaging systems like Kafka and RabbitMQ.  

Asynchronous messaging systems are always an important part of any modern enterprise software solution. The evolution of microservices has shortened the time-to-market for any software product, but this is not possible without the necessary tools and frameworks.

You may also like: Event-Driven Microservices Using Spring Cloud Stream and RabbitMQ.

Spring Cloud Stream is a framework built on top of Spring Integration. It integrates with Spring Boot seamlessly to build efficient microservices in less time to connect with shared messaging systems. Spring Cloud Stream provides multiple binder implementations such as Kafka, RabbitMQ and various others. The details are provided here.

Here is a step-by-step tutorial on building a simple microservice application based on Spring Boot and uses Spring Cloud Stream to connect with a Kafka instance.

Getting Started

Install Kafka and create a topic. I am using a Kafka broker running on my local windows machine for this demonstration, but it can be an installation on a Unix machine as well. Steps for Kafka installation on windows machine are provided here.

Create a Spring Boot starter project either using STS IDE or Spring Initializr. I am providing the pom.xml for reference.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.techwording</groupId>
<artifactId>spring-cloud-stream-kafka-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-cloud-stream-kafka-example</name>
<description>Demo project for Spring Cloud Stream and Kafka</description>

<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.SR3</spring-cloud.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>


The Spring Cloud Stream project needs to be configured with the Kafka broker URL, topic, and other binder configurations. Below is an example of configuration for the application.

spring:
  cloud:
    stream:
      default-binder: kafka
      kafka:
        binder:
          brokers:
          - localhost:9092
      bindings:
        input:
         binder: kafka
         destination: test
         content-type: text/plain
         group: input-group-1
        output:
          binder: kafka
          destination: test
          group: output-group-1
          content-type: text/plain


We will need at least one producer and a consumer to test the message and send and receive operations. Below is the sample code for a producer and consumer in its simplest form, developed using Spring Cloud Stream.

package com.techwording.scs;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;

@EnableBinding(Source.class)
public class Producer {

private Source mySource;

public Producer(Source mySource) {

super();
this.mySource = mySource;
}

public Source getMysource() {

return mySource;
}

public void setMysource(Source mysource) {

mySource = mySource;
}

}


package com.techwording.scs;

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.FormatStyle;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.handler.annotation.Payload;

@EnableBinding(Sink.class)
public class Consumer {

private static final Logger logger = LoggerFactory.getLogger(Consumer.class);

@StreamListener(target = Sink.INPUT)
public void consume(String message) {

logger.info("recieved a string message : " + message);
}

@StreamListener(target = Sink.INPUT, condition = "headers['type']=='chat'")
public void handle(@Payload ChatMessage message) {

final DateTimeFormatter df = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM)
.withZone(ZoneId.systemDefault());
final String time = df.format(Instant.ofEpochMilli(message.getTime()));
logger.info("recieved a complex message : [{}]: {}", time, message.getContents());
}

}


We will also create a Rest Controller class, which will accept the message over HTTP and pass it to the producer. This is just to make the testing convenient.

package com.techwording.scs;

import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class Controller {

private Producer producer;

public Controller(Producer producer) {

super();
this.producer = producer;
}

// get the message as a complex type via HTTP, publish it to broker using spring cloud stream
@RequestMapping(value = "/sendMessage/complexType", method = RequestMethod.POST)
public String publishMessageComplextType(@RequestBody ChatMessage payload) {

payload.setTime(System.currentTimeMillis());
producer.getMysource()
.output()
.send(MessageBuilder.withPayload(payload)
.setHeader("type", "chat")
.build());

return "success";
}

// get the String message via HTTP, publish it to broker using spring cloud stream
@RequestMapping(value = "/sendMessage/string", method = RequestMethod.POST)
public String publishMessageString(@RequestBody String payload) {

// send message to channel
producer.getMysource()
.output()
.send(MessageBuilder.withPayload(payload)
.setHeader("type", "string")
.build());

return "success";
}
}


Run the below maven commands to build and run this project.

mvn clean install
mvn spring-boot:run



Hit the POST endpoint /sendMessage/string and check the application console logs. Here is an example output the application produced when I hit this endpoint with message "hello" in the rest body.

2019-10-01 14:37:22.764  INFO 377456 --- [container-0-C-1] com.techwording.scs.Consumer             : received a string message : {"contents":"hello","time":1569920841187}


Hit the POST endpoint /sendMessage/complexType and check the application console logs.

2019-10-01 14:37:22.773  INFO 377456 --- [container-0-C-1] com.techwording.scs.Consumer             : received a complex message : [2:37:21 PM]: hello


The annotation @EnableBinding takes one or more interfaces as parameters. In this example, we have used Sink and Source interfaces, which declare input and output channels, respectively. You can also define your own interfaces for this purpose.

 @StreamListener annotation is a convenient way provided by Spring Cloud Stream for content-based routing. It works based on a pub-sub model, and every @StreamListener receives its own copy of the message.

I have used two stream listeners in this project — one for consuming plain string messages and another one for messages with a complex type, ChatMessage. The producer sends messages attached with a header "type" with a logical value and consumer can apply conditions to filter messages using  @StreamListener.

You can find the complete project here.


Further Reading

  • Building and Testing Message-Driven Microservices Using Spring Cloud Stream.
  • Building Data Pipelines With Spring Cloud Data Flow.
Spring Framework Spring Cloud kafka Stream (computing)

Opinions expressed by DZone contributors are their own.

Related

  • Spring Cloud Stream Channel Interceptor
  • Spring Cloud Stream Binding Kafka With EmbeddedKafkaRule Using In Tests
  • Request Tracing in Spring Cloud Stream Data Pipelines With Kafka Binder
  • How Kafka Can Make Microservice Planet a Better Place

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!