RxJava 1.x to 2.x Migration: Observable vs. Observable
RxJava 1.x to 2.x Migration: Observable vs. Observable
RxJava users moving from 1.x to 2.x might have noticed that Observable isn't what it once was. Here are the changes made to accommodate backpressure.
Join the DZone community and get the full member experience.Join For Free
Get the Edge with a Professional Java IDE. 30-day free trial.
The title is not a mistake.
rx.Observable from RxJava 1.x is a completely different beast than
io.reactivex.Observable from 2.x.
Blindly upgrading the rx dependency and renaming all the imports in your project will compile (with minor changes), but that does not guarantee the same behavior. In the very early days of the project, Observable in 1.x had no notion of backpressure. But later on, backpressure was included. What does it actually mean? Let's imagine we have a stream that produces one event every 1 millisecond, but it takes 1 second to process one such item. You see it can't possibly work this way in the long run:
import rx.Observable; //RxJava 1.x import rx.schedulers.Schedulers; Observable .interval(1, MILLISECONDS) .observeOn(Schedulers.computation()) .subscribe( x -> sleep(Duration.ofSeconds(1)));
MissingBackpressureExceptioncreeps in within few hundred milliseconds. But what does this exception mean? Well, basically, it's a safety net (or sanity check, if you will) that prevents you from hurting your application. RxJava automatically discovers that the producer is overflowing the consumer and proactively terminates the stream to avoid further damage. So what if we simply search and replace few imports here and there?
import io.reactivex.Observable; //RxJava 2.x import io.reactivex.schedulers.Schedulers; Observable .interval(1, MILLISECONDS) .observeOn(Schedulers.computation()) .subscribe( x -> sleep(Duration.ofSeconds(1)));
The exception is gone! And So is our throughput! The application stalls after a while, staying in an endless GC loop. You see, Observable in RxJava 1.x has assertions (bounded queues, checks, etc.) all over the place, making sure you are not overflowing anywhere. For example, the observeOn() operator in 1.x has a queue limited to 128 elements by default. When the observeOn() operator asks upstream to deliver not more than 128 elements to fill in its internal buffer. Then separate threads (workers) from this scheduler are picking up events from this queue. When the queue becomes almost empty, observeOn() operator asks (request() method) for more. This mechanism breaks apart when the producer does not respect backpressure requests and sends more data than it was allowed, effectively overflowing the consumer.
The internal queue inside observeOn() operator is full, yet interval() operator keeps emitting new events — because that's what interval() is suppose to do. Observable in 1.x discovers such overflows and fails fast with MissingBackpressureException. It literally means: I tried so hard to keep the system in a healthy state, but my upstream is not respecting backpressure — backpressure implementation is missing. However Observable in 2.x has no such safety mechanism. It's a vanilla stream that hopes you will be a good citizen and either have slow producers or fast consumers.
When a system is healthy, both Observables behave the same way. However, under load, 1.x fails fast, whereas 2.x fails slowly and painfully.
Does it mean RxJava 2.x is a step back? Quite the contrary! In 2.x, an important distinction was made:
Observabledoesn't care about backpressure, which greatly simplifies its design and implementation. It should be used to model streams that can't support backpressure by definition, e.g. user interface events.
Flowabledoes support backpressure and has all the safety measures in place. In other words, all steps in the computation pipeline make sure you are not overflowing the consumer.
2.x makes an important distinction between streams that can support backpressure ("can slow down if needed" in simple words) and those that don't. From the type system perspective, it becomes clear what kind of source are we dealing with and what its guarantees are. So how should we migrate our interval() example to RxJava 2.x? Easier than you think:
Flowable .interval(1, MILLISECONDS) .observeOn(Schedulers.computation()) .subscribe( x -> sleep(Duration.ofSeconds(1)));
That simple. You may ask yourself, "How come Flowable can have an interval() operator that, by definition, can't support backpressure? After all, interval() is supposed to deliver events at a constant rate, it can't slow down!" Well, if you look at the declaration of interval(), you'll notice:
Simply put, this tells us that whenever backpressure can no longer be guaranteed, RxJava will take care of it and throw MissingBackpressureException. That's precisely what happens when we run the Flowable.interval() program — it fails fast, as opposed to destabilizing the whole application.
So, to wrap up, whenever you see an Observable from 1.x, what you probably want is a Flowable from 2.x. At least unless your stream by definition does not support backpressure. Despite the same name, Observables in these two major releases are quite different. But once you do a search and replace from Observable to Flowable, you'll notice that migration isn't that straightforward. It's not about API changes — the differences are more profound.
There is no simple Flowable.create() directly equivalent to Observable.create() in 2.x. I made a mistake myself to overuse the Observable.create() factory method in the past. create() allows you to emit events at an arbitrary rate, entirely ignoring backpressure. 2.x has some friendly facilities to deal with backpressure requests, but they require careful design of your streams. This will be covered in another article.
Published at DZone with permission of Tomasz Nurkiewicz , DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.