Creating an Observable in RxJava
In RxJava, Observables are at the center. These data sources can be created in a number of ways. Here's a breakdown of creating Observables in RxJava.
Join the DZone community and get the full member experience.
Join For FreeAn Observable is the heart of RxJava. It is the source of data or events in reactive programming. RxJava provides many methods in its library to create an Observable. Choosing which one to use can be difficult. My goal from this article is to help you in making this choice simpler by providing you with a mental map of different scenarios and which methods to use in each scenario.
The code samples for this article can be found here. This article is second in the series of RxJava articles that I am writing.
Observable and OnSubscribe
Think of Observable as an action which starts getting executed as soon as a Subscriber subscribes to it. During the execution of this action, data/events are generated and passed on to the subscriber. Let’s try to understand it by an example. I will be using Observable.create() method to create an Observable.
In the code above, the following are worth noticing:
- The Action - The create method receives an implementation of Observable.OnSubscribe interface. This implementation defines what action will be taken when a subscriber subscribes to the Observable.
- The Action is lazy - The
call
method is called by library each time a subscriber subscribes to the Observable. Till then the action is not executed, i.e. it is lazy. - Events are pushed to the subscriber - onNext, onError and onCompleted methods on
Subscriber
are used to push the events onto it. As per Rx Design Guidelines, the events pushed from Observable should follow the below rules:- Zero, one or more than one calls to
onNext
- Zero or only one call to either of
onCompleted
oronError
- Zero, one or more than one calls to
In all the different methods for creation of Observable that RxJava library provides, an implementation of theObservable.OnSubscribe
interface is created internally. We will be seeing these methods in the next section.
Scenarios for Creating an Observable
Grouping the different scenarios into categories helps in deciding which method to use for creating an Observable. In the following sections, I will be providing the scenarios and what methods are available in the library to help in each scenario.
An Observable That Emits a Single Value After Computation
Among all the scenarios, the one that you will come across the most is where you define a computation which returns a single value and then completes. Defining a function that makes an HTTP call to retrieve some information is an example of such a scenario. Such an action can be defined by using the Callable interface. RxJava provides a fromCallable factory method for creating an Observable from a Callable.
The code example which I have shown above emits only one value. We can replace it by the fromCallable
method as shown below. The emission of value to the subscriber using onNext and either onCompleted or onError is handled by the library.
An Observable That Emits Multiple Values During Computation
In a few cases, more than one value is passed to the Subscriber from the Observer. An example can be making an HTTP call to retrieve a list but the list in the response is paginated. In this case, you will have to make multiple requests to retrieve the complete list and keep on calling Subscriber’s onNext method multiple times till all the values have been emitted. In such a scenario Observable.create()
can be used. However creating an Observable using the create()
factory method requires some advanced handling which might be difficult to write. RxJava library has introduced SyncOnSubscribe and AsyncOnSubscribe to handle the difficulties.
An Observable Created by Combining Two or More Observables
There are scenarios where you have two or more observables and you would want to combine the result from all of them before doing any further processing. An example of this can be on a server side implementation where a request has to be processed by collecting information from two external services which can be run in parallel. In such a case you can start two parallel computations using two Observables and combine the results of each by using a method like Observable.concat().
There are many other methods which help in combining observables. Their differences among them are in the way they combine the observables. Observable.amb(), Observable.combineLatest(), Observable.merge() and Observable.zip() are few that are very useful in combining observables.
Some Useful Pre-defined Observables
The RxJava library provides few methods for pre-defined Observables. One such method is Observable.interval(). This observable emits a sequential number every specified interval of time. Other such methods are Observable.empty(), Observable.never(), Observable.error(), Observable.just(), Observable.from(), Observable.timer() and Observable.range().
An Observable From Applying Operator on an Instance of Observable
All the methods that we have seen for Observable creation till now are static factory methods on Observable. There is another group of methods which are on instances of an Observable. These are called operators. Whenever an operator is applied in an instance, a new instance of Observable, chained to the previous one, is provided. Internally all the operators use lift function to do the chaining. I will be writing about operators in detail in other blog posts in this series.
Summary
There are many methods that are provided by the RxJava library for Observable creation. Using the mental map of scenarios can help in deciding which method to use in a particular scenario.
Published at DZone with permission of Praveer Gupta. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments