Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Building Real-Time Web Applications With Cettia, Part 2

DZone's Guide to

Building Real-Time Web Applications With Cettia, Part 2

We go through as various patterns and features required to build real-time oriented applications using this open source architecture.

· Web Dev Zone ·
Free Resource

Bugsnag monitors application stability, so you can make data-driven decisions on whether you should be building new features, or fixing bugs. Learn more.

Welcome back! If you missed Part 1, you can check it out here.

Broadcasting Events

To send an event to multiple sockets, you could create a set, add a socket to the set, and send events iterating over the set. It should work, but the socket is stateful and not serializable, which means that the caller should always check whether this socket is available each time; it’s not possible to handle this socket on the other side of the wire. Cettia resolved these issues in a functional way.

  1. The application creates and passes a socket action to one of the server’s finder methods.
  2. The server finds corresponding sockets and executes the passed action, passing sockets one by one.

Here, the action means a functional interface used to handle a given argument. In this way, you can delegate state management to the server and focus on socket handling by constructing a socket action; you can also serialize and broadcast an action instead of a socket to other servers in the cluster, and let the servers execute the action for their own sockets.

The finder method to find all sockets in the server isserver.all(Action<ServerSocket> action). Add the following chat event handler to the socket handler to send a given chat event to every socket in the server:

socket.on("chat", data -> {
  server.all((Action<ServerSocket> & Serializable) s -> {
    s.send("chat", data);
  });
});

Don’t confuse it with the socket handler registered viaserver.onsocket(Action<ServerSocket> action). Finder methods including server.all are to handle existing sockets in the server (every server in the cluster if clustered) andserver.onsocket is to initialize sockets newly accepted by this server.

In fact, writing and submitting an action is only useful when you need to do something more complicated than sending events. If it’s not that complicated, it can be done in one line of code using Sentence. Rewrite the above chat event handler in one line:

socket.on("chat", data -> server.all().send("chat", data));

Sentence is created and returned by the server when its finder methods are called without an action, i.e. server.all(). Each method on Sentence, like the above send, is mapped to a pre-implemented common socket action, so if the method is executed, its mapped action is executed with the sockets found by the server according to the called finder method. That’s why the above two code snippets do exactly the same thing.

To demonstrate the chat event handler, open two sockets in one tab, or two browsers and one socket per browser. When tracking a socket’s state, it is convenient to add the above logState event handler to built-in events.

var socket1 = cettia.open("http://127.0.0.1:8080/cettia");
socket1.on("chat", data => console.log("socket1", data));
var socket2 = cettia.open("http://127.0.0.1:8080/cettia");
socket2.on("chat", data => console.log("socket2", data));

Once all the sockets are opened, select one of them and send a chat event. Then, you should see a chat event sent by socket1 and broadcast to socket1 and socket2.

socket1.send("chat", "Is it safe to invest in Bitcoin?");

You may be dying to answer the question. Try it on the console.

Working With Specific Sockets

In most cases, you are likely to handle a group of sockets representing a specific entity rather than simply all sockets. The entity, for example, could be a user signed in to multiple browsers, users entered in a chat room, red-team players in a game, and so on. As explained, the server’s finder methods accept a criterion to find sockets and an action to execute with found sockets, and the criteria to be used here is 'tag.' Cettia allows the addition and removal of a tag to and from a socket and provides finder methods to find tagged sockets, such as querying the database.

As a simple example, let’s write the myself event handler, which sends a given event to sockets tagged with my username. Here, these sockets represent an entity called myself. Assume that the username is included in the query parameter named username in a URI and URI encoding safe. For instance, if the socket’s URI is /cettia?username=alice, the socket handler will add thealice tag to the socket through socket.tag(String tagName), and when a myself event is dispatched, the server will find sockets containing the alice tag withserver.byTag(String... names) and send the event to them.

Here’s an implementation for the myself event handler. Assume that there is a method calledfindUsernameParameter to find the username parameter from given URI.

String username = findUsernameParameter(socket.uri());
socket.tag(username).on("myself", data -> server.byTag(username).send("myself", data));

To test the myself event handler, open three sockets in one tab, or three browsers and one socket per browser:

var socket1 = cettia.open("http://127.0.0.1:8080/cettia?username=alice");
socket1.on("myself", data => console.log("socket1", data));
var socket2 = cettia.open("http://127.0.0.1:8080/cettia?username=alice");
socket2.on("myself", data => console.log("socket2", data));
var socket3 = cettia.open("http://127.0.0.1:8080/cettia?username=bob");
socket3.on("myself", data => console.log("socket3", data));

Once all sockets are opened, select one of them and send a myself event.

socket2.send("myself", "A private message for me");

You should see that an event sent by socket2 is broadcast to socket1 and socket2, but not tosocket3, whose username is different. In this way, if you send a direct message to yourself, no matter which browser or device you use, it will be broadcast to every browser and device where you have opened a socket, which is very useful to improve the multi-device user experience.

You may want to compare the myself event with the above echo and chat events. Run the following code and figure out what’s different between these events.

[socket1, socket2, socket3].forEach((socket, i) => {
  const log = data => console.log(`socket${i + 1}`, data);
  socket.on("echo", log).on("chat", log);
});

Disconnection Handling

We have only dealt with sockets in opened states so far, but disconnection is inevitable. If any event fails to be sent to the user because of disconnection and they should be sent in spite of the delay when the connection is recovered, the situation has become complex. Not all disconnections are the same; they vary in the period of time between disconnection and reconnection. In general, a temporary disconnection is more common than a permanent disconnection, especially in the mobile environment, and the user experiences for each case are different. If some events are delivered after a delay of some seconds due to a temporary disconnection, the client could treat them as if they were delivered on time, but if a delay would be some minutes or hours due to permanent disconnection, it might be better to send an email about missed events.

Cettia defines the temporary disconnection as one that is followed by reconnection within 60 seconds. It designs a socket’s lifecycle to be unaffected by temporary disconnections and provides an event-driven way to handle disconnections. Here’s a server-side example to send events failed due to disconnection on the next connection. Append the following imports:

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

And the following code to the socket handler:

Queue<Object[]> queue = new ConcurrentLinkedQueue<>();
socket.oncache(args -> queue.offer(args));
socket.onopen(v -> {
  while (socket.state() == ServerSocket.State.OPENED && !queue.isEmpty()) {
    Object[] args = queue.poll();
    socket.send((String) args[0], args[1], (Action<?>) args[2], (Action<?>) args[3]);
  }
});
socket.ondelete(v -> queue.forEach(args -> System.out.println(socket + " missed event - name: " + args[0] + ", data: " + args[1])));

Refer to the socket lifecycle section for the difference between the server’s socket event and the socket’s open event, and when the socket’s open and delete events are dispatched. By default, the client reconnects to the server with the delay interval determined by a geometric progression with initial the delay 500 and ratio 2 (500, 1000, 2000, 4000 …).

  • If the socket has no active connection when the send method is called, the cache event is fired with an argument array used to call the send method. In this event, you can decide and collect events to send on the next reconnection into the queue.
  • If an open event is fired, flush the queue by sending items one by one via a new connection. Even within the open event, you should check that the socket is opened so as not to disrupt thequeue up.
  • If a delete event is fired and the queue is not empty, you have to work with other building blocks of your application according to the user experience you want to provide with the remaining events. For example, a database could be used to store missed events and show them on the next visit to the service. A push notification system could be used to notify a user of missed events, and an SMTP server could be used to send a digest email of missed events.

Note that when writing and submitting socket actions to the server, you don’t need to take care of a given socket’s state. Even if a socket has no connection and fails to send events, you can safely handle them within the cache handler.

The easiest way to simulate a temporary disconnection would be to set a name option in opening a socket and refresh a webpage. The name option is an identifier within the browsing context to allow the socket to share the same name option in the next page and to inherit the lifecycle of the socket in the current page. Because this option can help restore missed events during page navigation, it’s useful when you add real-time web features to multi-page applications. Open the developer tools at index.html and run the following code snippet:

var socket1 = cettia.open("http://127.0.0.1:8080/cettia", {name: "main"});
socket1.on("chat", data => console.log("socket1", "message", data.message, "with", Date.now() - data.sentAt, "ms delay"));

Refresh the webpage, then socket1 should be disconnected. Run the following code snippet on the refreshed page:

var socket2 = cettia.open("http://127.0.0.1:8080/cettia");
socket2.once("open", () => socket2.send("chat", {message: "ㅇㅅㅇ", sentAt: Date.now()}));
socket2.on("chat", data => console.log("socket2", "message", data.message, "with", Date.now() - data.sentAt, "ms delay"));

A chat event sent from socket2 can’t reach socket1 because it has no active connection, and instead the event is cached in a queue for socket1. If you run the first code snippet again on the refreshed page so that socket1’s lifecycle is extended, you should see that socket1 receives the cached events. Of course, if you defer running the first code snippet for one minute, you will see thatsocket1 dispatches the delete event, so its cached events are logged as missed events in the server.

Scaling a Cettia Application

Last but not least is scaling an application. As mentioned earlier, any publish-subscribe messaging system can be used to scale a Cettia application horizontally, and it doesn’t require any modification in the existing application. The idea behind scaling a Cettia application is very simple:

  • When one of the finder methods of the server is called, it serializes this method invocation to a message and publishes it to the cluster.
  • When a server receives some message from the cluster, it deserializes to the method invocation and applies it to its own sockets.

No matter how complicated the socket action to be passed to the finder method is, it can be executed with sockets in other servers as long as it is serializable; you don’t need to be worried much about serialization. Actions provided by Sentence are all serializable, and you can make actions serializable simply with cast expressions of Java 8, like(Action<ServerSocket> & Serializable) socket -> {} even if you have to use plain actions.

In this tutorial, we will use Hazelcast as a publish-subscribe messaging system. Add the following dependencies:

<dependencies>
  <dependency>
    <groupId>com.hazelcast</groupId>
    <artifactId>hazelcast</artifactId>
    <version>3.9.3</version>
  </dependency>
  <dependency>
    <groupId>com.hazelcast</groupId>
    <artifactId>hazelcast-client</artifactId>
    <version>3.9.3</version>
  </dependency>
</dependencies>

Now, add the following imports:

import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import com.hazelcast.instance.HazelcastInstanceFactory;
import io.cettia.ClusteredServer;
import java.util.Map;

Replace the first line of the Cettia part, Server server = new DefaultServer();, with the following line:

ClusteredServer server = new ClusteredServer();

ClusteredServer class has two methods:

  1. onpublish(Action<Map<String,Object>> action) - The server intercepts and serializes the finder method calls to the wrapped server and passes them to the argument action. The action should publish it to the cluster.
  2. messageAction() - This action deserializes a published message and calls the wrapped server’s finder method. It should be called with a message when it arrives from the cluster.

Just to give you an idea, withserver.onpublish(message -> server.messageAction().on(message));,

ClusteredServer will behave exactly the same as DefaultServer. Add the following code to the CettiaConfigListener#contextInitialized method:

// Hazelcast part
HazelcastInstance hazelcast = HazelcastInstanceFactory.newHazelcastInstance(new Config());
ITopic<Map<String, Object>> topic = hazelcast.getTopic("cettia");
// It publishes messages given by the server
server.onpublish(message -> topic.publish(message));
// It relays published messages to the server
topic.addMessageListener(message -> server.messageAction().on(message.getMessageObject()));

Now, if the application calls server.all with an action, the passed action will be serialized and broadcast to all servers in the cluster and deserialized and executed by each server in the cluster. Let’s restart the server on port 8080, open a new shell, and start up one more server on port 8090 by running mvn jetty:run -Djetty.port=8090. Then you’ll see Hazelcast nodes on 8080 and 8090 form the cluster.

To test the implementation, open two sockets in one tab per port, or two browsers and one socket per browser plausibly:

var socket1 = cettia.open("http://127.0.0.1:8080/cettia");
socket1.on("chat", data => console.log("socket1", data));
var socket2 = cettia.open("http://127.0.0.1:8090/cettia");
socket2.on("chat", data => console.log("socket2", data));

Once all sockets are opened, select one of them and send a chat event:

socket1.send("chat", "Greetings from 8080");

As you can see, a chat event sent from a client connected to the server on 8080 propagates to clients connected to the server on 8090 as well as 8080.

As for deployment, it’s just a web application, after all, so you can deploy the application and configure the environment as usual. Just keep in mind that you should enable ‘sticky session’ to deploy a clustered Cettia application. It’s required to manage a socket lifecycle that consists of multiple transports and to enable HTTP transports that consist of multiple HTTP request-response exchanges.

Conclusion

Cettia is a full-featured real-time web application framework that you can use to exchange events between the server and client in real-time. Following the separation of concerns principle, the framework is separated into three layers: 1. I/O framework agnostic layer to run a Cettia application on any I/O framework on JVM; 2. Transport layer to provide a reliable full duplex message channel; 3. Socket layer to offer elegant patterns to achieve better user experience in the real-time web. This multi-layered architecture allows for focusing on application-level real-time event handling only, as well as a greater freedom of choice on technical stacks.

In this tutorial, we’ve walked through the reason behind key design decisions that the Cettia team have made in the Cettia, as well as various patterns and features required to build real-time oriented applications without compromise with Cettia, and as a result, we’ve built the starter kit. The source code for the starter kit is available at https://github.com/cettia/cettia-starter-kit.

If you have any question or feedback, please feel free to share them on Cettia Groups.

Monitor application stability with Bugsnag to decide if your engineering team should be building new features on your roadmap or fixing bugs to stabilize your application.Try it free.

Topics:
web dev ,java web development ,jvm ,web application architecture

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}