Micronaut Mastery: Consuming Server-Sent Events (SSE)
Let's take a look at how to consume SSEs from the server itself in microservices with Micronaut.
Join the DZone community and get the full member experience.
Join For FreeNormally, we would consume server-sent events (SSE) in a web browser, but we can also consume them in our code on the server. Micronaut has a low-level HTTP client with a SseClient
interface that we can use to get server-sent events. The interface has an eventStream
method with different arguments that return a Publisher
type of the Reactive Streams API. We can use the RxSseClient
interface to get back RxJava2 Flowable
return type instead of Publisher
type. We can also use Micronaut's declarative HTTP client, which we define using the @Client
annotation that supports server-sent events with the correct annotation attributes.
In our example, we first create a controller in Micronaut to send out server-sent events. We must create method that returns a Publisher
type with Event
objects. These Event
objects can contains some attributes like id
and name
, but also the actual object we want to send:
// File: src/main/java/mrhaki/ConferencesController.java
package mrhaki;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.sse.Event;
import io.reactivex.Flowable;
import java.util.concurrent.TimeUnit;
@Controller("/conferences")
public class ConferencesController {
private final ConferenceRepository repository;
public ConferencesController(final ConferenceRepository repository) {
this.repository = repository;
}
/**
* Send each second a random Conference.
*
* @return Server-sent events each second where the event is a randomly
* selected Conference object from the repository.
*/
@Get("/random")
Flowable<Event<Conference>> events() {
final Flowable<Long> tick = Flowable.interval(1, TimeUnit.SECONDS);
final Flowable<Conference> randomConferences = repository.random().repeat();
return tick.zipWith(randomConferences, this::createEvent);
}
/**
* Create a server-sent event with id, name and the Conference data.
*
* @param counter Counter used as id for event.
* @param conference Conference data as payload for the event.
* @return Event with id, name and Conference object.
*/
private Event<Conference> createEvent(Long counter, final Conference conference) {
return Event.of(conference)
.id(String.valueOf(counter))
.name("randomEvent");
}
}
Notice how easy it is in Micronaut to use server-sent events. Let's add a declarative HTTP client that can consume the server-sent events. We must set the processes
attribute of the @Get
annotation with the value text/event-stream
. This way Micronaut can create an implementation of this interface with the correct code to consume server-sent events:
// File: src/main/java/mrhaki/ConferencesClient.java
package mrhaki;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.http.sse.Event;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@Client("/conferences")
interface ConferencesSseClient {
/**
* Return Publisher with SSE containing Conference data.
* We must set the processes attribute with the value
* text/event-stream so Micronaut can generate an implementation
* to support server-sent events.
* We could also return Publisher implementation class
* like Flowable or Flux, Micronaut will do the conversion.
*
* @return Publisher with Event objects with Conference data.
*/
@Get(value = "/random", processes = MediaType.TEXT_EVENT_STREAM)
Publisher<Event<Conference>> randomEvents();
/**
* Here we use a Publisher implementation Flux. Also we don't
* add the Event in the return type: Micronaut will leave out
* the event metadata and we get the data that is part of
* the event as object.
*
* @return Flux with Conference data.
*/
@Get(value = "/random", processes = MediaType.TEXT_EVENT_STREAM)
Flux<Conference> randomConferences();
}
Next we create a Spock specification to test our controller with server-sent events. In the specification we use the low-level HTTP client and the declarative client:
// File: src/test/groovy/mrhaki/ConferencesControllerSpec.groovy
package mrhaki
import io.micronaut.context.ApplicationContext
import io.micronaut.http.client.sse.RxSseClient
import io.micronaut.http.client.sse.SseClient
import io.micronaut.http.sse.Event
import io.micronaut.runtime.server.EmbeddedServer
import io.reactivex.Flowable
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
class ConferencesControllerSpec extends Specification {
@Shared
@AutoCleanup
private EmbeddedServer server = ApplicationContext.run(EmbeddedServer)
/**
* Low level client to interact with server
* that returns server side events, that supports
* RxJava2.
*/
@Shared
@AutoCleanup
private RxSseClient sseLowLevelClient =
server.applicationContext
.createBean(RxSseClient, server.getURL())
/**
* Declarative client for interacting
* with server that send server side events.
*/
@Shared
private ConferencesSseClient sseClient =
server.applicationContext
.getBean(ConferencesSseClient)
void "test event stream with low level SSE client"() {
when:
// Use eventStream method of RxSseClient to get SSE
// and convert data in event to Conference objects by
// setting second argument to Conference.class.
final List<Event<Conference>> result =
sseLowLevelClient.eventStream("/conferences/random", Conference.class)
.take(2)
.toList()
.blockingGet()
then:
result.name.every { name -> name == "randomEvent" }
result.id == ["0", "1"]
result.data.every { conference -> conference instanceof Conference }
}
void "test event stream with declarative SSE client"() {
when:
// Use declarative client (using @Client)
// with SSE support.
List<Event<Conference>> result =
Flowable.fromPublisher(sseClient.randomEvents())
.take(2)
.toList()
.blockingGet();
then:
result.name.every { name -> name == "randomEvent" }
result.id == ["0", "1"]
result.data.every { conference -> conference instanceof Conference }
}
void "test conference stream with declarative SSE client"() {
when:
// Use declarative client (using @Client)
// with built-in extraction of data in event.
List<Conference> result =
sseClient.randomConferences()
.take(2)
.collectList()
.block();
then:
result.id.every(Closure.IDENTITY) // Check every id property is set.
result.every { conference -> conference instanceof Conference }
}
}
Written with Micronaut 1.0.0.RC1.
Published at DZone with permission of Hubert Klein Ikkink, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments