DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
View Events Video Library
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Integrating PostgreSQL Databases with ANF: Join this workshop to learn how to create a PostgreSQL server using Instaclustr’s managed service

Mobile Database Essentials: Assess data needs, storage requirements, and more when leveraging databases for cloud and edge applications.

Monitoring and Observability for LLMs: Datadog and Google Cloud discuss how to achieve optimal AI model performance.

Automated Testing: The latest on architecture, TDD, and the benefits of AI and low-code tools.

Related

  • Handling RxJava Observables in a Web UI
  • Reactive Event Streaming Architecture With Kafka, Redis Streams, Spring Boot, and HTTP Server-Sent Events (SSE)
  • Microsoft Azure Event Hubs
  • Reactive Kafka With Streaming in Spring Boot

Trending

  • How To Validate Archives and Identify Invalid Documents in Java
  • Agile Metrics and KPIs in Action
  • The Convergence of Testing and Observability
  • Spring WebFlux Retries
  1. DZone
  2. Coding
  3. JavaScript
  4. RxJava's Side Effect Methods

RxJava's Side Effect Methods

Learn about some examples of methods that don't affect your stream.

Wolfram Rittmeyer user avatar by
Wolfram Rittmeyer
·
Wolfram Rittmeyer user avatar by
Wolfram Rittmeyer
·
Nov. 11, 15 · Tutorial
Like (6)
Save
Tweet
Share
13.35K Views

Join the DZone community and get the full member experience.

Join For Free

RxJava’s Observable class has plenty of methods that can be used to transform the stream of emitted items to the kind of data that you need. Those methods are at the very core of RxJava and form a big part of it’s attraction.

But there are other methods that do not change the stream of items in any way – I call those methods side effect methods.

What Do I Mean By Side Effect Methods?

Side effect methods do not affect your stream in itself. Instead they are invoked when certain events occur to allow you to react to those events.

For example: if you’re interested in doing something outside of your Subscriber‘s callbacks whenever some error occurs, you would use the doOnError() method and pass to it the functional interface to be used whenever an error occurs:

someObservable
      .doOnError(new Action1() {
         @Override
         public void call(Throwable t) {
            // use this callback to clean up resources,
            // log the event or or report the
            // problem to the user
         }
      })
      //…


The most important part is the call() method. The code of this method will be executed before the Subscriber‘s onError() method is called.

In addition to exceptions RxJava offers many more events to which you can react:

Events and their corresponding side effect operations
MethodFunctional InterfaceEvent
doOnSubscribe()Action0A subscriber subscribes to the Observable
doOnUnsubscribe()Action0A subscriber unsubscribes from the subscription
doOnNext()Action1<T>The next item is emitted
doOnCompleted()Action0The Observable will emit no more items
doOnError()Action1<T>An error occurred
doOnTerminate()Action0Either an error occurred or the Observable will emit no more items
doOnEach()Action1<Notification<T>>Either an item was emitted, the Observable completes or an error occurred. The Notification object contains information about the type of event
doOnRequest()Action1<Long>A downstream operator requests to emit more items

The <T> refers either to the type of the item emitted or, in the case of the onError()method, the type of the Throwable thrown.

The functional interfaces are all of type Action0 or Action1. This means that the single methods of these interfaces do not return anything and take either zero arguments or one argument, depending on the specific event.

Since those methods do not return anything, they cannot be used to change the emitted items and thus do not change the stream of items in any way. Instead these methods are intended to cause side effects like writing something on disk, cleaning up state or anything else that manipulates the state of the system itself instead of the stream of events.

Note: The side effect methods themselves (doOnNext(), doOnCompleted() and so on) doreturn an Observable. That’s to keep the interface fluent. But the returned Observable is of the same type and emits the same items as the source Observable.

What Are They Useful For?

Now since they do not change the stream of items there must be other uses for them. I present here three examples of what you can achieve using these methods:

  • Use doOnNext() for debugging
  • Use doOnError() within flatMap() for error handling
  • Use doOnNext() to save/cache network results

So let’s see these examples in detail.

Use doOnNext() for debugging

With RxJava you sometimes wonder why your Observable isn’t working as expected. Especially when you are just starting out. Since you use a fluent API to transform some source into something that you want to subscribe to, you only see what you get at the end of this transformation pipeline.

When I was learning about RxJava I had some initial experience with Java’s Streams. Basically you have the same problem there. You have a fluid API to move from one Stream of some type to another Stream of another type. But what if it doesn’t work as expected?

With Java 8 Streams you have the peek() method. So when starting out with RxJava I wondered if something comparable is available. Well, there is. Actually, RxJava offers much more!

You can use the doOnNext() method anywhere in your processing pipeline to see what is happening and what the intermediary result is.

Here’s an example of this:

Observable someObservable = Observable
            .from(Arrays.asList(new Integer[]{2, 3, 5, 7, 11}))
            .doOnNext(System.out::println)
            .filter(prime -> prime % 2 == 0)
            .doOnNext(System.out::println)
            .count()
            .doOnNext(System.out::println)
            .map(number -> String.format(“Contains %d elements”, number));

Subscription subscription = o.subscribe(
            System.out::println,
            System.out::println,
            () -> System.out.println(“Completed!”));


And here is the output of that code:

2
3
3
5
5
7
7
11
11
4
Contains 4 elements
Completed!

That way you can glean valuable information about what is going on when your Observable doesn’t behave as you expected.

The doOnError() and doOnCompleted() methods can also be useful for debugging the state of your pipeline.

Note: If you’re using RxJava while developing for Android please have a look at the Frodo and Fernando Ceja’s post explaining about the motivation for and usage of Frodo. With Frodo you can use annotations to debug your Observables and Subscribers. 

The shown way of using doOnNext() and doOnError() does not change much of the system state – apart from bloating your log and slowing everything down. 

But there are other uses for these operators. And in those cases you use those methods to actually change the state of your system. Let’s have a look at them.

Use doOnError() within flatMap()

Say you’re using Retrofit to access some resource over the network. Since Retrofit supports observables, you can easily use those calls within your processing chain usingflatMap().

Alas, network related calls can go wrong in many ways – especially on mobiles. In this case you might not want the Observable to stop working, which it would if you were to rely on your subscriber’s onError() callback alone.

But keep in mind that you have an Observable within your flatMap() method. Thus you could use the doOnError() method to change the UI in some way, yet still have a working Observable stream for future events.

So what this looks like is this:

flatMap(id -> service.getPost()
       .doOnError(t -> {
          // report problem to UI
       })
       .onErrorResumeNext(Observable.empty())
)

This method is especially useful if you query your remote resource as a result of potentially recurring UI events.

Use doOnNext() to save/cache network results

If at some point in your chain you make network calls, you could use doOnNext() to store the incoming results to your local database or put them in some cache.

It would be as simple as the following lines:

// getOrderById is getting a fresh order
// from the net and returns an observable of orders
// Observable<Order> getOrderById(long id) {…}

Observable.from(aListWithIds)
         .flatMap(id -> getOrderById(id)
                              .doOnNext(order -> cacheOrder(order))
         // carry on with more processing


See this pattern applied in more detail in Daniel Lew’s excellent blog post aboutaccessing multiple sources.

Wrap Up

As you’ve seen, you can use the side effect methods of RxJava in multiple ways. Even though they do not change the stream of emitted items, they change the state of your overall system. This can be something as simple as logging the current items of yourObservable at a certain point within your processing pipeline, up to writing objects to your database as a result of a network call.

In my next post I am going to show you how to use RxJava’s hooks to get further insights. Stay tuned!

Side effect (computer science) Stream (computing) Event

Published at DZone with permission of Wolfram Rittmeyer. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Handling RxJava Observables in a Web UI
  • Reactive Event Streaming Architecture With Kafka, Redis Streams, Spring Boot, and HTTP Server-Sent Events (SSE)
  • Microsoft Azure Event Hubs
  • Reactive Kafka With Streaming in Spring Boot

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: