{{announcement.body}}
{{announcement.title}}

Spring Boot and Kafka Configuration Tuning

DZone 's Guide to

Spring Boot and Kafka Configuration Tuning

See a setup for configuration tuning in an isolated environment and to determine the Spring Boot, Kafka configuration and best practices for moderate uses.

· Big Data Zone ·
Free Resource

Summary

The goal of this exercise is to provide a setup for configuration tuning in an isolated environment and to determine the Spring Boot, Kafka configuration, and best practices for moderate uses. The high-level observations are:

  1. Partition your Kafka topic and design system stateless for higher concurrency. For most of the moderate use cases (we have 100,000 messages per hour) you won't need more than 10 partitions. As a rule of thumb; concurrency direction is directly proportional to the times the consumer spends processing a single message. 
  2. The default Spring boot configuration are very reasonable for moderate uses. Avoid changing them without a proper root cause analysis. 
  3. Although it differs from use case to use case, it is recommended to have the producer receive acknowledgment from at least one Kafka Partition leader and manual acknowledgment at the consumer side.  
  4. Size Kafka topics appropriately, by using small messages that are less than 0.5 MB, to avoid Kafka broker timeout errors. If required, chunk the large data into small messages, send them through a single partition, and reconstruct the message at the consumer side. 

Challenge

I recently got involved in an initiative where we had a need to hydrate two or more clusters of an In Memory Data Grid (IMDG) with transient data from a combination of data sources. [Please don’t ask me why? :)]

Given the fact that All IMDG clusters must be always in an identical state, the option of sourcing the data directly from each IMDG was out of the question for obvious reasons. I needed a common memory space that is accessible or can flow the data to all IMDGs. In our infrastructure, Kafka was the only available system that was accessible from all IMDGs. Hence, it was decided to fill a Kafka topic once from all data sources, and then use it as a single data source to hydrate all IMDGs. 

Purpose, Performance Needs, and Scope

Our performance needs are not very high. As of now; the system needs to move less than 40GB of data (less than 100,000 messages). The purpose was to determine the impact of popular configurations like concurrency, single/batch and auto/manual act mode for the consumer; and concurrency, retries and acks (none/1/all) for the producer. 

 Given my needs, and to time box this exercise, I didn’t experiment much tuning batch and custom buffer sizing in the consumer or producer. Also, all the experimenting was done within a single JVM; and the configuration impact on multiple processes (horizontal scaling) with multiple Kafka topics and multiple partitions is out of scope from this exercise. I also didn’t check if compression makes a lot of difference. Usually, Kafka topics have a limitation of message size and it should be considered especially if messages are traveling a long distance. 

 PS: If someone has done the hard work and willing to share the observations of any configurations I missed, I’d be happy to hear about it.  

Test Environment 

