Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

RxJava: Detecting and Testing Stalled Streams

DZone's Guide to

RxJava: Detecting and Testing Stalled Streams

RxJava's time-oriented suite of capabilities can make it easy to determine and test for silences in your streams — and to make sure the streams still work.

· Java Zone
Free Resource

Try Okta to add social login, MFA, and OpenID Connect support to your Java app in minutes. Create a free developer account today and never build auth again.

Imagine you have a stream that publishes events with unpredictable frequency. Sometimes you can expect dozens of messages per second, but occasionally, no events can be seen for several seconds. This can be an issue if your stream is transmitted via WebSocket, SSE, or any other network protocol. A silent period taking too long (stall) can be interpreted as a network issue. Therefore, we often send artificial events (pings) once in a while just to make sure:

  • clients are still alive
  • let clients know we are still alive

For a more concrete example, imagine we have a Flowable<String> stream that produces some events. When there is no event for more than one second, we should send a placeholder "PING" message. When the silence is even longer, there should be a "PING" message every second. How can we implement such a requirement in RxJava? The most obvious, but incorrect, solution is to the merge original stream with pings:

Flowable<String> events = //...
Flowable<String> pings = Flowable
            .interval(1, SECONDS)
            .map(x -> "PING");

Flowable<String> eventsWithPings = events.mergeWith(pings);


The mergeWith() operator is crucial: It takes genuine events and combines them with a constant stream of pings. Surely, when no genuine events are presents, "PING" messages will appear. Unfortunately, they are entirely unrelated to the original stream. This means we keep sending pings even when there are plenty of normal events. Moreover, when the silence begins, we do not send "PING" precisely after one second. If you are OK with such a mechanism, you may stop reading here.

debounce() Operator

A more sophisticated approach requires discovering a silence that lasts for more than 1 second. We can use a timeout() operator for that. Unfortunately, it yields a TimeoutException and unsubscribes from upstream — way too aggressive behavior. We just want to get some sort of notification.

It turns out the debounce() operator can be used for that. Normally, this operator postpones the emission of new events just in case new events arrive, overriding the old ones. So if I say:

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);


This means the delayed stream will only emit an event if it was not followed by another event within 1 second. Technically, delayed may never emit anything if the events stream keeps producing events fast enough. We will use the delayed stream to discover silence in the following way:

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed.map(ev -> "PING");
Flowable<String> eventsWithPings = Flowable.merge(events, pings);


Keep in mind that there is no difference between mergeWith() and its static merge() counterpart. So we are getting somewhere. If the stream is busy, the delayed stream never receives any events, therefore no "PING" messages are sent. However, when the original stream does not send any events for more than 1 second, delayed receives the last seen event, ignores it, and transforms into "PING". Clever, but broken. This implementation only sends one "PING" after discovering a stall, as opposed to sending periodic pings every second. Fairly easy to fix! Rather than transforming the last-seen event into a single "PING", we can transform it into a sequence of periodic pings:

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed
        .flatMap(x -> Flowable
                .interval(0, 1, SECONDS)
                .map(e -> "PING")
        );
Flowable<String> eventsWithPings = Flowable.merge(events, pings);


Can you see where the flaw is? Every time a bit of silence appears in the original stream, we start emitting pings every second. However, we should stop doing so once some genuine events appear. We don't. Every stall in the upstream causes a new infinite stream of pings to appear on the final merged stream. We must somehow tell the pings stream that it should stop emitting pings because the original stream emitted a genuine event. Guess what? There is the takeUntil() operator that does just that!

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed
        .flatMap(x -> Flowable
                .interval(0, 1, SECONDS)
                .map(e -> "PING")
                .takeUntil(events)
        );
Flowable<String> eventsWithPings = Flowable.merge(events, pings);


Take a moment to fully grasp the above code snippet. The delayed stream emits an event every time nothing happens on the original stream for more than 1 second. The pings stream emits a sequence of "PING" events every second for each event emitted from delayed. However, the pings stream is terminated the moment an event appears on the events stream. You can even define all of this as a single expression:

Flowable<String> events = //...
Flowable<String> eventsWithPings = events
        .mergeWith(
                events
                        .debounce(1, SECONDS)
                        .flatMap(x1 -> Flowable
                                .interval(0, 1, SECONDS)
                                .map(e -> "PING")
                                .takeUntil(events)
                        ));


Testability

All right, we wrote all of this, but how are we supposed to test this triple-nested blob of event-driven code? How do we make sure that pings appear at the right moment and stop when the silence is over? How do we simulate various time-related scenarios? RxJava has many killer features, but testing how time passes through is probably the biggest one. First of all, let's make our pinging code a little bit more testable and generic:

<T> Flowable<T> withPings(Flowable<T> events, Scheduler clock, T ping) {
    return events
            .mergeWith(
                    events
                            .debounce(1, SECONDS, clock)
                            .flatMap(x1 -> Flowable
                                    .interval(0, 1, SECONDS, clock)
                                    .map(e -> ping)
                                    .takeUntil(events)
                            ));

}


This utility method takes an arbitrary stream of T and adds pings in case the stream doesn't produce any events for a longer period of time. We use it like this in our test:

PublishProcessor<String> events = PublishProcessor.create();
TestScheduler clock = new TestScheduler();
Flowable<String> eventsWithPings = withPings(events, clock, "PING");


Oh boy, PublishProcessor? TestScheduler? PublishProcessor is an interesting class that is a subtype of Flowable (so we can use it as an ordinary stream). On the other hand, we can imperatively emit events using its onNext() method:

events.onNext("A");


If someone listens to the events stream, they will receive the "A" event straight away. And what's with this clock thing? Every single operator in RxJava that deals with time in any way (e.g. debounce(), interval(), timeout(), window(), etc.) can take an optional Scheduler argument. It serves as an external source of time. Special TestScheduler is an artificial source of time that we have full control of. For instance, time stands still as long as we don't call advanceTimeBy() explicitly:

clock.advanceTimeBy(999, MILLISECONDS);


999 milliseconds is not a coincidence. Pings start to appear precisely after 1 second, so they should not be visible after 999 milliseconds. Now it's about time to reveal the full test case:

@Test
public void shouldAddPings() throws Exception {
    PublishProcessor<String> events = PublishProcessor.create();
    final TestScheduler clock = new TestScheduler();
    final Flowable<String> eventsWithPings = withPings(events, clock, "PING");

    final TestSubscriber<String> test = eventsWithPings.test();
    events.onNext("A");
    test.assertValues("A");

    clock.advanceTimeBy(999, MILLISECONDS);
    events.onNext("B");
    test.assertValues("A", "B");
    clock.advanceTimeBy(999, MILLISECONDS);
    test.assertValues("A", "B");

    clock.advanceTimeBy(1, MILLISECONDS);
    test.assertValues("A", "B", "PING");
    clock.advanceTimeBy(999, MILLISECONDS);
    test.assertValues("A", "B", "PING");

    events.onNext("C");
    test.assertValues("A", "B", "PING", "C");

    clock.advanceTimeBy(1000, MILLISECONDS);
    test.assertValues("A", "B", "PING", "C", "PING");
    clock.advanceTimeBy(999, MILLISECONDS);
    test.assertValues("A", "B", "PING", "C", "PING");

    clock.advanceTimeBy(1, MILLISECONDS);
    test.assertValues("A", "B", "PING", "C", "PING", "PING");
    clock.advanceTimeBy(999, MILLISECONDS);
    test.assertValues("A", "B", "PING", "C", "PING", "PING");

    events.onNext("D");
    test.assertValues("A", "B", "PING", "C", "PING", "PING", "D");

    clock.advanceTimeBy(999, MILLISECONDS);
    events.onNext("E");
    test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E");
    clock.advanceTimeBy(999, MILLISECONDS);
    test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E");

    clock.advanceTimeBy(1, MILLISECONDS);
    test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING");

    clock.advanceTimeBy(3_000, MILLISECONDS);
    test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING", "PING", "PING", "PING");
}


It looks like a wall of text, but it's actually a complete testing scenario of our logic. It makes sure pings appear precisely after 1000 milliseconds, are repeated when silence is very long, and quiet down when genuine events appear. But the most important part: The test is 100% predictable and blazingly fast. No Awaitility, busy waiting, polling, intermittent test failures, and slowness. An artificial clock that we have full control of makes sure all these combined streams work exactly as expected.

Build and launch faster with Okta’s user management API. Register today for the free forever developer edition!

Topics:
java ,rxjava ,streams ,java performance ,tutorial ,ping

Published at DZone with permission of Tomasz Nurkiewicz, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}