Async Abstractions using rx-java
Join the DZone community and get the full member experience.
Join For FreeOne of the big benefits in using Rx-java for me has been the way the code looks exactly the same whether the underlying calls are synchronous or asynchronous and hence the title of this entry.
Consider a very simple use case of a client code making three slow running calls and combines the results into a list:
Since the calls are synchronous the time taken to do this would be additive. To simulate a slow call the following is the type of implementation in each of method calls:
So the first attempt at using rx-java with these implementations is to simply have these long running operations return the versatile type Observable, a bad implementation would look like this:
See how the caller composes the results using the merge method.
However the calls to each of the service calls is still synchronous at this point, to make the call asynch the service calls can be made to use a Thread pool, the following way:
The beauty of the approach is that the calling code of this service is not changed at all, the implementation there remains exactly same as before whereas the service calls are now asynchronous. If you are interested in exploring this sample further, here is a github repo with working examples.
Consider a very simple use case of a client code making three slow running calls and combines the results into a list:
String op1 = service1.operation(); String op2 = service2.operation(); String op3 = service3.operation(); Arrays.asList(op1, op2, op3)
Since the calls are synchronous the time taken to do this would be additive. To simulate a slow call the following is the type of implementation in each of method calls:
public String operation() { logger.info("Start: Executing slow task in Service 1"); Util.delay(7000); logger.info("End: Executing slow task in Service 1"); return "operation1" }
So the first attempt at using rx-java with these implementations is to simply have these long running operations return the versatile type Observable, a bad implementation would look like this:
public Observable<string> operation() { logger.info("Start: Executing slow task in Service 1"); Util.delay(7000); logger.info("End: Executing slow task in Service 1"); return Observable.just("operation 1"); }So with this the caller implementation changes to the following:
Observable<String> op1 = service1.operation(); Observable<String> op2 = service2.operation(); Observable<String> op3 = service3.operation(); Observable<List<String>> lst = Observable.merge(op1, op2, op3).toList();
See how the caller composes the results using the merge method.
However the calls to each of the service calls is still synchronous at this point, to make the call asynch the service calls can be made to use a Thread pool, the following way:
public class Service1 { private static final Logger logger = LoggerFactory.getLogger(Service1.class); public Observable<String> operation() { return Observable.<String>create(s -> { logger.info("Start: Executing slow task in Service 1"); Util.delay(7000); s.onNext("operation 1"); logger.info("End: Executing slow task in Service 1"); s.onCompleted(); }).subscribeOn(Schedulers.computation()); } }subscribeOn uses the specified Scheduler to run the actual operation.
The beauty of the approach is that the calling code of this service is not changed at all, the implementation there remains exactly same as before whereas the service calls are now asynchronous. If you are interested in exploring this sample further, here is a github repo with working examples.
Abstraction (computer science)
Implementation
Use case
GitHub
Merge (version control)
job scheduling
Published at DZone with permission of Biju Kunjummen, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Trending
-
How Agile Works at Tesla [Video]
-
10 Traits That Separate the Best Devs From the Crowd
-
Design Patterns for Microservices: Ambassador, Anti-Corruption Layer, and Backends for Frontends
-
What Is Envoy Proxy?
Comments