DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Curious about the future of data-driven systems? Join our Data Engineering roundtable and learn how to build scalable data platforms.

Data Engineering: The industry has come a long way from organizing unstructured data to adopting today's modern data pipelines. See how.

Threat Detection: Learn core practices for managing security risks and vulnerabilities in your organization — don't regret those threats!

Managing API integrations: Assess your use case and needs — plus learn patterns for the design, build, and maintenance of your integrations.

Related

  • Ports and Adapters Architecture with Kafka, Avro, and Spring-Boot
  • Spring Boot and Kafka Configuration Tuning
  • Spring Cloud Stream: A Brief Guide
  • Keep Your Application Secrets Secret

Trending

  • How to Get Plain Text From Common Documents in Java
  • What the CrowdStrike Crash Exposed About the Future of Software Testing
  • AI-Powered Flashcard Application With Next.js, Clerk, Firebase, Material UI, and LLaMA 3.1
  • Build Retrieval-Augmented Generation (RAG) With Milvus
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Develop a Full-Stack Java Application With Kafka and Spring Boot

Develop a Full-Stack Java Application With Kafka and Spring Boot

This tutorial shows how to publish and subscribe to Kafka messages in a Spring Boot application and how to display the messages live in the browser.

By 
Marcus Hellberg user avatar
Marcus Hellberg
·
Updated Oct. 06, 22 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
7.1K Views

Join the DZone community and get the full member experience.

Join For Free

What You Will Build

You will build a full-stack reactive web app that sends and receives messages through Kafka. The app uses Spring Boot and Java on the server, Lit and TypeScript on the client, and the Hilla framework for components and communication.


A browser window with an app showing one message "Hello Kafka". At the bottom of the window, there are two inputs, one for name and one for message, and a button for sending messages.


What You Will Need

  • 20 minutes
  • Java 11 or newer
  • Node 16.14 or newer
  • An IDE that supports both Java and TypeScript, such as VS Code.

Technology Overview

Kafka

Apache Kafka is a distributed event streaming platform. You can think of it as a publish/subscribe system on steroids. Kafka producers can send messages to a topic, and consumers can then read those messages. However, unlike most pub/sub systems, the messages do not get removed from the topic when you read them. This allows you to perform stream processing to analyze, aggregate, or transform data from different events in real-time.

If you want to learn the fundamentals of Kafka, I highly recommend watching this video by Tim Berglund:

Spring Boot and Spring Kafka

Spring Boot is an opinionated way of using Spring. It reduces the amount of configuration code to a bare minimum by relying on conventions over configuration. In addition, Spring Kafka adds support for configuring Kafka Producers and Consumers and listening for incoming messages by annotation methods.

Hilla

Hilla is a front-end framework built for Java. It combines a Spring Boot backend with a reactive TypeScript frontend built in Lit. Hilla automatically generates TypeScript types based on your server endpoint signatures, which helps keep your front end and back end in sync as you develop your application.

Download and Run Kafka

This tutorial uses a local Kafka broker. Follow the steps below to download and start Kafka on your computer:

  1. Go to the Kafka download page and download Kafka.
  2. Extract the downloaded archive tar -xzf kafka<,version>.tgz
  3. Open the directory cd kafka_<version>
  4. Start Zookeeper to manage the local Kafka cluster bin/zookeeper-server-start.sh config/zookeeper.properties
  5. Open a second terminal and run bin/kafka-server-start.sh config/server.properties to start the Kafka broker.
  6. You now have Kafka running and are ready to start building your application.

Create a New Project

Begin by creating a new Hilla project. This will give you a Spring Boot project configured with a TypeScript-Lit front end.

  1. Use the Vaadin CLI to initialize the project: npx @vaadin/cli init --hilla --empty hilla-kafka
  2. Open the project in your IDE of choice.
  3. Start the application using the included Maven wrapper. The command will download Maven and npm dependencies and start the development server. Note: the initial launch can take several minutes. However, subsequent starts are almost instantaneous. ./mvnw

Add Kafka Spring Dependencies

Add Kafka support to the application by including the following dependencies to the pom.xml file's <dependencies>:

 
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
</dependency>


The parent Spring Boot BOM covers the dependencies, so you don't need to add the version number explicitly.

Define a Data Model

Begin by creating a new Java package: com.example.application.model.

In this newly created package, create a new Java class, Message.java, to represent the message you will send over Kafka. Then, add the following content to the class:

 
package com.example.application.model;


import java.time.Instant;
import dev.hilla.Nonnull;

public class Message {

    private @Nonnull String text;
    private Instant time;
    private @Nonnull String userName;

    public String getText() {
        return text;
    }

    public void setText(String text) {
        this.text = text;
    }

    public Instant getTime() {
        return time;
    }

    public void setTime(Instant time) {
        this.time = time;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

}


The Hilla framework uses the @Nonnull annotations to guide TypeScript type generation: they do not have an impact on Java behavior.

Sending Custom Objects With Kafka

In this tutorial, you will send a Java object as a message instead of employing a primitive method like a string or number. To do this, you need to create a custom serializer and deserializer.

In the same package, create the following two new classes, MessageSerializer.java and MessageDeserializer.java with the following content:

 
package com.example.application.model;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;

public class MessageSerializer implements Serializer<Message> {

    public static final ObjectMapper mapper = JsonMapper.builder()
            .findAndAddModules()
            .build();

    @Override
    public byte[] serialize(String topic, Message message) {
        try {
            return mapper.writeValueAsBytes(message);
        } catch (JsonProcessingException e) {
            throw new SerializationException(e);
        }
    }
}


 
package com.example.application.model;

import java.io.IOException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;

public class MessageDeSerializer implements Deserializer<Message> {

    public static final ObjectMapper mapper = JsonMapper.builder()
            .findAndAddModules()
            .build();

    @Override
    public Message deserialize(String topic, byte[] data) {
        try {
            return mapper.readValue(data, Message.class);
        } catch (IOException e) {
            throw new SerializationException(e);
        }
    }
}


The serializer and deserializer use Jackson to convert the object to and from JSON. The findAndAddModules() builder method allows Jackson to support JSR310 data types through the dependency you added.

Configure Kafka

Next, configure Kafka by adding the following to the src/main/resources/application.properties file:

 
# A custom property to hold the name of our Kafka topic:
topic.name=chat

# Set up Kafka:
spring.kafka.bootstrap-servers=localhost:9092

# Configure the consumer:
spring.kafka.consumer.client-id=chat-consumer
spring.kafka.consumer.group-id=chat-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=com.example.application.model.MessageDeSerializer

# Configure the producer:
spring.kafka.producer.client-id=chat-producer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=com.example.application.model.MessageSerializer


Update Application.java to configure the topic programmatically.

 
package com.example.application;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import com.vaadin.flow.component.page.AppShellConfigurator;
import com.vaadin.flow.server.PWA;
import com.vaadin.flow.theme.Theme;

/**
 * The entry point of the Spring Boot application.
 *
 * Use the @PWA annotation to make the application installable on phones, tablets, and some desktop
 * browsers.
 *
 */

@SpringBootApplication
@Theme(value = "hilla-kafka")\
@PWA(name = "hilla-kafka", shortName = "hilla-kafka", offlineResources = {})
@Configuration

public class Application implements AppShellConfigurator {

    @Value("${topic.name}")
    private String topicName;

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    NewTopic chatTopic() {

        return TopicBuilder
                .name(topicName)
                .partitions(1)
                .replicas(1)
                .build();
    }
}


Here are the essential parts explained:

  • Inject the topic name through Spring.
  • Use the TopicBuilder bean configuration to define and configure a new topic. In this example app, you are only setting up one partition and one replica. In an actual application, you will want to set up more partitions and replicas to ensure the cluster performs well and reliably.

Create a Server Endpoint

You are now ready to start using Kafka. Next, create the server endpoint that will communicate with the Kafka broker and the client web application.

Create a new Java file, MessageEndpoint.java, in the com.example.application package and add the following code to it:

 
package com.example.application;

import java.time.Instant;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import com.example.application.model.Message;
import com.vaadin.flow.server.auth.AnonymousAllowed;
import dev.hilla.Endpoint;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.publisher.Sinks.Many;

@Endpoint
@AnonymousAllowed
public class MessageEndpoint {

    @Value("${topic.name}")
    private String topicName;

    private final Many<Message> chatSink;
    private final Flux<Message> chat;

    private final KafkaTemplate<String, Message> kafkaTemplate;

    MessageEndpoint(KafkaTemplate<String, Message> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
        chatSink = Sinks.many().multicast().directBestEffort();
        chat = chatSink.asFlux();
    }

    public Flux<Message> join() {
        return chat;
    }

    public void send(Message message) {
        message.setTime(Instant.now());
        kafkaTemplate.send(topicName, message);
    }

    @KafkaListener(topics = "chat", groupId = "chat-group")
    private void consumer(Message message) {
        chatSink.emitNext(message,
                (signalType, emitResult) -> emitResult == EmitResult.FAIL_NON_SERIALIZED);
    }
}


Here are the essential parts explained:

  • The @Endpoint annotation tells Hilla to make all public methods available as TypeScript methods for the client. @AnonymousAllowed turns off authentication for this endpoint.
  • The chatSink is a programmatic way to pass data to the system. It emits messages so that any client that has subscribed to the associated chat Flux will receive them.
  • The constructor gets a KafkaTemplate injected by Spring and saves it to a field.
  • The join()-method returns the chat Flux, which you will subscribe to on the client.
  • The send()-method takes in a message, stamps it with the send time, and sends it using the kafkaTemplate.
  • The consumer()-method has a @KafkaListener annotation which tells Spring Kafka to run this method on incoming messages. The method emits the received message to the chatSink, which will notify all clients that are subscribed to the chat Flux.

Enable Reactive Endpoints

The current version of Hilla, as of writing this tutorial (1.2,) supports Flux endpoint methods through a feature flag. Enable the feature by creating a new file, src/main/resources/vaadin-featureflags.properties, with the following content:

 
# Push support in Hilla
com.vaadin.experimental.hillaPush=true


Create a View for Sending and Receiving Messages

Now that you have configured Kafka and set up the server to send and receive messages, the final step is creating a web view that you can use to send and receive messages.

Hilla includes the Vaadin component set, which has over 40 components. You can use the <vaadin-message-list> and <vaadin-message-input> components to build out the main chat UI. You can also use the <vaadin-text-field> component to capture the current user's name.

Hilla uses Lit to create views. Lit is conceptually similar to React: components consist of a state and a template. The template gets re-rendered any time the state changes.

Begin by renaming the generated placeholder view. Rename frontend/views/empty/empty-view.ts folder and file to frontend/views/messages/messages-view.ts. Replace the contents of the file with the following code:

 
import { View } from "Frontend/views/view";
import { customElement, state } from "lit/decorators.js";
import { html } from "lit";
import "@vaadin/message-list";
import "@vaadin/message-input";
import "@vaadin/text-field";
import { TextFieldChangeEvent } from "@vaadin/text-field";
import { MessageEndpoint } from "Frontend/generated/endpoints";
import Message from "Frontend/generated/com/example/application/model/Message";

@customElement("messages-view")
export class MessagesView extends View {
  @state() messages: Message[] = [];
  @state() userName = "";

  render() {
    return html`
      <h1 class="m-m">Kafka message center</h1>
      <vaadin-message-list
        class="flex-grow"
        .items=${this.messages}
      ></vaadin-message-list>
      <div class="flex p-s gap-s items-baseline">
        <vaadin-text-field
          placeholder="Your name"
          @change=${this.userNameChange}
        ></vaadin-text-field>
        <vaadin-message-input
          class="flex-grow"
          @submit=${this.submit}
        ></vaadin-message-input>
      </div>
    `;
  }

  userNameChange(e: TextFieldChangeEvent) {
    this.userName = e.target.value;
  }

  async submit(e: CustomEvent) {
    MessageEndpoint.send({
      text: e.detail.value,
      userName: this.userName,
    });
  }

  connectedCallback() {
    super.connectedCallback();

    this.classList.add("flex", "flex-col", "h-full", "box-border");

    MessageEndpoint.join().onNext(
      (message) => (this.messages = [...this.messages, message])
    );
  }
}


Here are the essential parts explained:

  • Lit tracks the @state() decorated properties, and any time they change, the template gets re-rendered.
  • The Message data type is generated by Hilla based on the Java object you created on the server.
  • The list of messages is bound to the message list component with .items=${this.messages}. The period in front of items tells Lit to pass the array as a property instead of an attribute.
  • The text field calls the userNameChange-method whenever the value gets changed with @change=${this.userNameChange} (the @ denotes an event listener).
  • The message input component calls MessageEndpoint.send() when submitted. Note that you are calling a TypeScript method. Hilla takes care of calling the underlying Java method on the server.
  • Finally, call MessageEndpoint.join() in connectedCallback to start receiving incoming chat messages.
  • In addition to the Vaadin components, you are also using Hilla CSS utility classes for basic layouting (flex, flex-grow, flex-col).

Finally, update the routing to match the new name of the view. Replace the contents of routes.ts with the following:

 
import { Route } from "@vaadin/router";
import "./views/messages/messages-view";

export const routes: Route[] = [{ path: "", component: "messages-view" }];


Run the Completed Application

If your application is still running, re-start it. Once the server starts, you can access the application at http://localhost:8080. Try opening the app in multiple browsers to see the messages displayed in real time across them all.

 
./mvnw




A browser window with an app showing one message "Hello Kafka". At the bottom of the window, there are two inputs, one for name and one for message, and a button for sending messages.


application Java (programming language) kafka Spring Boot

Published at DZone with permission of Marcus Hellberg. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Ports and Adapters Architecture with Kafka, Avro, and Spring-Boot
  • Spring Boot and Kafka Configuration Tuning
  • Spring Cloud Stream: A Brief Guide
  • Keep Your Application Secrets Secret

Partner Resources


Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: