Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Creating an Observable in RxJava

DZone's Guide to

Creating an Observable in RxJava

In RxJava, Observables are at the center. These data sources can be created in a number of ways. Here's a breakdown of creating Observables in RxJava.

· Java Zone
Free Resource

Learn how to troubleshoot and diagnose some of the most common performance issues in Java today. Brought to you in partnership with AppDynamics.

An Observable is the heart of RxJava. It is the source of data or events in reactive programming. RxJava provides many methods in its library to create an Observable. Choosing which one to use can be difficult. My goal from this article is to help you in making this choice simpler by providing you with a mental map of different scenarios and which methods to use in each scenario.

The code samples for this article can be found here. This article is second in the series of RxJava articles that I am writing. Part 1 can be found here.

Observable and OnSubscribe

Think of Observable as an action which starts getting executed as soon as a Subscriber subscribes to it. During the execution of this action, data/events are generated and passed on to the subscriber. Let’s try to understand it by an example. I will be using Observable.create() method to create an Observable.

// Note that below code is not optimal but it helps in demonstration of concepts
// A better version is shown in the next section
Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        try {
            String result = doSomeTimeTakingIoOperation();
            subscriber.onNext(result);    // Pass on the data to subscriber
            subscriber.onCompleted();     // Signal about the completion subscriber
        } catch (Exception e) {
            subscriber.onError(e);        // Signal about the error to subscriber
        }
    }
});

In the code above, the following are worth noticing:

  • The Action - The create method receives an implementation of Observable.OnSubscribe interface. This implementation defines what action will be taken when a subscriber subscribes to the Observable.
  • The Action is lazy - The call method is called by library each time a subscriber subscribes to the Observable. Till then the action is not executed, i.e. it is lazy.
  • Events are pushed to the subscriber - onNextonError and onCompleted methods on Subscriber are used to push the events onto it. As per Rx Design Guidelines, the events pushed from Observable should follow the below rules:
    • Zero, one or more than one calls to onNext
    • Zero or only one call to either of onCompleted or onError

In all the different methods for creation of Observable that RxJava library provides, an implementation of theObservable.OnSubscribe interface is created internally. We will be seeing these methods in the next section.

Scenarios for Creating an Observable

Grouping the different scenarios into categories helps in deciding which method to use for creating an Observable. In the following sections, I will be providing the scenarios and what methods are available in the library to help in each scenario.

An Observable That Emits a Single Value After Computation

Among all the scenarios, the one that you will come across the most is where you define a computation which returns a single value and then completes. Defining a function that makes an HTTP call to retrieve some information is an example of such a scenario. Such an action can be defined by using the Callable interface. RxJava provides a fromCallable factory method for creating an Observable from a Callable.

The code example which I have shown above emits only one value. We can replace it by the fromCallablemethod as shown below. The emission of value to the subscriber using onNext and either onCompleted or onError is handled by the library.

Observable.fromCallable(new Callable<String>() {
    @Override
    public String call() throws Exception {
        return doSomeTimeTakingIoOperation();
    }
});

An Observable That Emits Multiple Values During Computation

In a few cases, more than one value is passed to the Subscriber from the Observer. An example can be making an HTTP call to retrieve a list but the list in the response is paginated. In this case, you will have to make multiple requests to retrieve the complete list and keep on calling Subscriber’s onNext method multiple times till all the values have been emitted. In such a scenario Observable.create() can be used. However creating an Observable using the create() factory method requires some advanced handling which might be difficult to write. RxJava library has introduced SyncOnSubscribe and AsyncOnSubscribe to handle the difficulties.

An Observable Created by Combining Two or More Observables

There are scenarios where you have two or more observables and you would want to combine the result from all of them before doing any further processing. An example of this can be on a server side implementation where a request has to be processed by collecting information from two external services which can be run in parallel. In such a case you can start two parallel computations using two Observables and combine the results of each by using a method like Observable.concat().

There are many other methods which help in combining observables. Their differences among them are in the way they combine the observables. Observable.amb()Observable.combineLatest()Observable.merge() and Observable.zip() are few that are very useful in combining observables.

Some Useful Pre-defined Observables

The RxJava library provides few methods for pre-defined Observables. One such method is Observable.interval(). This observable emits a sequential number every specified interval of time. Other such methods are Observable.empty()Observable.never()Observable.error()Observable.just()Observable.from()Observable.timer() and Observable.range().

An Observable From Applying Operator on an Instance of Observable

All the methods that we have seen for Observable creation till now are static factory methods on Observable. There is another group of methods which are on instances of an Observable. These are called operators. Whenever an operator is applied in an instance, a new instance of Observable, chained to the previous one, is provided. Internally all the operators use lift function to do the chaining. I will be writing about operators in detail in other blog posts in this series.

Summary

There are many methods that are provided by the RxJava library for Observable creation. Using the mental map of scenarios can help in deciding which method to use in a particular scenario.

Understand the needs and benefits around implementing the right monitoring solution for a growing containerized market. Brought to you in partnership with AppDynamics.

Topics:
rxjava ,java ,reactive programming ,reactive extensions ,reactive streams ,observable ,functional programing

Published at DZone with permission of Praveer Gupta. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}