DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
  1. DZone
  2. Coding
  3. Java
  4. RxJava — Reactive Extensions for Java

RxJava — Reactive Extensions for Java

Want to learn more about reactive extensions for Java? Click here for this guide on RxJava where we cover schedulers, Observables, and more!

Alaattin KAYRAK user avatar by
Alaattin KAYRAK
·
Oct. 29, 18 · Presentation
Like (10)
Save
Tweet
Share
12.44K Views

Join the DZone community and get the full member experience.

Join For Free

rxjava is a reactive extensions implementation for java projects. a combination of functional and reactive techniques can represent an elegant approach to event-driven programming.

functional programming is the process of building software by composing pure functions, avoiding shared state, mutable data, and side-effects.

reactive programming is basically event-based asynchronous programming or an asynchronous programming paradigm concerned with data streams and the propagation of change.

there are two key types for reactive programming: observable and observer. an observable emits items; an observer consumes those items.

observerinterface methods

  • onsubscribe
  • onnext: on the new item is emitted from the observable
  • oncomplete
  • onerror

important headache : backpressure problem

in rxjava it is not difficult to get into a situation in which an observable is emitting items more rapidly than an operator or subscriber can consume them. this presents the problem of what to do with such a growing backlog of unconsumed items.

creating observable

observable types:

type description
* flowable<t> emits 0 or n items and terminates with an success or an error event. supports backpressure , which allows to control how fast a source emits items.
* observable<t> emits 0 or n items and terminates with an success or an error event. no support for backpressure.
+ single<t> emits either a single item or an error event. the reactive version of a method call.
+ maybe<t> succeeds with a singleitem, or 0 item , or errors. the reactive version of an optional .
# completable either completes with an success or with an error event. it never emits items. the reactive version of a runnable .

convenience methods to create observables:

  • .just() : produces an observable that emits asingle generic.
  • .fromiterable() : produces n emits .
  • .fromarray()
  • .from...()
  • .interval()
  • .range...()
  • .repeat()

subscription and disposing

to receive the data emitted from an observable, you need to subscribe to it: .subscribe() and .subscribewith() .

when listers or subscribers are attached they usually are not supposed to listen eternally, so dispose of them to avoiding memory leaks: .dispose() .

when working with multiple subscriptions, use compositedisposable . add the return of subscribes to composite disposable. then, use compositedisposable.dispose() or compositedisposable.clear() to dispose them all.

single caching

when working with observables, doing async calls on every subscription on an observable is often not necessary. expensive works, like web request, use the cache method so that the single instance keeps its result once it was successful for the first time.

rxandroid providers a scheduler to run code in the main thread of android. it also provides the ability to create a scheduler that runs on a android handler class. with this schedulers, you can define an observable that does its work in a background thread and post our results to the main thread. this allows for example to replace a asynctask implementations which rxjava.

next, let's look at androidschedulers.mainthread() and schedulers.io()

schedulers

they play a major role in supporting the multithreading concept in android applications. schedulers basically decide the thread that a particular code runs on.

  • schedulers.io() : to perform non-cpu-intensive operations
  • androidschedulers.mainthread() : to access android main thread / ui thread.
  • schedulers.computation() : to perform cpu-intensive-operations, like processing huge data, bitmap processing, etc.
  • schedulers.newthread()
  • schedulers.single()
  • schedulers.immediate()
  • schedulers.trampoline()
  • schedulers.from()

operator/transformation

operators modify the data emitted by the observable before an observer receives them, using:

  • filter
  • map
  • scan
  • groupby
  • conditional operators : defaultifempty , takewhile , skipuntil , etc.

testing

flowable can be tested with io.reactivex.subscribers.testsubscriber for the test object subscribewith it.

non-back-pressured observable, single, maybe, and completable can be tested with io.reactivex.observers.testobserver .

the testschedular class is very useful when testing time-based operators (e.g. timeout() , buffer() , window() , etc.).

flatmap vs. map operators

map modifies each item emitted by the source observable and emits the modified item.

flatmap , switchmap , and concatmap also apply a function on each emitted item, but instead of returning the modified item, it returns the observable itself, which can emit data again.

flatmap is used to map over asynchronous operations.

all previously-generated, intermediate streams are terminated.

parallelization of requests with multi-thread

  1. use flatmap , create an observable off it, and perform a subscribeon() to the scheduler.

2. use parallel with flowable

zip operator

the zip operator strictly pairs emitted items from observables. it waits for both (or more) items to arrive and then merges them.

for non-blocking, asynchronous execution is supported, and you are allowed to unsubscribe at any point. however, for blocking, all onnext observer calls will be synchronous. it is not possible to unsubscribe in the middle of an event stream.

hot and cold observables

the cold observable emits a sequence of items when the observer demands without disrupting the integrity of sequence. for example, the observable might include the results of a database query , file retrieval, or web request.

a hot observable starts emitting data immediately when its created. for example, the observable might include mouse and keyboard events, system events, or stock prices .

how to avoid backpressure

by operators: throttling

you can avoid backpressure using operators and throttling to emit an item. operators like sample( ) or throttlelast( ) , throttlefirst( ) , and throttlewithtimeout( ) or debounce( ) allow you to regulate the rate at which an observable emits items.

the sample operator periodically “dips” into the sequence and emits only the most recently emitted item during each dip.

the throttlefirst operator is similar, but it does not emit the most recently emitted item but the first item that was emitted after the previous “dip."

the debounce operator emits only those items from the source observable that are not followed by another item within a specified duration.

by operators: buffers and windows

you can also use an operator like buffer( ) or window( ) to collect items from the over-producing observable and then emit them less-frequently as collections (or observables) of items.

buffer : you could, for example, close and emit a buffer of items from the bursty observable periodically and at a regular interval of time.

window is similar to buffer and allows you to periodically emit observable windows of items at a regular interval of time.

establish the “reactive pull” backpressure mechanism to the subscriber

another way of handling an overproductive observable is to block the callstack (parking the thread that governs the overproductive observable).

if the observable, all of the operators that operate on it, and the observer that is subscribed to it are all operating in the same thread, this effectively establishes a form of backpressure by means of callstack blocking.

implementing a custom operator

sequence operator : operator<t> by using .lift(myoperator<t>) for emiting altered t item.

transformational operator: transformer <cls1, cls2> by using . compose(mytransformer<cls1, cls2>) for emitting cls2 by transforming from cls1 item.

error handling operators

  • onerrorresumenext( ) — instructs an observable to emit a sequence of items if it encounters an error
  • onerrorreturn( ) — instructs an observable to emit a particular item when it encounters an error
  • onexceptionresumenext( ) — instructs an observable to continue emitting items after it encounters an exception (but not another variety of throwable)
  • retry( ) — if a source observable emits an error, resubscribe to it in the hopes that it will complete without error
  • retrywhen( ) — if a source observable emits an error, pass that error to another observable to determine whether to resubscribe to the source

unhandled exceptions will be handled by the global rxjavaplugins.seterrorhandler(throwable) method. unhandled exceptions of the parallel processes will be handled by that function.

subjects

a subject is a sort of bridge or proxy that is available in some implementations of reactivex that acts both as an observer and as an observable. because it is an observer, it can subscribe to one or more observables, and because it is an observable, it can pass through the items it observes by re-emitting them. it can also emit new items.

  • publish subjectit emits all the subsequent items of the source observable at the time of subscription.
  • replay subjectit emits all the items of the source observable, regardless of when the subscriber subscribes.
  • behavior subjectit emits the most recently emitted item and all the subsequent items of the source observable when an observer subscribes to it.
  • async subjectit only emits the last value of the source observable (and the last value) only after that source observable completes.

when should i use a subject?

you should use a subject when all of the following are true:

  • you don’t have an observable or anything that can be converted into one.
  • you require a hot observable.
  • the scope of your observable is a type (e.g., it’s exposed as a public property and backed by a field).
  • you don’t need to define a similar event, and no similar event already exists.

rxrelays

this is a subject without the ability to call oncomplete or onerror .

that's all for this guide on rxjava. hope this helps!

Operator (extension) Java (programming language) Event

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • PostgreSQL: Bulk Loading Data With Node.js and Sequelize
  • Top Five Tools for AI-based Test Automation
  • A Real-Time Supply Chain Control Tower Powered by Kafka
  • How Do the Docker Client and Docker Servers Work?

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: