Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Async Abstractions using rx-java

DZone's Guide to

Async Abstractions using rx-java

· Java Zone ·
Free Resource

Verify, standardize, and correct the Big 4 + more– name, email, phone and global addresses – try our Data Quality APIs now at Melissa Developer Portal!

One of the big benefits in using  Rx-java for me has been the way the code looks exactly the same whether the underlying calls are synchronous or asynchronous and hence the title of this entry.

Consider a very simple use case of a client code making three slow running calls and combines the results into a list:

String op1 = service1.operation();
String op2 = service2.operation();
String op3 = service3.operation();
Arrays.asList(op1, op2, op3)

Since the calls are synchronous the time taken to do this would be additive. To simulate a slow call the following is the type of implementation in each of method calls:

public String operation() {
    logger.info("Start: Executing slow task in Service 1");
    Util.delay(7000);
    logger.info("End: Executing slow task in Service 1");
    return "operation1"
}

So the first attempt at using rx-java with these implementations is to simply have these long running operations return the versatile type  Observable, a bad implementation would look like this:

public Observable<string> operation() {
    logger.info("Start: Executing slow task in Service 1");
    Util.delay(7000);
    logger.info("End: Executing slow task in Service 1");
    return Observable.just("operation 1");
}
So with this the caller implementation changes to the following:

Observable<String> op1 = service1.operation();
Observable<String> op2 = service2.operation();
Observable<String> op3 = service3.operation();

Observable<List<String>> lst = Observable.merge(op1, op2, op3).toList();


See how the caller composes the results using the  merge method. 

However the calls to each of the service calls is still synchronous at this point, to make the call asynch the service calls can be made to use a Thread pool, the following way:

public class Service1 {
    private static final Logger logger = LoggerFactory.getLogger(Service1.class);
    public Observable<String> operation() {
        return Observable.<String>create(s -> {
            logger.info("Start: Executing slow task in Service 1");
            Util.delay(7000);
            s.onNext("operation 1");
            logger.info("End: Executing slow task in Service 1");
            s.onCompleted();
        }).subscribeOn(Schedulers.computation());
    }
}
subscribeOn uses the specified  Scheduler to run the actual operation.

The beauty of the approach is that the calling code of this service is not changed at all, the implementation there remains exactly same as before whereas the service calls are now asynchronous. If you are interested in exploring this sample further,  here is a github repo with working examples.

Developers! Quickly and easily gain access to the tools and information you need! Explore, test and combine our data quality APIs at Melissa Developer Portal – home to tools that save time and boost revenue. 

Topics:

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}