DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Reactive Event Streaming Architecture With Kafka, Redis Streams, Spring Boot, and HTTP Server-Sent Events (SSE)
  • Introduction to Apache Kafka With Spring
  • Build Reactive REST APIs With Spring WebFlux
  • Server-Sent Events Using Spring

Trending

  • Secrets Sprawl and AI: Why Your Non-Human Identities Need Attention Before You Deploy That LLM
  • Exploring Intercooler.js: Simplify AJAX With HTML Attributes
  • How To Introduce a New API Quickly Using Quarkus and ChatGPT
  • Docker Base Images Demystified: A Practical Guide
  1. DZone
  2. Coding
  3. Frameworks
  4. Server-sent Events With RxJava and SseEmitter

Server-sent Events With RxJava and SseEmitter

The new release of Spring introduces features to make server-sent events simple.

By 
Tomasz Nurkiewicz user avatar
Tomasz Nurkiewicz
DZone Core CORE ·
Aug. 03, 15 · Tutorial
Likes (3)
Comment
Save
Tweet
Share
18.1K Views

Join the DZone community and get the full member experience.

Join For Free

Spring framework 4.2 GA is almost released, let's look at some new features it provides. The one that got my attention is a simple new class SseEmitter - an abstraction over sever-sent events easily used in Spring MVC controllers. SSE is a technology that allows you to stream data from server to the browser within one HTTP connection in one direction. It sounds like a subset of what websockets can do. However since it's a much simpler protocol, it may be used when full-duplex is not necessary, e.g. pushing stock price changes in real-time or showing progress of long-running process. This is going to be our example.


Imagine we have a virtual coin miner with the following API:

public interface CoinMiner {
 
    BigDecimal mine() {
        //...
    }
}


Every time we call mine() we have to wait few seconds and we get around 1 coin in return (on average). If we want to mine multiple coins, we have to call this method multiple times:

@RestController
public class MiningController {
 
    //...
 
    @RequestMapping("/mine/{count}")
    void mine(@PathVariable int count) {
        IntStream
                .range(0, count)
                .forEach(x -> coinMiner.mine());
    }
 
}

Client code must supply ExecutorService explicitly (just a design choice):

@RequestMapping("/mine/{count}")
void mine(@PathVariable int count) {
    final List<CompletableFuture<BigDecimal>> futures = IntStream
            .range(0, count)
            .mapToObj(x -> coinMiner.mineAsync(executorService))
            .collect(toList());
    futures.forEach(CompletableFuture::join);
}

It's insanely important to first call mineAsync multiple times and then (as a second stage) wait for all futures to complete with join. It's tempting to write:

IntStream
        .range(0, count)
        .mapToObj(x -> coinMiner.mineAsync(executorService))
        .forEach(CompletableFuture::join);

However due to lazy nature of streams in Java 8, that tasks will be executed sequentially! If you don't grok laziness of streams yet, always read them from bottom to top: we ask to join some future so stream goes up and calls mineAsync()just once (lazy!), passing it to join(). When this join() finishes, it goes up again asking for another Future. By using collect() we force all mineAsync() executions, starting all asynchronous computations. Later on we wait for each and every one of them.


Introducing SseEmitter

Now it's time to be more reactive (there, I said it). Controller can return an instance of SseEmitter. Once we return from a handler method, container thread is released and can serve more upcoming requests. But the connection is not closed and the client keeps waiting! What we should do is keep a reference of SseEmitter instance and call its send() and complete methods later, from a different thread. For example we can start a long-running process and keep send()-ing progress from arbitrary threads. Once the process is done, we complete() the SseEmitter and finally the HTTP connection is closed (at least logically, remember about Keep-alive). In example below we have a bunch of CompletableFutures and when each completes, we simply send 1 to the client (notifyProgress()). When all futures are done we complete the stream thenRun(sseEmitter::complete), closing the connection:

@RequestMapping("/mine/{count}")
SseEmitter mine(@PathVariable int count) {
    final SseEmitter sseEmitter = new SseEmitter();
    final List<CompletableFuture<BigDecimal>> futures = mineAsync(count);
    futures.forEach(future ->
            future.thenRun(() -> notifyProgress(sseEmitter)));
 
    final CompletableFuture[] futuresArr = futures.toArray(new CompletableFuture[futures.size()]);
    CompletableFuture
            .allOf(futuresArr)
            .thenRun(sseEmitter::complete);
 
    return sseEmitter;
}
 
private void notifyProgress(SseEmitter sseEmitter) {
    try {
        sseEmitter.send(1);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}
 
private List<CompletableFuture<BigDecimal>> mineAsync(@PathVariable int count) {
    return IntStream
            .range(0, count)
            .mapToObj(x -> coinMiner.mineAsync(executorService))
            .collect(toList());
}

Calling this method results with the following response (notice Content-Type):

< HTTP/1.1 200 OK
< Content-Type: text/event-stream;charset=UTF-8
< Transfer-Encoding: chunked
< 
data:1
 
data:1
 
data:1
 
data:1
 
* Connection #0 to host localhost left intact

We will learn later how to interpret such response on the client side. For the time being let's clean up the design a little bit.


Introducing RxJava with Observable progress

Code above works, but looks quite messy. What we actually have is a series of events, each representing progress of computation. Computation finally finishes, so the stream should also signal end. Sounds exactly like Observable! We start by refactoring CoinMiner in order to return Observable<BigDecimal>:

Observable<BigDecimal> mineMany(int count, ExecutorService executorService) {
    final ReplaySubject<BigDecimal> subject = ReplaySubject.create();
    final List<CompletableFuture<BigDecimal>> futures = IntStream
            .range(0, count)
            .mapToObj(x -> mineAsync(executorService))
            .collect(toList());
    futures
            .forEach(future ->
                    future.thenRun(() -> subject.onNext(BigDecimal.ONE)));
 
    final CompletableFuture[] futuresArr = futures.toArray(new CompletableFuture[futures.size()]);
    CompletableFuture
            .allOf(futuresArr)
            .thenRun(subject::onCompleted);
 
    return subject;
}


Every time an event appears in Observable returned from mineMany(), we just mined that many coins. When all futures are done, we complete the stream as well. This doesn't look much better yet on the implementation side, but look how clean it is from the controller's perspective:

@RequestMapping("/mine/{count}")
SseEmitter mine(@PathVariable int count) {
    final SseEmitter sseEmitter = new SseEmitter();
    coinMiner
            .mineMany(count, executorService)
            .subscribe(
                    value -> notifyProgress(sseEmitter),
                    sseEmitter::completeWithError,
                    sseEmitter::complete
            );
    return sseEmitter;
}


After calling coinMiner.mineMany() we simply subscribe to events. Turns out Observable andSseEmitter methods match 1:1. What happens here is pretty self-explanatory: start asynchronous computation and every time the background computation signals any progress, forward it to the client. OK, let's go back to the implementation. It looks messy because we mix CompletableFuture andObservable. I already described how to convert CompletableFuture into an Observable with just one element. Here is a recap, including rx.Single abstraction found since RxJava 1.0.13 (not used here):

public class Futures {
 
    public static <T> Observable<T> toObservable(CompletableFuture<T> future) {
        return Observable.create(subscriber ->
                future.whenComplete((result, error) -> {
                    if (error != null) {
                        subscriber.onError(error);
                    } else {
                        subscriber.onNext(result);
                        subscriber.onCompleted();
                    }
                }));
    }
 
    public static <T> Single<T> toSingle(CompletableFuture<T> future) {
        return Single.create(subscriber ->
                future.whenComplete((result, error) -> {
                    if (error != null) {
                        subscriber.onError(error);
                    } else {
                        subscriber.onSuccess(result);
                    }
                }));
    }
 
}

Having these utility operators somewhere we can improve implementation and avoid mixing two APIs:

Observable<BigDecimal> mineMany(int count, ExecutorService executorService) {
    final List<Observable<BigDecimal>> observables = IntStream
            .range(0, count)
            .mapToObj(x -> mineAsync(executorService))
            .collect(toList());
    return Observable.merge(observables);
}
 
Observable<BigDecimal> mineAsync(ExecutorService executorService) {
    final CompletableFuture<BigDecimal> future = 
         CompletableFuture.supplyAsync(this::mine, executorService);
    return Futures.toObservable(future);
}

RxJava has a built-in operator for merging multiple Observables into one, it doesn't matter that each of our underlying Observables emit just one event.


Deep-dive into RxJava operators

Let's use the power of RxJava to improve our streaming a little bit.


scan()

Currently every time we mine one coin, we send(1) event to the client. This means that every client has to track how many coins it already received in order to calculate total calculated amount. Would be nice if server was always sending total amount rather than deltas. However we don't want to change the implementation. Turns out it's pretty straightforward with Observable.scan() operator:

@RequestMapping("/mine/{count}")
SseEmitter mine(@PathVariable int count) {
    final SseEmitter sseEmitter = new SseEmitter();
    coinMiner
            .mineMany(count, executorService)
            .scan(BigDecimal::add)
            .subscribe(
                    value -> notifyProgress(sseEmitter, value),
                    sseEmitter::completeWithError,
                    sseEmitter::complete
            );
    return sseEmitter;
}
 
private void notifyProgress(SseEmitter sseEmitter, BigDecimal value) {
    try {
        sseEmitter.send(value);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

scan() operator takes previous event and current one, combing them together. By applyingBigDecimal::add we simply add all numbers. E.g. 1, 1 + 1, (1 + 1) + 1, and so on. scan() is likeflatMap(), but keeps intermediate values.
 

Sampling with sample()

It might be the case that our back-end service produces way too many progress updates then we can consume. We don't want to flood client with irrelevant updates and saturate bandwidth. Sending an update at most twice a second sounds reasonable. Luckily RxJava has a built-in operator for that as well:

Observable<BigDecimal> obs = coinMiner.mineMany(count, executorService);
obs
        .scan(BigDecimal::add)
        .sample(500, TimeUnit.MILLISECONDS)
        .subscribe(
            //...
        );


sample() will periodically look at the underlying stream and emit the most recent item only, discarding intermediate ones. Fortunately we aggregate items on-the-fly with scan() so we don't loose any updates.

window() - constant emit intervals

There is one catch though. sample() will not emit the same item twice if nothing new appeared within selected 500 milliseconds. It's fine, but remember we are pushing these updates over the TCP/IP connection. It's a good idea to periodically send an update to the client, even if nothing happened in the meantime - just to keep the connection alive, sort of a ping. There are probably many ways of achieving this requirement, e.g. involving timeout()operator. I chose grouping all events every 500 ms using window() operator:


Observable<BigDecimal> obs = coinMiner.mineMany(count, executorService);
obs
        .window(500, TimeUnit.MILLISECONDS)
        .flatMap(window -> window.reduce(BigDecimal.ZERO, BigDecimal::add))
        .scan(BigDecimal::add)
        .subscribe(
            //...
        );

This one is tricky. First we group all progress updates in 500 millisecond windows. Then we calculate total (similar to scan()) of coins mined within this time period using reduce. If no coins were mined in that period, we simply return ZERO. We use scan() in the end to aggregate sub-totals of every window. We no longer need sample() since window() makes sure an event is emitted every 500 ms.


Client-side

There is a lot of examples of SSE usage in JavaScript, so just to give you a quick solution calling our controller:

var source = new EventSource("/mine/10");
source.onmessage = function (event) {
    console.info(event);
};


I believe SseEmitter is a major improvement in Spring MVC, which will allow us to write more robust and faster web applications requiring instant one-directional updates.

Event Server-sent events Spring Framework Stream (computing) COinS

Published at DZone with permission of Tomasz Nurkiewicz, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Reactive Event Streaming Architecture With Kafka, Redis Streams, Spring Boot, and HTTP Server-Sent Events (SSE)
  • Introduction to Apache Kafka With Spring
  • Build Reactive REST APIs With Spring WebFlux
  • Server-Sent Events Using Spring

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: