DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • API Design First: AsyncAPI in .Net
  • Evolving Spring Boot APIs to an Event-Driven Mesh
  • From APIs to Event-Driven Systems: Modern Java Backend Design
  • Translating OData Queries to MongoDB in Java With Jamolingo

Trending

  • Context Is the New Schema
  • How AI Coding Assistants Are Changing Developer Flow
  • Comparing Top Gen AI Frameworks for Java in 2026
  • How to Detect Spam Content in Documents Using C#
  1. DZone
  2. Software Design and Architecture
  3. Integration
  4. Ways To Stop and Resume Your Kafka Producer/Consumer at Run-Time

Ways To Stop and Resume Your Kafka Producer/Consumer at Run-Time

In this blog, the reader will learn how to stop and resume a Kafka client or producer at runtime using two distinct methods.

By 
Ashok Gudise user avatar
Ashok Gudise
·
May. 09, 23 · Tutorial
Likes (1)
Comment
Save
Tweet
Share
6.3K Views

Join the DZone community and get the full member experience.

Join For Free

Imagine you are running a Kafka cluster, and suddenly you need to perform maintenance on one of your Kafka clients or producers. What do you do? In this blog, we will explore how to stop and resume a Kafka client or producer at runtime using the Java client API.

Kafka has become an indispensable building block for streaming data pipelines due to its high throughput, fault tolerance, and scalability, which make it an excellent option for processing large volumes of data in real time. Additionally, it offers the significant advantage of supporting several programming languages, including Java, Python, Kotlin, Rust, and others.

In this blog, we will discuss how to stop and resume a Kafka client or producer at runtime. We will explore two distinct methods: one involves utilizing REST service endpoints, while the other involves using Spring Actuator endpoints.

Tech Stack

  • Spring Boot
  • Spring Integration
  • Kafka Cluster (running in Docker)
  • Java 17 ( Or 8)

Demo Scene

Let’s begin by creating a Kafka producer. Here I am using Spring Integration to create a Kafka Producer. As I have mentioned in my previous blogs, Spring integration is the most powerful module that Spring Introducer, which works with Message Driven Approach backed by Enterprise Integration Patterns.

ProducerIntegrationConfig.java

Java
 
@Configuration
public class KafkaProducerConfig {
    
    private KafkaProperties kafkaProperties;
    private String kafkaTopic;
    
    public KafkaProducerConfig(KafkaProperties kafkaProperties, @Value("${app.topic-name}") String kafkaTopic){
        this.kafkaProperties = kafkaProperties;
        this.kafkaTopic = kafkaTopic;
    }
    public IntegrationFlow producerIntegrationFlow(){
        
        return IntegrationFlow.from(() -> new GenericMessage<>(""),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(5)))
                                .id("kafkaProducerBean"))
                .transform(message -> new Date().toString())
                .log()
                .channel("to-kafka-producer-template")
                .get();   
    }
    
    public IntegrationFlow kafkaProducerTemplate(KafkaTemplate<?,?> kafkaTemplate){
        kafkaTemplate.setDefaultTopic(this.kafkaTopic);
        return IntegrationFlow.from("to-kafka-producer-template")
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate))
                .get();
    }
    
}


application.yml

YAML
 
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 3

app:
  topic-name: demo-topic


Now Let's Create a Simple Streams Processor (By Configuring the Binders). I Am Using Spring Cloud Streams To Create a Streaming Processor.

StreamConsumer.java

Java
 
@Configuration
@Slf4j
public class StreamConsumer {
    
    @Bean
    public Consumer<KStream<?,String>> myConsumer(){
        return input -> 
                input.foreach((key, value) -> {
                    log.debug("Key: {} Value: {}", key, value);
                });
    }
}


application.yml

YAML
 
spring:
  application:
    name: processor-demo

  cloud:
   stream:
     bindings:
       myConsumer-in-0:
         destination: demo-topic
         binder: kstream-consumer
         group: processor-group
     kafka:
      streams:
        binder:
          brokers: localhost:9092

     binders:
      kstream:
       type: kstream
       environment:
         spring.cloud.stream.kafka.streams.binder.brokers: localhost:9092


With Rest Service Endpoints…

Spring Integration allows us to control and monitor the messaging endpoints that we created. This can be done in two steps.

Step 1: Create a Control Bus Message Channel, Define a Flow, and Finally, a Gateway Function

ProducerIntegrationConfig.java. ~Modify Producer’s Integration Config.

Java
 
 //Add This channel to Integration Config
    @Bean
    public MessageChannel controlChannel() {
        return MessageChannels.direct().get();
    }


Step 2: Call the Above Function in a Rest Controller

ProducerDemoController.java

Java
 
@RestController
public class ProducerDemoController {
    
    private MessageChannel controlChannel;
    
    public ProducerDemoController(@Qualifier("controlChannel") MessageChannel controlChannel){
        this.controlChannel = controlChannel;
    }
    
    @GetMapping("/stopProducer")
    public  void stopProducer(){
        controlChannel.send(new GenericMessage<>("@kafkaProducerBean.stop()"));
    }
    
    @GetMapping("/startProducer")
    public void startProducer(){
        controlChannel.send(new GenericMessage<>("@kafkaProducerBean.start()"));
    }
}


Start and Resume the Producer Through Rest Endpoint

http://localhost:8080/startProducer
http://localhost:8080/stopProducer

With Spring Boot’s Actuator Endpoints…

Add the below block to expose the bindings through actuator endpoints.

application.yml

YAML
 
management:
  endpoints:
    web:
      exposure:
        include:
          -bindings


Stop and Resume the Consumer Function Through Actuator Endpoint

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST localhost:8282/actuator/bindings/myConsumer-in-0
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST localhost:8282/actuator/bindings/myConsumer-in-0

Summary

In this blog, we discussed how to stop and resume a Kafka client or producer at runtime using the REST API and Actuator. The ability to stop and resume Kafka clients or producers is essential for maintaining the health of a Kafka cluster and ensuring the smooth operation of real-time data pipelines.

The source code can be found on my GitHub. Also, you can reach out to me on LinkedIn for any questions or suggestions.

That’s all for now. Happy Learning!

API REST Spring Integration kafka

Opinions expressed by DZone contributors are their own.

Related

  • API Design First: AsyncAPI in .Net
  • Evolving Spring Boot APIs to an Event-Driven Mesh
  • From APIs to Event-Driven Systems: Modern Java Backend Design
  • Translating OData Queries to MongoDB in Java With Jamolingo

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook