Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Splitting Up “Streams” Using jOOλ, Kotlin, and ReactiveX

DZone's Guide to

Splitting Up “Streams” Using jOOλ, Kotlin, and ReactiveX

How to use reuse streams when using Kotlin, rather than using the Java 8 Streams API, which is designed for parallel execution.

· Java Zone
Free Resource

Microservices! They are everywhere, or at least, the term is. When should you use a microservice architecture? What factors should be considered when making that decision? Do the benefits outweigh the costs? Why is everyone so excited about them, anyway?  Brought to you in partnership with IBM.

The Java 8 Stream API is pretty cool as you can see in my post BFS with Streams. But there is a catch. You cannot reuse Streams. Consider the following code Java-Code:  

final List<String> helloList = 
    Arrays.asList("H", "e", "l", "l", "o", ", ", "W", "o", "r", "l", "d", "!");

final Stream<String> helloStream = helloList.stream();
final Predicate<String> checkUpper = s -> !s.isEmpty() 
    && !s.substring(0,1).toUpperCase().equals(s.substring(0, 1));
helloStream.filter(checkUpper);
helloStream.filter(s -> !checkUpper.test(s));

This results in IllegalStateException: stream has already been operated upon or closed.

The problem with Java 8’s Stream API is that it is designed for parallel execution. This decision introduced some constraints. Unfortunately, there is no sequential only-switch. What you can do is collect the results with groupBy into a map and then create two new streams from that. But collecting is a terminal operation, not lazy, and therefore inefficient (especially in combination, with early-exit operations like limit).

You can also try to do the first filter, chain it with a peek, and finally do the second filter. But since only elements matching the first filter will reach the second filter (i.e. a && !a which is equal to false), you won’t get any elements past the second filter. If you have a so called cold source (i.e. like a collection), you can just use two different streams which results in two iterations. But for hot sources (like a network or file i/o stream), this is not that easy. A possible solution is to cache the input in a collection, i.e., cool it down. But this comes with a space and performance penalty. So let us see, what our options are…

I couldn’t resist and tried it in Kotlin 1.0 with their sequences:

val helloArray = arrayOf("H", "e", "l", "l", "o", ", ", "W", "o", "r", "l", "d", "!")

val helloSeq = helloArray.asSequence()
val checkUpper: (String) -> Boolean = { it.firstOrNull()?.isUpperCase() ?: false }
val upperSeq = helloSeq.filter(checkUpper)
val lowerSeq = helloSeq.filter { !checkUpper(it) }.map(String::toUpperCase)

println(upperSeq.joinToString())
println(lowerSeq.joinToString())

And hey, it works. Output:

H, W
E, L, L, O, , , O, R, L, D, !

But for Java we have a solution, too. You can use jOOλ, which provides some extensions to the Stream API in particular for sequential streams via the Seq interface. Reusing streams does not work here either. But we have nice additional (lazy) operations like partition:

final Tuple2<Seq<String>, Seq<String>> partition = Seq.of("H", "e", "l", "l", "o", ", ", "W", "o", "r", "l", "d", "!").partition(checkUpper);
System.out.println(partition.v1().map(String::toUpperCase).toString(" "));
System.out.println(partition.v2().map(String::toLowerCase).toString(" "));

This works and results in the following output:

E L L O O R L D
h ,  w !

Of course we can grab even deeper into the toy/tool box and use RXJava and the groupBy feature or more precisely RXKotlin in this case (a Java implementation is quite similar, but uses much uglier syntax):

val helloArray = arrayOf("H", "e", "l", "l", "o", ", ", "W", "o", "r", "l", "d", "!")
Observable.from(helloArray)
        .groupBy {
            it.firstOrNull()?.isUpperCase() ?: false
        }.subscribe {
    if (it.key) {
       it.subscribe { print("$it") }
    }
    else {
        it.map { it.toUpperCase() }.subscribe {
            print("$it")
        }
    }
}

The output is:

hELLO, wORLD!

Using ReactiveX might be overkill in many situations, especially, if you would like to collect the result into some kind of collection. ReactiveX is more a (data-driven) programming paradigm, where observers can react to emissions of observables. So check first what you would like to accomplish before you switch to reactiveX as (after beginning to use it) it eventually will change the complete control flow of your application. This might be good or not :).

Exciting.

Discover how the Watson team is further developing SDKs in Java, Node.js, Python, iOS, and Android to access these services and make programming easy. Brought to you in partnership with IBM.

Topics:
java ,streams ,kotlin

Published at DZone with permission of Johannes Neubauer, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}