Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Go Reactive With RxJava

DZone's Guide to

Go Reactive With RxJava

Dive into RxJava with lessons on Observers, Observables, and tips for organizing, filtering, and grouping your data with an example DZone search service.

· Java Zone
Free Resource

Get the Edge with a Professional Java IDE. 30-day free trial.

This article came from our Bounty Board, where you can find prompts on in-demand topics for each of our zones. If you like writing about software development for prizes, come check it out!

In the 21st century, every good application must support the following factors.

  1. Responsiveness

  2. Elasticness

  3. Resilience

These three terms are not new.

From the birth of the programming, programmers have paid attention to these three pillars. Let's see what we mean by these terms.

Responsiveness

We want our application to communicate with clients seamlessly and to always make sure that the client should be busy with our system. Suppose you are searching DZone with the Java keyword. What would you expect? Wait for 10 minutes, then DZone gives you the search results as a whole? Or that Dzone returns data in chunks so while you are reading the first chunk, it returns the second chunk and so on. Obviously, we want the second option, so the crux is to always give me something rather than nothing.

Elasticness

Our system should function as usual when there is a heavy load. Suppose you are searching DZone with the IoT keyword and got your results, but if you search for Java, you get an unexpected error as Java returns a huge search result that can't be handled. Or if it's the middle of the day, when there is a huge number of readers logged into DZone, and it can't show them the results due to the high number of customers.  Of course, you want to build a system that is equally responsive under high load.

Resilience

There should be no single point of failure. Say there is a search service that is responsible for searching the DZone database. Suppose the search service is down it would be a very bad experience for users if they are not able to search. So as a solution, there should be more than one search service instance to stay responsive. If one goes down, other services take care of searches. 

These are the essential parts of the Reactive Programming manifesto. Reactive programming is all about handling event-based reactive streams. RxJava offers those, so it is high time to get our hands dirty with RX Java.

RxJava Configuration

To incorporate RxJava, we just need to add the following entry to our pom.xml:

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.1.0</version>
</dependency>


How RxJava Works

In this exercise, we will try to build a demo search application for DZone. Suppose Dzone has a search service that returns all docs from the DZone database. Gradually, we will apply some best practices related to reactive programming.

Let see the program first.

First, we will try to mock the DZone database.

package com.example.reactive.dzone;

public class DzoneDBDao {

    private static DzoneDBDao service = new DzoneDBDao();

    public static DzoneDBDao get() {
        return service;
    }
    DZoneDoc[] getAllDocFromDB() {
        return produceDocs();
    }

    private DZoneDoc[] produceDocs() {
        DZoneDoc[] array = {
            DZoneDoc.create("Java Microservice", "Refcardz"),
            DZoneDoc.create("RX Java", "Article"),
            DZoneDoc.create("IOT in Action", "Refcardz"),
            DZoneDoc.create("Java8 in Action", "Refcardz"),
        };
        return array;

    }

}


Here the produceDocs method returns an array of DZone doc objects. In our application, we will use this as a mock database.

The DZone doc looks like: 

package com.example.reactive.dzone;

public class DZoneDoc {
    private String name;
    private String type;

    private DZoneDoc() {}

    public static DZoneDoc create(String name, String type) {
        DZoneDoc doc = new DZoneDoc();
        doc.setName(name);
        doc.setType(type);
        return doc;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    @Override
    public String toString() {
        return "DZoneDoc [name=" + name + ", type=" + type + "]";
    }

}


Now we will create a service that returns DZone docs in chunks:

package com.example.reactive.dzone;

import io.reactivex.Observable;

public class DZoneSearchService {

    Observable < DZoneDoc > getAllDocs() {
        return Observable.fromArray(DzoneDBDao.get().getAllDocFromDB());
    }

}


Our service class is not a conventional service class. Here, getAllDocs returns an Observable of DzoneDoc.

Welcome to RxJava! Observable is an abstract class of RxJava — and its most important concept.

Observable

Observable is an Object that constantly fetches data from a data source and emits data to interested parties. It decouples the data source from the data consumer. One of the great features of an Observable is that it fetches data from the source and immediately emits that data, so consumers get notified multiple times — unlike in an asynchronous system, where a consumer is notified only when all the data is fetched.

Observables push data to the consumer when it arrives. The data source can be two types: finite or infinite. For example, DZone search results are finite, but getting stock updates from a third party API is infinite, as stock values change every now and then. For finite data sources, an Observable fetches and emits data until it ends or gets an error. For infinite streams, it always fetches and emits data until an error occurs. Observable is lazy in nature.

Here in our program, our DzoneDBDao's getAllDocsFromDB method acts as a source of data. In our DZone search service, we pass that source of a specific array of DZone docs into a static method called fromArray — this method takes an array as an input and produces an Observable Object, whose generic type is inferred from the array element type. As the array contains DzoneDoc, it return an Observable<DzoneDoc>.

Observable.fromArray(DzoneDBDao.get().getAllDocFromDB());


There is another important concept in RxJava called an Observer.

Observer

An Observer is an object that wants to get notified if the state of the Objects it is observing changes — so it can take some action based on that.

So an Observer should subscribe to an Observable beforehand to get notified.

Now, an Observer wants to take an action when there are some changes in the data source. There can be three possible courses of action, so the Observer interface provides three callback methods. Observable calls these three methods based on the situation in order to perform the strategy given by the Observer.

  • OnNext: Upon emitting data/events, this method is invoked. So the Observable fetches data from the data source and then emits that data. At that point, the onNext callback is invoked, so observer acts on the data. Note that onNext is called on each emitted item.

  • OnError: If there is an error while fetching data, then the OnError callback will be called so the Observer handles the error in an appropriate manner.

  • OnCompleted: OnCompleted is called when the Observable emits all the data from a data source.

Now we have the fair bit of understanding of what Observable and Observer are, so we will write an Observer that will take actions on each DZone doc.

Say we write a DZone UI that shows all the DZone docs. So the action is that it displays the elements when they arrive. Let's see the code:

package com.example.reactive.dzone;

public class DZoneUI {

    private DZoneSearchService dzoneService = new DZoneSearchService();
    public void printAllDocs() {
        dzoneService.getAllDocs().subscribe(System.out::println); //dzonedocs
    }
    public static void main(String[] args) {

        DZoneUI UI = new DZoneUI();
        UI.printAllDocs();
    }

}


Hooray! We incorporated Reactive Programming in our DZone search service.

Output:

DZoneDoc [name=Java Microservice, type=Refcardz]
DZoneDoc [name=RX Java, type=Article]
DZoneDoc [name=IOT in Action, type=Refcardz]
DZoneDoc [name=Java8 in Action, type=Refcardz]


Note: In a real-world application, the search service would be exposed as a Reactive REST API. To implement RxJava with a REST API, we need the help of the Retrofit Project, or we can use Spring 5 Reactive Web.

To focus only on RxJava, I skip that, but in a later article, I will show you how to implement a Reactive REST API.

I believe at this point we are comfortable with the basic operation of RxJava. Now I will add some tips that will very important while coding with RxJava.

Using Map Functions as Adapters

Sometimes you will face a situation where, from a service, you get a data structure that does not fit into your client. You need to transform that data structure to a client-specific data structure. That is very common in RxJava. When you try to show data in a UI, the data format always changes as per business requirements, so it is not possible to publish an API that exposes data in all formats. The solution is to transform the data on the fly.

Going back to our application, in our UI we show all the documents, both the names of the documents and types of the documents. But in another section of that UI, I just need to print the name of the document. Let's see how we can achieve this through a map function.

package com.example.reactive.dzone;

public class DZoneUI {

    private DZoneSearchService dzoneService = new DZoneSearchService();

    public void printAllDocsName() {
        dzoneService.getAllDocs().map(doc - > doc.getName()).subscribe(System.out::println); // dzonedocs
    }

    public static void main(String[] args) {

        DZoneUI UI = new DZoneUI();
        UI.printAllDocsName();
    }

}


Here, I pass a lambda expression in the map function, which applies it on each Dzonedoc and takes only the name from the docs and returns an Observable<String> upon which UI subscribes. That is called an Operator in RxJava.

Dynamic Filtering

Sometimes we need to filter some data based on some runtime criteria. With filter functions, we can achieve that. Please note that we can use a series of filters to further refine results. Filter functions take a predicate and return an Observable.

Suppose we need to print only Java-related Refcardz from all documents. Here is the code:

package com.example.reactive.dzone;

import io.reactivex.observables.GroupedObservable;

public class DZoneUI {

    private DZoneSearchService dzoneService = new DZoneSearchService();

    public void printJavaRefCardz() {
        dzoneService.getAllDocs().filter(docs - > "Refcardz".equalsIgnoreCase(docs.getType()))
            .filter(doc - > doc.getName().contains("Java"))
            .subscribe(System.out::println); // dzonedocs
    }

    public static void main(String[] args) {

        DZoneUI UI = new DZoneUI();
        UI.printJavaRefCardz();
    }

}


Here, I apply two filter functions consecutively. First, I filter out all the Refcardz, then apply another filter to find Refcardz whose titles contain the word Java. So, multiple filter functions act as a filter chain pattern. Each filter applies its functionality and produces an Observable on which the next filter invokes its action. So, it is the same as Piping.

Grouping Results on the Fly

Sometimes we need a more advanced data structure. Suppose we get data, but then we need to segregate or group that data based on some dynamic criteria. We can achieve that with the groupBy function.

Suppose we want to group all the docs based on the subject:

package com.example.reactive.dzone;

import io.reactivex.observables.GroupedObservable;

public class DZoneUI {

    private DZoneSearchService dzoneService = new DZoneSearchService();

    public void printGroupwiseDoc() {
        dzoneService.getAllDocs().groupBy(doc - > groupByName(doc))
            .subscribe(group - > group.subscribe(doc - > showDocs(group, doc)));
    }
    private String groupByName(DZoneDoc doc) {
        return doc.getName().contains("Java") ? "JAVA" : "IOT";
    }

    private void showDocs(GroupedObservable < String, DZoneDoc > observable, DZoneDoc currentDoc) {
        if ("JAVA".equalsIgnoreCase(observable.getKey().toString())) {
            System.out.println("JAVA::" + currentDoc);
        } else {
            System.out.println("IOT::" + currentDoc);
        }
    }


    public static void main(String[] args) {

        DZoneUI UI = new DZoneUI();
        UI.printGroupwiseDoc();
    }

}


Here in the printGroupwiseDoc() method, I try to group the documents based on the title. The groupBy function returns an Observable that contains the list of grouped observables, which contains DZone docs. Here, two observable groups are created, as we only segregate two groups based on the title of the doc: Java and IOT. Then, again, we subscribe to each group Observable and take the necessary actions on the docs — printing the name of the docs with the Group name.

Output:

JAVA::DZoneDoc [name=Java Microservice, type=Refcardz]
JAVA::DZoneDoc [name=RX Java, type=Article]
IOT::DZoneDoc [name=IOT in Action, type=Refcardz]
JAVA::DZoneDoc [name=Java8 in Action, type=Refcardz]


 A Few Common Mistakes

  1. RxJava maintains fluent API design, so it can properly handle errors in code unless it could break the method chain.

  2. While Filtering thinks about the sequence, choose the sequence in such a way that the filter execution would be optimal. Say the Refcardz number is bigger than the Java materials, so it would be optimal to first filter the Java materials, then apply the Refcardz filter.

  3. RxJava is non-blocking in nature. Never call toBlocking() to block or wait for the response.

Conclusion

What we saw is just the tip of the iceberg. With Rx, we can do a lot more than what we saw. It has the capability to handle backpressure, where a client can send a signal that it can't receive events right now — saving us from data flooding. Also, it embraces functional programming and a revolutionary idea to send data chunks as an event. Sometimes, for complex logic, it gets less verbose and debugging is a problem, but apart from that, Rx can deal with huge amount of data in a hassle-free manner.

This article came from our Bounty Board, where you can find prompts on in-demand topics for each of our zones. If you like writing about software development for prizes, come check it out!

Get the Java IDE that understands code & makes developing enjoyable. Level up your code with IntelliJ IDEA. Download the free trial.

Topics:
java ,rxjava ,reactive programming ,observable ,tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}