DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Spring Cloud Stream: A Brief Guide
  • Microsoft Azure Event Hubs
  • Reactive Kafka With Streaming in Spring Boot
  • How To Get Closer to Consistency in Microservice Architecture

Trending

  • Why High-Performance AI/ML Is Essential in Modern Cybersecurity
  • A Deep Dive Into Firmware Over the Air for IoT Devices
  • Transforming AI-Driven Data Analytics with DeepSeek: A New Era of Intelligent Insights
  • Kubeflow: Driving Scalable and Intelligent Machine Learning Systems
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Introduction to Apache Kafka With Spring

Introduction to Apache Kafka With Spring

Introduction to Apache Kafka with Spring.

By 
Otavio Santana user avatar
Otavio Santana
DZone Core CORE ·
May. 20, 21 · Tutorial
Likes (12)
Comment
Save
Tweet
Share
12.3K Views

Join the DZone community and get the full member experience.

Join For Free

Apache Kafka is a community-distributed streaming platform that has three key capabilities: publish and subscribe to streams of records, store streams of records in a fault-tolerant durable way, and then process streams as they occur. Apache Kafka has several successful cases in the Java world. This post will cover how to benefit from this powerful tool in the Spring universe.

Apache Kafka Core Concepts

Kafka is run as a cluster on one or more servers that can span multiple data centers; Kafka cluster stores a stream of records in categories called topics, and each record consists of a key, a value, and a timestamp. 

From the documentation, Kafka has four core APIs: 

  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.
  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
  • The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
  • The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems.

APIs Apache Kafka

Using Docker

There is also the possibility of using Docker. As it requires two images, one to Zookeeper and one to Apache Kafka, this tutorial will use docker-compose. Follow these instructions: 

  • Install Docker 
  • Install Docker-compose 
  • Create a docker-compose.yml and set it with the configuration below: 
YAML
 




x


 
1
version: '2.1'
2

          
3
services:
4
  zoo:
5
    image: zookeeper:3.4.9
6
    hostname: zoo1
7
    ports:
8
      - "2181:2181"
9
    environment:
10
      ZOO_MY_ID: 1
11
      ZOO_PORT: 2181
12
      ZOO_SERVERS: server.1=zoo1:2888:3888
13

          
14
  kafka:
15
    image: confluentinc/cp-kafka:5.5.1
16
    hostname: kafka1
17
    ports:
18
      - "9092:9092"
19
    environment:
20
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
21
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
22
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
23
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
24
      KAFKA_BROKER_ID: 1
25
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
26
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
27
    depends_on:
28
      - zoo



Then, run the command: 

Shell
 




xxxxxxxxxx
1


 
1
docker-compose -f docker-compose.yml up –d 


To connect as localhost, also define Kafka as the localhost within Linux, append the value below at t

he /etc/hosts: 

Properties files
 




xxxxxxxxxx
1


 
1
127.0.0.1       localhost kafka 



Application With Spring 

To explore Kafka, we'll use the Spring-kafka project. In the project, we'll simple a name counter, where based on a request it will fire an event to a simple counter in memory.

The Spring for Apache Kafka (spring-kafka) project applies core Spring concepts to the development of Kafka-based messaging solutions. It provides a "template" as a high-level abstraction for sending messages. It also provides support for Message-driven POJOs with @KafkaListener annotations and a "listener container". These libraries promote the use of dependency injection and declarative. In all these cases, you will see similarities to the JMS support.

The first step in a Spring project maven based, where we'll add Spring-Kafka, spring-boot-starter-web.

Spring-kafka by default uses the String to both serializer and deserializer. We'll overwrite this configuration to use JSON where we'll send Java objects through JSON.

Properties files
 




x


 
1
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
2
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
3
spring.json.add.type.headers=false
4
spring.kafka.consumer.properties.spring.json.trusted.packages=*



The first class is a configuration to either create a topic if it does not exist. Spring has a TopicBuilder to define the name, partition, and replica. 

Java
 




xxxxxxxxxx
1
28


 
1
import org.apache.kafka.clients.admin.NewTopic;
2
import org.springframework.context.annotation.Bean;
3
import org.springframework.context.annotation.Configuration;
4
import org.springframework.kafka.config.TopicBuilder;
5

          
6
@Configuration
7
public class TopicProducer {
8

          
9
    static final String NAME_INCREMENT = "name_increment";
10
    static final String NAME_DECREMENT = "name_decrement";
11

          
12
    @Bean
13
    public NewTopic increment() {
14
        return TopicBuilder.name(NAME_INCREMENT)
15
                .partitions(10)
16
                .replicas(1)
17
                .build();
18
    }
19

          
20
    @Bean
21
    public NewTopic decrement() {
22
        return TopicBuilder.name(NAME_DECREMENT)
23
                .partitions(10)
24
                .replicas(1)
25
                .build();
26
    }
27

          
28
}



KafkaTemplate is a template for executing high-level operations in Apache Kafka. We'll use this class in the name service to fire two events, one to increment and another one to decrement, in the Kafka. 

Java
 




xxxxxxxxxx
1
37


 
1

          
2
import org.springframework.kafka.core.KafkaTemplate;
3
import org.springframework.stereotype.Service;
4

          
5
import java.util.List;
6
import java.util.stream.Collectors;
7

          
8
@Service
9
public class NameService {
10

          
11
    private final NameCounter counter;
12

          
13
    private final KafkaTemplate<String, Name> template;
14

          
15
    public NameService(NameCounter counter, KafkaTemplate<String, Name> template) {
16
        this.counter = counter;
17
        this.template = template;
18
    }
19

          
20
    public List<NameStatus> findAll() {
21
        return counter.getValues()
22
                .map(NameStatus::of)
23
                .collect(Collectors.toUnmodifiableList());
24
    }
25

          
26
    public NameStatus findByName(String name) {
27
        return new NameStatus(name, counter.get(name));
28
    }
29

          
30
    public void decrement(String name) {
31
        template.send(TopicProducer.NAME_DECREMENT, new Name(name));
32
    }
33

          
34
    public void increment(String name) {
35
        template.send(TopicProducer.NAME_INCREMENT, new Name(name));
36
    }
37
}



Once we talked about the producer with the KafkaTemplatethe next step is to define a consumer class. A Consumer class will listen to a Kafka event to execute an operation. In this sample, the NameConsumer will listen to events easily with the KafkaListener annotation. 

Java
 




xxxxxxxxxx
1
29


 
1
import org.springframework.kafka.annotation.KafkaListener;
2
import org.springframework.stereotype.Component;
3

          
4
import java.util.logging.Logger;
5

          
6
@Component
7
public class NameConsumer {
8

          
9
    private static final Logger LOGGER = Logger.getLogger(NameConsumer.class.getName());
10

          
11
    private final NameCounter counter;
12

          
13
    public NameConsumer(NameCounter counter) {
14
        this.counter = counter;
15
    }
16

          
17

          
18
    @KafkaListener(id = "increment", topics = TopicProducer.NAME_INCREMENT)
19
    public void increment(Name name) {
20
        LOGGER.info("Increment listener to the name" + name);
21
        counter.increment(name.get());
22
    }
23

          
24
    @KafkaListener(id = "decrement", topics = TopicProducer.NAME_DECREMENT)
25
    public void decrement(Name name) {
26
        LOGGER.info("Decrement listener to the name " + name);
27
        counter.decrement(name.get());
28
    }
29
}


To conclude, we see the potential of Apache Kafka and why this project became so accessible to Big-Data players. This is a simple example of how secure it is to integrate with Spring. 

References:

  • https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-kafka 
  • https://spring.io/projects/spring-kafka 
  • https://docs.spring.io/spring-kafka/reference/html/ 
  • Source 
kafka Spring Framework Stream (computing) Docker (software) application Big data Record (computer science) Java (programming language) Dependency injection Event

Opinions expressed by DZone contributors are their own.

Related

  • Spring Cloud Stream: A Brief Guide
  • Microsoft Azure Event Hubs
  • Reactive Kafka With Streaming in Spring Boot
  • How To Get Closer to Consistency in Microservice Architecture

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!