Full-Duplex Scalable Client-Server Communication with WebSockets and Spring Boot (Part I)
In the first part of this tutorial, we take a look at how to configure a Java WebSocket that communicates with a microservices infrastructure.
Join the DZone community and get the full member experience.
Join For FreeIntroduction
In this article, we will be covering the steps to create a Java WebSocket server, powered by Spring Boot, that communicates with Java clients and supports load balancing across multiple instances. We’ll start by describing a use case scenario, analyzing the requirements, and then we’ll proceed to choose the technology stack that suits the case. We’ll implement the code, and finally test the application to get some performance and durability benchmarks.
Use Case: A Smart Home
The first thought we had when writing this, was to go with the classic demonstration of WebSockets usage that is the chat application. But if you were to search the internet about WebSocket examples, 99% of them are about chat apps. So we needed to come up with another idea, one that would be more fascinating and relevant to today’s technology, and would cover both point-to-point communication, as well as one-to-many broadcasting. And that is a smart home device network.
In our scenario, all the smart devices have a persistent connection to a server. The server is responsible for sending commands to specific devices, such as turning on the living room lights, or enabling the alarm. It can also receive information from devices. For example, there can be a temperature sensor that takes readings every minute or an oven that sends alerts if the temperature is too high. Finally, the server may also issue commands to all devices, such as turn on/off.
The server also exposes a REST API, to be used by the user to collect information and control the devices.
From now on, we’ll refer to the smart devices as clients. So let’s write down the requirements:
Client login with credentials: we assume that each client has a pair of credentials (username and password) that’s used to authenticate with the server.
Persistent connection to the server: the clients keep a bi-directional persistent connection to the server.
Point-to-point communication: the server must be able to send (and receive) messages to a specific client.
Broadcast messages: the server must be able to broadcast messages to all clients.
Dropped Connection Detection and Recovery: the client should be able to detect a dropped connection and attempt to reconnect automatically.
Server scalability: should the network traffic become too high, scaling the number of instances of the server application must be effortless.
Technology Stack
Now that the requirements are established, we’ll describe the tech stack we used to implement our solution. For the server, we’ll be using the microservices pattern. Each microservice (MS) is written in Java 11, using the Spring Boot framework and more specifically the Web on Servlet stack. The communication with the clients is handled by the Device Management MS. The Control MS exposes the REST API, and communicates with the Device Mgmt MS using an ActiveMQ Artemis message broker. For incoming traffic routing, service discovery, and load balancing we’ll be using Spring Cloud Gateway and Eureka.
As mentioned, the server and clients communicate via WebSocket. This protocol allows for persistent, full-duplex communication as well as detection of dropped connections by both the server and the clients.
Because WebSocket is a low-level protocol that doesn’t specify the structure of the transmitted messages, we also need to implement a higher-level protocol, that would be the “contract” between the sender and the receiver. For that, we will be using STOMP (Simple Text Oriented Message Protocol). We will also configure Spring to work with a dedicated STOMP broker for the actual broadcasting of messages. A simple approach would be to use a simple in-memory Broker. This approach falls short, though, when you scale up and add additional servers. Users connected to different servers would have no way of communicating or getting updates pushed to them for something that's happened on another server. Therefore, we will use an External Broker (ActiveMQ Artemis). For more details see here: Spring Framework Reference - External Broker
Let’s Write Some Code: Java WebSocket Server
The first part consists of writing the code for the server-side part. In this article, we are focusing mostly on the Device Management MS and Control MS, as there are many tutorials on how to set up a Eureka/gateway server.
Configuring Gateway/Eureka
The Eureka server runs on port 8761. The routing configuration of the gateway service application.yml:
xxxxxxxxxx
spring
application
name gateway-server
cloud
gateway
discovery
locator
enabledtrue
routes
id device-management-ms
uri lb //device-management-service
predicates
Path=/device-management-service/**
filters
StripPrefix=1
id control-ms
uri lb //control-service
predicates
Path=/control-service/**
filters
StripPrefix=1
globalcors
cors-configurations
'[/**]'
allowedOrigins"*"
allowedMethods"*"
allowedHeaders"*"
allowCredentialstrue
Creating the Device Management MS
This microservice will play the role of the WebSocket Server. At first, we have to add the following dependencies to the pom.xml
xxxxxxxxxx
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-messaging</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
WebSocket Configuration
Then we need to create a configuration class for WebSockets. This class will be annotated with the @EnableWebSocketMessageBroker
annotation and will implement the interface: org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer
Let's see what we need to set within the configureMessageBroker
method. We start by enabling a STOMP broker relay and setting the path and credentials for the ActiveMQ Artemis:
WebSocketConfig.java
x
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
"${broker.relay.host}") (
private String brokerRelayHost;
"${broker.relay.user}") (
private String brokerRelayUser;
"${broker.relay.password}") (
private String brokerRelayPass;
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/queue", "/topic")
.setRelayHost(brokerRelayHost)
.setClientLogin(brokerRelayUser)
.setClientPasscode(brokerRelayPass)
.setSystemLogin(brokerRelayUser)
.setSystemPasscode(brokerRelayPass)
.setUserDestinationBroadcast("/topic/unresolved-user")
.setUserRegistryBroadcast("/topic/log-user-registry");
config.setApplicationDestinationPrefixes("/cdm");
}
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket").setAllowedOrigins("*");
registry.addEndpoint("/websocket").withSockJS();
}
}
We continue by configuring the destination prefixes of the queues/topics that are used for WebSocket communication. When a client initiates a connection, it subscribes to a queue (or topic) to receive messages from the server. In our case, for point-to-point messaging the client would subscribe to a destination like “/user/queue/device”. Spring automatically recognizes the prefix “/user/” and creates a unique queue belonging to this specific WebSocket session. An example would be “/queue/device-user1234-asdf-5678-asdf”, where the suffix 1234-asdf-5678-asdf is the server-generated session-id belonging to this client. We’ll describe how this session-id is associated with the username that the client provides, during the authentication phase, later on.
Behind the scenes, Spring keeps these associations in a "User Registry". There are 2 options:
-
DefaultSimpUserRegistry
(default strategy): stores everything in memory -
MultiServerUserRegistry
: shares user registries across multiple servers utilizing the selected Message Broker
We go with the second option by setting the following properties of the MessageBrokerRegistry
:
xxxxxxxxxx
.setUserDestinationBroadcast("/topic/unresolved-user")
.setUserRegistryBroadcast("/topic/log-user-registry");
These create two "special" topics on the message broker and must be set if you intend to have multiple instances of the server running. Let’s explain why:
- Suppose there are two instances of the Device Management MS and Control MS sends an event to turn off a device with id: “X.” The event will be picked up by one instance of the Device Mgmt MS in order to send the corresponding message to the Smart Device client. Because the WebSocket sessions are load-balanced across the instances, there is a 50% chance that this instance doesn’t have the WebSocket connection with the client “X.”
- To overcome this issue, special topics are used to forward unresolved messages to other instances, and keep track of connected users.
You can read more in the Spring’s documentation here: websocket-stomp-user-destination
WebSocket Controller
A typical controller class to handle incoming messages from the clients. We can add a java.security.Principal
argument to the method and Spring will inject this parameter to be used by our application. The principal information comes from the authorization performed during the HTTP handshake.
WebsocketController.java
xxxxxxxxxx
public class WebsocketController {
private final DeviceMgmtService deviceMgmtService;
"/device") (
public void handleMessageFromDevice( ResponseMessage message, Principal principal) {
log.info("Received message from device: {}", principal.getName());
deviceMgmtService.handleMessageFromDevice(message);
}
}
Spring Security Configuration
We are now going to secure the WebSocket endpoints and enable username-password authentication. A simplistic configuration is shown below:
SecurityConfig.java
xxxxxxxxxx
public class SecurityConfig extends WebSecurityConfigurerAdapter {
private CustomAuthenticationProvider authProvider;
protected void configure(AuthenticationManagerBuilder auth) {
auth.authenticationProvider(authProvider);
}
protected void configure(HttpSecurity http) throws Exception {
http.csrf().disable().authorizeRequests()
.antMatchers("/", "/index", "/server/**", "/websocket", "/websocket/**").permitAll()
.and()
.authorizeRequests()
.anyRequest().authenticated()
.and().httpBasic();
}
}
In our case, we are using Basic Authentication with a Custom Authentication Provider. You may use a custom authentication provider, when you have to authenticate with third-party systems or perform other non-trivial tasks during authentication. In our case, the authentication provider looks like this:
CustomAuthenticationProvider.java
xxxxxxxxxx
public class CustomAuthenticationProvider implements AuthenticationProvider {
public Authentication authenticate(Authentication authentication) throws AuthenticationException {
String username = authentication.getName();
String password = authentication.getCredentials().toString();
/*
* Insert custom authentication logic here
*/
return new UsernamePasswordAuthenticationToken(username, password, new ArrayList<>());
}
public boolean supports(Class<?> authentication) {
return authentication.equals(UsernamePasswordAuthenticationToken.class);
}
}
You can refer to Authentication and Token Authentication for a full list of options and capabilities.
Remember the auto-generated session-id we mentioned earlier? Behind the scenes, Spring associates the username provided during the authentication, with this id. Therefore, you can send a message to any client by simply referring to its username.
Message-Driven Communication
So far, we’ve configured the WebSocket server to work with ActiveMQ, accept messages at the “/app/device” endpoint, and support authentication with Spring Security. The final step is to configure the communication between the Device Management MS and Control MS.
The Device Management MS must consume the events sent to a specific queue from Control MS. Note that only one instance of the service must consume the event, to avoid repetition. This is known as competing consumers pattern and it’s implemented by ActiveMQ by default.
The ActionEvent
represents the messages that are sent to the queue. We add the following to the ArtemisConfig
class for listening to the queue and converting the messages using Jackson:
ArtemisConfig.java
xxxxxxxxxx
public ActiveMQConnectionFactory receiverActiveMQConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
connectionFactory.setUser(username);
connectionFactory.setPassword(password);
connectionFactory.setConnectionTTL(120000L);
return connectionFactory;
}
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(receiverActiveMQConnectionFactory());
factory.setMessageConverter(jacksonJmsMessageConverter());
factory.setConcurrency("3-10");
return factory;
}
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
HashMap<String, Class<?>> typeIdMappings = new HashMap<>();
typeIdMappings.put(ActionEvent.class.getSimpleName(), ActionEvent.class);
converter.setTypeIdMappings(typeIdMappings);
converter.setTypeIdPropertyName("_type");
return converter;
}
public CachingConnectionFactory cachingConnectionFactory() {
return new CachingConnectionFactory(receiverActiveMQConnectionFactory());
}
Finally, we configure a @JmsListener
to the queue, which triggers when a message is consumed.
AMQMessagingService.java
xxxxxxxxxx
destination = "${broker.queue}") (
public void receiveMessage(ActionEvent message) {
log.info("Received event from Control MS");
deviceMgmtService.sendMessageToDevice(message);
}
The method calls the corresponding service that creates a STOMP message and delivers it over the WebSocket to the client. You can send a STOMP message to a client from any point inside the application by calling convertAndSendToUser
.
DeviceMgmtServiceImpl.java
xxxxxxxxxx
public void sendMessageToDevice(ActionEvent event) {
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
headerAccessor.setLeaveMutable(true);
String client = event.getDestination();
/*
* Create a dummy message to send to client
*/
CommandMessage message = CommandMessage.builder()
.time(LocalDateTime.now())
.command(CommandEnum.ACTIVATE.toString())
.build();
log.info("Sending message to device: {}", client);
simpMessagingTemplate.convertAndSendToUser(client, "/queue/device", message, headerAccessor.getMessageHeaders());
}
Notice that client is the username (provided at the authentication step) of the client we want to send a message to, and the “/queue/device” is the destination that the client has subscribed to. The message is the object that contains the content of the STOMP message.
Spring has some out-of-the-box annotations like @SendToUser
, or @SendTo
that are meant to be used on a @Controller
method, when invoked by a client. For example the @SendToUser
annotation can be used to reply to the same client that hit the WebSocket endpoint. But in this case, the server sends a message at will (rather than replying to a client), so the annotations cannot be used.
This covers all of the key highlights of the Device Management MS development.
We will now make a short reference to the development of Control MS, but you can skip this part and jump right into the Java WebSocket Client, if you prefer.
Control MS
Creating the control microservice is pretty straightforward. The only remarks are the Artemis configuration class and the definition of a jmsTemplate
Bean which uses Jackson for message converting.
ArtemisConfig.java
xxxxxxxxxx
public ActiveMQConnectionFactory senderActiveMQConnectionFactory(){
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
connectionFactory.setUser(username);
connectionFactory.setPassword(password);
connectionFactory.setConnectionTTL(120000L);
return connectionFactory;
}
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
HashMap<String, Class<?>> typeIdMappings = new HashMap<>();
typeIdMappings.put(ActionEvent.class.getSimpleName(), ActionEvent.class);
converter.setTypeIdMappings(typeIdMappings);
converter.setTypeIdPropertyName("_type");
return converter;
}
public CachingConnectionFactory cachingConnectionFactory(){
return new CachingConnectionFactory(senderActiveMQConnectionFactory());
}
"jmsTemplate") (
public JmsTemplate jmsTemplate(){
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
return jmsTemplate;
}
We also create the REST endpoints that the users "hit" to talk to the smart devices. Of course, in reality, there will be a browser and/or mobile-based application for the users to interact with. For the sake of simplicity, we will use a single dummy endpoint for the moment which will be invoked via curl
or a tool like Postman:
DeviceController.java
x
"/device") (
public class DeviceController {
private final AMQMessagingService messagingService;
public ResponseEntity sendCommandToDevice( ActionDTO actionDTO) {
log.info("Received POST request with body: {}", actionDTO.toString());
/* Create Action event*/
ActionEvent event = ActionEvent.builder()
.destination(actionDTO.getDestination())
.command(actionDTO.getCommand())
.args(actionDTO.getArgs())
.id(UUID.randomUUID())
.build();
messagingService.send(event);
return ResponseEntity.ok().build();
}
}
The ActionDTO
contains the following fields:
- destination: an identified of the smart device that this message should get to, e.g. "lights_living_room"
- command: a command to be executed when delegated to the device, like "turn_on"
- args: optional Map of extra arguments if needed
Let’s Write Some Code: Java WebSocket Client
The server is ready to accept WebSocket connections at: ws://gateway.server:8000/device-management-service/websocket
At first, our client should do the following:
Connect to the URL above using its authentication credentials
Subscribe to “/user/queue/device” to receive personal messages
Subscribe to “/topic/messages/outgoing” to receive broadcasts
Adding Dependencies
Ensure that the following dependencies are added to the pom.xml
x
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
Configuring WebSocket Client
First, we need to create an org.springframework.web.socket.messaging.WebSocketStompClient
Bean. The key points here are to set the message converter to Jackson (because the content of the exchanged messages is in JSON) and set a TaskScheduler
. This scheduler is used to send heartbeat messages to the server (in times of inactivity), in order to keep the connection alive.
WebSocketConfig.java
x
public WebSocketStompClient stompClient() {
WebSocketClient simpleWebSocketClient = new StandardWebSocketClient();
List<Transport> transports =
List.of(new WebSocketTransport(simpleWebSocketClient));
SockJsClient sockJsClient = new SockJsClient(transports);
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
stompClient.setMessageConverter(new MappingJackson2MessageConverter());
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.initialize();
stompClient.setTaskScheduler(scheduler);
return stompClient;
}
Note about heartbeats: As of now (April 2020) there is an open issue in Spring https://github.com/spring-projects/spring-framework/issues/22822, related to heartbeats. As we mentioned, the client periodically sends heartbeat messages to the server, when it detects channel inactivity. Therefore, when a client is sending actual messages, the channel is considered active and no heartbeat is sent during this time. This poses a problem when the messages are handled by the server like this:
xxxxxxxxxx
"/device") (
public void handleMessageFromDevice( ResponseMessage message, Principal principal) {
log.info("Received message from device: {}", principal.getName());
}
As you can see this function doesn’t return anything to the client. Therefore, the message broker is not triggered so it won’t know that a message has been received. This may cause a timeout and trigger client disconnect by the broker. There are two possible ways to fix this is:
Always send ACK messages back to the client, which utilizes the queue and keeps the connection alive.
Client-side, don’t use the default
taskScheduler
. Instead, create a special server endpoint just for ping-pong messages and a client task scheduler that talks to this endpoint.
Re-connection Handling
The initSession
method demonstrated below, returns a StompSession
object and is used to initiate a WebSocket connection over STOMP with the server.
ConnectionHandler.java
xxxxxxxxxx
"${ws.server.host}") (
private String serverURL;
private final StompSessionHandler sessionHandler;
value = {Exception.class}, (
maxAttempts = 10,
backoff = (delay=1000, multiplier = 2, random = true))
public StompSession initSession(String username, String password) throws Exception {
String plainCredentials = username + ":" + password;
String base64Credentials = Base64.getEncoder().encodeToString(plainCredentials.getBytes());
final WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
headers.add("Authorization", "Basic " + base64Credentials);
return stompClient().connect(serverURL, headers, sessionHandler).get();
}
The method takes as arguments the username and password of the device, adds the credentials in the header (Base64 encoded) and initializes the connection to the server. Notice the @Retryable
annotation. This is part of the Spring Retry utility and it’s used to rerun the method, should an exception be thrown. This annotation is not mandatory but adds to the stability of our solution. This particular policy will try to connect again ten times, and the delay between retries follows an exponential rule. To include Spring Retry add these dependencies and add @EnableRetry
above any configuration class:
pom.xml
xxxxxxxxxx
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>1.1.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
</dependency>
Furthermore, we need to configure a Bean of type org.springframework.messaging.simp.stomp.StompSessionHandler
. In this Bean we can implement the following methods:
afterConnected: Invoked right after the initial connection to the WS server. Can be used to subscribe to queues and topics.
handleTransportError: Invoked when an communication error occurs, e.g lost connection to server.
handleFrame: Called when a message arrives. This is where the message handling is done.
getPayloadType: Returns the class that this message content should be mapped to.
StompSessionHandlerImpl.java
xxxxxxxxxx
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
this.isConnected.set(true);
session.subscribe("/topic/messages/outgoing", this);
session.subscribe("/user/queue/device", this);
}
public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
exception.printStackTrace();
}
public void handleTransportError(StompSession session, Throwable exception) {
if (this.isConnected.get()) {
this.isConnected.set(false);
webSocketService.initSession("some_device", "some_pass");
}
}
public Type getPayloadType(StompHeaders headers) {
return CommandMessage.class;
}
public void handleFrame(StompHeaders headers, Object payload) {
if (payload == null) return;
CommandMessage msg = (CommandMessage) payload;
log.info("Received: " + msg.getCommand()+ ", time: " + msg.getTime());
}
The callback method handleTransportError
allows us to create a custom re-connection logic. The flow is simple: when an error occurs, call the initSession
method to reinitialize a connection. Because handleTransportError
can be invoked multiple times, we need an extra variable that acts as a lock and ensures that initSession
will be called only once:
xxxxxxxxxx
private AtomicBoolean isConnected;
Time to "Git" It a Try!
The source code can be found in here smart-home-websockets along with a README file describing how to start the whole thing using Docker and perform basic testing on your laptop. For a more extended testing, stay tuned for part II where we deploy the microservices in GCE (Google Compute Engine) and use Locust for simulating various traffic scenarios.
Stay Tuned!
Stay tuned for the second part where we will cover the following topics:
- Load testing: We will present the tests performed to answer questions like: How does the system operate with thousands of clients? How much CPU/RAM is used? How does scaling the server help in handling the request load?
- Secure WebSocket and HTTPS: Add NGNIX in front of Spring Cloud Gateway and configure it to proxy WSS connections.
- Server Sent Events: When a device app sends a command to Control MS, needs to receive asynchronous callbacks (push notifications) from the server as a result. In this case, there is no need for a full-duplex communication so we will show how to accommodate this by leveraging SSEs.
- Production usage extra tips: 1) The need to increase limit on file handles on the servers if you expect a lot of concurrent connections, 2) how to manage inactive queues on ActiveMQ, so that, when the user session is over, all unique user queues are removed.
Opinions expressed by DZone contributors are their own.
Comments