Live Dashboard Using Apache Kafka and Spring WebSocket
Want to learn more about using Apache Kafka and Spring WebSocket? Check out this tutorial on how to create a live dashboard of real-time temperature values.
Join the DZone community and get the full member experience.
Join For FreeThis article demonstrates a demo project that I have been working on over the last week. This project uses the Live web app dashboard using Apache Kafka and Spring WebSocket. I have set the topic as the "Live Temperature Update." Kafka will feed the real-time temperature values, and the application will read the temperature in real-time and update the view.
Apache Kafka
Apache Kafka is a pub-sub messaging stream that can be used to create an enterprise messaging queue.
Spring Support for Apache Kafka
The Spring framework is providing support for Apache Kafka to publish and subscribe data in real-time.
Spring WebSocket
The Spring framework is also hosting a project called Spring WebSocket, which can be used to send messages back and forth between the client and server in real-time. To do this, Spring WebSocket is using the STOMP protocol.
My Demo Project
I have created a simple demo application to explore these technologies. In this application, the view page has a simple text and a simple line chart (I am using chart.js
), which is updating in real-time.
In this article, I am not including the process of configuring the Kafka in our workstation. There are plenty of articles that can be found on the Internet for the same.
Maven Dependencies Needed
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>webjars-locator-core</artifactId>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>sockjs-client</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>stomp-websocket</artifactId>
<version>2.3.3</version>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>bootstrap</artifactId>
<version>3.3.7</version>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>jquery</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
<version>2.0.4.RELEASE</version>
</dependency>
</dependencies>
Kafka Consumer Configuration in Spring
public class KafkaConsumerConfig {
"${kafka.bootstrapserver}") (
public String bootstrapServer;
public Map<String,Object> consumerConfigs(){
Map<String,Object> props=new HashMap<String,Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "temp-groupid.group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return props;
}
public ConsumerFactory<String, String> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory=new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
In Config, you will need to perform the following tasks:
BOOTSTRAP_SERVERS_CONFIG
to mention the server address on which Kafka is running.
KEY_DESERIALIZER_CLASS_CONFIG
and VALUE_DESERIALIZER_CLASS_CONFIG
to deserialize the key and value from the Kafka Queue.
GROUP_ID_CONFIG
to mention the Kafka group ID.
Use the AUTO_OFFSET_RESET_CONFIG
to mention the Offset Configuration. In this project, we are using the value "latest" so that we will get only the latest value. Instead, we can also use "earliest" to get all the values in the queue from the beginning.
Spring WebSocket Configuration
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/live-temperature").withSockJS();
}
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic");
}
}
Next, we need to implement the WebSocketMessageBrokerConfigurer
to configure the WebSocket.
Kafka Consumer and Message Publisher for WebSocket
public class KafkaConsumerService{
SimpMessagingTemplate template;
topics="${kafka.topic}") (
public void consume( String message) {
if(isNumeric(message)) {
template.convertAndSend("/topic/temperature", message);
}
}
public boolean isNumeric(String str)
{
try
{
"unused") (
double d = Double.parseDouble(str);
}
catch(NumberFormatException nfe)
{
return false;
}
return true;
}
}
In this class, the @KafkaListener
annotated the method that will listen for the Kafka queue messages, and template.convertAndSend
will convert the message and send that to WebSocket.
View Page
<html>
<head>
<meta charset="ISO-8859-1">
<title>Home</title>
<link href="/webjars/bootstrap/css/bootstrap.min.css" rel="stylesheet">
<script src="/webjars/jquery/jquery.min.js"></script>
<script src="/webjars/sockjs-client/sockjs.min.js"></script>
<script src="/webjars/stomp-websocket/stomp.min.js"></script>
<script
src="https://cdnjs.cloudflare.com/ajax/libs/moment.js/2.22.2/moment.min.js"></script>
<script
src="https://cdnjs.cloudflare.com/ajax/libs/Chart.js/2.7.2/Chart.min.js"></script>
<script type="text/javascript">
var stompClient;
/* Chart Configuration */
var config = {
type : 'line',
data : {
labels : [],
datasets : [ {
label : 'Temperature',
backgroudColor : 'rgb(255, 99, 132)',
borderColor : 'rgb(255, 99, 132)',
data : [],
fill : false
} ]
},
options : {
responsive : true,
title : {
display : true,
text : 'Temperature'
},
tooltips : {
mode : 'index',
intersect : false
},
hover : {
mode : 'nearest',
intersect : true
},
scales : {
xAxes : [ {
display : true,
type : 'time',
time : {
displayFormats : {
quarter : 'h:mm:ss a'
}
},
scaleLabel : {
display : true,
labelString : 'Time'
}
} ],
yAxes : [ {
display : true,
scaleLabel : {
display : true,
labelString : 'Value'
}
} ]
}
}
};
/* Document Ready Event */
$(document).ready(function() {
var ctx = document.getElementById('lineChart').getContext('2d');
window.myLine = new Chart(ctx, config);
/* Configuring WebSocket on Client Side */
var socket = new SockJS('/live-temperature');
stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
stompClient.subscribe('/topic/temperature', function(temperature) {
$('#temperature').text(temperature.body);
/* Push new data On X-Axis of Chart */
config.data.labels.push(new Date());
/* Push new data on Y-Axis of chart */
config.data.datasets.forEach(function(dataset) {
dataset.data.push(temperature.body);
});
window.myLine.update();
});
});
});
</script>
</head>
<body>
<div class="alert alert-danger" role="alert" style="width:300px;margin-left:40%;margin-top:10px;">
<p class="text-center">Current Temperature : <b id="temperature">0</b></p>
</div>
<div class="model">
<div class="modal-dialog" style="width:80%;height:auto">
<div class="modal-content">
<div class="modal-header">
<h5 class="modal-title">Temperature</h5>
</div>
<div class="model-body">
<div class="container" style="width:80%">
<canvas id="lineChart"></canvas>
</div>
</div>
</div>
</div>
</div>
</body>
</html>
On the client side, we are using SockJS to listen to the messages, which are sent from the server side WebSocket.
Properties File
server.port=5656
#Kafka Topic and Server Port
kafka.topic=livetemperature
kafka.bootstrapserver=localhost:9092
You can find the complete source code in GitHub.
Opinions expressed by DZone contributors are their own.
Comments