DZone
Java Zone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
  • Refcardz
  • Trend Reports
  • Webinars
  • Zones
  • |
    • Agile
    • AI
    • Big Data
    • Cloud
    • Database
    • DevOps
    • Integration
    • IoT
    • Java
    • Microservices
    • Open Source
    • Performance
    • Security
    • Web Dev
DZone > Java Zone > Splitting Up “Streams” Using jOOλ, Kotlin, and ReactiveX

Splitting Up “Streams” Using jOOλ, Kotlin, and ReactiveX

How to use reuse streams when using Kotlin, rather than using the Java 8 Streams API, which is designed for parallel execution.

Johannes Neubauer user avatar by
Johannes Neubauer
·
Aug. 03, 16 · Java Zone · Tutorial
Like (1)
Save
Tweet
4.50K Views

Join the DZone community and get the full member experience.

Join For Free

The Java 8 Stream API is pretty cool as you can see in my post BFS with Streams. But there is a catch. You cannot reuse Streams. Consider the following code Java-Code:  

final List<String> helloList = 
    Arrays.asList("H", "e", "l", "l", "o", ", ", "W", "o", "r", "l", "d", "!");

final Stream<String> helloStream = helloList.stream();
final Predicate<String> checkUpper = s -> !s.isEmpty() 
    && !s.substring(0,1).toUpperCase().equals(s.substring(0, 1));
helloStream.filter(checkUpper);
helloStream.filter(s -> !checkUpper.test(s));

This results in IllegalStateException: stream has already been operated upon or closed.

The problem with Java 8’s Stream API is that it is designed for parallel execution. This decision introduced some constraints. Unfortunately, there is no sequential only-switch. What you can do is collect the results with groupBy into a map and then create two new streams from that. But collecting is a terminal operation, not lazy, and therefore inefficient (especially in combination, with early-exit operations like limit).

You can also try to do the first filter, chain it with a peek, and finally do the second filter. But since only elements matching the first filter will reach the second filter (i.e. a && !a which is equal to false), you won’t get any elements past the second filter. If you have a so called cold source (i.e. like a collection), you can just use two different streams which results in two iterations. But for hot sources (like a network or file i/o stream), this is not that easy. A possible solution is to cache the input in a collection, i.e., cool it down. But this comes with a space and performance penalty. So let us see, what our options are…

I couldn’t resist and tried it in Kotlin 1.0 with their sequences:

val helloArray = arrayOf("H", "e", "l", "l", "o", ", ", "W", "o", "r", "l", "d", "!")

val helloSeq = helloArray.asSequence()
val checkUpper: (String) -> Boolean = { it.firstOrNull()?.isUpperCase() ?: false }
val upperSeq = helloSeq.filter(checkUpper)
val lowerSeq = helloSeq.filter { !checkUpper(it) }.map(String::toUpperCase)

println(upperSeq.joinToString())
println(lowerSeq.joinToString())

And hey, it works. Output:

H, W
E, L, L, O, , , O, R, L, D, !

But for Java we have a solution, too. You can use jOOλ, which provides some extensions to the Stream API in particular for sequential streams via the Seq interface. Reusing streams does not work here either. But we have nice additional (lazy) operations like partition:

final Tuple2<Seq<String>, Seq<String>> partition = Seq.of("H", "e", "l", "l", "o", ", ", "W", "o", "r", "l", "d", "!").partition(checkUpper);
System.out.println(partition.v1().map(String::toUpperCase).toString(" "));
System.out.println(partition.v2().map(String::toLowerCase).toString(" "));

This works and results in the following output:

E L L O O R L D
h ,  w !

Of course we can grab even deeper into the toy/tool box and use RXJava and the groupBy feature or more precisely RXKotlin in this case (a Java implementation is quite similar, but uses much uglier syntax):

val helloArray = arrayOf("H", "e", "l", "l", "o", ", ", "W", "o", "r", "l", "d", "!")
Observable.from(helloArray)
        .groupBy {
            it.firstOrNull()?.isUpperCase() ?: false
        }.subscribe {
    if (it.key) {
       it.subscribe { print("$it") }
    }
    else {
        it.map { it.toUpperCase() }.subscribe {
            print("$it")
        }
    }
}

The output is:

hELLO, wORLD!

Using ReactiveX might be overkill in many situations, especially, if you would like to collect the result into some kind of collection. ReactiveX is more a (data-driven) programming paradigm, where observers can react to emissions of observables. So check first what you would like to accomplish before you switch to reactiveX as (after beginning to use it) it eventually will change the complete control flow of your application. This might be good or not :).

Exciting.

Kotlin (programming language) Stream (computing)

Published at DZone with permission of Johannes Neubauer, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Software Methodologies — Waterfall vs Agile vs DevOps
  • A Guide to Parsing: Algorithms and Technology
  • No Sprint Goal, No Cohesion, No Collaboration
  • Get Started With Kafka and Docker in 20 Minutes

Comments

Java Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • MVB Program
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends:

DZone.com is powered by 

AnswerHub logo