Rx-java subscribeOn and observeOn
Join the DZone community and get the full member experience.
Join For FreeIf you have been confused by Rx-java ObservablesubscribeOn and observeOn, one of the blog articles that helped me understand these operations is this one by Graham Lea. I wanted to recreate a very small part of the article here, so consider a service which emits values every 200 millseconds:
package obs.threads; import obs.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; public class GeneralService { private static final Logger logger = LoggerFactory.getLogger(GeneralService.class); public Observable<String> getData() { return Observable.<String>create(s -> { logger.info("Start: Executing a Service"); for (int i = 1; i <= 3; i++) { Util.delay(200); logger.info("Emitting {}", "root " + i); s.onNext("root " + i); } logger.info("End: Executing a Service"); s.onCompleted(); }); } }
Now, if I were to subscribe to this service, this way:
@Test public void testThreadedObservable1() throws Exception { Observable<String> ob1 = aService.getData(); CountDownLatch latch = new CountDownLatch(1); ob1.subscribe(s -> { Util.delay(500); logger.info("Got {}", s); }, e -> logger.error(e.getMessage(), e), () -> latch.countDown()); latch.await(); }
All of the emissions and subscriptions will act on the main thread and something along the following lines will be printed:
20:53:29.380 [main] INFO o.t.GeneralService - Start: Executing a Service 20:53:29.587 [main] INFO o.t.GeneralService - Emitting root 1 20:53:30.093 [main] INFO o.t.ThreadedObsTest - Got root 1 20:53:30.298 [main] INFO o.t.GeneralService - Emitting root 2 20:53:30.800 [main] INFO o.t.ThreadedObsTest - Got root 2 20:53:31.002 [main] INFO o.t.GeneralService - Emitting root 3 20:53:31.507 [main] INFO o.t.ThreadedObsTest - Got root 3 20:53:31.507 [main] INFO o.t.GeneralService - End: Executing a Service
By default the emissions are not asynchronous in nature. So now, what is the behavior if subscribeOn is used:
public class ThreadedObsTest { private GeneralService aService = new GeneralService(); private static final Logger logger = LoggerFactory.getLogger(ThreadedObsTest.class); private ExecutorService executor1 = Executors.newFixedThreadPool(5, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build()); @Test public void testSubscribeOn() throws Exception { Observable<String> ob1 = aService.getData(); CountDownLatch latch = new CountDownLatch(1); ob1.subscribeOn(Schedulers.from(executor1)).subscribe(s -> { Util.delay(500); logger.info("Got {}", s); }, e -> logger.error(e.getMessage(), e), () -> latch.countDown()); latch.await(); } }
Here I am using Guava's ThreadFactoryBuilder to give each thread in the threadpool a unique name pattern, if I were to execute this code, the output will be along these lines:
20:56:47.117 [SubscribeOn-0] INFO o.t.GeneralService - Start: Executing a Service 20:56:47.322 [SubscribeOn-0] INFO o.t.GeneralService - Emitting root 1 20:56:47.828 [SubscribeOn-0] INFO o.t.ThreadedObsTest - Got root 1 20:56:48.032 [SubscribeOn-0] INFO o.t.GeneralService - Emitting root 2 20:56:48.535 [SubscribeOn-0] INFO o.t.ThreadedObsTest - Got root 2 20:56:48.740 [SubscribeOn-0] INFO o.t.GeneralService - Emitting root 3 20:56:49.245 [SubscribeOn-0] INFO o.t.ThreadedObsTest - Got root 3 20:56:49.245 [SubscribeOn-0] INFO o.t.GeneralService - End: Executing a Service
Now, the execution has moved away from the main thread and the emissions and the subscriptions are being processed in the threads borrowed from the threadpool.
And what happens if observeOn is used:
public class ThreadedObsTest { private GeneralService aService = new GeneralService(); private static final Logger logger = LoggerFactory.getLogger(ThreadedObsTest.class); private ExecutorService executor1 = Executors.newFixedThreadPool(5, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build()); @Test public void testObserveOn() throws Exception { Observable<String> ob1 = aService.getData(); CountDownLatch latch = new CountDownLatch(1); ob1.observeOn(Schedulers.from(executor2)).subscribe(s -> { Util.delay(500); logger.info("Got {}", s); }, e -> logger.error(e.getMessage(), e), () -> latch.countDown()); latch.await(); } }
the output is along these lines:
21:03:08.655 [main] INFO o.t.GeneralService - Start: Executing a Service 21:03:08.860 [main] INFO o.t.GeneralService - Emitting root 1 21:03:09.067 [main] INFO o.t.GeneralService - Emitting root 2 21:03:09.268 [main] INFO o.t.GeneralService - Emitting root 3 21:03:09.269 [main] INFO o.t.GeneralService - End: Executing a Service 21:03:09.366 [ObserveOn-1] INFO o.t.ThreadedObsTest - Got root 1 21:03:09.872 [ObserveOn-1] INFO o.t.ThreadedObsTest - Got root 2 21:03:10.376 [ObserveOn-1] INFO o.t.ThreadedObsTest - Got root 3
The emissions are now back on the main thread but the subscriptions are being processed in a threadpool.
That is the difference, when subscribeOn is used the emissions are performed on the specified Scheduler, when observeOn is used the subscriptions are performed are on the specified scheduler!
And the output when both are specified is equally predictable. Now in all cases I had created a Scheduler using a ThreadPool with 5 threads but only 1 of the threads has really been used both for emitting values and for processing subscriptions, this is actually the normal behavior of Observables. If you want to make more efficient use of the Threadpool, one approach may be to create multiple Observable's, say for eg, if I have a service which returns pages of data this way:
public Observable<Integer> getPages(int totalPages) { return Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { logger.info("Getting pages"); for (int i = 1; i <= totalPages; i++) { subscriber.onNext(i); } subscriber.onCompleted(); } }); }
and another service which acts on each page of the data:
public Observable<String> actOnAPage(int pageNum) { return Observable.<String>create(s -> { Util.delay(200); logger.info("Acting on page {}", pageNum); s.onNext("Page " + pageNum); s.onCompleted(); }); }
a way to use a Threadpool to process each page of data would be to chain it this way:
getPages(5).flatMap( page -> aService.actOnAPage(page).subscribeOn(Schedulers.from(executor1)) ) .subscribe(s -> { logger.info("Completed Processing page: {}", s); });
see how the subscribeOn is on the each Observable acting on a page. With this change, the output would look like this:
21:15:45.572 [main] INFO o.t.ThreadedObsTest - Getting pages 21:15:45.787 [SubscribeOn-1] INFO o.t.GeneralService - Acting on page 2 21:15:45.787 [SubscribeOn-0] INFO o.t.GeneralService - Acting on page 1 21:15:45.787 [SubscribeOn-4] INFO o.t.GeneralService - Acting on page 5 21:15:45.787 [SubscribeOn-3] INFO o.t.GeneralService - Acting on page 4 21:15:45.787 [SubscribeOn-2] INFO o.t.GeneralService - Acting on page 3 21:15:45.789 [SubscribeOn-1] INFO o.t.ThreadedObsTest - Completed Processing page: Page 2 21:15:45.790 [SubscribeOn-1] INFO o.t.ThreadedObsTest - Completed Processing page: Page 1 21:15:45.790 [SubscribeOn-1] INFO o.t.ThreadedObsTest - Completed Processing page: Page 3 21:15:45.790 [SubscribeOn-1] INFO o.t.ThreadedObsTest - Completed Processing page: Page 4 21:15:45.791 [SubscribeOn-1] INFO o.t.ThreadedObsTest - Completed Processing page: Page 5
Published at DZone with permission of Biju Kunjummen, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments