Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

RxJava: Idiomatic Concurrency — flatMap() vs. parallel()

DZone's Guide to

RxJava: Idiomatic Concurrency — flatMap() vs. parallel()

When it comes to concurrent programming using the RxJava library, here's a breakdown of flatMap() and parallel() and some guidance as to which is the most helpful.

· Java Zone ·
Free Resource

Download Microservices for Java Developers: A hands-on introduction to frameworks and containers. Brought to you in partnership with Red Hat.

Simple, effective, and safe concurrency was one of the design principles of RxJava. Yet, ironically, it's probably one of the most misunderstood aspects of this library. Let's take a simple example: Imagine we have a bunch of UUIDs, and for each one of them, we must perform a set of tasks. The first problem is to perform I/O-intensive operations per each UUID, for example loading an object from a database:

Flowable<UUID> ids = Flowable
        .fromCallable(UUID::randomUUID)
        .repeat()
        .take(100);

ids.subscribe(id -> slowLoadBy(id));


First, I'm generating 100 random UUIDs just for the sake of testing. Then, for each UUID, I'd like to load a record using the following method:

Person slowLoadBy(UUID id) {
    //...
}


The implementation of slowLoadBy() is irrelevant — just keep in mind it's slow and blocking. Using subscribe() to invoke slowLoadBy() has many disadvantages:

  • subscribe() is single-threaded by design and there is no way around it. Each UUID is loaded sequentially.

  • When you call subscribe(), you cannot transform the Person object further. It's a terminal operation.

A more robust, and even more broken, approach is to map() each UUID:

Flowable<Person> people = ids
        .map(id -> slowLoadBy(id));  //BROKEN


This is very readable, but unfortunately broken. Operators, just like subscribers, are single-threaded. This means at any given time, only one UUID can be mapped — no concurrency is allowed here as well. To make matters worse, we are inheriting a thread/worker from upstream. This has several drawbacks. If the upstream produces events using some dedicated scheduler, we will hijack threads from that scheduler. For example, many operators, like interval(), use the Schedulers.computation() thread pool transparently. We suddenly start to perform I/O-intensive operations on a pool that is totally not suitable for that. Moreover, we slow down the whole pipeline with this one blocking, sequential step. Very, very bad.

You might have heard about this subscribeOn() operator and how it enables concurrency. Indeed, but you have to be very careful when applying it. The following sample is (again) wrong:

import io.reactivex.schedulers.Schedulers;


Flowable<Person> people = ids
        .subscribeOn(Schedulers.io())
        .map(id -> slowLoadBy(id)); //BROKEN


The code snippet above is still broken. subscribeOn() (and observeOn() for that matter) barely switch execution to a different worker (thread) without introducing any concurrency. The stream still sequentially processes all events, but on a different thread. In other words, rather than consuming events sequentially on a thread inherited from upstream, we now consume them sequentially on the io() thread. So what about this mythical flatMap() operator?

flatMap() Operator to the Rescue

The flatMap() operator enables concurrency by splitting a stream of events into a stream of sub-streams. But first, one more broken example:

Flowable<Person> asyncLoadBy(UUID id) {
    return Flowable.fromCallable(() -> slowLoadBy(id));
}

Flowable<Person> people = ids
        .subscribeOn(Schedulers.io())
        .flatMap(id -> asyncLoadBy(id)); //BROKEN


Oh gosh, this is still broken! The flatMap() operator logically does two things:

  • Applies the transformation (id -> asyncLoadBy(id)) on each upstream event. This produces Flowable<Flowable<Person>>. This makes sense — for each upstream UUID, we get a Flowable<Person>, so we end up with a stream of streams of Person objects.

  • Then, flatMap() tries to subscribe to all of these inner sub-streams at once. Whenever any of the sub-streams emit a Person event, it is transparently passed as an outcome of the outer Flowable.

Technically, flatMap() only creates and subscribes to the first 128 (the default of the optional maxConcurrency parameter) sub-streams. Also, when the last substream completes, the outer stream of Person completes as well. Now, why on Earth is this broken? RxJava doesn't introduce any thread pool unless explicitly asked for. For example, this piece of code is still blocking:

log.info("Setup");
Flowable<String> blocking = Flowable
        .fromCallable(() -> {
            log.info("Starting");
            TimeUnit.SECONDS.sleep(1);
            log.info("Done");
            return "Hello, world!";
        });
log.info("Created");
blocking.subscribe(s -> log.info("Received {}", s));
log.info("Done");


Look at the output carefully, especially on the order of events and threads involved:

19:57:28.847 | INFO  | main | Setup
19:57:28.943 | INFO  | main | Created
19:57:28.949 | INFO  | main | Starting
19:57:29.954 | INFO  | main | Done
19:57:29.955 | INFO  | main | Received Hello, world!
19:57:29.957 | INFO  | main | Done


No concurrency whatsoever, no extra threads. Merely wrapping blocking code in a Flowable doesn't magically add concurrency. You have to explicitly use... subscribeOn():

log.info("Setup");
Flowable<String> blocking = Flowable
        .fromCallable(() -> {
            log.info("Starting");
            TimeUnit.SECONDS.sleep(1);
            log.info("Done");
            return "Hello, world!";
        })
        .subscribeOn(Schedulers.io());
log.info("Created");
blocking.subscribe(s -> log.info("Received {}", s));
log.info("Done");


The output this time is more promising:

19:59:10.547 | INFO  | main | Setup
19:59:10.653 | INFO  | main | Created
19:59:10.662 | INFO  | main | Done
19:59:10.664 | INFO  | RxCachedThreadScheduler-1 | Starting
19:59:11.668 | INFO  | RxCachedThreadScheduler-1 | Done
19:59:11.669 | INFO  | RxCachedThreadScheduler-1 | Received Hello, world!


But we did use subscribeOn() last time, so what's going on? Well, subscribeOn() on the outer stream level basically said that all events should be processed sequentially, within this stream, on a different thread. We didn't say that there should many sub-streams running concurrently. And because all sub-streams are blocking, when RxJava tries to subscribe to all of them, it effectively subscribes sequentially to one after another. asyncLoadBy() is not really async, thus it blocks when the flatMap() operator tries to subscribe to it. The fix is easy. Normally, you would put subscribeOn() inside asyncLoadBy() but for educational purposes, I'll place it directly in the main pipeline:

Flowable<Person> people = ids
    .flatMap(id -> asyncLoadBy(id).subscribeOn(Schedulers.io()));


Now it works like a charm! By default, RxJava will take first 128 upstream events (UUIDs), turn them into sub-streams, and subscribe to all of them. If sub-streams are asynchronous and highly parallelizable (e.g. network calls), we get 128 concurrent invocations of asyncLoadBy(). The concurrency level (128) is configurable via the maxConcurrency parameter:

Flowable<Person> people = ids
    .flatMap(id ->
                asyncLoadBy(id).subscribeOn(Schedulers.io()),
                10  //maxConcurrency
    );


That was a lot of work, don't you think? Shouldn't concurrency be even more declarative? We no longer deal with Executors and futures, but still, it seems this approach is too error prone. Can't it be as simple as parallel() in Java 8 streams?

Enter ParallelFlowable

Let's look again at our example and make it even more complex by adding filter():

Flowable<Person> people = ids
        .map(this::slowLoadBy)     //BROKEN
        .filter(this::hasLowRisk); //BROKEN


Where hasLowRisk() is a slow predicate:

boolean hasLowRisk(Person p) {
    //slow...
}


We already know that the idiomatic approach to this problem is using flatMap() twice:

Flowable<Person> people = ids
        .flatMap(id -> asyncLoadBy(id).subscribeOn(io()))
        .flatMap(p -> asyncHasLowRisk(p).subscribeOn(io()));


asyncHasLowRisk() is rather obscure — it either returns a single-element stream when the predicate passes or an empty stream when it fails. This is how you emulate filter() using flatMap(). Can we do better? Introduced in RxJava 2.0.5, there is a new operator called... parallel()! It's quite surprising because an operator with the same name was removed before RxJava became 1.0 due to many misconceptions surrounding it, leading to it being misused. parallel() in 2.x seems to finally address the problem of idiomatic concurrency in a safe and declarative way. First, let's see some beautiful code!

Flowable<Person> people = ids
        .parallel(10)
        .runOn(Schedulers.io())
        .map(this::slowLoadBy)
        .filter(this::hasLowRisk)
        .sequential();


Just like that! A block of code between parallel() and sequential() runs... in parallel. What do we have here? First of all, the new parallel() operator turns Flowable<UUID> into ParallelFlowable<UUID>, which has a much smaller API than Flowable. You'll see in a second why. The optional int parameter (10 in our case) defines concurrency, or (as the documentation puts it) how many concurrent "rails" are created. So for us, we split single Flowable<Person> into 10 concurrent, independent rails (think: threads). Events from original stream of UUIDs are split (modulo 10) into different rails — sub-streams that are independent of each other. Think of them as sending upstream events into 10 separate threads. But first, we have to define where these threads come from — using the handy runOn() operator. This is so much better than parallel() on Java 8 streams where you have no control over concurrency level.

At this point, we have a ParallelFlowable. When an event appears upstream (UUID), it is delegated to one of 10 "rails" — concurrent, independent pipelines. Pipeline provides a limited subset of operators that are safe to run concurrently, e.g. map() and filter(), but also reduce(). There is no buffer(), take(), etc. as their semantics are unclear when invoked on many sub-streams at once. Our blocking slowLoadBy(), as well as hasLowRisk(), are still invoked sequentially, but only within a single "rail". Because we now have 10 concurrent "rails", we effectively parallelized them without much effort.

When events reach the end of a sub-stream ("rail"), they encounter a sequential() operator. This operator turns ParallelFlowable back into Flowable. As long as our mappers and filters are thread-safe, a parallel()/sequential() pair provides a very easy way of parallelizing streams. One small caveat — you will inevitably get messages reordered. Sequential map() and filter() always preserve order (like most operators). But once you run them within a parallel() block, the order is lost. This allows for greater concurrency, but you have to keep that in mind.

Should you use parallel() rather than nested flatMap() to parallelize your code? It's up to you, but parallel() seems to be much easier to read and grasp.

Download Building Reactive Microservices in Java: Asynchronous and Event-Based Application Design. Brought to you in partnership with Red Hat

Topics:
java ,rxjava ,concurrency ,parallel ,flatmap ,tutorial

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}