DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Spring Cloud Stream Channel Interceptor
  • Consumer-Driven Contract Testing With Spring Cloud Contract
  • Testing Spring Boot Apps With Kafka and Awaitility
  • Request Tracing in Spring Cloud Stream Data Pipelines With Kafka Binder

Trending

  • Advancing Your Software Engineering Career in 2025
  • Intro to RAG: Foundations of Retrieval Augmented Generation, Part 2
  • Using Java Stream Gatherers To Improve Stateful Operations
  • Implementing API Design First in .NET for Efficient Development, Testing, and CI/CD
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Spring Cloud Stream Binding Kafka With EmbeddedKafkaRule Using In Tests

Spring Cloud Stream Binding Kafka With EmbeddedKafkaRule Using In Tests

Spring Cloud Stream is a framework built on top of Spring Boot and Spring Integration that helps in creating event-driven or message-driven microservices.

By 
Vitaly Kuznetsov (Ippolitov) user avatar
Vitaly Kuznetsov (Ippolitov)
·
Mar. 31, 21 · Tutorial
Likes (3)
Comment
Save
Tweet
Share
10.0K Views

Join the DZone community and get the full member experience.

Join For Free

Overview

In this article, we'll introduce the main concepts and constructs of Spring Cloud Stream with some simple test-examples based on EmbeddedKafkaRule using MessageCollector


Getting Started

Dependencies & Configuration

To get started, we'll need to add the Spring Cloud Starter Stream with the Kafka broker Gradle dependency to our build.gradle:

Java
 




xxxxxxxxxx
1
13


 
1
dependencies {
2
    implementation(kotlin("stdlib"))
3
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")
4
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
5

          
6
    implementation("org.springframework.boot:spring-boot-starter-web")
7
    implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
8

          
9
    testImplementation("org.springframework.boot:spring-boot-starter-test")
10
    testImplementation("org.springframework.cloud:spring-cloud-stream-test-support")
11
    testImplementation("org.springframework.kafka:spring-kafka-test:springKafkaTestVersion")
12
}



The Spring Cloud Stream project needs to be configured with the Kafka broker URL, topic, and other binder configurations. Below is an example of a configuration for the application.yaml:

Java
 




xxxxxxxxxx
1
19


 
1
spring:
2
  application:
3
    name: cloud-stream-binding-kafka-app
4
  cloud:
5
    stream:
6
      kafka:
7
        binder:
8
          brokers: 0.0.0.0:8080
9
          configuration:
10
            auto-offset-reset: latest
11
      bindings:
12
        customChannel:                   #Channel name
13
          destination: 0.0.0.0:8080      #Destination to which the message is sent (topic)
14
          group: input-group-N
15
          contentType: application/json
16
          consumer:
17
            max-attempts: 1
18
            autoCommitOffset: true
19
            autoCommitOnError: false



Constructs

This is a simple Spring Cloud Stream-based service that listens to input binding (SpringCloudStreamBindingKafkaApp.kt):

Java
 




xxxxxxxxxx
1


 
1
@EnableBinding(ProducerBinding::class)
2
@SpringBootApplication
3
class SpringCloudStreamBindingKafkaApp
4

          
5
fun main(args: Array<String>) {
6
    SpringApplication.run(SpringCloudStreamBindingKafkaApp::class.java, *args)
7
}



The annotation @EnableBinding configures the service to bind input and output channels.


Now let's see the main concepts:
Bindings: a collection of interfaces that identify the input and output channels declaratively
Binder: messaging middleware implementation such as Kafka or another
Channel: represents the communication pipe between messaging middleware and the application
StreamListeners: message-handling methods in beans that will be automatically invoked on a message from the channel after the MessageConverter does the serialization or deserialization between middleware specific events and domain object types or "POJO"
Message Schemas: used for serialization and deserialization of messages, these schemas can be statically read from a location or loaded dynamically


We will need at least one producer and a consumer to test the message and send and receive operations. Below is the sample code for a producer and consumer in its simplest form, developed using Spring Cloud Stream.

Producer

There is a producer bean that will send messages to a Kafka topic (ProducerBinding.kt):

Java
 




xxxxxxxxxx
1


 
1
interface ProducerBinding {
2

          
3
    @Output(BINDING_TARGET_NAME)
4
    fun messageChannel(): MessageChannel
5
}




Consumer

A consumer bean will listen to a Kafka topic and receive messages (ConsumerBinding.kt):

Java
 




