RSocket With Spring Boot + JS: Zero to Hero
A tutorial.
Join the DZone community and get the full member experience.
Join For FreeWhat’s RSocket?
RSocket is a binary, asynchronous, one-to-one, stateless, symmetrical protocol that relies on one between TCP, WebSocket, Aeron, and HTTP/2 streams to be used as a transport protocol behind the curtains.
It’s been developed to match the Reactive Streams semantics, therefore integrating seamlessly in applications that depend heavily on Reactive libraries such as Project Reactor or ReactiveX.
Why Should I Use RSocket?
BareHTTP just doesn’t cut it, especially in the modern world where software architecture leans heavily towards microservices.
Microservices need to communicate potentially with a myriad of other microservices, in a tangled and twisted dance that doesn’t always go along with the core principles upon which HTTP has been built: sending text over the wire, in a request ⇄ response fashion.
It is often required that microservices send out events in a fire-and-forget manner (brokers and advanced messaging protocols help with this but at the cost of adding significant complexity to infrastructure and applications relying on them), or request some data and hold onto the connection expecting a stream of data coming through as a response, over time.
HTTP is not an efficient solution in either of these scenarios, whereas a transport protocol that’s been built specifically for computers talking to other computers asynchronously and with high-performance in mind, such as WebSocket, seems to be a very good fit.
RSocket provides all the advantages of choosing the best transport protocol for the task, and builds things like Reactive Streams semantics, backpressure management, load-balancing hints and resumability on top of it! Great stuff!
Interaction Models
RSocket is based on four main interaction models that enable symmetric interaction over a single connection:
- Request/Response: similar to HTTP, but the client waits for the response (a stream of one element) in a non-blocking manner;
- Request/Stream: the client receives elements that compose a stream of many, over time;
- Fire-and-Forget: the client sends some data and expects no response;
- Channel: the most customizable interaction model, where the client and the server can exchange data in any way that seems fit for a specific task (eg. server sends 2 frames for each frame* sent by the client).
*a frame is a single message containing either a request or a response
Resilience and High Availability
RSocket satisfies resiliency and high availability requirements by providing features such as connection/request resumption and load-balancing hints through leasing.
Resumption is the ability to resume operation in case of failure (eg. recovering an abruptly closed connection).
When using RSocket, clients are completely in charge of resumption, and it is not enabled by default.
It is particularly useful as, when sending a RESUME frame containing information about the last received frame, the client is able to resume the connection and only request the data it hasn’t already received, avoiding unnecessary load on the server and wasting time trying to retrieve data that was already retrieved.
It should be used everywhere, where it makes sense to do so.
Leasing can be enabled so that servers issue leases to clients, making sure said clients don’t exceed the defined request rates (the client should not send more than Number of Requests in any particular Time-To-Live timeframe, or else its requests will be rejected until a new, valid lease can be issued).
Servers are completely in charge of leasing, and, as it is the case for resumption, it is not enabled by default either.
When having clusters of machines exposing the same API via RSocket, it would be wise to enable leasing and use the server-provided responses as load-balancing hints for smart request routing, targeting the machines that are more likely to issue a valid lease with higher frequency.
data:image/s3,"s3://crabby-images/120ba/120ba811274855210ce37608105e292aac7ccc2c" alt=""
I Want to See It in Action!
Worry not! I’ve prepared a reactive demo application based on Spring Boot 2.2 that shows the request/stream interaction mode of RSocket in action, over WebSocket transport, with a Java RSocket Server, a Java RSocket client and a Javascript RSocket client.
I’ll explain all the steps required to create an app similar to the demo one further down, but, if you’re impatient, you can jump right into it.
Either dig into the source code yourself or follow the quickstart to see RSocket in action.
Quickstart
- Clone the repository: https://github.com/dsibilio/rsocket-demo.git, and move inside the cloned directory
- Deploy the Java RSocket Server and spin up both the Java RSocket client and the Javascript RSocket client by running:
mvn spring-boot:run
What Can I Do Now?
The Java backend exposes the following APIs*:
- HTTP http://localhost:8080/socket/{author}: the HTTP request triggers the Java RSocket client to pull data from the Java RSocket server
- HTTP http://localhost:8080/tweets/{author}: the same API as above but without any socket interaction, pure SSE over HTTP (for comparison)
- WS
ws://localhost:8080/tweetsocket
- route:tweets.by.author
: WebSocket transport employed by the Javascript RSocket client to pull data from the Java RSocket server
The Javascript client can be seen in action by going to http://localhost:8080/index.html; it connects to the Java RSocket server directly to pull data from it in a request/stream fashion.
How To: Build Your App With RSocket
This section will help you develop an application like the demo one, using Java to host an RSocket server, and both Java and Javascript to pull data from the said server via RSocket clients.
NOTE: both Spring Boot 2.2 and RSocket are products under development at the time of writing this article, therefore the following steps might slightly change in the future!
Java RSocket Client and Java RSocket Server
In order to develop a working Java RSocket Server and Client implementation, go through the following steps.
- Generate the backbone of your project by heading to https://start.spring.io/, selecting Spring Boot 2.2.0 M6 as your Spring Boot version, and adding Spring Reactive Web and RSocket as selected dependencies.
- Replace the src/main/resources/application.properties file with an application.yml like the following:1
local
2server
3port8080
4spring
5rsocket
6server
7mapping-path /tweetsocket
8transport websocket
ws://localhost:8080/tweetsocket
endpoint available to incoming connections
- Create an RSocketConfiguration class inside the com.example.demo.configpackage:331
package com.example.demo.config;
2
3import reactor.core.publisher.Mono;
4
5import org.springframework.boot.autoconfigure.rsocket.RSocketProperties;
6import org.springframework.boot.web.server.LocalServerPort;
7import org.springframework.context.annotation.Bean;
8import org.springframework.context.annotation.Configuration;
9import org.springframework.messaging.rsocket.RSocketRequester;
10import org.springframework.messaging.rsocket.RSocketStrategies;
11
12import java.net.URI;
13
1415public class RSocketConfiguration {
1617private int port;
18
1920public Mono<RSocketRequester> rSocketRequester(
21RSocketStrategies rSocketStrategies,
22RSocketProperties rSocketProps) {
23return RSocketRequester.builder()
24.rsocketStrategies(rSocketStrategies)
25.connectWebSocket(getURI(rSocketProps));
26}
27
28private URI getURI(RSocketProperties rSocketProps) {
29return URI.create(String.format("ws://localhost:%d%s",
30port, rSocketProps.getServer().getMappingPath()));
31}
32
33}
- Create a simple Tweet POJO, like the one below, and put it inside the com.example.demo.domain package:541
package com.example.demo.domain;
2
3import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
4import com.fasterxml.jackson.databind.annotation.JsonSerialize;
5import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;
6import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;
7import io.netty.util.internal.ThreadLocalRandom;
8
9import java.time.LocalDate;
10import java.util.UUID;
11
12public class Tweet {
13private String id;
14private String author;
15private String body;
16using = LocalDateDeserializer.class) (
17using = LocalDateSerializer.class) (
18private LocalDate date;
19
20public Tweet() {}
21
22public Tweet(String author, String body) {
23this.id = UUID.randomUUID().toString();
24this.author = author;
25this.body = body;
26this.date = getRandomDate();
27}
28
29public static Tweet of(Tweet tweet) {
30return new Tweet(tweet.getAuthor(), tweet.getBody());
31}
32
33private LocalDate getRandomDate() {
34ThreadLocalRandom r = ThreadLocalRandom.current();
35return LocalDate.of(r.nextInt(1990, 2020), r.nextInt(1, 13), r.nextInt(1, 29));
36}
37
38public String getId() {
39return id;
40}
41
42public String getAuthor() {
43return author;
44}
45
46public String getBody() {
47return body;
48}
49
50public LocalDate getDate() {
51return date;
52}
53
54}
- Create a TweetRequest class to be used as a filter for tweets, and move it inside the com.example.demo.domain package:201
package com.example.demo.domain;
2
3public class TweetRequest {
4private String author;
5
6public TweetRequest() {}
7
8public TweetRequest(String author) {
9this.author = author;
10}
11
12public String getAuthor() {
13return author;
14}
15
16public void setAuthor(String author) {
17this.author = author;
18}
19
20}
- Create a TweetService class that will return an endless stream of Tweets, and put it inside the com.example.demo.servicepackage:x1
package com.example.demo.service;
2
3import com.example.demo.domain.Tweet;
4import reactor.core.publisher.Flux;
5
6import org.springframework.stereotype.Service;
7
8import java.time.Duration;
9import java.util.HashMap;
10import java.util.Map;
11
1213public class TweetService {
14
15private static final Map<String, Tweet> tweets = new HashMap<String, Tweet>() {
16{
17put("linustorvalds", new Tweet("Linus Torvalds", "Talk is cheap. Show me the code."));
18put("robertmartin", new Tweet("Robert Martin", "Truth can only be found in one place: the code."));
19put("martinfowler", new Tweet("Martin Fowler", "Any fool can write code that a computer can understand. Good programmers write code that humans can understand."));
20}
21};
22
23public Flux<Tweet> getByAuthor(String author) {
24return Flux
25.interval(Duration.ZERO, Duration.ofSeconds(1))
26.map(i -> Tweet.of(tweets.get(author)));
27}
28
29}
- Create a TweetSocketController class that sets up a route —
tweets.by.author
— for our socket to receive TweetRequests; place it inside the com.example.demo.api.rsocketpackage:251package com.example.demo.api.rsocket;
2
3import com.example.demo.domain.Tweet;
4import com.example.demo.domain.TweetRequest;
5import com.example.demo.service.TweetService;
6import reactor.core.publisher.Flux;
7
8import org.springframework.messaging.handler.annotation.MessageMapping;
9import org.springframework.stereotype.Controller;
10
1112public class TweetSocketController {
13
14private final TweetService service;
15
16public TweetSocketController(TweetService service) {
17this.service = service;
18}
19
20"tweets.by.author") (
21public Flux<Tweet> getByAuthor(TweetRequest request) {
22return service.getByAuthor(request.getAuthor());
23}
24
25}
- Finally, create a TweetController class that serves as a Rest controller to expose an SSE endpoint that triggers the Java RSocket client to pull data from the Java RSocket server; place it inside the com.example.demo.api.rest package:
You can test the end result by running mvn spring-boot:run
and heading to http://localhost:8080/tweets/linustorvalds.
You should see something like this:
data:image/s3,"s3://crabby-images/75739/757396ba661a3416fbcaf4a396b991da427033c5" alt=""
Javascript RSocket Client
It is now time to put together our Javascript RSocket client, so that the browser will be able to pull data directly from the Java RSocket server without ever issuing any HTTP request!
PREREQUISITES: npm and browserify must be installed!
- Create a new folder, named public, under src/main/resources and place the following index.htmlfile inside it — it’s pretty straightforward and shouldn’t need any explanation:251
<html>
2
3<head>
4<title>RSocket Demo</title>
5</head>
6
7<body>
8<h2>Filtering by Author</h2>
9<select id="author-filter">
10<option value="linustorvalds">Linus Torvalds</option>
11<option value="martinfowler">Martin Fowler</option>
12<option value="robertmartin">Robert Martin</option>
13</select>
14
15<h2>Messages</h2>
16<p>
17<ul id="messages">
18
19</ul>
20</p>
21
22<script src="app.js"></script>
23</body>
24
25</html>
- Inside the same folder, add the index.jsfile which represents our Javascript RSocket Client implementation:871
const {
2RSocketClient,
3JsonSerializer,
4IdentitySerializer
5} = require('rsocket-core');
6const RSocketWebSocketClient = require('rsocket-websocket-client').default;
7var client = undefined;
8
9function addErrorMessage(prefix, error) {
10var ul = document.getElementById("messages");
11var li = document.createElement("li");
12li.appendChild(document.createTextNode(prefix + error));
13ul.appendChild(li);
14}
15
16function addMessage(message) {
17var ul = document.getElementById("messages");
18
19var li = document.createElement("li");
20li.appendChild(document.createTextNode(JSON.stringify(message)));
21ul.appendChild(li);
22}
23
24function main() {
25if (client !== undefined) {
26client.close();
27document.getElementById("messages").innerHTML = "";
28}
29
30// Create an instance of a client
31client = new RSocketClient({
32serializers: {
33data: JsonSerializer,
34metadata: IdentitySerializer
35},
36setup: {
37// ms btw sending keepalive to server
38keepAlive: 60000,
39// ms timeout if no keepalive response
40lifetime: 180000,
41// format of `data`
42dataMimeType: 'application/json',
43// format of `metadata`
44metadataMimeType: 'message/x.rsocket.routing.v0',
45},
46transport: new RSocketWebSocketClient({
47url: 'ws://localhost:8080/tweetsocket'
48}),
49});
50
51// Open the connection
52client.connect().subscribe({
53onComplete: socket => {
54// socket provides the rsocket interactions fire/forget, request/response,
55// request/stream, etc as well as methods to close the socket.
56socket.requestStream({
57data: {
58'author': document.getElementById("author-filter").value
59},
60metadata: String.fromCharCode('tweets.by.author'.length) + 'tweets.by.author',
61}).subscribe({
62onComplete: () => console.log('complete'),
63onError: error => {
64console.log(error);
65addErrorMessage("Connection has been closed due to ", error);
66},
67onNext: payload => {
68console.log(payload.data);
69addMessage(payload.data);
70},
71onSubscribe: subscription => {
72subscription.request(2147483647);
73},
74});
75},
76onError: error => {
77console.log(error);
78addErrorMessage("Connection has been refused due to ", error);
79},
80onSubscribe: cancel => {
81/* call cancel() to abort */
82}
83});
84}
85
86document.addEventListener('DOMContentLoaded', main);
87document.getElementById('author-filter').addEventListener('change', main);
Whenever the page is loaded or the author-filter select list value is updated, the main()
function is invoked, which in turn disconnects any previously connected RSocket client and opens a new connection to ws://localhost:8080/tweetsocket.
Once a connection is successfully obtained, the request/stream interaction mode is used to obtain a stream of tweets over time, specifying the WebSocket route as the message metadata with MIME type message/x.rsocket.routing.v0
, and showing each received tweet on the page.
- Add the following package.json file inside the publicfolder:211
{
2"name": "rsocket-demo",
3"private": true,
4"description": "RSocket Demo",
5"version": "0.0.1",
6"repository": {
7"type": "git",
8"url": "https://github.com/dsibilio/rsocket-demo.git"
9},
10"license": "BSD-3-Clause",
11"dependencies": {
12"fbjs": "^0.8.12",
13"rsocket-core": "^0.0.10",
14"rsocket-flowable": "^0.0.10",
15"rsocket-tcp-server": "^0.0.10",
16"rsocket-types": "^0.0.10",
17"rsocket-websocket-client": "^0.0.10",
18"rsocket-websocket-server": "^0.0.10",
19"ws": "^5.2.1"
20}
21}
- Move inside the src/main/resources/publicdirectory and run
npm install
to download all the required dependencies - Without changing directory, run
browserify index.js > app.js
You can now deploy your application as you did earlier with mvn spring-boot:run
, and head over to http://localhost:8080/index.html to see the fruits of your labor and play around until you get bored!
data:image/s3,"s3://crabby-images/a04f8/a04f883a334783d049e65305b4df644d0eda1083" alt=""
Conclusion
RSocket could very well become the future of transport protocols as it rapidly transitions into a mature product, with an active community regularly contributing to its growth.
I hope you got the gist of it after reading this article, but if you still want to learn more don’t be afraid to head over to:
…or dig some more into my rsocket-demo source code!
Further Reading
Reactive Service-to-Service Communication With RSocket (Part 1)
Published at DZone with permission of Domenico Sibilio. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments