Over a million developers have joined DZone.

RxJava in Action with Live Financial Market Data From IB (Part 2)

RxJava can be a really useful JVM extension. Check out this great tutorial of RxJava in action using live data from the financial market.

· IoT Zone

Access the survey results 'State of Industrial Internet Application Development' to learn about latest challenges, trends and opportunities with Industrial IoT, brought to you in partnership with GE Digital.

In this post we will make our observable receive live data from AAPL and GOOG, nothing new here, but with the help of the observeOn operator we can aggregate the tick data of each instrument on its own thread with little effort. Multi-core programming made easy:

  • Subscribe to APPL and GOOG ticks with the IB API
  • Modify marketDataObservable to make the 1-minute bar aggregation on its own thread.

1. Subscribe to APPL and GOOG Ticks With the IB API


Now we will receive ticks from GOOG and AAPL as they get in live.

2. Modify marketDataObservable to Make the 1-minute Bar Aggregation on its Own Thread.

public void aggregateLiveMinuteBar() {

            ofType(LivePriceEvent.class). //filter on live ticks
            groupBy(LivePriceEvent::getInstrument). // group by instrument i.e AAPL, GOOG
            flatMap(grouped -> grouped.buffer(2, 1)). // take each 2 consecutive events
            //until here same as Part 1

            //1. instead of subscribe we use flatMap , to produce a fresh observable
            flatMap(listOf2 ->

                    //2. ---Generate One Minute Bar Task -----
                    Observable.defer(() -> {
                        //Logic to check if the minute has been crossed

                        //simulate heavy computation

                        LivePriceEvent lastEvent = listOf2.get(0);
                        int lastMinute = new DateTime(lastEvent.getCreateTimestamp()).minuteOfHour().get();
                        int currentMinute = new DateTime(listOf2.get(1).getCreateTimestamp()).minuteOfHour().get();
                        //3. when minute is crossed , we create a new observable with the minute in it
                        if (lastMinute != currentMinute) {
                            System.out.println("[generating minute bar]" + Thread.currentThread().getName());
                            return Observable.just(new LiveBarEvent(TimeUnit.MINUTES, lastEvent.createTimestamp, lastEvent.getInstrument(), lastEvent.getPrice()));

                        System.out.println("[ignoring event ]" + Thread.currentThread().getName());
                        return Observable.empty();

                            //6. every Task get executed on the computation thread pool

    ).//7. end of flatmap

            //8. push the result back to make it available for subscribers

2. We use defer to create a fresh observable each time data from a distinct Instrument arrives.

4. and 5. We return also Observables to abide to the defer signature (it needs to return an observable).

6. That's where all the magic happens subscribeOn(Schedulers.computation()) tell RxJava to use the thread pool computation each time someone subscribe to the previous observable effectively this makes all the code from 2. to 5. runs on a new thread.

3. Running it Against IB Demo Feed

Just follow the instructions from the github

$ git clone [https://github.com/dsebban/blog-post-2] rx-ib-2
$ cd rx-ib-2
$ mvn package
$ foreman start

You should see something like this pay attention to the name of the thread you can see it's different each time some data is aggregated , here RxComputationThreadPool-4 and RxComputationThreadPool-5.

daniel@daniel-desktop:~/Projects/dice_bot/blog-post-1$ foreman start
22:41:01 app.1  | INFO: GOOG IB tick Wed Jul 29 22:41:01 IDT 2015 price 628.09
22:41:01 app.1  | [generating minute bar]RxComputationThreadPool-4
22:41:01 app.1  |           [TID:  16 : RxComputationThreadPool-4] LiveBarEvent{min:40-sec:58-ms:537, instrument=GOOG, price=628.121}
22:41:01 app.1  | INFO: minute = 40 val=LiveBarEvent{min:40-sec:58-ms:537, instrument=GOOG, price=628.121}
22:41:03 app.1  | INFO: AAPL IB tick Wed Jul 29 22:41:03 IDT 2015 price 124.77
22:41:03 app.1  | [generating minute bar]RxComputationThreadPool-5
22:41:03 app.1  |           [TID:  17 : RxComputationThreadPool-5] LiveBarEvent{min:40-sec:50-ms:528, instrument=APPL, price=124.781}

To be continued... (Part 3 writing a simple algotrading strategy using RxJava and IB)

The IoT Zone is brought to you in partnership with GE Digital.  Discover how IoT developers are using Predix to disrupt traditional industrial development models.

rxjava,reactive programming,java

The best of DZone straight to your inbox.

Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}