DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Spring Boot Pet Clinic App: A Performance Study
  • How to Activate New User Accounts by Email
  • Spring Cloud Stream Binding Kafka With EmbeddedKafkaRule Using In Tests
  • Reactive Kafka With Streaming in Spring Boot

Trending

  • Compliance Automated Standard Solution (COMPASS), Part 10: How OSCAL Mapping Paves the Way for Continuous Compliance Scalability
  • Implementing the Planning Pattern With Java Enterprise and LangChain4j
  • The Missing `bandit` for AI Agents: How I Built a Static Analyzer for Prompt Injection
  • Why Stable RAG Answers Can Still Hide Unstable Evidence
  1. DZone
  2. Coding
  3. Frameworks
  4. Testing Spring Boot Apps With Kafka and Awaitility

Testing Spring Boot Apps With Kafka and Awaitility

Learn more about testing Spring Boot apps with Kafka and Awaitility!

By 
Nakul Shukla user avatar
Nakul Shukla
·
Updated Jan. 30, 20 · Analysis
Likes (5)
Comment
Save
Tweet
Share
29.5K Views

Join the DZone community and get the full member experience.

Join For Free

Learn more about testing Spring Boot apps with Kafka and Awaitility!

This is the second article in the Spring Cloud Stream and Kafka series. This post talks about different ways in which we can test Spring Boot applications using EmbeddedKafka and Awaitility.

While testing any synchronous application, it is all about “call and wait.” We invoke a particular API or endpoint and wait for the response. The test blocks the main execution thread until the API returns the response. Once the processing completes, we get the response and can compare the result with the expected output.

You may also like: Kafka With Spring Cloud Stream

Asynchronous applications are tested differently as compared to the synchronous or blocking applications, i.e. we need not block the main execution thread. In simple words, it will not wait for the response from the API and we manually need to program the test to hold the execution at a certain point and wait for the results from all the non-blocking operations. At this stage, we can write the assertions. 

It is hard to manage different threads and concurrency issues and write a concise, readable unit test.

There are a few ways in which we can write tests for a Spring Boot — Spring Cloud Stream-based micro-services to connect with Kafka.

Let’s consider a simple use case for this purpose.

Example Use Case

There is a producer bean that will send messages to a Kafka topic. 

Java




x
27


 
1
package com.techwording.scs;
2
 
          
3
import org.springframework.cloud.stream.annotation.EnableBinding;
4
import org.springframework.cloud.stream.messaging.Source;
5
 
          
6
@EnableBinding(Source.class)
7
public class Producer {
8
 
          
9
    private Source mySource;
10
 
          
11
    public Producer(Source mySource) {
12
 
          
13
        super();
14
        this.mySource = mySource;
15
    }
16
 
          
17
    public Source getMysource() {
18
 
          
19
        return mySource;
20
    }
21
 
          
22
    public void setMysource(Source mysource) {
23
 
          
24
        mySource = mysource;
25
    }
26
 
          
27
}


 

 A consumer bean will listen to a Kafka topic and receive messages.

Java




xxxxxxxxxx
1
19


 
1
@EnableBinding(Sink.class)
2
public class Consumer {
3
 
          
4
        private String receivedMessage;
5
 
          
6
        @StreamListener(target = Sink.INPUT)
7
        public void consume(String message) {
8
 
          
9
            receivedMessage = message;
10
            latch.countDown();
11
 
          
12
        }
13
 
          
14
        public String getReceivedMessage() {
15
 
          
16
            return receivedMessage;
17
        }
18
 
          
19
    }



A Kafka broker with a topic is created. For this test, we will use an Embedded Kafka server with  spring-kafka-test.  

Java




xxxxxxxxxx
1


 
1
@ClassRule public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TOPIC1);


 

EmbeddedKafkaRule

Spring-kafka-test provides an embedded Kafka broker. We can use a JUnit @ClassRule annotation to create this Kafka broker. This rule starts the Kafka and Zookeeper servers on a random port before the tests execute and shuts them down after the tests are complete. The embedded Kafka broker eliminates the need to have a real Kafka and Zookeeper instance running while running the test.   

