DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • While Performing Dependency Selection, I Avoid the Loss Of Sleep From Node.js Libraries' Dangers
  • Understanding ldd: The Linux Dynamic Dependency Explorer
  • Practical Use of Weak Symbols
  • Python and Open-Source Libraries for Efficient PDF Management

Trending

  • Introducing Graph Concepts in Java With Eclipse JNoSQL
  • The Evolution of Scalable and Resilient Container Infrastructure
  • How To Introduce a New API Quickly Using Quarkus and ChatGPT
  • Building a Real-Time Audio Transcription System With OpenAI’s Realtime API

Creating an Observable in RxJava

In RxJava, Observables are at the center. These data sources can be created in a number of ways. Here's a breakdown of creating Observables in RxJava.

By 
Praveer Gupta user avatar
Praveer Gupta
·
Feb. 25, 16 · Analysis
Likes (17)
Comment
Save
Tweet
Share
40.1K Views

Join the DZone community and get the full member experience.

Join For Free

An Observable is the heart of RxJava. It is the source of data or events in reactive programming. RxJava provides many methods in its library to create an Observable. Choosing which one to use can be difficult. My goal from this article is to help you in making this choice simpler by providing you with a mental map of different scenarios and which methods to use in each scenario.

The code samples for this article can be found here. This article is second in the series of RxJava articles that I am writing. 

Observable and OnSubscribe

Think of Observable as an action which starts getting executed as soon as a Subscriber subscribes to it. During the execution of this action, data/events are generated and passed on to the subscriber. Let’s try to understand it by an example. I will be using Observable.create() method to create an Observable.

// Note that below code is not optimal but it helps in demonstration of concepts
// A better version is shown in the next section
Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        try {
            String result = doSomeTimeTakingIoOperation();
            subscriber.onNext(result);    // Pass on the data to subscriber
            subscriber.onCompleted();     // Signal about the completion subscriber
        } catch (Exception e) {
            subscriber.onError(e);        // Signal about the error to subscriber
        }
    }
});

In the code above, the following are worth noticing:

  • The Action - The create method receives an implementation of Observable.OnSubscribe interface. This implementation defines what action will be taken when a subscriber subscribes to the Observable.
  • The Action is lazy - The call method is called by library each time a subscriber subscribes to the Observable. Till then the action is not executed, i.e. it is lazy.
  • Events are pushed to the subscriber - onNext, onError and onCompleted methods on Subscriber are used to push the events onto it. As per Rx Design Guidelines, the events pushed from Observable should follow the below rules:
    • Zero, one or more than one calls to onNext
    • Zero or only one call to either of onCompleted or onError

In all the different methods for creation of Observable that RxJava library provides, an implementation of theObservable.OnSubscribe interface is created internally. We will be seeing these methods in the next section.

Scenarios for Creating an Observable

Grouping the different scenarios into categories helps in deciding which method to use for creating an Observable. In the following sections, I will be providing the scenarios and what methods are available in the library to help in each scenario.

An Observable That Emits a Single Value After Computation

Among all the scenarios, the one that you will come across the most is where you define a computation which returns a single value and then completes. Defining a function that makes an HTTP call to retrieve some information is an example of such a scenario. Such an action can be defined by using the Callable interface. RxJava provides a fromCallable factory method for creating an Observable from a Callable.

The code example which I have shown above emits only one value. We can replace it by the fromCallablemethod as shown below. The emission of value to the subscriber using onNext and either onCompleted or onError is handled by the library.

Observable.fromCallable(new Callable<String>() {
    @Override
    public String call() throws Exception {
        return doSomeTimeTakingIoOperation();
    }
});

An Observable That Emits Multiple Values During Computation

In a few cases, more than one value is passed to the Subscriber from the Observer. An example can be making an HTTP call to retrieve a list but the list in the response is paginated. In this case, you will have to make multiple requests to retrieve the complete list and keep on calling Subscriber’s onNext method multiple times till all the values have been emitted. In such a scenario Observable.create() can be used. However creating an Observable using the create() factory method requires some advanced handling which might be difficult to write. RxJava library has introduced SyncOnSubscribe and AsyncOnSubscribe to handle the difficulties.

An Observable Created by Combining Two or More Observables

There are scenarios where you have two or more observables and you would want to combine the result from all of them before doing any further processing. An example of this can be on a server side implementation where a request has to be processed by collecting information from two external services which can be run in parallel. In such a case you can start two parallel computations using two Observables and combine the results of each by using a method like Observable.concat().

There are many other methods which help in combining observables. Their differences among them are in the way they combine the observables. Observable.amb(), Observable.combineLatest(), Observable.merge() and Observable.zip() are few that are very useful in combining observables.

Some Useful Pre-defined Observables

The RxJava library provides few methods for pre-defined Observables. One such method is Observable.interval(). This observable emits a sequential number every specified interval of time. Other such methods are Observable.empty(), Observable.never(), Observable.error(), Observable.just(), Observable.from(), Observable.timer() and Observable.range().

An Observable From Applying Operator on an Instance of Observable

All the methods that we have seen for Observable creation till now are static factory methods on Observable. There is another group of methods which are on instances of an Observable. These are called operators. Whenever an operator is applied in an instance, a new instance of Observable, chained to the previous one, is provided. Internally all the operators use lift function to do the chaining. I will be writing about operators in detail in other blog posts in this series.

Summary

There are many methods that are provided by the RxJava library for Observable creation. Using the mental map of scenarios can help in deciding which method to use in a particular scenario.

Library

Published at DZone with permission of Praveer Gupta. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • While Performing Dependency Selection, I Avoid the Loss Of Sleep From Node.js Libraries' Dangers
  • Understanding ldd: The Linux Dynamic Dependency Explorer
  • Practical Use of Weak Symbols
  • Python and Open-Source Libraries for Efficient PDF Management

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!