Over a million developers have joined DZone.

A Creative Way to Use Reactive Extensions in a Common Testing Setup

Check out this creative testing setup that uses reactive programming to handle an event stream.

· Java Zone

Check out this 8-step guide to see how you can increase your productivity by skipping slow application redeploys and by implementing application profiling, as you code! Brought to you in partnership with ZeroTurnaround.

I don’t know if events became part of software engineering since Graphical-User Interface interactions, but for sure they are a very convenient way of modeling them. With more and more interconnected systems, asynchronous event management has become an important issue to tackle. With functional programming also on the raise, this gave birth to libraries such as RxJava. However, modeling a problem that handles a stream of events shouldn’t be restricted to system events handling. It can also be used in testing in many different ways.

One common use-case for a testing setup is to launch a program, for example an external dependency such as a mock server. In this case, we need to wait until the program has been successfully launched. On the contrary, the test should stop as soon as the external program launch fails. If the program has a Java API, that’s easy. However, this is rarely the case and the more basic API are generally used, such as ProcessBuilder or Runtime.getRuntime().exec():

ProcessBuilder builder;

protected void setUp() throws IOException {
    builder = new ProcessBuilder().command("script.sh");
    process = builder.start();

protected void tearDown() {

The traditional way to handle this problem was to put a big Thread.sleep() just after the launch. Not only was it system dependent as the launch time changed from system to system, it didn’t tackle the case where the launch failed. In this later case, precious computing time as well as manual relaunch time were lost. Better solutions exist, but they involve a lot of lines of code as much at some (or a high) degree of complexity. Wouldn’t it be nice if we could have a simple and reliable way to start the program and depending on the output, either continue the setup or fail the test? Rx to the rescue!

The first step is to create an Observable around the launched process’s input stream:

Observable<String> observable = Observable.create(subscriber -> {
    InputStream stream = process.getInputStream();
    try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) {
        String line;
        while ((line = reader.readLine()) != null) {
    } catch (Exception e) {

In the above snippet:

  • Each time the process writes on the output, a next event is sent
  • When there is no more output, a complete event is sent
  • Finally, if an exception happens, it’s an error event

By Rx definition, any of the last two events mark the end of the sequence.

Once the observable has been created, it just needs to be observed for events. A simple script that emits a single event can be listened to with the following snippet:

BlockingObservable<String> blocking = observable.toBlocking();

The thing of interest here is the wrapping of the Observable instance in a BlockingObservable. While the former can be combined together, the later adds methods to manage events. At this point, the first() method will listen to the first (and single) event.

For a more complex script that emits a random number of regular events terminated by a single end event, a code snippet could be:

BlockingObservable<String> blocking = observable

In this case, whatever the number of regular events, the filter() method provides the way to listen to the only event we are interested in.

Previous cases do not reflect the reality, however. Most of the time, setup scripts should start before and run in parallel to the tests i.e. the end event is never sent – at least until after tests have finished. In this case, there are some threading involved. Rx let it handle that quite easily:

BlockingObservable<String> blocking = observable

There is a simple difference there: the subscriber will listen on a new thread thanks to the subscribeOn() method. Alternatively, events could have been emitted on another thread with the observeOn() method. Note I replaced the filter() method with take() to pretend to be interested only in the first 5 events.

At this point, the test setup is finished. Happy testing!

The Java Zone is brought to you in partnership with ZeroTurnaround. Check out this 8-step guide to see how you can increase your productivity by skipping slow application redeploys and by implementing application profiling, as you code!

java,reactive programming

Published at DZone with permission of Nicolas Frankel, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}