Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

A Reactive Emoji Tracker With WebClient and Reactor: Consuming SEE

DZone's Guide to

A Reactive Emoji Tracker With WebClient and Reactor: Consuming SEE

This lesson in reactive programming using Project Reactor and Spring WebClient focuses on an infinite stream of SSEs and the most important tech of all time: emojis.

· Java Zone ·
Free Resource

Get the Edge with a Professional Java IDE. 30-day free trial.

In this article, we will learn how to consume an infinite SSE (server-sent events) stream with Spring's WebClient and Project Reactor. WebClient is a new HTTP client in Spring 5, entirely asynchronous and natively supporting the Flux and Mono types. You can technically open thousands of concurrent HTTP connections with just a handful of threads. In standard RestTemplate, one HTTP connection always needs at least one thread.

As an example, let's connect to this cute little site called emojitracker.com. It shows emojis being used in real time on Twitter. Looks quite cool! All credit goes to Matthew Rothenberg, the creator of that site. It's very dynamic, so there obviously has to be some push mechanism underneath. I wore my hacker glasses, and after hours of penetration testing, I discovered the following URL in Chrome DevTools: http://emojitrack-gostreamer.herokuapp.com/subscribe/eps. If you connect to it, you'll get a fast stream of emoji counters:

$ curl -v http://emojitrack-gostreamer.herokuapp.com/subscribe/eps
> GET /subscribe/eps HTTP/1.1
> Host: emojitrack-gostreamer.herokuapp.com
> User-Agent: curl/7.54.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< Connection: keep-alive
< Content-Type: text/event-stream; charset=utf-8
< Transfer-Encoding: chunked
<
data:{"1F3C6":1,"1F440":1,"1F64F":1}

data:{"267B":1}

data:{"1F4B0":1}

data:{"267B":2}

data:{"1F49B":1,"1F612":1}

data:{"1F331":1,"1F44D":1,"1F49E":1,"1F4F9":1,"1F51E":1,"1F525":1}

data:{"1F609":1}

data:{"2764":1}

data:{"1F331":1,"267B":2}

data:{"1F498":1,"1F60A":1}


Dozens of data points per second, ready to be consumed via convenient SSE stream. Each event represents the number of emojis that appeared on Twitter since the last event. For example, {"1F604":1,"267B":2} means:

Image title

 once and ♻ twice.

We would like to read this stream in Java efficiently and make something useful out of it. Well, maybe not useful (it's emojis after all), but at least fun. Consuming SSE streams with WebClient is pretty simple:

import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.reactive.function.client.WebClient;

public static void main(String[] args) throws InterruptedException {
    final Flux<ServerSentEvent> stream = WebClient
            .create("http://emojitrack-gostreamer.herokuapp.com")
            .get().uri("/subscribe/eps")
            .retrieve()
            .bodyToFlux(ServerSentEvent.class);

    stream.subscribe(sse -> log.info("Received: {}", sse));

    TimeUnit.MINUTES.sleep(10);
}


sleep(10) is important. Otherwise, the application terminates immediately because the only non-daemon thread (main) dies. In web applications, this is not a problem.

At this point, you'll see a bunch of logs appearing on your console:

Received: ServerSentEvent [... data={1F1EC-1F1E7=1, 1F614=1, 2764=1}]
Received: ServerSentEvent [... data={1F49C=1}]
Received: ServerSentEvent [... data={1F605=1, 1F60D=1, 1F60E=1, 2665=1}]
Received: ServerSentEvent [... data={267B=2}]
Received: ServerSentEvent [... data={1F1FA-1F1F8=1, 1F34B=1, 1F604=1, 1F608=1, 1F60A=1, 25B6=1}]
Received: ServerSentEvent [... data={1F525=1, 1F602=1, 25B6=1, 2705=1, 274C=1}]
Received: ServerSentEvent [... data={267B=1}]


Being able to connect to the live SSE stream, let's apply some transformations on top of it. First of all, we would like to parse the JSON data inside of each message pushed from the server:

final Flux<Map<String, Integer>> stream = WebClient
         //...see above for missing lines...
        .bodyToFlux(ServerSentEvent.class)
        .flatMap(e -> Mono.justOrEmpty(e.data()))
        .map(x -> (Map<String, Integer>)x);


There's no JSON parsing, Spring does its magic for us! At this point, we have a stream of Map<String, Integer> instances, not raw ServerSentEvent classes. Two caveats: First of all we need flatMap(e -> Mono.justOrEmpty(e.data())), rather than just a simple map(ServerSentEvent::data), because ServerSentEvent.data() sometimes returns null. Secondly, .map(x -> (Map<String, Integer>)x) needs to be used as opposed to a simple .cast(Map.class) because of type erasure.

Alright, our stream is a bit too complex right now. Rather than having three-dimensional data (event contains map, map contains entries, entries contain count), we'd like to have a single event per each emoji appearance. Easy!

final Flux<Map.Entry<String, Integer>> stream = WebClient
         //...see above for missing lines...
        .flatMap(e -> Mono.justOrEmpty(e.data()))
        .map(x -> (Map<String, Integer>) x)
        .flatMapIterable(Map::entrySet);


We get a stream of map entries (Map.Entry<String, Integer>), then...

final Flux<String> stream = WebClient
         //...see above for missing lines...
        .map(x -> (Map<String, Integer>) x)
        .flatMapIterable(Map::entrySet)
        .flatMap(entry -> Flux.just(entry.getKey()).repeat(entry.getValue()));


With just few lines of code, we transformed one event: {"1F604":1,"267B":2} into three: "1F604", "267B", "267B". I was feeling a bit guilty at this point, reverse-engineering the emojitracker.com. Then I discovered that the source code of the website is on GitHub and the API is documented. Moreover, there is already an endpoint that sends individual emojis, as opposed to aggregated JSON maps:

$ curl -v http://emojitrack-gostreamer.herokuapp.com/subscribe/raw
> GET /subscribe/raw HTTP/1.1
> Host: emojitrack-gostreamer.herokuapp.com
> User-Agent: curl/7.54.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< Connection: keep-alive
< Content-Type: text/event-stream; charset=utf-8
< Transfer-Encoding: chunked
< 
data:1F604

data:267B

data:2665

data:1F60E

...


You know what they say, hours of coding can save you from minutes of reading the documentation. But we had fun! The full source code we have so far looks as follows:

final Flux<String> stream = WebClient
        .create("http://emojitrack-gostreamer.herokuapp.com")
        .get().uri("/subscribe/eps")
        .retrieve()
        .bodyToFlux(ServerSentEvent.class)
        .flatMap(e -> Mono.justOrEmpty(e.data()))
        .map(x -> (Map<String, Integer>) x)
        .flatMapIterable(Map::entrySet)
        .flatMap(entry -> Flux.just(entry.getKey()).repeat(entry.getValue()));

stream.subscribe(sse -> log.info("Received: {}", sse));

TimeUnit.SECONDS.sleep(10);


In the next part, we will parse the emoji data even further and run some aggregations on top of it — all using Flux's magic.

Get the Java IDE that understands code & makes developing enjoyable. Level up your code with IntelliJ IDEA. Download the free trial.

Topics:
java ,reactive programming ,sse ,spring webclient ,project reactor ,emojis ,tutorial

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}