RxJava — Reactive Extensions for Java
Want to learn more about reactive extensions for Java? Click here for this guide on RxJava where we cover schedulers, Observables, and more!
Join the DZone community and get the full member experience.
Join For Freerxjava is a reactive extensions implementation for java projects. a combination of functional and reactive techniques can represent an elegant approach to event-driven programming.
functional programming is the process of building software by composing pure functions, avoiding shared state, mutable data, and side-effects.
reactive programming is basically event-based asynchronous programming or an asynchronous programming paradigm concerned with data streams and the propagation of change.
there are two key types for reactive programming: observable and observer. an observable emits items; an observer consumes those items.
observerinterface methods
- onsubscribe
- onnext: on the new item is emitted from the observable
- oncomplete
- onerror
important headache : backpressure problem
in rxjava it is not difficult to get into a situation in which an observable is emitting items more rapidly than an operator or subscriber can consume them. this presents the problem of what to do with such a growing backlog of unconsumed items.
creating observable
observable types:
type | description | |
---|---|---|
* |
flowable<t>
|
emits 0 or n items and terminates with an success or an error event. supports backpressure , which allows to control how fast a source emits items. |
* |
observable<t>
|
emits 0 or n items and terminates with an success or an error event. no support for backpressure. |
+ |
single<t>
|
emits either a single item or an error event. the reactive version of a method call. |
+ |
maybe<t>
|
succeeds with a
singleitem,
or 0 item
, or errors. the reactive version of an
optional
.
|
# |
completable
|
either completes with an success or with an error event. it
never
emits items. the reactive version of a
runnable
.
|
convenience methods to create observables:
-
.just()
: produces an observable that emits asingle generic. -
.fromiterable()
: produces n emits . -
.fromarray()
-
.from...()
-
.interval()
-
.range...()
-
.repeat()
subscription and disposing
to receive the data emitted from an observable, you need to subscribe to it:
.subscribe()
and
.subscribewith()
.
when listers or subscribers are attached they usually are not supposed to listen eternally, so dispose of them to avoiding memory leaks:
.dispose()
.
when working with multiple subscriptions, use
compositedisposable
. add the return of subscribes to composite disposable. then, use
compositedisposable.dispose()
or
compositedisposable.clear()
to dispose them all.
single caching
when working with observables, doing async calls on every subscription on an observable is often not necessary. expensive works, like web request, use the
cache
method so that the
single
instance keeps its result once it was successful for the first time.
rxandroid providers a scheduler to run code in the main thread of android. it also provides the ability to create a scheduler that runs on a android handler class. with this schedulers, you can define an observable that does its work in a background thread and post our results to the main thread. this allows for example to replace a
asynctask
implementations which rxjava.
next, let's look at
androidschedulers.mainthread()
and
schedulers.io()
schedulers
they play a major role in supporting the multithreading concept in android applications. schedulers basically decide the thread that a particular code runs on.
-
schedulers.io()
: to perform non-cpu-intensive operations -
androidschedulers.mainthread()
: to access android main thread / ui thread. -
schedulers.computation()
: to perform cpu-intensive-operations, like processing huge data, bitmap processing, etc. -
schedulers.newthread()
-
schedulers.single()
-
schedulers.immediate()
-
schedulers.trampoline()
-
schedulers.from()
operator/transformation
operators modify the data emitted by the observable before an observer receives them, using:
- filter
- map
- scan
- groupby
-
conditional operators :
defaultifempty
,takewhile
,skipuntil
, etc.
testing
flowable can be tested with
io.reactivex.subscribers.testsubscriber
for the test object
subscribewith
it.
non-back-pressured observable, single, maybe, and completable can be tested with
io.reactivex.observers.testobserver
.
the
testschedular
class is very useful when testing time-based operators (e.g.
timeout()
,
buffer()
,
window()
, etc.).
flatmap vs. map operators
map modifies each item emitted by the source observable and emits the modified item.
flatmap
,
switchmap
, and
concatmap
also apply a function on each emitted item, but instead of returning the modified item, it returns the observable itself, which can emit data again.
flatmap
is used to map over asynchronous operations.
all previously-generated, intermediate streams are terminated.
parallelization of requests with multi-thread
-
use
flatmap
, create an observable off it, and perform asubscribeon()
to the scheduler.
2. use
parallel
with
flowable
zip operator
the zip operator strictly pairs emitted items from observables. it waits for both (or more) items to arrive and then merges them.

for non-blocking, asynchronous execution is supported, and you are allowed to unsubscribe at any point. however, for blocking, all
onnext
observer calls will be synchronous. it is not possible to unsubscribe in the middle of an event stream.
hot and cold observables
the cold observable emits a sequence of items when the observer demands without disrupting the integrity of sequence. for example, the observable might include the results of a database query , file retrieval, or web request.
a hot observable starts emitting data immediately when its created. for example, the observable might include mouse and keyboard events, system events, or stock prices .
how to avoid backpressure
by operators: throttling
you can avoid backpressure using operators and throttling to emit an item. operators like
sample( )
or
throttlelast( )
,
throttlefirst( )
, and
throttlewithtimeout( )
or
debounce( )
allow you to regulate the rate at which an observable emits items.
the
sample
operator periodically “dips” into the sequence and emits only the most recently emitted item during each dip.
the
throttlefirst
operator is similar, but it does not emit the most recently emitted item but the first item that was emitted after the previous “dip."
the
debounce
operator emits only those items from the source observable that are not followed by another item within a specified duration.
by operators: buffers and windows
you can also use an operator like
buffer( )
or
window( )
to collect items from the over-producing observable and then emit them less-frequently as collections (or observables) of items.
buffer
: you could, for example, close and emit a buffer of
items
from the bursty observable periodically and at a regular interval of time.
window
is similar to
buffer
and allows you to periodically emit observable windows of
items
at a regular interval of time.
establish the “reactive pull” backpressure mechanism to the subscriber
another way of handling an overproductive observable is to block the
callstack
(parking the thread that governs the overproductive observable).
if the observable, all of the operators that operate on it, and the observer that is subscribed to it are all operating in the same thread, this effectively establishes a form of backpressure by means of callstack blocking.
implementing a custom operator
sequence operator : operator<t> by using .lift(myoperator<t>) for emiting altered t item.
transformational operator: transformer <cls1, cls2> by using . compose(mytransformer<cls1, cls2>) for emitting cls2 by transforming from cls1 item.
error handling operators
-
onerrorresumenext( )
— instructs an observable to emit a sequence of items if it encounters an error -
onerrorreturn( )
— instructs an observable to emit a particular item when it encounters an error -
onexceptionresumenext( )
— instructs an observable to continue emitting items after it encounters an exception (but not another variety of throwable) -
retry( )
— if a source observable emits an error, resubscribe to it in the hopes that it will complete without error -
retrywhen( )
— if a source observable emits an error, pass that error to another observable to determine whether to resubscribe to the source
unhandled exceptions will be handled by the global
rxjavaplugins.seterrorhandler(throwable)
method. unhandled exceptions of the parallel processes will be handled by that function.
subjects
a subject is a sort of bridge or proxy that is available in some implementations of reactivex that acts both as an observer and as an observable. because it is an observer, it can subscribe to one or more observables, and because it is an observable, it can pass through the items it observes by re-emitting them. it can also emit new items.
-
publish subjectit
emits all the subsequent items of the source observable at the time of subscription. -
replay subjectit
emits all the items of the source observable, regardless of when the subscriber subscribes. -
behavior subjectit
emits the most recently emitted item and all the subsequent items of the source observable when an observer subscribes to it. -
async subjectit
only emits the last value of the source observable (and the last value) only after that source observable completes.
when should i use a subject?
you should use a subject when all of the following are true:
- you don’t have an observable or anything that can be converted into one.
- you require a hot observable.
- the scope of your observable is a type (e.g., it’s exposed as a public property and backed by a field).
- you don’t need to define a similar event, and no similar event already exists.
rxrelays
this is a subject without the ability to call
oncomplete
or
onerror
.
that's all for this guide on rxjava. hope this helps!
Opinions expressed by DZone contributors are their own.
Comments