Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

RxJava: Generating Backpressure-Aware Streams

DZone's Guide to

RxJava: Generating Backpressure-Aware Streams

Make sure backpressure works for you throughout your entire system (without crashing) with this guide to proper RxJava use.

· Java Zone
Free Resource

Try Okta to add social login, MFA, and OpenID Connect support to your Java app in minutes. Create a free developer account today and never build auth again.

RxJava is missing a factory to create an infinite stream of natural numbers. Such a stream is useful, e.g. when you want to assign unique sequence numbers to a possibly infinite stream of events by zipping both of them:

Flowable<Long> naturalNumbers = //???

Flowable<Event> someInfiniteEventStream = //...
Flowable<Pair<Long, Event>> sequenced = Flowable.zip(
        naturalNumbers,
        someInfiniteEventStream,
        Pair::of
);


Implementing naturalNumbers is surprisingly complex. In RxJava 1.x, you could briefly get away with an Observable, which does not respect backpressure:

import rx.Observable;  //RxJava 1.x

Observable<Long> naturalNumbers = Observable.create(subscriber -> {
    long state = 0;
    //poor solution :-(
    while (!subscriber.isUnsubscribed()) {
        subscriber.onNext(state++);
    }
});


What does it mean that such stream is not backpressure-aware? Well, basically, the stream produces events (an ever-incrementing state variable) as fast as the CPU core permits, millions per second, easily. However, when consumers can't consume events that fast, a growing backlog of unprocessed events starts to appear:

naturalNumbers
//      .observeOn(Schedulers.io())
        .subscribe(
                x -> {
                    //slooow, 1 millisecond
                }
        );


The program above (with the observeOn() operator commented out) runs just fine because it has accidental backpressure. By default, everything is single threaded in RxJava, thus the producer and consumer work within the same thread. Invoking subscriber.onNext() actually blocks, so the while loop throttles itself automatically. But try uncommenting observeOn() and disaster happens a few milliseconds later. The subscription callback is single-threaded by design. For every element, it needs at least 1 millisecond, therefore this stream can process not more than 1000 events per second. We are somewhat lucky. RxJava quickly discovers this disastrous condition and fails fast with a MissingBackpressureException.

Our biggest mistake was producing events without taking into account how slow the consumer is. By the way, this is the core idea behind reactive streams: The producer is not allowed to emit more events than requested by the consumer. In RxJava 1.x, implementing even the simplest stream that respected backpressure from scratch was a non-trivial task. RxJava 2.x brought several convenient operators that built on top of the experience from previous versions. First of all, RxJava 2.x does not allow you to implement a Flowable (backpressure-aware) the same way as you can with an Observable. It's not possible to create a Flowable that overloads the consumer with messages:

Flowable<Long> naturalNumbers = Flowable.create(subscriber -> {
    long state = 0;
    while (!subscriber.isCancelled()) {
        subscriber.onNext(state++);
    }
}, BackpressureStrategy.DROP);


Did you spot this extra DROP parameter? Before we explain it, let's see the output when we subscribe with a slow consumer:

0
1
2
3
//...continuous numbers...
126
127
101811682
//...where did my 100M events go?!?
101811683
101811684
101811685
//...continuous numbers...
101811776
//...17M events disappeared again...
101811777
//...


Your mileage may vary. What happens? The observeOn() operator switches between schedulers (thread pools: a pool of threads that are hydrated from a queue of pending events. This queue is finite and has a capacity of 128 elements. The observeOn() operator, knowing about this limitation, only requests 128 elements from upstream (our custom Flowable). At this point, it lets our subscriber process the events, 1 per millisecond. So after around 100 milliseconds, observeOn() discovers its internal queue is almost empty and asks for more. Does it get 128, 129, 130...? No! Our Flowable was producing events like crazy during this 0.1 second period and it (astonishingly) managed to generate more than 100 million numbers in that time frame. Where did they go? Well, observeOn() was not asking for them, so the DROP strategy (a mandatory parameter) simply discarded unwanted events.

BackpressureStrategy

That doesn't sound right. Are there any other strategies? Yes, many:

  • BackpressureStrategy.BUFFER: If the upstream produces too many events, they are buffered in an unbounded queue. No events are lost, but your whole application most likely is. If you are lucky, OutOfMemoryError will save you. I got stuck on 5-second-long GC pauses.
  • BackpressureStrategy.ERROR: If over-production of events is discovered, MissingBackpressureException will be thrown. It's a sane (and safe) strategy.
  • BackpressureStrategy.LATEST: Similar to DROP, but it remembers the last dropped event. Just in case a request for more data comes in, but we just dropped everything — we at least have the last seen value.
  • BackpressureStrategy.MISSING: No safety measures, deal with it. Most likely one of the downstream operators (like observeOn()) will throw MissingBackpressureException.
  • BackpressureStrategy.DROP: drops events that were not requested.

By the way, when you are turning an Observable to a Flowable, you must also provide BackpressureStrategy. RxJava must know how to limit over-producing Observables. OK, so what is the correct implementation of such a simple stream of sequential natural numbers.

Meet Flowable.generate()

The difference between create() and generate() lies in responsibility. Flowable.create() is supposed to generate the stream in its entirety with no respect to backpressure. It simply produces events whenever it wishes to do so.

Flowable.generate(), on the other hand, is only allowed to generate one event at a time (or complete a stream). The backpressure mechanism transparently figures out how many events it needs at the moment.

generate() is called an appropriate number of times — for example, 128 times in the case of observeOn().

Because this operator produces events one at a time, typically it needs some sort of state to figure out where it was the last time. This is what generate() is: a holder for (im)mutable state and a function that generates the next event based on it:

Flowable<Long> naturalNumbers =
    Flowable.generate(() -> 0L, (state, emitter) -> {
        emitter.onNext(state);
        return state + 1;
    });


The first argument to generate() is an initial state (factory), 0L in our case. Now, every time a subscriber or any downstream operator asks for some number of events, the lambda expression is invoked. Its responsibility is to call onNext() at most once (emit at most one event) somehow based on the supplied state.

When the lambda is invoked for the first time, the state is equal to the initial value of 0L. However, we are allowed to modify the state and return its new value. In this example, we increment the long so that a subsequent invocation of the lambda expression receives the state = 1L. Obviously, this goes on and on, producing consecutive natural numbers.

Such a programming model is obviously harder than a while loop. It also fundamentally changes the way you implement your sources of events. Rather than pushing events whenever you feel like it, you are only passively waiting for requests. Downstream operators and subscribers are pulling data from your stream. This shift enables backpressure at all levels of your pipeline.

generate() has a few flavors. First of all, if your state is a mutable object, you can use an overloaded version that does not require returning a new state value. Despite being less functional , mutable state tends to produce way less garbage. This assumes your state is constantly mutated and the same state object instance is passed every time. For example, you can easily turn an Iterator (also pull-based!) into a stream with all the wonders of backpressure:

Iterator<Integer> iter = //...

Flowable<String> strings = Flowable.generate(() -> iter, (iterator, emitter) -> {
    if (iterator.hasNext()) {
        emitter.onNext(iterator.next().toString());
    } else {
        emitter.onComplete();
    }
}); 


Notice that the type of stream (<String>) doesn't have to be the same as the type of state (Iterator<Integer>). Of course, if you have a Java Collection and want to turn it into a stream, you don't have to create an iterator first. It's enough to use Flowable.fromIterable(). An even simpler version of generate() assumes you have no state at all. For example, a stream of random numbers:

Flowable<Double> randoms = Flowable
        .generate(emitter -> emitter.onNext(Math.random()));


But honestly, you will probably need an instance of Random after all:

Flowable.generate(Random::new, (random, emitter) -> {
    emitter.onNext(random.nextBoolean());
});


Summary

As you can see, Observable.create() in RxJava 1.x and Flowable.create() have some shortcomings. If you really care about scalability and the health of your heavily concurrent system (and otherwise you wouldn't be reading this!), you must be aware of backpressure. If you really need to create streams from scratch, as opposed to using the from*() family of methods or various libraries that do the heavy lifting, familiarize yourself with generate(). In essence, you must learn how to model certain types of data sources as fancy iterators. Expect more articles explaining how to implement more real-life streams.

This is similar to the stateless HTTP protocol that uses small pieces of state called session* on the server to keep track of past requests.

Build and launch faster with Okta’s user management API. Register today for the free forever developer edition!

Topics:
java ,rxjava ,backpressure ,streams ,concurrency ,tutorial

Published at DZone with permission of Tomasz Nurkiewicz, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}