All the tests below are performed using –

  1. Kafka 2.12-2.5.0 (https://kafka.apache.org/quickstart)
  2. Spring Boot 2.2.8. RELEASE
  3. spring-kafka (https://spring.io/projects/spring-kafka#overview)
  4. MacBook Pro; MacOS Mojave 10.14.6; 16 GB RAM; 2.8 GHz Intel Core 7  

Observations

As a baseline, I ran one million (10,00,000) custom messages where each message is 1000 bytes with one topic and one default partition. The entire process was completed in ~45217 milliseconds, which can vary depending on your computer specs.

Concurrency Observation: Increasing the consumer concurrency won’t speed up the consumption of messages from Kafka topics. If your consumers are fast, keeping the concurrency low will yield better results, but if your consumer spends a significant time processing the message, higher concurrency will improve the throughput. Keep in mind that higher concurrency is not directly proportional to higher throughput, so you will have to find the right balance.

Batch Observation: Within my setup, introducing batching (spring.kafka.listener.type: batch) with most of Spring Boot’s default settings didn’t make much of a difference in performance. 

Consumer Manual Ack Observation: I didn’t notice any performance impact of using manual ack, so I would recommend to use it in all non-trivial scenarios. In case of consumer failure, it will help start again where the consumer left off. 

Producer Ack and retries Observation: I noticed considerable performance degration of using ack=1 (conformation by leader) and almost none when using retries. Even with the additional cost, I would recommend that every non-trivial application should leverage it.

 Kafka Broker and message size: I have observed issues in term of performance and Broker timeout with a large message size. Using 0.5MB turned out to be a good size for our volume. If message frequency is higher, try to keep the message size smaller. Also, size your topics and message retention appropriately. In my experiments, I am using 5GB topic (Kafka will rollover as soon as it reaches the defined limit) with one-day retention. 

 Overall: Spring Boot’s default configuration is quite reasonable for any moderate uses of Kafka. Based on my observation, the only tweak that I would recommend is to keep the acks=1 at the producer side (to get the confirmation from partition leader); and manually commit at the consumer side. 

If in case, your consumer stops receiving traffic suddenly, then you may consider tweaking “Kafka – Spring Boot” consumer connectivity configurations like “fetch.max.wait.ms”, “heartbeat.interval.ms”, “session.timeout.ms” and “max.poll.interval.ms”.

Code Setup and Tips

To set up a simple Maven-based Spring Boot based application, create a new Spring Boot project with the dependencies spring-boot-starter and spring-boot-starter-web. The package name should be “learn.kafka” which will create the main class as “learn.kafka.TestApp”. You should have the application running at port 8080. There are plenty of tutorials and spring initializers if you may need help. 

 To download Kafka, follow the instructions on https://kafka.apache.org/quickstart to start ZooKeeper and Kafka Server on your local machine and create a topic using the following command.

Java
 




x


 
1
“bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic-1p”



Tip: if required, you can delete a topic using the following command.

Java
 




xxxxxxxxxx
1


 
1
./bin/kafka-topics.sh --delete --topic topic-1p --bootstrap-server localhost:9092



Once you have a basic Spring boot application and Kafka ready to roll, it’s time to add the producer and the consumer to Spring boot application. We will use the same spring boot application as a producer as well as a consumer for this setup. You may choose to have two different spring boot applications as producer and consumer respectively. 

To add the Kafka support to “TestApp” application; open the POM.xml and add the following dependency. 

Java
 




xxxxxxxxxx
1


 
1
<dependency>
2
     <groupId>org.springframework.kafka</groupId>
3
     <artifactId>spring-kafka</artifactId>
4
 </dependency>



Add Lombok as a dependency. We will be leveraging Lombok for our CustomMessage class. 

Java
 




xxxxxxxxxx
1


 
1
<dependency>
2
     <groupId>org.projectlombok</groupId>
3
     <artifactId>lombok</artifactId>
4
 </dependency>



We have enabled Kafka support for our “TestApp” application. Now, let’s create a custom message class under the package “learn.kafka.model”. 

Java
 




xxxxxxxxxx
1
17


 
1
package learn.kafka.model;
2
 
3
 import lombok.AllArgsConstructor;
4
 import lombok.Builder;
5
 import lombok.Data;
6
 import lombok.NoArgsConstructor;
7
 
8
 import java.util.Date;
9
 
10
 @Data
11
 @NoArgsConstructor
12
 @AllArgsConstructor
13
 @Builder
14
 public class CustomMessage {
15
     private String value;      // payload 
16
     private Date startTime;    // when the producer started 
17
 }



Being a minimalist, let’s use “TestApp” class to add both producer and consumer in same class. Trust me! It’s not lot of code; thanks to Spring Boot’s magic! 

To add the consumer – add the consumer factory and consumer method as below.

Java
 




xxxxxxxxxx
1


 
1
 
          
2
@Bean public ConsumerFactory<String, CustomMessage> consumerFactory(KafkaProperties kafkaProperties){
3
     return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
4



Tips:

  1. ConsumerFactory bean is required only because of “CustomMessage”. You don’t have to create this bean explicitly if your value is of type string.
  2. Like many tutorials online DO NOT HARD CODE properties in “ConsumerFactory”. Instead, build it using “kafkaProperties.buildConsumerProperties()” and enhance it based on your needs. This will allow you to control your consumer from application.yml. 

Hot Tip: Most of the tutorials online create ConcurrentKafkaListenerContainerFactory bean explicitly. You don’t need to do so; Spring Boot will do it for you as soon as you will specify concurrency property for the consumer.     

Java
 




xxxxxxxxxx
1


 
1
@KafkaListener(topics = "#{'${spring.kafka.consumer.topic}'}")
2
 public void singleMessageConsumerWithManualAck(CustomMessage message, Acknowledgment acknowledgment, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
3
     if(receivedMessagedCount.get() < 1000) log.info("Partition: {} receivedMessagedCount:{}", partition, receivedMessagedCount.get());
4
     consumeMessage(message);
5
     acknowledgment.acknowledge();
6
 }



Tip: You don’t have to specify a topic name here and use the common property; but I decided to keep it separate because in most practice scenarios producer and consumer will be two different applications.  

Java
 




xxxxxxxxxx
1
17


 
1
private void consumeMessage(CustomMessage message) {
2
      // indicate log to make sure that consumer has started
3
      if(message.getIndex() == 0) log.info("Consumer started...");
4
 
5
      // keep the count of received messages
6
      receivedMessagedCount.getAndIncrement();
7
 
8
      // calculate the time taken since producer started till now
9
      long diffInMillies = Math.abs(new Date().getTime() - message.getStartTime().getTime());
10
      long diff = TimeUnit.MILLISECONDS.convert(diffInMillies, TimeUnit.MILLISECONDS);
11
      if(receivedMessagedCount.get() == MESSAGE_COUNT) {
12
         log.info("receivedMessagedCount:{} Took: {} ms", receivedMessagedCount.get(), diff);
13
         log.info("Consumer finished.");
14
      }
15
      // consumer has to do something. isn't it?
16
      // Thread.sleep(20);
17
 }



The actual consuming method has been separated out so I can reuse the same logic with a different setting like single vs batch mode. 

And finally, add the below configuration in your application.yml. 

Java
 




xxxxxxxxxx
1
16


 
1
spring:
2
   kafka:
3
     bootstrap-servers: localhost:9092
4
     listener:
5
       ack-mode: manual_immediate
6
      consumer:
7
        groupId: test-group-1
8
        topic: topic-1p
9
        enable-auto-commit: false
10
       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
11
       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
12
       properties:
13
         spring:
14
           json:
15
             trusted:
16
               packages: 'learn.kafka.model' 



Restart the application and your Consumer should be up and running. Please make sure ZooKeeper and Kafka Server are running on your local machine as mentioned above.  

If your application is running, then add the producer in the same class by injecting the topic name, KafkaTemplate, and producerFactory in TestApp.class as below. 

Java
 




xxxxxxxxxx
1


 
1
@Value("${spring.kafka.producer.topic}") private String topic;
2
@Autowired private KafkaTemplate<String, CustomMessage> kafkaTemplate;
3
@Bean public KafkaTemplate<String, CustomMessage> kafkaTemplate(ProducerFactory<String, CustomMessage> producerFactory) {
4
     return new KafkaTemplate(producerFactory);
5
 }
6
@Bean public ProducerFactory<String, CustomMessage> producerFactory(KafkaProperties kafkaProperties) {
7
     return new DefaultKafkaProducerFactory(kafkaProperties.buildProducerProperties());
8
 }



Special note; please add commons-lang3 in your dependencies, we will be using it to generate random strings of char 1000 length. 

Java
 




xxxxxxxxxx
1


 
1
<dependency>
2
     <groupId>org.apache.commons</groupId>
3
     <artifactId>commons-lang3</artifactId>
4
     <version>3.10</version>
5
 </dependency>



and a method to send the messages to Kafka Topic.  

Java
 




xxxxxxxxxx
1


 
1
private void sendMessages(Date startTime, long messageCount) {
2
     log.info("Producer started...");
3
     for (int i = 0; i < messageCount; i++) {
4
         String value = RandomStringUtils.random(1000, true, true);
5
         CustomMessage message = CustomMessage.builder.value(value).startTime(startTime).build();
6
         kafkaTemplate.send(topic, message);
7
     }
8
     log.info("Producer finished.");
9
 }



And add the producer configuration in your application.yml. Your combined consumer and producer properties application.yml should look like this:

Java
 




xxxxxxxxxx
1
22


 
1
spring:
2
   kafka:
3
     bootstrap-servers: localhost:9092
4
     listener:
5
       ack-mode: manual_immediate
6
     producer:
7
       topic: topic-1p
8
       key-serializer: org.apache.kafka.common.serialization.StringSerializer
9
       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
10
       retries: 3
11
       acks: 1
12
     consumer:
13
       groupId: test-group-1
14
       topic: topic-1p
15
       enable-auto-commit: false
16
       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
17
       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
18
       properties:
19
         spring:
20
           json:
21
             trusted:
22
               packages: 'learn.kafka.model'



And add “ApplicationRunner” which will  kick off the producer and consumer on application startup. 

Java
 




xxxxxxxxxx
1


 
1
    @Bean ApplicationRunner runAdditionalClientCacheInitialization() {
2
         return args -> {
3
             final Date startTime = new Date();
4
             sendMessages(startTime, 1000000);
5
 //            sendMessagesWithThread(startTime, 1000000, 10);
6
         };
7
     }



Run your application and depending on your hardware, the time taken could be different for you. 

You should be able to see how much in the console logs. 

“receivedMessagedCount:1000000 Took: 45217 ms”

To add the multiple consumer threads – modify your application properties file as below.  Also, point the application to a topic with 10 partitions. Remember! Kafka topic partition must be the same or less than the number of concurrent consumer threads.  You can create a new topic with 10 partitions using this command: 

Java
 




xxxxxxxxxx
1


 
1
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 10 --topic topic-10p 


 
Java
 




xxxxxxxxxx
1
23


1
spring:
2
   kafka:
3
     bootstrap-servers: localhost:9092
4
     listener:
5
       concurrency: 10
6
       ack-mode: manual_immediate
7
     producer:
8
       topic: topic-10p
9
       key-serializer: org.apache.kafka.common.serialization.StringSerializer
10
       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
11
       retries: 3
12
       acks: 1
13
     consumer:
14
       groupId: test-group-1
15
       topic: topic-10p
16
       enable-auto-commit: false
17
       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
18
       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
19
       properties:
20
         spring:
21
           json:
22
             trusted:
23
               packages: 'learn.kafka.model'



Run your application and depending on your hardware, the time taken could be different for you. 

You should be able to see how much in the console logs. 

“receivedMessagedCount:1000000 Took: 69660 ms”

To test the consumer’s batch based configuration, you can add the Kafka listener property to application.yml and add a new consumer method that can accept the list of Custom messages.

Java
 




xxxxxxxxxx
1


 
1
spring:
2
   kafka:
3
     listener:
4
       type: batch 



Java
 




xxxxxxxxxx
1
13


1
@KafkaListener(topics = "#{'${spring.kafka.consumer.topic}'}")
2
 public void batchConsumerWithManualAck(List< CustomMessage > messages, Acknowledgment acknowledgment, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
3
     // To sample different partitions, batch size, over all message count and size of the message
4
     if(receivedMessagedCount.get() < 1000) log.info("Partition: {} batchSize: {} receivedMessagedCount:{}", partition, messages.size(), receivedMessagedCount.get());
5
 
6
     // process the batch
7
     messages.forEach(message -> {
8
         // process the message
9
         consumeMessage(message);
10
         //  manually acknowledgment
11
         acknowledgment.acknowledge();
12
     });
13
 }



TIP: Once you add the method, don’t forget to comment out “singleMessageConsumerWithManualAck” method. 

Run your application and depending on your hardware, the time taken could be different for you. 

You should be able to see how much in the console logs.  

“receivedMessagedCount:1000000 Took: 77055 ms”

To test producer concurrency; simply add more threads using Spring boot’s task executer. KafkaTemplate is thread safe and can be used by multiple threads.  

Java
 




xxxxxxxxxx
1
14


 
1
    private void sendMessagesWithThread(Date startTime, long totalMessages, int threads) {
2
         final long messagePerThread = totalMessages /threads;
3
         log.info("messagePerThread:{}", messagePerThread);
4
 
5
 //        final CountDownLatch latch = new CountDownLatch(threads);
6
 
7
         for (int i = 0; i < threads; i++) {
8
             taskExecutor.execute(new Runnable() {
9
                 @Override public void run() {
10
                     sendMessages(startTime, messagePerThread);
11
 //                    latch.countDown();
12
                 }
13
        });
14
    }



and update the Application runner to use the method above. 

Java
 




xxxxxxxxxx
1


 
1
    @Bean ApplicationRunner runAdditionalClientCacheInitialization() {
2
         return args -> {
3
             final Date startTime = new Date();
4
 //            sendMessages(startTime, 1000000);
5
             sendMessagesWithThread(startTime, 1000000, 10);
6
         };
7
     }



This is just the tip of the iceberg. I hope this setup will help to tune your Kafka configuration for moderate usages.

Topics:
big data, java, kafka, perforamnce, spring boot, tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}