A Quick Introduction to Reactive Java: From Reactive Programming to RxJava in Action
The RxJava library aids developers with writing resilient, concurrent, asynch apps. Here's a look at RxJava basics, and a real world examole.
Join the DZone community and get the full member experience.
Join For FreeRxJava is a library that helps programmers to write asynchronous, concurrent, and resilient applications. Using RxJava you write programs in reactive programming paradigm. In this article, I will be providing a quick introduction to reactive programming and RxJava.
Before we dive into more details, let’s see a real world example.
A Real World Example
Suppose you go to an ATM (Automated Teller Machine) to withdraw some cash. You insert your debit card into the machine, enter pin code, enter the amount you want to withdraw and hit the done button. After hitting the button there are two possible outcomes:
- Either the ATM has the requested amount of cash, in which case it will dispense the cash to you. Once all the money is dispensed it will signal to you with a message about the successful transaction completion.
- Or the ATM does not have sufficient cash left, in which case it will signal with a message of transaction failure.
In this example, the ATM is a source, of cash, and you are the consumer, of cash. Based on the transaction details the cash flows from the ATM to you when you hit the done button. I will use this example to explain the concepts below.
What is Reactive Programming
It is a style of programming where you define a source of data and a consumer of that data. Once you connect the consumer to the source, the library (which in this blog is RxJava) takes care of pushing the data, generated by the source, to the consumer.
The above definition talks about three important things. I will be explaining each of these in detail.
- Source of data
- Consumer of data
- Connecting Consumer to Source
RxJava in Action
Let’s understand each of the above-mentioned points using an example code written using RxJava. I have intentionally made the code verbose to highlight the details. In the end of this blog, I have also provided a concise version of the same code.
// defining the source
Observable<Integer> source = Observable.range(1, 5);
// defining the consumer
Subscriber<Integer> consumer = new Subscriber<Integer>() {
@Override
public void onNext(Integer number) { System.out.println(number); }
@Override
public void onError(Throwable e) { System.out.println("error"); }
@Override
public void onCompleted() { System.out.println("completed"); }
};
// connecting the consumer to source
source.subscribe(consumer);
Source of Data
In ATM example, the machine along with the configured transaction details serves as the source. Similarly, in the code example Observable<T>
represents a source. An Observable can be created using one of the many factory methods it provides. Observable.range(int start, int count)
is one of them. In the example above the source will emit five numbers, starting from 1 through 5, and then finish.
Consumer of Data
Subscriber<T>
serves as a consumer of data. RxJava uses onNext(T data)
method on the Subscriber to push the data emitted by the source, the Observable, to the consumer of data, the Subscriber. In the example above the consumer will print each received number onto the console. This is similar to the ATM dispensing bills/banknotes of different denominations.
Once all the data is emitted by the source, RxJava signals the completion using onComplete()
method on the Subscriber. In the example above the consumer just prints completely. In the ATM example, completion is signalled using a successful transaction message.
If there is any error observed during emission of data, RxJava forwards the error to onError(Throwable e)
method on the Subscriber. In the example above the consumer is handles the exception by printing error onto the console. In the ATM example, the error is signalled using a transaction failure message.
Connecting Consumer to Source
This is established using the subscribe(Subscriber s)
method on Observable. In RxJava, the computations defined as part of Observable and Subscriber get executed only when the connection between the two is established. This means that the computations are lazy. In the example above the source will start emitting numbers only when a consumer is subscribed to it. In the ATM example pressing the done button after configuring transaction details is analogous to the subscribing action. Till then no cash is dispensed by the machine.
A concise version of above code written using Java 8 will be something like below.
Observable.range(1, 5).subscribe(
number -> System.out.println(number),
error -> System.out.println("error"),
() -> System.out.println("completed")
);
Summary
We saw that when writing programs using RxJava we have to define an Observable
and a Subscriber
and then connect the two using the subscribe
method on Observable. From here the Observable starts emitting data and RxJava starts pushing the data received from the Observable onto the Subscriber. I hope that this introduction should be enough to get you started with RxJava. I have created a repository here which you can use to experiment with RxJava.
In upcoming blog posts, I will be explaining concepts around concurrency, composition and resilience in the context of RxJava.
Published at DZone with permission of Praveer Gupta. See the original article here.
Opinions expressed by DZone contributors are their own.
Trending
-
Opportunities for Growth: Continuous Delivery and Continuous Deployment for Testers
-
Decoding ChatGPT: The Concerns We All Should Be Aware Of
-
Understanding Data Compaction in 3 Minutes
-
Top 10 Pillars of Zero Trust Networks
Comments