RxJava — Reactive Extensions for Java
Want to learn more about reactive extensions for Java? Click here for this guide on RxJava where we cover schedulers, Observables, and more!
Join the DZone community and get the full member experience.Join For Free
rxjava is a reactive extensions implementation for java projects. a combination of functional and reactive techniques can represent an elegant approach to event-driven programming.
functional programming is the process of building software by composing pure functions, avoiding shared state, mutable data, and side-effects.
reactive programming is basically event-based asynchronous programming or an asynchronous programming paradigm concerned with data streams and the propagation of change.
there are two key types for reactive programming: observable and observer. an observable emits items; an observer consumes those items.
- onnext: on the new item is emitted from the observable
important headache : backpressure problem
in rxjava it is not difficult to get into a situation in which an observable is emitting items more rapidly than an operator or subscriber can consume them. this presents the problem of what to do with such a growing backlog of unconsumed items.
||emits 0 or n items and terminates with an success or an error event. supports backpressure , which allows to control how fast a source emits items.|
||emits 0 or n items and terminates with an success or an error event. no support for backpressure.|
||emits either a single item or an error event. the reactive version of a method call.|
succeeds with a
or 0 item
, or errors. the reactive version of an
either completes with an success or with an error event. it
emits items. the reactive version of a
convenience methods to create observables:
.just(): produces an observable that emits asingle generic.
.fromiterable(): produces n emits .
subscription and disposing
to receive the data emitted from an observable, you need to subscribe to it:
when listers or subscribers are attached they usually are not supposed to listen eternally, so dispose of them to avoiding memory leaks:
when working with multiple subscriptions, use
. add the return of subscribes to composite disposable. then, use
to dispose them all.
when working with observables, doing async calls on every subscription on an observable is often not necessary. expensive works, like web request, use the
method so that the
instance keeps its result once it was successful for the first time.
rxandroid providers a scheduler to run code in the main thread of android. it also provides the ability to create a scheduler that runs on a android handler class. with this schedulers, you can define an observable that does its work in a background thread and post our results to the main thread. this allows for example to replace a
asynctaskimplementations which rxjava.
next, let's look at
they play a major role in supporting the multithreading concept in android applications. schedulers basically decide the thread that a particular code runs on.
schedulers.io(): to perform non-cpu-intensive operations
androidschedulers.mainthread(): to access android main thread / ui thread.
schedulers.computation(): to perform cpu-intensive-operations, like processing huge data, bitmap processing, etc.
operators modify the data emitted by the observable before an observer receives them, using:
conditional operators :
flowable can be tested with
for the test object
non-back-pressured observable, single, maybe, and completable can be tested with
class is very useful when testing time-based operators (e.g.
flatmap vs. map operators
map modifies each item emitted by the source observable and emits the modified item.
also apply a function on each emitted item, but instead of returning the modified item, it returns the observable itself, which can emit data again.
flatmapis used to map over asynchronous operations.
all previously-generated, intermediate streams are terminated.
parallelization of requests with multi-thread
flatmap, create an observable off it, and perform a
subscribeon()to the scheduler.
the zip operator strictly pairs emitted items from observables. it waits for both (or more) items to arrive and then merges them.
for non-blocking, asynchronous execution is supported, and you are allowed to unsubscribe at any point. however, for blocking, all
observer calls will be synchronous. it is not possible to unsubscribe in the middle of an event stream.
hot and cold observables
the cold observable emits a sequence of items when the observer demands without disrupting the integrity of sequence. for example, the observable might include the results of a database query , file retrieval, or web request.
a hot observable starts emitting data immediately when its created. for example, the observable might include mouse and keyboard events, system events, or stock prices .
how to avoid backpressure
by operators: throttling
you can avoid backpressure using operators and throttling to emit an item. operators like
allow you to regulate the rate at which an observable emits items.
operator periodically “dips” into the sequence and emits only the most recently emitted item during each dip.
operator is similar, but it does not emit the most recently emitted item but the first item that was emitted after the previous “dip."
operator emits only those items from the source observable that are not followed by another item within a specified duration.
by operators: buffers and windows
: you could, for example, close and emit a buffer of
from the bursty observable periodically and at a regular interval of time.
is similar to
and allows you to periodically emit observable windows of
at a regular interval of time.
establish the “reactive pull” backpressure mechanism to the subscriber
another way of handling an overproductive observable is to block the
(parking the thread that governs the overproductive observable).
if the observable, all of the operators that operate on it, and the observer that is subscribed to it are all operating in the same thread, this effectively establishes a form of backpressure by means of callstack blocking.
implementing a custom operator
sequence operator : operator<t> by using .lift(myoperator<t>) for emiting altered t item.
transformational operator: transformer <cls1, cls2> by using . compose(mytransformer<cls1, cls2>) for emitting cls2 by transforming from cls1 item.
error handling operators
onerrorresumenext( )— instructs an observable to emit a sequence of items if it encounters an error
onerrorreturn( )— instructs an observable to emit a particular item when it encounters an error
onexceptionresumenext( )— instructs an observable to continue emitting items after it encounters an exception (but not another variety of throwable)
retry( )— if a source observable emits an error, resubscribe to it in the hopes that it will complete without error
retrywhen( )— if a source observable emits an error, pass that error to another observable to determine whether to resubscribe to the source
unhandled exceptions will be handled by the global
method. unhandled exceptions of the parallel processes will be handled by that function.
a subject is a sort of bridge or proxy that is available in some implementations of reactivex that acts both as an observer and as an observable. because it is an observer, it can subscribe to one or more observables, and because it is an observable, it can pass through the items it observes by re-emitting them. it can also emit new items.
publish subjectitemits all the subsequent items of the source observable at the time of subscription.
replay subjectitemits all the items of the source observable, regardless of when the subscriber subscribes.
behavior subjectitemits the most recently emitted item and all the subsequent items of the source observable when an observer subscribes to it.
async subjectitonly emits the last value of the source observable (and the last value) only after that source observable completes.
when should i use a subject?
you should use a subject when all of the following are true:
- you don’t have an observable or anything that can be converted into one.
- you require a hot observable.
- the scope of your observable is a type (e.g., it’s exposed as a public property and backed by a field).
- you don’t need to define a similar event, and no similar event already exists.
this is a subject without the ability to call
that's all for this guide on rxjava. hope this helps!
Opinions expressed by DZone contributors are their own.