Ping-Pong Implementation: JSR-356
This article takes a look at the server-side implementation for ping-pong using Java API for WebSockets (JSR-356).
Join the DZone community and get the full member experience.
Join For FreeThis article was originally published in August 2021.
Prerequisites
For this blog post, I recommend that you have basic Java knowledge and an understanding of how WebSockets works.
Problem
This post explains a challenge we had faced on a cloud environment where the WebSockets were being dropped after being idle for a specified duration (nearly 55-60 seconds). To handle this, we enhanced our WebSocket endpoint by implementing the ping-pong on our platform, which kept our connection alive in that environment for a longer and specified (configurable) duration.
"What is Ping-Pong?"
Asking Google the same question, it rightly linked to table tennis. Players on either side pass up the ball across the table and abide by the rules set for the game. Similarly in our implementation, the "players" are the server and the client passing along the messages over the network and abiding by the rules and regulations set by RFC.
I assume that you, as a reader, are quite familiar with the annotations provided by the Java API for WebSockets:
- @OnOpen
- @OnMessage
- @OnPongMessage
- @OnClose
- @OnError
- @ServerEndpoint
Using these annotations, we will implement our own basic ping-pong implementation, managed completely by the server. I will try my best to keep things as simple and understandable as possible.
Approach:
Our requirement was that the server manages the connections on its end and periodically ping to the registered clients. The clients had only one job to do: try connecting to the server.
For every WebSocket connection that was opened up at the server, the server would wrap up the javax.websocket.Session object into a plain POJO, which we named SessionHeartbeat.
In our implementation, for every WebSocket connection that was trying to connect, we usually authenticated every connection in the method annotated with @OnOpen. In this method, the JWT token and clientUUID were being validated for authenticity. If the request was invalid, we basically threw a custom exception and handled it in the @OnError method.
Since every developer/platform might not have the same requirement or could also have their own custom implementation for security, I will leave you, as a reader, to decide how you want to handle authentication on your WebSocket connection.
Implementation:
You may be familiar with how to create a @ServerEndpoint
class.
@ServerEndpoint(value = "/events")
public class WebsocketEventsEndpoint {
@OnOpen
public void onOpen(Session session){
// Authenticate request
// Register the session
}
@OnMessage
public void onMessage(Session session, String message){
// Authenticate request
// Perform any business logic
}
@OnMessage
public void onPongMessage(Session session, PongMessage message){
// Handle ping-pong
}
@OnClose
public void onClose(Session session){
// De-register session from ping-pong
}
@OnError
public void onError(Session session, Throwable throwable){
// Handle errors
}
}
As you can notice, we have 2 different methods (@OnMessage
and @OnPongMessage
) with the same annotation. However, the arguments are different.
How does the underlying server decide if an incoming frame is either a PongMessage or a plain text message? If you go through the RFC, you will see that in this protocol every "message" that is being transmitted over the network is basically a frame. They will have some metadata of their own along with the real message being sent over. Now, this frame has some data in itself, called an "opcode". The value of opcode helps in determining if the message is a ping/pong/open/close/message frame. If you are curious to know more about the underlying implementation, I recommend going through the server's code.
I happened to stumble upon the Tyrus implementation. I used this standalone for unit testing and went through the call hierarchy on TyrusEndpointWrapper.onPong, org.glassfish.tyrus.core.frame.TyrusFrame.wrap(Frame, byte, ByteBuffer). You may also choose to look around on Tomcat or JBoss libraries.
Registering Session
As I had mentioned earlier, we register every connection by creating their respective heartbeats so that it is nothing more than a plain POJO with the necessary metadata which the ping-pong scheduler will use.
public final class SessionHeartbeat {
private Session userSession;
private AtomicInteger retry = new AtomicInteger(WebsocketProperties.MAX_RETRY_COUNT);
private AtomicReference<Long> lastPingAt = new AtomicReference<>(System.currentTimeMillis());
private AtomicReference<Long> lastPongReceived = new AtomicReference<>(
System.currentTimeMillis());
private AtomicReference<Long> lastMessageOnInMillis = new AtomicReference<>(
System.currentTimeMillis());
public SessionHeartbeat(Session session){
// assign the session
}
// getters & "mandatory" setters
// MUST : override equals & hash-code
}
Websocket PingPong Scheduler Service
For ping-pong implementation to succeed, the core will run as a background job that periodically pings to all the registered client(s). We registered these client connection sessions in a very basic way of using the ConcurrentHashMap, having the Session as the key and the heartbeat as their values.
We used 2 different executor services:
- Fixed thread pool: The worker thread pool, whose job was to ping to all the clients and update the metadata (
SessionHeartbeat
) accordingly. - Scheduled executor service: Runs periodically to get all the connections and submit it to worker only if it is allowed to ping.
Approach
The scheduler service takes up a session, checks if the session has been idle for a specified duration (meaning that the server only is receiving pong messages from the client, i.e., 30 minutes), or it has tried multiple times (i.e., max retry = 3), connecting to the client with no client response. If this is the case, it makes no sense for the server to keep it in the list, and it closes that connection. Otherwise, it will continue pinging.
Implementation
public final class WebsocketPingPongSchedulerService {
private static SchedulerService schedulerService;
private static ScheduledExecutorService websocketHeartbeatMainExecutorService;
private static ExecutorService webSocketHeartbeatWorkerExecutorService;
private final Map<Session, SessionHeartbeat> sessionHeartbeats = new ConcurrentHashMap<>();
// initialize & start the executor service - leaving upto the implementor.
// have the shutdown & cleanup methods
// add/modify/remove functionality for the heartbeats - i.e., modifying the Map.
// schedule the ping functionality method
public void schedulePingMessages() {
websocketHeartbeatMainExecutorService.scheduleAtFixedRate(this::pingClients,
WebsocketProperties.WEBSOCKET_PING_SCHEDULED_TIME_IN_SECONDS,
WebsocketProperties.WEBSOCKET_PING_SCHEDULED_TIME_IN_SECONDS,
TimeUnit.SECONDS);
}
private void pingClients() {
LOGGER.debug("Starting to ping");
sessionHeartbeats.parallelStream().forEach(heartbeat -> {
submitToWorker(heartbeat);
});
}
private void submitToWorker(SessionHeartbeat heartbeat) {
webSocketHeartbeatWorkerExecutorService.submit(() -> {
if (heartbeat.getRetry().get() == 0 || hasIdleTimeExpired(heartbeat)) {
closeSession(heartbeat);
} else {
pingToClient(heartbeat);
}
});
}
private void pingToClient(SessionHeartbeat heartbeat) {
Session session = heartbeat.getUserSession();
try {
if (session.isOpen()) {
LOGGER.trace("Going to ping client with session {}.", session.getId());
JsonObject payloadJson = createPingPayload(session); // Maximum allowed payload of 125 bytes only
ByteBuffer payload = ByteBuffer.wrap(payloadJson.toString().getBytes());
session.getBasicRemote().sendPing(payload);
heartbeat.getLastPingAt().set(System.currentTimeMillis());
heartbeat.getRetry().set(EventProperties.MAX_RETRY_COUNT.getFromConfig());
LOGGER.debug("Ping to client for session {} is successful.", session.getId());
} else {
LOGGER.debug("Session has been closed! Retrying again for session {}.", session.getId());
heartbeat.getRetry().decrementAndGet();
}
} catch (Exception e) {
LOGGER.warn("Error in pinging clients for session {}.", session.getId(), e);
heartbeat.getRetry().decrementAndGet();
}
}
private void closeSession(SessionHeartbeat heartbeat) {
Session session = heartbeat.getUserSession();
try {
LOGGER.debug("Session retry over. Closing session with id {}.", session.getId());
session.close(new CloseReason(CloseCodes.NORMAL_CLOSURE,
"SessionId: " + session.getId() + ", Client does not respond"));
} catch (Exception e) {
LOGGER.warn("Error closing session for session {}.", session.getId(), e);
}
}
}
Make sure that the ping payload does not exceed 125 bytes, otherwise you will face an exception. What you send in the ping payload is completely onto the contract between the client and server. You may also wonder if the client needs any implementation to send any pong message to the server, but it does not matter.
As per the RFC Section 5.5.2,
Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in response, unless it already received a Close frame.
Therefore, you need not worry as the pong will always be received by the server.
However, if your requirement wants you to write down a pong implementation, then please check the presence of a mechanism for creating a pong message on a web client. If the client is Android or iOS then you can check the feasibility of neovisionaries-client in your project. I personally used this library heavily during the testing of ping-pong functionality. You may access the code repo here.
Opinions expressed by DZone contributors are their own.
Comments