xxxxxxxxxx
1


 
1
interface ConsumerBinding {
2

          
3
    companion object {
4
        const val BINDING_TARGET_NAME = "customChannel"
5
    }
6

          
7
    @Input(BINDING_TARGET_NAME)
8
    fun messageChannel(): MessageChannel
9
}






(Consumer.kt):

Java
 




xxxxxxxxxx
1
11


 
1
@EnableBinding(ConsumerBinding::class)
2
class Consumer(val messageService: MessageService) {
3

          
4
    @StreamListener(target = ConsumerBinding.BINDING_TARGET_NAME)
5
    fun process(
6
        @Payload message: Map<String, Any?>,
7
        @Header(value = KafkaHeaders.OFFSET, required = false) offset: Int?
8
    ) {
9
        messageService.consume(message)
10
    }
11
}



A Kafka broker with a topic is created. For this test, we will use an Embedded Kafka server with spring-kafka-test


Functional Testing Using MessageCollector

This is a binder implementation that allows interaction with channels and reception of the messages. We send a message to the producer binding message channel and then receive it as payload (ProducerTest.kt):

Java
 




xxxxxxxxxx
1
29


 
1
@SpringBootTest
2
class ProducerTest {
3

          
4
    @Autowired
5
    lateinit var producerBinding: ProducerBinding
6

          
7
    @Autowired
8
    lateinit var messageCollector: MessageCollector
9

          
10
    @Test
11
    fun `should produce somePayload to channel`() {
12
        // ARRANGE
13
        val request = mapOf(1 to "foo", 2 to "bar", "three" to 10101)
14

          
15
        // ACT
16
        producerBinding.messageChannel().send(MessageBuilder.withPayload(request).build())
17
        val payload = messageCollector.forChannel(producerBinding.messageChannel())
18
            .poll()
19
            .payload
20

          
21
        // ASSERT
22
        val payloadAsMap = jacksonObjectMapper().readValue(payload.toString(), Map::class.java)
23
        assertTrue(request.entries.stream().allMatch { re ->
24
            re.value == payloadAsMap[re.key.toString()]
25
        })
26

          
27
        messageCollector.forChannel(producerBinding.messageChannel()).clear()
28
    }
29
}



Embedded Kafka broker testing

We use @ClassRule annotation to create this Kafka broker.

The rule starts the Kafka and Zookeeper servers on a random port before starting the tests and shuts them down after complete. Embedded Kafka broker eliminates the need to have a real instance of Kafka and zookeeper while the test is running (ConsumerTest.kt):

Java
 




xxxxxxxxxx
1
36


 
1
@SpringBootTest
2
@ActiveProfiles("test")
3
@EnableAutoConfiguration(exclude = [TestSupportBinderAutoConfiguration::class])
4
@EnableBinding(ProducerBinding::class)
5
class ConsumerTest {
6

          
7
    @Autowired
8
    lateinit var producerBinding: ProducerBinding
9

          
10
    @Autowired
11
    lateinit var objectMapper: ObjectMapper
12

          
13
    @MockBean
14
    lateinit var messageService: MessageService
15

          
16
    companion object {
17
        @ClassRule @JvmField
18
        var embeddedKafka = EmbeddedKafkaRule(1, true, "any-name-of-topic")
19
    }
20

          
21
    @Test
22
    fun `should consume via txConsumer process`() {
23
        // ACT
24
        val request = mapOf(1 to "foo", 2 to "bar")
25
        producerBinding.messageChannel().send(MessageBuilder.withPayload(request)
26
            .setHeader("someHeaderName", "someHeaderValue")
27
            .build())
28

          
29
        // ASSERT
30
        val requestAsMap = objectMapper.readValue<Map<String, Any?>>(objectMapper.writeValueAsString(request))
31
        runBlocking {
32
            delay(20)
33
            verify(messageService).consume(requestAsMap)
34
        }
35
    }
36
}



Conclusion

In this tutorial, we demonstrated concepts of Spring Cloud Stream and showed how to use it with Kafka, and demonstrated how to use the complete JUnit testing based on EmbeddedKafkaRule with using MessageCollector.

You can find the complete source code here.

kafka Spring Cloud Spring Framework Stream (computing) Testing Binding (linguistics)

Opinions expressed by DZone contributors are their own.

Related

  • Spring Cloud Stream Channel Interceptor
  • Consumer-Driven Contract Testing With Spring Cloud Contract
  • Testing Spring Boot Apps With Kafka and Awaitility
  • Request Tracing in Spring Cloud Stream Data Pipelines With Kafka Binder

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!