DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations

Trending

  • Building a Flask Web Application With Docker: A Step-by-Step Guide
  • Building the World's Most Resilient To-Do List Application With Node.js, K8s, and Distributed SQL
  • Getting Started With Kubernetes-Native CI/CD Pipelines
  • Future of Software Development: Generative AI Augmenting Roles and Unlocking Co-Innovation
  1. DZone
  2. Software Design and Architecture
  3. Integration
  4. Ways To Stop and Resume Your Kafka Producer/Consumer at Run-Time

Ways To Stop and Resume Your Kafka Producer/Consumer at Run-Time

In this blog, the reader will learn how to stop and resume a Kafka client or producer at runtime using two distinct methods.

Ashok Gudise user avatar by
Ashok Gudise
·
May. 09, 23 · Tutorial
Like (1)
Save
Tweet
Share
2.00K Views

Join the DZone community and get the full member experience.

Join For Free

Imagine you are running a Kafka cluster, and suddenly you need to perform maintenance on one of your Kafka clients or producers. What do you do? In this blog, we will explore how to stop and resume a Kafka client or producer at runtime using the Java client API.

Kafka has become an indispensable building block for streaming data pipelines due to its high throughput, fault tolerance, and scalability, which make it an excellent option for processing large volumes of data in real time. Additionally, it offers the significant advantage of supporting several programming languages, including Java, Python, Kotlin, Rust, and others.

In this blog, we will discuss how to stop and resume a Kafka client or producer at runtime. We will explore two distinct methods: one involves utilizing REST service endpoints, while the other involves using Spring Actuator endpoints.

Tech Stack

  • Spring Boot
  • Spring Integration
  • Kafka Cluster (running in Docker)
  • Java 17 ( Or 8)

Demo Scene

Let’s begin by creating a Kafka producer. Here I am using Spring Integration to create a Kafka Producer. As I have mentioned in my previous blogs, Spring integration is the most powerful module that Spring Introducer, which works with Message Driven Approach backed by Enterprise Integration Patterns.

ProducerIntegrationConfig.java

Java
 
@Configuration
public class KafkaProducerConfig {
    
    private KafkaProperties kafkaProperties;
    private String kafkaTopic;
    
    public KafkaProducerConfig(KafkaProperties kafkaProperties, @Value("${app.topic-name}") String kafkaTopic){
        this.kafkaProperties = kafkaProperties;
        this.kafkaTopic = kafkaTopic;
    }
    public IntegrationFlow producerIntegrationFlow(){
        
        return IntegrationFlow.from(() -> new GenericMessage<>(""),
                        c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(5)))
                                .id("kafkaProducerBean"))
                .transform(message -> new Date().toString())
                .log()
                .channel("to-kafka-producer-template")
                .get();   
    }
    
    public IntegrationFlow kafkaProducerTemplate(KafkaTemplate<?,?> kafkaTemplate){
        kafkaTemplate.setDefaultTopic(this.kafkaTopic);
        return IntegrationFlow.from("to-kafka-producer-template")
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate))
                .get();
    }
    
}


application.yml

YAML
 
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 3

app:
  topic-name: demo-topic


Now Let's Create a Simple Streams Processor (By Configuring the Binders). I Am Using Spring Cloud Streams To Create a Streaming Processor.

StreamConsumer.java

Java
 
@Configuration
@Slf4j
public class StreamConsumer {
    
    @Bean
    public Consumer<KStream<?,String>> myConsumer(){
        return input -> 
                input.foreach((key, value) -> {
                    log.debug("Key: {} Value: {}", key, value);
                });
    }
}


application.yml

YAML
 
spring:
  application:
    name: processor-demo

  cloud:
   stream:
     bindings:
       myConsumer-in-0:
         destination: demo-topic
         binder: kstream-consumer
         group: processor-group
     kafka:
      streams:
        binder:
          brokers: localhost:9092

     binders:
      kstream:
       type: kstream
       environment:
         spring.cloud.stream.kafka.streams.binder.brokers: localhost:9092


With Rest Service Endpoints…

Spring Integration allows us to control and monitor the messaging endpoints that we created. This can be done in two steps.

Step 1: Create a Control Bus Message Channel, Define a Flow, and Finally, a Gateway Function

ProducerIntegrationConfig.java. ~Modify Producer’s Integration Config.

Java
 
 //Add This channel to Integration Config
    @Bean
    public MessageChannel controlChannel() {
        return MessageChannels.direct().get();
    }


Step 2: Call the Above Function in a Rest Controller

ProducerDemoController.java

Java
 
@RestController
public class ProducerDemoController {
    
    private MessageChannel controlChannel;
    
    public ProducerDemoController(@Qualifier("controlChannel") MessageChannel controlChannel){
        this.controlChannel = controlChannel;
    }
    
    @GetMapping("/stopProducer")
    public  void stopProducer(){
        controlChannel.send(new GenericMessage<>("@kafkaProducerBean.stop()"));
    }
    
    @GetMapping("/startProducer")
    public void startProducer(){
        controlChannel.send(new GenericMessage<>("@kafkaProducerBean.start()"));
    }
}


Start and Resume the Producer Through Rest Endpoint

http://localhost:8080/startProducer
http://localhost:8080/stopProducer

With Spring Boot’s Actuator Endpoints…

Add the below block to expose the bindings through actuator endpoints.

application.yml

YAML
 
management:
  endpoints:
    web:
      exposure:
        include:
          -bindings


Stop and Resume the Consumer Function Through Actuator Endpoint

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST localhost:8282/actuator/bindings/myConsumer-in-0
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST localhost:8282/actuator/bindings/myConsumer-in-0

Summary

In this blog, we discussed how to stop and resume a Kafka client or producer at runtime using the REST API and Actuator. The ability to stop and resume Kafka clients or producers is essential for maintaining the health of a Kafka cluster and ensuring the smooth operation of real-time data pipelines.

The source code can be found on my GitHub. Also, you can reach out to me on LinkedIn for any questions or suggestions.

That’s all for now. Happy Learning!

API REST Spring Integration kafka

Opinions expressed by DZone contributors are their own.

Trending

  • Building a Flask Web Application With Docker: A Step-by-Step Guide
  • Building the World's Most Resilient To-Do List Application With Node.js, K8s, and Distributed SQL
  • Getting Started With Kubernetes-Native CI/CD Pipelines
  • Future of Software Development: Generative AI Augmenting Roles and Unlocking Co-Innovation

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com

Let's be friends: