Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Error Handling in RxJava/RxKotlin

DZone's Guide to

Error Handling in RxJava/RxKotlin

Let's check out the error handling options at your disposal and some recommended practices when working with reactive streams.

· Java Zone ·
Free Resource

Get the Edge with a Professional Java IDE. 30-day free trial.

If you’ve worked with RxJava/RxKotlin, you might be familiar with three methods of a subscriber: onNext, onError, and onComplete. In a reactive stream, elements are consumed by onNext first, and onComplete is called when the stream ends. If you encounter any error in onNext, the complete observable chain is abandoned and control is shifted to the onError method. So, for example, the below code will run perfectly:

Observable.fromArray(1,2,3,4)
    .subscribeBy(
        onNext = {
            println(it)
        },
        onComplete = {
            println("Completed")
        },
        onError = {
            println(it.message)
        }
    )
}


The output will be:

1
2
3
4
Completed


Now suppose we encounter an exception in onNext — the whole observable chain is dropped. In the below code, we will deliberately induce an exception when the input is 2:

Observable.fromArray(1,2,3,4)
    .subscribeBy(
        onNext = {
            if (it==2) {
                throw (RuntimeException("Exception on 2"))
            }
            println(it)
        },
        onComplete = {
            println("Completed")
        },
        onError = {
            println(it.message)
        }
    )


Output:

1
Exception on 2


This is all well and good if you encounter an error in the subscriber, but what if you encounter an error in one of the operators above it? It could be in a map operator, or in a flatMap operator. The point is, what do you do if you encounter an error before it makes its way to the subscriber? There are so many things that we can do in RxJava if we encounter an error. The first one we will be talking about is onExceptionResumeNext():

onExceptionResumeNext

To understand this one, we need to know about doOnNext first. You might be thinking that it sounds a lot like the onNext of a subscriber. That’s right! doOnNext is basically for side-effects. The items will go to doOnNext before they get finally consumed by the onNext method of the observer. So doOnNext is a great place for debugging your items in the stream. Now back to onExceptionResumeNext, if we encounter any exception in the observable chain (before they make their way to the observer methods), we can use this method to plug in another observable. Let’s have a look at the below code:

Observable.fromArray(1,2,3)
    .doOnNext {
        if (it==2) {
            throw (RuntimeException("Exception on 2"))
        }
    }
    .onExceptionResumeNext(Observable.just(10))
    .subscribeBy(
        onNext = {
            println(it)
        }, onError = {
            println(it.message)
        }, onComplete = {
            println("Complete")
        }
    )


Output:

1
10
Complete


In the code above, we deliberately induced an exception when the item is 2 (before it makes its way to get consumed by the observer) in doOnNext, and when the exception is triggered, we plugged in another observable Observable.just(10), which starts another stream of items, which gets consumed by the previous observer.

One of the use cases of this can be a fallback mechanism. Suppose you want to get the latest news from an API, and if there is an error (suppose the network connection is down), you can plug in an observable that takes data from your local database. This way, you are still showing news (even though it is stale).

onErrorResumeNext

onErrorResumeNext is quite similar to onExceptionResumeNext, but for general errors. onExceptionResumeNext handles throwables of java.lang.Exception. Meanwhile, onErrorResumeNext can handle errors of type java.lang.Throwable and java.lang.Error. Check out the image below that describes onErrorResumeNext.

Image title

doOnError

This is quite similar to doOnNext. It can be used to intercept an error before the error makes its way to the consumer. Let’s have a look at an example:

Observable.fromArray(1,2,3)
    .doOnNext {
        if (it==2) {
            throw (RuntimeException("Exception on 2"))
        }
    }
    .doOnError {
        println("Doing on error")
    }
    .subscribeBy(
        onNext = {
            println(it)
        }, onError = {
            println(it.message)
        }, onComplete = {
            println("Complete")
        }
    )


Output:

1
Doing on error
Exception on 2


So as you can see, the block doOnError was run before the onError of the observer. It’s a great place to do some stuff in case you are about to receive an error. This could include showing a toast message or a snackbar informing about the error.

onErrorReturnItem

As the name suggests, it just returns a value if an error is encountered. Let’s see the below example:

Observable.fromArray(1,2,3)
    .doOnNext {
        if (it==2) {
            throw (RuntimeException("Exception on 2"))
        }
    }
    .onErrorReturnItem(-1)
    .subscribeBy(
        onNext = {
            println(it)
        }, onError = {
            println(it.message)
        }, onComplete = {
            println("Complete")
        }
    )


Output:

1
-1
Complete


As you can see, after encountering an error, it returns “-1”. The placing of onErrorReturnItem is crucial. It needs to be downstream from where the error occurs. So in the example above, if you had placed onErrorReturnItem before doOnNext, it wouldn’t have returned the “-1”.

onErrorReturn

Sometimes, you need to produce the default item (in case you encounter an error) dynamically. So onErrorReturn provides you throwable and a lambda, which you can use to determine the value you want to return.

Observable.fromArray(1,2,3)
    .doOnNext {
        if (it==2) {
            throw (RuntimeException("Exception on 2"))
        }
    }
    .onErrorReturn {
        t: Throwable ->
            if (t.message=="<something you want>") {
                1 // Return a value based on error type
            } else {
                100 // Return another value based on different error type
            }
    }
    .subscribeBy(
        onNext = {
            println(it)
        }, onError = {
            println(it.message)
        }, onComplete = {
            println("Complete")
        }
    )


Similar to onErrorReturnItem, the position of onErrorReturn also matters. It needs to be downstream of the error in order to fall into the onErrorReturn lambda.

retry()

This is another way of error handling. It simply resubscribes the preceding Observable. You need to take care while using this operator. Let’s take a previous example here with a retry operator:

Observable.fromArray(1,2,3)
    .doOnNext {
        if (it==2) {
            throw (RuntimeException("Exception on 2"))
        }
    }
    .retry()
    .subscribeBy(
        onNext = {
            println(it)
        }, onError = {
            println(it.message)
        }, onComplete = {
            println("Complete")
        }
    )


Output:

1
1
1
1
....(infinite loop)


As you can see, it went into an infinite loop since the retry operator is resubscribing to the Observable, which is getting an exception at “2”.

To overcome this situation, you can also provide a number and the retry attempt will be reduced to that fixed number. It can be done as below:

Observable.fromArray(1,2,3)
    .doOnNext {
        if (it==2) {
            throw (RuntimeException("Exception on 2"))
        }
    }
    .retry(3)
    .subscribeBy(
        onNext = {
            println(it)
        }, onError = {
            println(it.message)
        }, onComplete = {
            println("Complete")
        }
    )


Doing this will reduce the retry attempt to 3. There are other variations like retryUntil and retryWhen which are quite similar. You can read more on that at http://reactivex.io/documentation/operators/retry.html.

If you like this article, please share it to help others.

Also, you can follow me on Twitter.

Also, check out these other articles on RxJava:

Get the Java IDE that understands code & makes developing enjoyable. Level up your code with IntelliJ IDEA. Download the free trial.

Topics:
java ,rxjava ,rxkotlin ,reactive streams ,tutorial

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}