DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Spring Cloud Stream Channel Interceptor
  • Request Tracing in Spring Cloud Stream Data Pipelines With Kafka Binder
  • How Kafka Can Make Microservice Planet a Better Place
  • Introduction to Apache Kafka With Spring

Trending

  • How AI Is Rewriting Full-Stack Java Systems: Practical Patterns with Spring Boot, Kafka and WebSockets
  • Why Your QA Engineer Should Be the Most Stubborn Person on the Team
  • Content Lakes: Harness Unstructured Data for Enterprise AI Readiness
  • Evaluating SOC Effectiveness Using Detection Coverage and Response Metrics
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Spring Cloud Stream Binding Kafka With EmbeddedKafkaRule Using In Tests

Spring Cloud Stream Binding Kafka With EmbeddedKafkaRule Using In Tests

Spring Cloud Stream is a framework built on top of Spring Boot and Spring Integration that helps in creating event-driven or message-driven microservices.

By 
Vitaly Kuznetsov (Ippolitov) user avatar
Vitaly Kuznetsov (Ippolitov)
·
Mar. 31, 21 · Tutorial
Likes (3)
Comment
Save
Tweet
Share
10.4K Views

Join the DZone community and get the full member experience.

Join For Free

Overview

In this article, we'll introduce the main concepts and constructs of Spring Cloud Stream with some simple test-examples based on EmbeddedKafkaRule using MessageCollector


Getting Started

Dependencies & Configuration

To get started, we'll need to add the Spring Cloud Starter Stream with the Kafka broker Gradle dependency to our build.gradle:

Java
 




xxxxxxxxxx
1
13


 
1
dependencies {
2
    implementation(kotlin("stdlib"))
3
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")
4
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
5

          
6
    implementation("org.springframework.boot:spring-boot-starter-web")
7
    implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
8

          
9
    testImplementation("org.springframework.boot:spring-boot-starter-test")
10
    testImplementation("org.springframework.cloud:spring-cloud-stream-test-support")
11
    testImplementation("org.springframework.kafka:spring-kafka-test:springKafkaTestVersion")
12
}



The Spring Cloud Stream project needs to be configured with the Kafka broker URL, topic, and other binder configurations. Below is an example of a configuration for the application.yaml:

Java
 




xxxxxxxxxx
1
19


 
1
spring:
2
  application:
3
    name: cloud-stream-binding-kafka-app
4
  cloud:
5
    stream:
6
      kafka:
7
        binder:
8
          brokers: 0.0.0.0:8080
9
          configuration:
10
            auto-offset-reset: latest
11
      bindings:
12
        customChannel:                   #Channel name
13
          destination: 0.0.0.0:8080      #Destination to which the message is sent (topic)
14
          group: input-group-N
15
          contentType: application/json
16
          consumer:
17
            max-attempts: 1
18
            autoCommitOffset: true
19
            autoCommitOnError: false



Constructs

This is a simple Spring Cloud Stream-based service that listens to input binding (SpringCloudStreamBindingKafkaApp.kt):

Java
 




xxxxxxxxxx
1


 
1
@EnableBinding(ProducerBinding::class)
2
@SpringBootApplication
3
class SpringCloudStreamBindingKafkaApp
4

          
5
fun main(args: Array<String>) {
6
    SpringApplication.run(SpringCloudStreamBindingKafkaApp::class.java, *args)
7
}



The annotation @EnableBinding configures the service to bind input and output channels.


Now let's see the main concepts:
Bindings: a collection of interfaces that identify the input and output channels declaratively
Binder: messaging middleware implementation such as Kafka or another
Channel: represents the communication pipe between messaging middleware and the application
StreamListeners: message-handling methods in beans that will be automatically invoked on a message from the channel after the MessageConverter does the serialization or deserialization between middleware specific events and domain object types or "POJO"
Message Schemas: used for serialization and deserialization of messages, these schemas can be statically read from a location or loaded dynamically


We will need at least one producer and a consumer to test the message and send and receive operations. Below is the sample code for a producer and consumer in its simplest form, developed using Spring Cloud Stream.

Producer

There is a producer bean that will send messages to a Kafka topic (ProducerBinding.kt):

Java
 




xxxxxxxxxx
1


 
1
interface ProducerBinding {
2

          
3
    @Output(BINDING_TARGET_NAME)
4
    fun messageChannel(): MessageChannel
5
}




Consumer

A consumer bean will listen to a Kafka topic and receive messages (ConsumerBinding.kt):

Java
 




xxxxxxxxxx
1


 
1
interface ConsumerBinding {
2

          
3
    companion object {
4
        const val BINDING_TARGET_NAME = "customChannel"
5
    }
6

          
7
    @Input(BINDING_TARGET_NAME)
8
    fun messageChannel(): MessageChannel
9
}






(Consumer.kt):

Java
 




xxxxxxxxxx
1
11


 
1
@EnableBinding(ConsumerBinding::class)
2
class Consumer(val messageService: MessageService) {
3

          
4
    @StreamListener(target = ConsumerBinding.BINDING_TARGET_NAME)
5
    fun process(
6
        @Payload message: Map<String, Any?>,
7
        @Header(value = KafkaHeaders.OFFSET, required = false) offset: Int?
8
    ) {
9
        messageService.consume(message)
10
    }
11
}



A Kafka broker with a topic is created. For this test, we will use an Embedded Kafka server with spring-kafka-test


Functional Testing Using MessageCollector

This is a binder implementation that allows interaction with channels and reception of the messages. We send a message to the producer binding message channel and then receive it as payload (ProducerTest.kt):

Java
 




xxxxxxxxxx
1
29


 
1
@SpringBootTest
2
class ProducerTest {
3

          
4
    @Autowired
5
    lateinit var producerBinding: ProducerBinding
6

          
7
    @Autowired
8
    lateinit var messageCollector: MessageCollector
9

          
10
    @Test
11
    fun `should produce somePayload to channel`() {
12
        // ARRANGE
13
        val request = mapOf(1 to "foo", 2 to "bar", "three" to 10101)
14

          
15
        // ACT
16
        producerBinding.messageChannel().send(MessageBuilder.withPayload(request).build())
17
        val payload = messageCollector.forChannel(producerBinding.messageChannel())
18
            .poll()
19
            .payload
20

          
21
        // ASSERT
22
        val payloadAsMap = jacksonObjectMapper().readValue(payload.toString(), Map::class.java)
23
        assertTrue(request.entries.stream().allMatch { re ->
24
            re.value == payloadAsMap[re.key.toString()]
25
        })
26

          
27
        messageCollector.forChannel(producerBinding.messageChannel()).clear()
28
    }
29
}



Embedded Kafka broker testing

We use @ClassRule annotation to create this Kafka broker.

The rule starts the Kafka and Zookeeper servers on a random port before starting the tests and shuts them down after complete. Embedded Kafka broker eliminates the need to have a real instance of Kafka and zookeeper while the test is running (ConsumerTest.kt):

Java
 




xxxxxxxxxx
1
36


 
1
@SpringBootTest
2
@ActiveProfiles("test")
3
@EnableAutoConfiguration(exclude = [TestSupportBinderAutoConfiguration::class])
4
@EnableBinding(ProducerBinding::class)
5
class ConsumerTest {
6

          
7
    @Autowired
8
    lateinit var producerBinding: ProducerBinding
9

          
10
    @Autowired
11
    lateinit var objectMapper: ObjectMapper
12

          
13
    @MockBean
14
    lateinit var messageService: MessageService
15

          
16
    companion object {
17
        @ClassRule @JvmField
18
        var embeddedKafka = EmbeddedKafkaRule(1, true, "any-name-of-topic")
19
    }
20

          
21
    @Test
22
    fun `should consume via txConsumer process`() {
23
        // ACT
24
        val request = mapOf(1 to "foo", 2 to "bar")
25
        producerBinding.messageChannel().send(MessageBuilder.withPayload(request)
26
            .setHeader("someHeaderName", "someHeaderValue")
27
            .build())
28

          
29
        // ASSERT
30
        val requestAsMap = objectMapper.readValue<Map<String, Any?>>(objectMapper.writeValueAsString(request))
31
        runBlocking {
32
            delay(20)
33
            verify(messageService).consume(requestAsMap)
34
        }
35
    }
36
}



Conclusion

In this tutorial, we demonstrated concepts of Spring Cloud Stream and showed how to use it with Kafka, and demonstrated how to use the complete JUnit testing based on EmbeddedKafkaRule with using MessageCollector.

You can find the complete source code here.

kafka Spring Cloud Spring Framework Stream (computing) Testing Binding (linguistics)

Opinions expressed by DZone contributors are their own.

Related

  • Spring Cloud Stream Channel Interceptor
  • Request Tracing in Spring Cloud Stream Data Pipelines With Kafka Binder
  • How Kafka Can Make Microservice Planet a Better Place
  • Introduction to Apache Kafka With Spring

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook