DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Related

  • WebSocket vs. Server-Sent Events: Choosing the Best Real-Time Communication Protocol
  • Reactive Event Streaming Architecture With Kafka, Redis Streams, Spring Boot, and HTTP Server-Sent Events (SSE)
  • Server-Sent Events Using Spring
  • Reactive Messaging Examples for Quarkus

Trending

  • Resilience Pattern: Circuit Breaker
  • Designing AI Multi-Agent Systems in Java
  • Modern Test Automation With AI (LLM) and Playwright MCP
  • The Rise of Self‐Service Platforms: How Cloud Development Environments Are Reshaping Dev Culture

Micronaut Mastery: Consuming Server-Sent Events (SSE)

Let's take a look at how to consume SSEs from the server itself in microservices with Micronaut.

By 
Hubert Klein Ikkink user avatar
Hubert Klein Ikkink
·
Oct. 03, 18 · Tutorial
Likes (6)
Comment
Save
Tweet
Share
7.0K Views

Join the DZone community and get the full member experience.

Join For Free

Normally, we would consume server-sent events (SSE) in a web browser, but we can also consume them in our code on the server. Micronaut has a low-level HTTP client with a SseClient interface that we can use to get server-sent events. The interface has an eventStream method with different arguments that return a Publisher type of the Reactive Streams API. We can use the RxSseClient interface to get back RxJava2 Flowable return type instead of Publisher type. We can also use Micronaut's declarative HTTP client, which we define using the @Client annotation that supports server-sent events with the correct annotation attributes.

In our example, we first create a controller in Micronaut to send out server-sent events. We must create method that returns a Publisher type with Event objects. These Event objects can contains some attributes like id and name, but also the actual object we want to send:

// File: src/main/java/mrhaki/ConferencesController.java
package mrhaki;

import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.sse.Event;
import io.reactivex.Flowable;

import java.util.concurrent.TimeUnit;

@Controller("/conferences")
public class ConferencesController {

    private final ConferenceRepository repository;

    public ConferencesController(final ConferenceRepository repository) {
        this.repository = repository;
    }

    /**
     * Send each second a random Conference.
     * 
     * @return Server-sent events each second where the event is a randomly
     * selected Conference object from the repository.
     */
    @Get("/random")
    Flowable<Event<Conference>> events() {
        final Flowable<Long> tick = Flowable.interval(1, TimeUnit.SECONDS);
        final Flowable<Conference> randomConferences = repository.random().repeat();

        return tick.zipWith(randomConferences, this::createEvent);
    }

    /**
     * Create a server-sent event with id, name and the Conference data.
     * 
     * @param counter Counter used as id for event.
     * @param conference Conference data as payload for the event.
     * @return Event with id, name and Conference object.
     */
    private Event<Conference> createEvent(Long counter, final Conference conference) {
        return Event.of(conference)
                    .id(String.valueOf(counter))
                    .name("randomEvent");
    }

}

Notice how easy it is in Micronaut to use server-sent events. Let's add a declarative HTTP client that can consume the server-sent events. We must set the processes attribute of the @Get annotation with the value text/event-stream. This way Micronaut can create an implementation of this interface with the correct code to consume server-sent events:

// File: src/main/java/mrhaki/ConferencesClient.java
package mrhaki;

import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.http.sse.Event;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

@Client("/conferences")
interface ConferencesSseClient {

    /**
     * Return Publisher with SSE containing Conference data.
     * We must set the processes attribute with the value
     * text/event-stream so Micronaut can generate an implementation
     * to support server-sent events.
     * We could also return Publisher implementation class
     * like Flowable or Flux, Micronaut will do the conversion.
     * 
     * @return Publisher with Event objects with Conference data.
     */
    @Get(value = "/random", processes = MediaType.TEXT_EVENT_STREAM)
    Publisher<Event<Conference>> randomEvents();

    /**
     * Here we use a Publisher implementation Flux. Also we don't
     * add the Event in the return type: Micronaut will leave out
     * the event metadata and we get the data that is part of
     * the event as object.
     *
     * @return Flux with Conference data.
     */
    @Get(value = "/random", processes = MediaType.TEXT_EVENT_STREAM)
    Flux<Conference> randomConferences();

}

Next we create a Spock specification to test our controller with server-sent events. In the specification we use the low-level HTTP client and the declarative client:

// File: src/test/groovy/mrhaki/ConferencesControllerSpec.groovy
package mrhaki

import io.micronaut.context.ApplicationContext
import io.micronaut.http.client.sse.RxSseClient
import io.micronaut.http.client.sse.SseClient
import io.micronaut.http.sse.Event
import io.micronaut.runtime.server.EmbeddedServer
import io.reactivex.Flowable
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification

class ConferencesControllerSpec extends Specification {

    @Shared
    @AutoCleanup
    private EmbeddedServer server = ApplicationContext.run(EmbeddedServer)

    /**
     * Low level client to interact with server
     * that returns server side events, that supports
     * RxJava2.
     */
    @Shared
    @AutoCleanup
    private RxSseClient sseLowLevelClient =
            server.applicationContext
                  .createBean(RxSseClient, server.getURL())

    /**
     * Declarative client for interacting
     * with server that send server side events.
     */
    @Shared
    private ConferencesSseClient sseClient =
            server.applicationContext
                  .getBean(ConferencesSseClient)

    void "test event stream with low level SSE client"() {
        when:
        // Use eventStream method of RxSseClient to get SSE
        // and convert data in event to Conference objects by 
        // setting second argument to Conference.class.
        final List<Event<Conference>> result =
                sseLowLevelClient.eventStream("/conferences/random", Conference.class)
                                 .take(2)
                                 .toList()
                                 .blockingGet()

        then:
        result.name.every { name -> name == "randomEvent" }
        result.id == ["0", "1"]
        result.data.every { conference -> conference instanceof Conference }
    }

    void "test event stream with declarative SSE client"() {
        when:
        // Use declarative client (using @Client)
        // with SSE support.
        List<Event<Conference>> result =
                Flowable.fromPublisher(sseClient.randomEvents())
                        .take(2)
                        .toList()
                        .blockingGet();

        then:
        result.name.every { name -> name == "randomEvent" }
        result.id == ["0", "1"]
        result.data.every { conference -> conference instanceof Conference }
    }

    void "test conference stream with declarative SSE client"() {
        when:
        // Use declarative client (using @Client)
        // with built-in extraction of data in event.
        List<Conference> result =
                sseClient.randomConferences()
                         .take(2)
                         .collectList()
                         .block();

        then:
        result.id.every(Closure.IDENTITY) // Check every id property is set.
        result.every { conference -> conference instanceof Conference }
    }
}

Written with Micronaut 1.0.0.RC1.

Server-sent events Event

Published at DZone with permission of Hubert Klein Ikkink, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • WebSocket vs. Server-Sent Events: Choosing the Best Real-Time Communication Protocol
  • Reactive Event Streaming Architecture With Kafka, Redis Streams, Spring Boot, and HTTP Server-Sent Events (SSE)
  • Server-Sent Events Using Spring
  • Reactive Messaging Examples for Quarkus

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • [email protected]

Let's be friends: