{{announcement.body}}
{{announcement.title}}

Testing Spring Boot Apps With Kafka and Awaitility

DZone 's Guide to

Testing Spring Boot Apps With Kafka and Awaitility

Learn more about testing Spring Boot apps with Kafka and Awaitility!

· Java Zone ·
Free Resource

Learn more about testing Spring Boot apps with Kafka and Awaitility!

This is the second article in the Spring Cloud Stream and Kafka series. This post talks about different ways in which we can test Spring Boot applications using EmbeddedKafka and Awaitility.

While testing any synchronous application, it is all about “call and wait.” We invoke a particular API or endpoint and wait for the response. The test blocks the main execution thread until the API returns the response. Once the processing completes, we get the response and can compare the result with the expected output.

You may also like: Kafka With Spring Cloud Stream

Asynchronous applications are tested differently as compared to the synchronous or blocking applications, i.e. we need not block the main execution thread. In simple words, it will not wait for the response from the API and we manually need to program the test to hold the execution at a certain point and wait for the results from all the non-blocking operations. At this stage, we can write the assertions. 

It is hard to manage different threads and concurrency issues and write a concise, readable unit test.

There are a few ways in which we can write tests for a Spring Boot — Spring Cloud Stream-based micro-services to connect with Kafka.

Let’s consider a simple use case for this purpose.

Example Use Case

There is a producer bean that will send messages to a Kafka topic. 

Java




x
27


 
1
package com.techwording.scs;
2
 
          
3
import org.springframework.cloud.stream.annotation.EnableBinding;
4
import org.springframework.cloud.stream.messaging.Source;
5
 
          
6
@EnableBinding(Source.class)
7
public class Producer {
8
 
          
9
    private Source mySource;
10
 
          
11
    public Producer(Source mySource) {
12
 
          
13
        super();
14
        this.mySource = mySource;
15
    }
16
 
          
17
    public Source getMysource() {
18
 
          
19
        return mySource;
20
    }
21
 
          
22
    public void setMysource(Source mysource) {
23
 
          
24
        mySource = mysource;
25
    }
26
 
          
27
}


 

 A consumer bean will listen to a Kafka topic and receive messages.

Java




xxxxxxxxxx
1
19


 
1
@EnableBinding(Sink.class)
2
public class Consumer {
3
 
          
4
        private String receivedMessage;
5
 
          
6
        @StreamListener(target = Sink.INPUT)
7
        public void consume(String message) {
8
 
          
9
            receivedMessage = message;
10
            latch.countDown();
11
 
          
12
        }
13
 
          
14
        public String getReceivedMessage() {
15
 
          
16
            return receivedMessage;
17
        }
18
 
          
19
    }



A Kafka broker with a topic is created. For this test, we will use an Embedded Kafka server with  spring-kafka-test.  

Java




xxxxxxxxxx
1


 
1
@ClassRule public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TOPIC1);


 

EmbeddedKafkaRule

Spring-kafka-test provides an embedded Kafka broker. We can use a JUnit @ClassRule annotation to create this Kafka broker. This rule starts the Kafka and Zookeeper servers on a random port before the tests execute and shuts them down after the tests are complete. The embedded Kafka broker eliminates the need to have a real Kafka and Zookeeper instance running while running the test.   

Coming back to the tests, I have implemented this test in two ways, using Awaitility and using a countdown latch.

Test Using Awaitility

This is a DSL library that provides features to assist in writing JUnit tests for an asynchronous Java application. You can check out their official GitHub page here.  Below is an implementation of the test using Awaitility.

Java




xxxxxxxxxx
1
77


 
1
package com.techwording.scs;
2
 
          
3
import java.util.concurrent.TimeUnit;
4
 
          
5
import org.junit.BeforeClass;
6
import org.junit.ClassRule;
7
import org.junit.Test;
8
import org.junit.runner.RunWith;
9
import org.springframework.beans.factory.annotation.Autowired;
10
import org.springframework.boot.autoconfigure.SpringBootApplication;
11
import org.springframework.boot.test.context.SpringBootTest;
12
import org.springframework.cloud.stream.annotation.EnableBinding;
13
import org.springframework.cloud.stream.annotation.StreamListener;
14
import org.springframework.cloud.stream.messaging.Sink;
15
import org.springframework.cloud.stream.messaging.Source;
16
import org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration;
17
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
18
import org.springframework.messaging.support.MessageBuilder;
19
import org.springframework.test.context.junit4.SpringRunner;
20
 
          
21
import static org.assertj.core.api.BDDAssertions.then;
22
 
          
23
import static org.awaitility.Awaitility.waitAtMost;
24
 
          
25
@RunWith(SpringRunner.class)
26
@SpringBootTest(classes = { EmbeddedKafkaAwaitilityTest.App.class, EmbeddedKafkaAwaitilityTest.Producer.class, EmbeddedKafkaAwaitilityTest.Consumer.class })
27
@EnableBinding(Source.class)
28
public class EmbeddedKafkaAwaitilityTest {
29
 
          
30
    @SpringBootApplication(exclude = TestSupportBinderAutoConfiguration.class)
31
    static class App {
32
 
          
33
    }
34
 
          
35
    private static final String TOPIC1 = "test-topic-1";
36
 
          
37
    @ClassRule
38
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TOPIC1);
39
 
          
40
    @BeforeClass
41
    public static void setup() {
42
 
          
43
        System.setProperty("spring.cloud.stream.kafka.binder.brokers",
44
            embeddedKafka.getEmbeddedKafka()
45
                .getBrokersAsString());
46
        System.setProperty("spring.cloud.stream.bindings.input.destination", TOPIC1);
47
        System.setProperty("spring.cloud.stream.bindings.input.content-type", "text/plain");
48
        System.setProperty("spring.cloud.stream.bindings.input.group", "input-group-1");
49
        System.setProperty("spring.cloud.stream.bindings.output.destination", TOPIC1);
50
        System.setProperty("spring.cloud.stream.bindings.output.content-type", "text/plain");
51
        System.setProperty("spring.cloud.stream.bindings.output.group", "output-group-1");
52
 
          
53
    }
54
 
          
55
    @Autowired
56
    private Producer producer;
57
 
          
58
    @Autowired
59
    private Consumer consumer;
60
 
          
61
    @Test
62
    public void testMessageSendReceive_Awaitility() {
63
 
          
64
        producer.getMysource()
65
            .output()
66
            .send(MessageBuilder.withPayload("payload")
67
                .setHeader("type", "string")
68
                .build());
69
 
          
70
        waitAtMost(5, TimeUnit.SECONDS)
71
            .untilAsserted(() -> {
72
                then("payload").isEqualTo(
73
                    EmbeddedKafkaAwaitilityTest.this.consumer.getReceivedMessage());
74
            });
75
    }
76
 
          
77
}



Test Using CountDownLatch

As per the Java documentation, CountDownLatch is an aid that allows one or more threads to wait until a set of operations being performed in other threads completes. To write this test using  CountDownLatch, we initialize the latch first with a counter.

The value of this counter depends on the number of tasks our test needs to wait for. Here, we initialize this counter with count 1.  Once the producer has sent the message, the latch waits for the count to reach 0. The consumer has the responsibility of decreasing the count. Hence, when the consumer is done with its part, the main thread resumes and performs the assertion.  

Below is an implementation of the test using CountDownLatch:

Java
xxxxxxxxxx
1
73
 
1
package com.techwording.scs;
2
 
          
3
import java.util.concurrent.CountDownLatch;
4
 
          
5
import org.junit.BeforeClass;
6
import org.junit.ClassRule;
7
import org.junit.Test;
8
import org.junit.runner.RunWith;
9
import org.springframework.beans.factory.annotation.Autowired;
10
import org.springframework.boot.autoconfigure.SpringBootApplication;
11
import org.springframework.boot.test.context.SpringBootTest;
12
import org.springframework.cloud.stream.annotation.EnableBinding;
13
import org.springframework.cloud.stream.annotation.StreamListener;
14
import org.springframework.cloud.stream.messaging.Sink;
15
import org.springframework.cloud.stream.messaging.Source;
16
import org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration;
17
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
18
import org.springframework.messaging.support.MessageBuilder;
19
import org.springframework.test.context.junit4.SpringRunner;
20
 
          
21
import static org.assertj.core.api.Assertions.assertThat;
22
 
          
23
@RunWith(SpringRunner.class)
24
@SpringBootTest(classes = { EmbeddedKafkaLatchTest.App.class, EmbeddedKafkaLatchTest.Producer.class, EmbeddedKafkaLatchTest.Consumer.class })
25
@EnableBinding(Source.class)
26
public class EmbeddedKafkaLatchTest {
27
 
          
28
    @SpringBootApplication(exclude = TestSupportBinderAutoConfiguration.class)
29
    static class App {
30
 
          
31
    }
32
 
          
33
    private static final String TOPIC1 = "test-topic-1";
34
 
          
35
    @ClassRule
36
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TOPIC1);
37
 
          
38
    private static CountDownLatch latch = new CountDownLatch(1);
39
 
          
40
    @BeforeClass
41
    public static void setup() {
42
 
          
43
        System.setProperty("spring.cloud.stream.kafka.binder.brokers",
44
            embeddedKafka.getEmbeddedKafka()
45
                .getBrokersAsString());
46
        System.setProperty("spring.cloud.stream.bindings.input.destination", TOPIC1);
47
        System.setProperty("spring.cloud.stream.bindings.input.content-type", "text/plain");
48
        System.setProperty("spring.cloud.stream.bindings.input.group", "input-group-1");
49
        System.setProperty("spring.cloud.stream.bindings.output.destination", TOPIC1);
50
        System.setProperty("spring.cloud.stream.bindings.output.content-type", "text/plain");
51
        System.setProperty("spring.cloud.stream.bindings.output.group", "output-group-1");
52
 
          
53
    }
54
 
          
55
    @Autowired
56
    private Producer producer;
57
 
          
58
    @Autowired
59
    private Consumer consumer;
60
 
          
61
    @Test
62
    public void testMessageSendReceive() throws InterruptedException {
63
 
          
64
        producer.getMysource()
65
            .output()
66
            .send(MessageBuilder.withPayload("payload")
67
                .setHeader("type", "string")
68
                .build());
69
 
          
70
        latch.await();
71
        assertThat(consumer.getReceivedMessage()).isEqualTo("payload");
72
    }
73
}


You can find the complete source code here

Further Reading

Kafka With Spring Cloud Stream

5 Spring Cloud Annotations Java Programmers Should Know

Microservice Architectures With Spring Cloud and Docker

Topics:
spring boot ,kafka ,spring cloud stream ,java ,embedded kafka

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}