DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • SaaS in an Enterprise - An Implementation Roadmap
  • AI-Driven RAG Systems: Practical Implementation With LangChain
  • Injecting Implementations With Jakarta CDI Using Polymorphism
  • STRIDE: A Guide to Threat Modeling and Secure Implementation

Trending

  • Enhancing Business Decision-Making Through Advanced Data Visualization Techniques
  • AI Speaks for the World... But Whose Humanity Does It Learn From?
  • Implementing Explainable AI in CRM Using Stream Processing
  • AI Agents: A New Era for Integration Professionals

Ping-Pong Implementation: JSR-356

This article takes a look at the server-side implementation for ping-pong using Java API for WebSockets (JSR-356).

By 
Abhijeet Ashri user avatar
Abhijeet Ashri
·
Updated Apr. 21, 22 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
12.2K Views

Join the DZone community and get the full member experience.

Join For Free

This article was originally published in August 2021.

Prerequisites

For this blog post, I recommend that you have basic Java knowledge and an understanding of how WebSockets works.

Problem

This post explains a challenge we had faced on a cloud environment where the WebSockets were being dropped after being idle for a specified duration (nearly 55-60 seconds). To handle this, we enhanced our WebSocket endpoint by implementing the ping-pong on our platform, which kept our connection alive in that environment for a longer and specified (configurable) duration.

"What is Ping-Pong?"

Asking Google the same question, it rightly linked to table tennis. Players on either side pass up the ball across the table and abide by the rules set for the game. Similarly in our implementation, the "players" are the server and the client passing along the messages over the network and abiding by the rules and regulations set by RFC.

I assume that you, as a reader, are quite familiar with the annotations provided by the Java API for WebSockets:

  • @OnOpen
  • @OnMessage
  • @OnPongMessage
  • @OnClose
  • @OnError
  • @ServerEndpoint

Using these annotations, we will implement our own basic ping-pong implementation, managed completely by the server. I will try my best to keep things as simple and understandable as possible.

Approach:

Our requirement was that the server manages the connections on its end and periodically ping to the registered clients. The clients had only one job to do:  try connecting to the server.
For every WebSocket connection that was opened up at the server, the server would wrap up the javax.websocket.Session object into a plain POJO, which we named SessionHeartbeat.

In our implementation, for every WebSocket connection that was trying to connect, we usually authenticated every connection in the method annotated with @OnOpen. In this method, the JWT token and clientUUID were being validated for authenticity. If the request was invalid, we basically threw a custom exception and handled it in the @OnError method.

Since every developer/platform might not have the same requirement or could also have their own custom implementation for security, I will leave you, as a reader, to decide how you want to handle authentication on your WebSocket connection.

Implementation:

You may be familiar with how to create a @ServerEndpoint class. 

Java
 
@ServerEndpoint(value = "/events")
public class WebsocketEventsEndpoint {
  @OnOpen
  public void onOpen(Session session){
    // Authenticate request
    // Register the session
  }
  
  @OnMessage
  public void onMessage(Session session, String message){
    // Authenticate request
    // Perform any business logic
  }
  
  @OnMessage
  public void onPongMessage(Session session, PongMessage message){
    // Handle ping-pong
  }
    
  @OnClose
  public void onClose(Session session){
    // De-register session from ping-pong
  }
  
  @OnError
  public void onError(Session session, Throwable throwable){
    // Handle errors
  }
}


As you can notice, we have 2 different methods (@OnMessage and @OnPongMessage) with the same annotation. However, the arguments are different.

How does the underlying server decide if an incoming frame is either a PongMessage or a plain text message?  If you go through the RFC, you will see that in this protocol every "message" that is being transmitted over the network is basically a frame. They will have some metadata of their own along with the real message being sent over. Now, this frame has some data in itself, called an "opcode". The value of opcode helps in determining if the message is a ping/pong/open/close/message frame. If you are curious to know more about the underlying implementation, I recommend going through the server's code.

I happened to stumble upon the Tyrus implementation. I used this standalone for unit testing and went through the call hierarchy on TyrusEndpointWrapper.onPong, org.glassfish.tyrus.core.frame.TyrusFrame.wrap(Frame, byte, ByteBuffer). You may also choose to look around on Tomcat or JBoss libraries.

Registering Session

As I had mentioned earlier, we register every connection by creating their respective heartbeats so that it is nothing more than a plain POJO with the necessary metadata which the ping-pong scheduler will use.

Java
 
public final class SessionHeartbeat {
  private Session userSession;
  private AtomicInteger retry = new AtomicInteger(WebsocketProperties.MAX_RETRY_COUNT);
  private AtomicReference<Long> lastPingAt = new AtomicReference<>(System.currentTimeMillis());
  private AtomicReference<Long> lastPongReceived = new AtomicReference<>(
      System.currentTimeMillis());
  private AtomicReference<Long> lastMessageOnInMillis = new AtomicReference<>(
      System.currentTimeMillis());
  
  public SessionHeartbeat(Session session){
    // assign the session
  }
  
  // getters & "mandatory" setters
  // MUST : override equals & hash-code
}


Websocket PingPong Scheduler Service

For ping-pong implementation to succeed, the core will run as a background job that periodically pings to all the registered client(s).  We registered these client connection sessions in a very basic way of using the ConcurrentHashMap, having the Session as the key and the heartbeat as their values.

We used 2 different executor services:

  • Fixed thread pool:  The worker thread pool, whose job was to ping to all the clients and update the metadata (SessionHeartbeat) accordingly.
  • Scheduled executor service: Runs periodically to get all the connections and submit it to worker only if it is allowed to ping.

Approach

The scheduler service takes up a session, checks if the session has been idle for a specified duration (meaning that the server only is receiving pong messages from the client, i.e., 30 minutes), or it has tried multiple times (i.e., max retry = 3), connecting to the client with no client response.  If this is the case, it makes no sense for the server to keep it in the list, and it closes that connection. Otherwise, it will continue pinging.

Implementation

Java
 
public final class WebsocketPingPongSchedulerService {
  private static SchedulerService schedulerService;
  private static ScheduledExecutorService websocketHeartbeatMainExecutorService;
  private static ExecutorService webSocketHeartbeatWorkerExecutorService;
  private final Map<Session, SessionHeartbeat> sessionHeartbeats = new ConcurrentHashMap<>();
  
  // initialize & start the executor service - leaving upto the implementor.
  
  // have the shutdown & cleanup methods
  
  // add/modify/remove functionality for the heartbeats - i.e., modifying the Map.
  
  // schedule the ping functionality method
  public void schedulePingMessages() {
    websocketHeartbeatMainExecutorService.scheduleAtFixedRate(this::pingClients,
        WebsocketProperties.WEBSOCKET_PING_SCHEDULED_TIME_IN_SECONDS,
        WebsocketProperties.WEBSOCKET_PING_SCHEDULED_TIME_IN_SECONDS,
        TimeUnit.SECONDS);
  }
  private void pingClients() {
    LOGGER.debug("Starting to ping");
    sessionHeartbeats.parallelStream().forEach(heartbeat -> {
      submitToWorker(heartbeat);
    });
  }

  private void submitToWorker(SessionHeartbeat heartbeat) {
    webSocketHeartbeatWorkerExecutorService.submit(() -> {
      if (heartbeat.getRetry().get() == 0 || hasIdleTimeExpired(heartbeat)) {
        closeSession(heartbeat);
      } else {
        pingToClient(heartbeat);
      }
    });
  }
  
  private void pingToClient(SessionHeartbeat heartbeat) {
    Session session = heartbeat.getUserSession();
    try {
      if (session.isOpen()) {
        LOGGER.trace("Going to ping client with session {}.", session.getId());
        JsonObject payloadJson = createPingPayload(session); //  Maximum allowed payload of 125 bytes only
        ByteBuffer payload = ByteBuffer.wrap(payloadJson.toString().getBytes());

        session.getBasicRemote().sendPing(payload);
        heartbeat.getLastPingAt().set(System.currentTimeMillis());
        heartbeat.getRetry().set(EventProperties.MAX_RETRY_COUNT.getFromConfig());
        LOGGER.debug("Ping to client for session {} is successful.", session.getId());
      } else {
        LOGGER.debug("Session has been closed! Retrying again for session {}.", session.getId());
        heartbeat.getRetry().decrementAndGet();
      }
    } catch (Exception e) {
      LOGGER.warn("Error in pinging clients for session {}.", session.getId(), e);
      heartbeat.getRetry().decrementAndGet();
    }
  }
  
  private void closeSession(SessionHeartbeat heartbeat) {
    Session session = heartbeat.getUserSession();
    try {
      LOGGER.debug("Session retry over. Closing session with id {}.", session.getId());
      session.close(new CloseReason(CloseCodes.NORMAL_CLOSURE,
          "SessionId: " + session.getId() + ", Client does not respond"));
    } catch (Exception e) {
      LOGGER.warn("Error closing session for session {}.", session.getId(), e);
    }
  }
}


Make sure that the ping payload does not exceed 125 bytes, otherwise you will face an exception. What you send in the ping payload is completely onto the contract between the client and server. You may also wonder if the client needs any implementation to send any pong message to the server, but it does not matter. 

As per the RFC Section 5.5.2, 

Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in response, unless it already received a Close frame.


Therefore, you need not worry as the pong will always be received by the server.

However, if your requirement wants you to write down a pong implementation, then please check the presence of a mechanism for creating a pong message on a web client.  If the client is Android or iOS then you can check the feasibility of neovisionaries-client in your project. I personally used this library heavily during the testing of ping-pong functionality.  You may access the code repo here. 

Implementation

Opinions expressed by DZone contributors are their own.

Related

  • SaaS in an Enterprise - An Implementation Roadmap
  • AI-Driven RAG Systems: Practical Implementation With LangChain
  • Injecting Implementations With Jakarta CDI Using Polymorphism
  • STRIDE: A Guide to Threat Modeling and Secure Implementation

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!