Coming back to the tests, I have implemented this test in two ways, using Awaitility and using a countdown latch.

Test Using Awaitility

                                  This is a DSL library that provides features to assist in writing JUnit tests for an asynchronous Java application. You can check out their official GitHub page here.  Below is an implementation of the test using Awaitility.

Java




xxxxxxxxxx
1
77


 
1
package com.techwording.scs;
2
 
          
3
import java.util.concurrent.TimeUnit;
4
 
          
5
import org.junit.BeforeClass;
6
import org.junit.ClassRule;
7
import org.junit.Test;
8
import org.junit.runner.RunWith;
9
import org.springframework.beans.factory.annotation.Autowired;
10
import org.springframework.boot.autoconfigure.SpringBootApplication;
11
import org.springframework.boot.test.context.SpringBootTest;
12
import org.springframework.cloud.stream.annotation.EnableBinding;
13
import org.springframework.cloud.stream.annotation.StreamListener;
14
import org.springframework.cloud.stream.messaging.Sink;
15
import org.springframework.cloud.stream.messaging.Source;
16
import org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration;
17
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
18
import org.springframework.messaging.support.MessageBuilder;
19
import org.springframework.test.context.junit4.SpringRunner;
20
 
          
21
import static org.assertj.core.api.BDDAssertions.then;
22
 
          
23
import static org.awaitility.Awaitility.waitAtMost;
24
 
          
25
@RunWith(SpringRunner.class)
26
@SpringBootTest(classes = { EmbeddedKafkaAwaitilityTest.App.class, EmbeddedKafkaAwaitilityTest.Producer.class, EmbeddedKafkaAwaitilityTest.Consumer.class })
27
@EnableBinding(Source.class)
28
public class EmbeddedKafkaAwaitilityTest {
29
 
          
30
    @SpringBootApplication(exclude = TestSupportBinderAutoConfiguration.class)
31
    static class App {
32
 
          
33
    }
34
 
          
35
    private static final String TOPIC1 = "test-topic-1";
36
 
          
37
    @ClassRule
38
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TOPIC1);
39
 
          
40
    @BeforeClass
41
    public static void setup() {
42
 
          
43
        System.setProperty("spring.cloud.stream.kafka.binder.brokers",
44
            embeddedKafka.getEmbeddedKafka()
45
                .getBrokersAsString());
46
        System.setProperty("spring.cloud.stream.bindings.input.destination", TOPIC1);
47
        System.setProperty("spring.cloud.stream.bindings.input.content-type", "text/plain");
48
        System.setProperty("spring.cloud.stream.bindings.input.group", "input-group-1");
49
        System.setProperty("spring.cloud.stream.bindings.output.destination", TOPIC1);
50
        System.setProperty("spring.cloud.stream.bindings.output.content-type", "text/plain");
51
        System.setProperty("spring.cloud.stream.bindings.output.group", "output-group-1");
52
 
          
53
    }
54
 
          
55
    @Autowired
56
    private Producer producer;
57
 
          
58
    @Autowired
59
    private Consumer consumer;
60
 
          
61
    @Test
62
    public void testMessageSendReceive_Awaitility() {
63
 
          
64
        producer.getMysource()
65
            .output()
66
            .send(MessageBuilder.withPayload("payload")
67
                .setHeader("type", "string")
68
                .build());
69
 
          
70
        waitAtMost(5, TimeUnit.SECONDS)
71
            .untilAsserted(() -> {
72
                then("payload").isEqualTo(
73
                    EmbeddedKafkaAwaitilityTest.this.consumer.getReceivedMessage());
74
            });
75
    }
76
 
          
77
}



Test Using CountDownLatch

As per the Java documentation, CountDownLatch is an aid that allows one or more threads to wait until a set of operations being performed in other threads completes. To write this test using  CountDownLatch, we initialize the latch first with a counter.

The value of this counter depends on the number of tasks our test needs to wait for. Here, we initialize this counter with count 1.  Once the producer has sent the message, the latch waits for the count to reach 0. The consumer has the responsibility of decreasing the count. Hence, when the consumer is done with its part, the main thread resumes and performs the assertion.  

Below is an implementation of the test using CountDownLatch:

Java
xxxxxxxxxx
1
73
 
1
package com.techwording.scs;
2
 
          
3
import java.util.concurrent.CountDownLatch;
4
 
          
5
import org.junit.BeforeClass;
6
import org.junit.ClassRule;
7
import org.junit.Test;
8
import org.junit.runner.RunWith;
9
import org.springframework.beans.factory.annotation.Autowired;
10
import org.springframework.boot.autoconfigure.SpringBootApplication;
11
import org.springframework.boot.test.context.SpringBootTest;
12
import org.springframework.cloud.stream.annotation.EnableBinding;
13
import org.springframework.cloud.stream.annotation.StreamListener;
14
import org.springframework.cloud.stream.messaging.Sink;
15
import org.springframework.cloud.stream.messaging.Source;
16
import org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration;
17
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
18
import org.springframework.messaging.support.MessageBuilder;
19
import org.springframework.test.context.junit4.SpringRunner;
20
 
          
21
import static org.assertj.core.api.Assertions.assertThat;
22
 
          
23
@RunWith(SpringRunner.class)
24
@SpringBootTest(classes = { EmbeddedKafkaLatchTest.App.class, EmbeddedKafkaLatchTest.Producer.class, EmbeddedKafkaLatchTest.Consumer.class })
25
@EnableBinding(Source.class)
26
public class EmbeddedKafkaLatchTest {
27
 
          
28
    @SpringBootApplication(exclude = TestSupportBinderAutoConfiguration.class)
29
    static class App {
30
 
          
31
    }
32
 
          
33
    private static final String TOPIC1 = "test-topic-1";
34
 
          
35
    @ClassRule
36
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TOPIC1);
37
 
          
38
    private static CountDownLatch latch = new CountDownLatch(1);
39
 
          
40
    @BeforeClass
41
    public static void setup() {
42
 
          
43
        System.setProperty("spring.cloud.stream.kafka.binder.brokers",
44
            embeddedKafka.getEmbeddedKafka()
45
                .getBrokersAsString());
46
        System.setProperty("spring.cloud.stream.bindings.input.destination", TOPIC1);
47
        System.setProperty("spring.cloud.stream.bindings.input.content-type", "text/plain");
48
        System.setProperty("spring.cloud.stream.bindings.input.group", "input-group-1");
49
        System.setProperty("spring.cloud.stream.bindings.output.destination", TOPIC1);
50
        System.setProperty("spring.cloud.stream.bindings.output.content-type", "text/plain");
51
        System.setProperty("spring.cloud.stream.bindings.output.group", "output-group-1");
52
 
          
53
    }
54
 
          
55
    @Autowired
56
    private Producer producer;
57
 
          
58
    @Autowired
59
    private Consumer consumer;
60
 
          
61
    @Test
62
    public void testMessageSendReceive() throws InterruptedException {
63
 
          
64
        producer.getMysource()
65
            .output()
66
            .send(MessageBuilder.withPayload("payload")
67
                .setHeader("type", "string")
68
                .build());
69
 
          
70
        latch.await();
71
        assertThat(consumer.getReceivedMessage()).isEqualTo("payload");
72
    }
73
}


You can find the complete source code here. 

Further Reading

Kafka With Spring Cloud Stream

5 Spring Cloud Annotations Java Programmers Should Know

Microservice Architectures With Spring Cloud and Docker

Spring Framework kafka Spring Boot Spring Cloud Testing app

Opinions expressed by DZone contributors are their own.

Related

  • Spring Boot Pet Clinic App: A Performance Study
  • How to Activate New User Accounts by Email
  • Spring Cloud Stream Binding Kafka With EmbeddedKafkaRule Using In Tests
  • Reactive Kafka With Streaming in Spring Boot

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook