Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Parallel RxJava and Spock Oddity

DZone's Guide to

Parallel RxJava and Spock Oddity

Check out this post where we investigate a peculiarity that can arise when using RxJava and Spock together. Come take a look with us to learn more.

· Java Zone ·
Free Resource

Java-based (JDBC) data connectivity to SaaS, NoSQL, and Big Data. Download Now.

I use RxJava a lot nowadays — back in the day before I joined Netflix, I was struggling with it a bit and mostly watched from the sideline. But now, I find myself pulling in that dependency in a lot of the Java code that I’m writing. And, I also use the Spock framework more for testing. I know there are a lot of Mockito die-hards out there, but, for me, Spock feels like the closest to writing a test spec, and also, because of Groovy, I end up writing much (much!) less code.

And, it was in one of this projects that I came across an oddity with this combination and thought to share it.

Consider this scenario: you have a collection of inputs arriving in your code and you need to do some processing. Since the processing of each input is independent of anything else, I’m thinking I could do it in parallel — and this is where RxJava comes in handy. If I just use flatMap for each input and inside the flatMap I do a subscribeOn (let’s say io scheduler, in this case), I’m making a call over the wire. However. I can assure you that computation and most of the other schedulers will have the same behavior.

If we assume that our input data is a String, you could end up something like this:

public String parallelize(List<String> lst) {
 return Observable.from(lst)
                  .flatMap(s ->
                                    Observable.just(s)
                                              .map(this::lower)
                                              .map(str -> "STRING " + str)
                                              .subscribeOn(Schedulers.computation())
                  )
                  .reduce("", (prev, another) -> prev + another)
                  .toBlocking()
                  .single();
}


As you can see, for the purpose of this exercise, I have just assumed that the “processing” means lowercasing a string via a handcrafted method. Notice here that I am NOT using String.toLowerCase and then some string concatenation. And, at the end, after all the inputs have been processing, I’m just concatenating them together.

The only tricky bit here is that I’m doing the  subscribeOn inside the flatMap — this triggers parallel processing of these inputs; without it, the order, as well as parallelism level, would be not guaranteed and more often than not these would NOT be processed in parallel.

Now, you can test the code for yourself, and you will see it runs absolutely fine if you run the main() method. Make sure you pass some string parameters in the command line, so the code is actually okay to use.

The problem, however, starts exhibiting itself when I write a unit test for this using Spock (and CGLIB). If I write a test like this:

def "parallel"() {
  List<String> lst = ["ABC", "XYz", "one"]
  def x = new ParallelRx()

  when:
    def r = x.parallelize(lst)

  then:
    r == "STRING xyzSTRING abcSTRING one"
}


Then, actually, everything works fine. The problem starts when I change to using a Spy:

def "parallel"() {
  List<String> lst = ["ABC", "XYz", "one"]
  def x = Spy(ParallelRx)

  when:
    def r = x.parallelize(lst)

  then:
    r == "STRING xyzSTRING abcSTRING one"
}


You will notice that your test never finish running! Even if you add mocks to the methods using the Spy, it doesn’t make a difference:

def "parallel"() {
  List<String> lst = ["ABC", "XYz", "one"]
  def x = Spy(ParallelRx)

  when:
    def r = x.parallelize(lst)

  then:
    r == "STRING xyzSTRING abcSTRING one"
    1 * x.lower("ABC") >> "abc"
    1 * x.lower("XYz") >> "xyz"
    1 * x.lower("one") >> "one"
}


This would render the same result. As I said before also, you can try other schedulers, and they all fail with the exception of trampoline! And, this was my first hint that something somewhere gets screwed up. Trampoline basically executes everything in the same thread. I tried debugging this, and I got as far as noticing that the BlockingObservable in RxJava has a countdown latch, which it waits on. It seems that if we are using a single thread model, the operations, which lock this latch, unlocks it and waits for it happen in the right order. Once we start using a different scheduler with multiple threads, something gets messed up. I suspect it’s the proxy-ing that Spock does via CGLIB, but I have found it difficult to confirm that.

Bottom line: using a parallel Rx approach like the one above can only be tested without using Spy instances. There are, of course, ways around this by pulling out the code you are trying to spy on in a different class, etc., but if you take an approach like the one above, you will find out that your tests run forever!

The code for this post is on GitHub.

Connect any Java based application to your SaaS data.  Over 100+ Java-based data source connectors.

Topics:
mockito ,unit test ,groovy ,java ,spock ,rxjava ,code

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}