Tutorial: Reactive Spring Boot, Part 8: Kotlin RSocket Server
Learn more about building a reactive Spring Boot app with Kotlin RSocket server.
Join the DZone community and get the full member experience.
Join For FreeThis is the eighth part of our tutorial showing how to build a Reactive application using Spring Boot, Kotlin, Java, and JavaFX. The original inspiration was a 70-minute live demo.
Here is everything you've missed so far:
Tutorial: Reactive Spring Boot, Part 1: Building a Kotlin REST Service
Tutorial: Reactive Spring Boot, Part 2: A REST Client for Reactive Streams
Tutorial: Reactive Spring Boot, Part 3: A JavaFX Spring Boot Application
Tutorial: Reactive Spring Boot, Part 4: A JavaFX Line Chart
Tutorial: Reactive Spring Boot, Part 5: Auto-Configuration for Shared Beans
Tutorial: Reactive Spring Boot, Part 6: Displaying Reactive Data
Tutorial: Reactive Spring Boot, Part 7: Subscribing Multiple Subscribers
In this lesson, we add a new back-end service in Kotlin, this time emitting the prices via RSocket, a protocol for reactive streams.
This blog post contains a video showing the process step-by-step and a textual walk-through (adapted from the transcript of the video) for those who prefer a written format.
This tutorial is a series of steps during which we will build a full Spring Boot application featuring a Kotlin back-end, a Java client and a JavaFX user interface.
At this point in the tutorial series, we’ve successfully created an end to end application that publishes prices from a Kotlin Spring Boot service and shows them on a JavaFX line chart. This uses HTTP and server-sent events. However, since this is a reactive application, we might want to choose a protocol that’s better suited to streaming data.
In this step, we’re going to create a service that produces price data via the RSocket protocol.
Creating an RSocket Controller
We’re going to make these changes in the Kotlin Spring Boot application that we created back in part one of this tutorial, our StockServiceApplication.kt. Our existing service has a REST Controller. We’re going to create a similar class for RSocket.
- Inside StockServiceApplication.kt, create a new class
RSocketController
. - Annotated it as a Spring Controller.
- Create a new Kotlin function that takes a single argument.
- (Tip: we can use the fun1 Live Template to get IntelliJ IDEA to create the outline of this function for us.)
- Call the function
prices
, the same as theRestController
function. This takes a String symbol and returns a Flux ofStockPrice
.
x
class StockServiceApplication
// main function here...
class RestController() {
// controller body here...
}
class RSocketController() {
fun prices(symbol: String): Flux<StockPrice> {
}
}
// StockPrice data class here
(Note: This code will not compile, yet the function needs to return something)
Introducing a Price Service
This prices function is going to look a lot like the prices function in the RestController
since it’s actually going to do the same thing. The only difference is it’s going to publish the prices in a different way. To reduce duplication, let’s introduce a price service that contains the shared logic.
- Add a
priceService
constructor parameter of typePriceService
. - (Tip: If we type
priceService
into the prices method body, we can press Alt+Enter on the red text and get IntelliJ IDEA to “create propertypriceService
as a constructor parameter”) - Create the
PriceService
class inside this same file. - (Tip: We can press Alt+Enter on the red
PriceService
type in the constructor and get IntelliJ IDEA to “Create classPriceService
” in this StockServiceApplication.kt file) - Annotate the
PriceService
with @Service.
x
class StockServiceApplication
// main function here...
// @RestController here...
class RSocketController(val priceService: PriceService) {
fun prices(symbol: String): Flux<StockPrice> {
}
}
class PriceService {
}
// StockPrice data class here...
Moving Shared Code Into the PriceService
- Create a function called
generatePrices
in the service class. - (Tip: if we call
priceService.generatePrices
from inside the prices function ofRSocketController
, we can press Alt+Enter on the red function call and get IntelliJ IDEA to generate the function for us.) - This function needs to take a symbol of type String, and return a
Flux
ofStockPrice
, the same as ourprices
functions. - The logic for this function already exists in
RestController.prices
, so copy the body of that function into the newgeneratePrices
function. - This needs the
randomStockPrice
function too, so copy this fromRestController
intoPriceService
. - Make sure the prices method of
RSocketController
callsgeneratePrices
and returns the results.
x
class StockServiceApplication
// main function here...
// @RestController here...
class RSocketController(val priceService: PriceService) {
fun prices(symbol: String): Flux<StockPrice> {
return priceService.generatePrices(symbol)
}
}
class PriceService {
fun generatePrices(symbol: String): Flux<StockPrice> {
return Flux
.interval(Duration.ofSeconds(1))
.map { StockPrice(symbol, randomStockPrice(), now()) }
}
private fun randomStockPrice(): Double {
return ThreadLocalRandom.current().nextDouble(100.0)
}
}
// StockPrice data class here...
Reducing Duplicated Code
Now that everything here is compiling, we can remove the duplicated code in the RestController
.
- Introduce a
priceService
constructor parameter to theRestController
. - Call
generatePrices
from insideRestController.prices
instead of generating the prices there. - Remove the
randomStockPrice
function insideRestController
since it’s not being used. - (Tip: we can press Alt+Enter on the grey
randomStockPrices
function name and select Safe delete to remove this. Or we can use Alt+Delete/⌘⌦ on the function name).
x
class RestController(val priceService: PriceService) {
value = ["/stocks/{symbol}"], (
produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun prices( symbol: String): Flux<StockPrice> {
return priceService.generatePrices(symbol)
}
}
class RSocketController(val priceService: PriceService) {
fun prices(symbol: String): Flux<StockPrice> {
return priceService.generatePrices(symbol)
}
}
class PriceService {
fun generatePrices(symbol: String): Flux<StockPrice> {
return Flux
.interval(Duration.ofSeconds(1))
.map { StockPrice(symbol, randomStockPrice(), now()) }
}
private fun randomStockPrice(): Double {
return ThreadLocalRandom.current().nextDouble(100.0)
}
}
Refactoring to Reduce Boilerplate
The prices functions on both the RestController
and the RSocketController
are now simply calling the PriceService, so all the common code is in one place. Kotlin allows us to simplify this code even further.
- Convert the
prices
function to an expression body and remove the declared return type. - (Tip: If we press Alt+Enter on the curly braces of the function, IntelliJ IDEA offers the option of “Convert to expression body”. Once we’ve done this, the return type will be highlighted and we can easily delete it.)
- Do this with both
prices
functions.
x
class RestController(val priceService: PriceService) {
value = ["/stocks/{symbol}"], (
produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun prices( symbol: String) = priceService.generatePrices(symbol)
}
class RSocketController(val priceService: PriceService) {
fun prices(symbol: String) = priceService.generatePrices(symbol)
}
Because this function is a simple delegation, this might be a more useful, and certainly shorter, way to write it.
Setting Up the Message Mapping
The RestController
function is annotated with GetMapping
, which sets up the URL for clients to connect to consume this stream of prices. We need to do something similar for the RSocketController
function.
- Add a MessageMapping annotation onto
RSocketController.prices
. - Add the dependency
spring-boot-starter-rsocket
to the pom.xml file. - (Tip: IntelliJ IDEA can help us here with code completion in pom.xml, or you can generate a dependency)
x
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
Back in our StockServiceApplication
file, we can add an import for MessageMapping
.
- Pass into the
@MessageMapping
annotation a String route so that clients can connect.
x
class RSocketController(val priceService: PriceService) {
"stockPrices") (
fun prices(symbol: String) = priceService.generatePrices(symbol)
}
Setting Up an RSocket Server
If we start the application now, we can see which servers have been started. At this time, we should only have Netty on port 8080. We want an RSocket server as well.
- Go to application.properties and define an RSocket server port as 7000.
xxxxxxxxxx
spring.rsocket.server.port=7000
Simply defining the port here is enough to get Spring Boot to start an RSocket server for us, so when we re-start the application, we will see a Netty RSocket server started on port 7000 (for an example, see the end of the video).
Now we have a prices service started on port 7000 ready for a client to connect to it to receive stock prices via RSocket. Stay tuned for the next lesson where we’ll connect to this server and consume the prices.
The full source code is available on GitHub.
Further Reading
Published at DZone with permission of Trisha Gee, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Trending
-
Demystifying SPF Record Limitations
-
Integration Architecture Guiding Principles, A Reference
-
Database Integration Tests With Spring Boot and Testcontainers
-
Prompt Engineering: Unlocking the Power of Generative AI Models
Comments