Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

RxJava in Action with Live Financial Market Data From IB (Part 2)

DZone's Guide to

RxJava in Action with Live Financial Market Data From IB (Part 2)

RxJava can be a really useful JVM extension. Check out this great tutorial of RxJava in action using live data from the financial market.

· IoT Zone
Free Resource

Download Red Hat’s blueprint for building an open IoT platform—open source from cloud to gateways to devices.

In this post we will make our observable receive live data from AAPL and GOOG, nothing new here, but with the help of the observeOn operator we can aggregate the tick data of each instrument on its own thread with little effort. Multi-core programming made easy:

  • Subscribe to APPL and GOOG ticks with the IB API
  • Modify marketDataObservable to make the 1-minute bar aggregation on its own thread.

1. Subscribe to APPL and GOOG Ticks With the IB API

 InteractiveBrokersFeed.getInstance().connect();
 InteractiveBrokersFeed.getInstance().subscribeRealTimeData(Instruments.APPL.val());
 InteractiveBrokersFeed.getInstance().subscribeRealTimeData(Instruments.GOOG.val());

Now we will receive ticks from GOOG and AAPL as they get in live.

2. Modify marketDataObservable to Make the 1-minute Bar Aggregation on its Own Thread.

public void aggregateLiveMinuteBar() {

    observable().
            ofType(LivePriceEvent.class). //filter on live ticks
            groupBy(LivePriceEvent::getInstrument). // group by instrument i.e AAPL, GOOG
            flatMap(grouped -> grouped.buffer(2, 1)). // take each 2 consecutive events
            //until here same as Part 1


            //1. instead of subscribe we use flatMap , to produce a fresh observable
            flatMap(listOf2 ->

                    //2. ---Generate One Minute Bar Task -----
                    Observable.defer(() -> {
                        //Logic to check if the minute has been crossed

                        //simulate heavy computation
                        sleepALittle();

                        LivePriceEvent lastEvent = listOf2.get(0);
                        int lastMinute = new DateTime(lastEvent.getCreateTimestamp()).minuteOfHour().get();
                        int currentMinute = new DateTime(listOf2.get(1).getCreateTimestamp()).minuteOfHour().get();
                        //3. when minute is crossed , we create a new observable with the minute in it
                        if (lastMinute != currentMinute) {
                            System.out.println("[generating minute bar]" + Thread.currentThread().getName());
                            //4.
                            return Observable.just(new LiveBarEvent(TimeUnit.MINUTES, lastEvent.createTimestamp, lastEvent.getInstrument(), lastEvent.getPrice()));
                        }

                        System.out.println("[ignoring event ]" + Thread.currentThread().getName());
                        //5.
                        return Observable.empty();

                    }).
                            //6. every Task get executed on the computation thread pool
                            subscribeOn(Schedulers.computation()).
                            doOnEach(Utils::printInfo)


    ).//7. end of flatmap

            //8. push the result back to make it available for subscribers
            subscribe(this::push);
}

2. We use defer to create a fresh observable each time data from a distinct Instrument arrives.

4. and 5. We return also Observables to abide to the defer signature (it needs to return an observable).

6. That's where all the magic happens subscribeOn(Schedulers.computation()) tell RxJava to use the thread pool computation each time someone subscribe to the previous observable effectively this makes all the code from 2. to 5. runs on a new thread.

3. Running it Against IB Demo Feed

Just follow the instructions from the github

$ git clone [https://github.com/dsebban/blog-post-2] rx-ib-2
$ cd rx-ib-2
$ mvn package
$ foreman start

You should see something like this pay attention to the name of the thread you can see it's different each time some data is aggregated , here RxComputationThreadPool-4 and RxComputationThreadPool-5.

daniel@daniel-desktop:~/Projects/dice_bot/blog-post-1$ foreman start
22:41:01 app.1  | INFO: GOOG IB tick Wed Jul 29 22:41:01 IDT 2015 price 628.09
22:41:01 app.1  | [generating minute bar]RxComputationThreadPool-4
22:41:01 app.1  |           [TID:  16 : RxComputationThreadPool-4] LiveBarEvent{min:40-sec:58-ms:537, instrument=GOOG, price=628.121}
22:41:01 app.1  | INFO: minute = 40 val=LiveBarEvent{min:40-sec:58-ms:537, instrument=GOOG, price=628.121}
22:41:03 app.1  | INFO: AAPL IB tick Wed Jul 29 22:41:03 IDT 2015 price 124.77
22:41:03 app.1  | [generating minute bar]RxComputationThreadPool-5
22:41:03 app.1  |           [TID:  17 : RxComputationThreadPool-5] LiveBarEvent{min:40-sec:50-ms:528, instrument=APPL, price=124.781}

To be continued... (Part 3 writing a simple algotrading strategy using RxJava and IB)

Build an open IoT platform with Red Hat—keep it flexible with open source software.

Topics:
rxjava ,reactive programming ,java

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}