DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
  1. DZone
  2. Coding
  3. Java
  4. Java Stream API Was Broken Before JDK 10

Java Stream API Was Broken Before JDK 10

Stream API bugs can affect anyone still residing on JDK 8 and JDK 9. Click here to learn more.

Grzegorz Piwowarek user avatar by
Grzegorz Piwowarek
·
Apr. 02, 19 · Analysis
Like (47)
Save
Tweet
Share
38.49K Views

Join the DZone community and get the full member experience.

Join For Free

Of course, not all of it, but history showed that Stream API featured a few interesting bugs/deficiencies that can affect anyone still residing on JDK 8 and JDK 9.

Stream#flatMap

Unfortunately, it turns out that Stream#flatMap was not as lazy as advertised, which allowed for several crazy situations to exist.

For example, let’s take this one:

Stream.of(1)
  .flatMap(i -> Stream.generate(() -> 42))
  .findAny()
  .ifPresent(System.out::println);


In JDK 8 and JDK 9, the above code snippet spins forever waiting for the evaluation of the inner infinite Stream.

One would expect O(1) time complexity from a trivial operation of taking a single element from an infinite sequence — and this is how it works as long as we don’t process an infinite Stream inside  Stream#flatMap:

Stream.generate(() -> 42)
  .findAny()
  .ifPresent(System.out::println);

// completes "immediately" and prints 42


What’s more, it gets worse if we insert some additional processing after a short-circuited Stream#flatMap call:

Stream.of(1)
  .flatMap(i -> Stream.generate(() -> 42))
  .map(i -> process(i))
  .findAny()
  .ifPresent(System.out::println);
private static <T> T process(T input) {
    System.out.println("Processing...");
    return input;
}


Now, not only are we stuck in an infinite evaluation loop, but we’re also processing all of items coming through:

Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
Processing...
...


Imagine the consequences if the process() method contained some blocking operations and unwanted side-effects like email send outs or logging.

Explanation

The internal implementation of Stream#flatMap is to blame, especially the following code:

@Override
public void accept(P_OUT u) {
    try (Stream<? extends R> result = mapper.apply(u)) {
        // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
        if (result != null)
            result.sequential().forEach(downstream);
    }
}


As you can see, the inner Stream is consumed eagerly using Stream#forEach (not even mentioning the lack of curly braces around the conditional statement. Ugh!).

The problem remained unaddressed in JDK 9, but luckily, the solution was shipped with JDK 10:

@Override
public void accept(P_OUT u) {
    try (Stream<? extends R> result = mapper.apply(u)) {
        if (result != null) {
            if (!cancellationRequestedCalled) {
                result.sequential().forEach(downstream);
            }
            else {
                var s = result.sequential().spliterator();
                do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
            }
        }
    }
}


Stream#takeWhile/dropWhile

This one is directly connected to the above one and  Stream#flatMap’s unwanted eager evaluation.

Let’s say we have a list of lists:

List<List<String>> list = List.of(
  List.of("1", "2"),
  List.of("3", "4", "5", "6", "7"));


And, we want to flatten them into a single one:

list.stream()
  .flatMap(Collection::stream)
  .forEach(System.out::println);

// 1
// 2
// 3
// 4
// 5
// 6
// 7


It works just as expected.

Now, let’s take the flattened the Stream and simply keep taking elements until we encounter “4:"

Stream.of("1", "2", "3", "4", "5", "6", "7")
  .takeWhile(i -> !i.equals("4"))
  .forEach(System.out::println);

// 1
// 2
// 3


Again, it works just as we expected.

Let’s now try to combine these two; what could go wrong?

List<List<String>> list = List.of(
  List.of("1", "2"),
  List.of("3", "4", "5", "6", "7"));

list.stream()
  .flatMap(Collection::stream)
  .takeWhile(i -> !i.equals("4"))
  .forEach(System.out::println);

// 1
// 2
// 3
// 5
// 6
// 7


That’s an unexpected turn of events and can be fully attributed to the original issue with Stream#flatMap.

Some time ago, I did run a short poll on Twitter, most of you were quite surprised with the result:

Image title

Parallel Streams on Custom ForkJoinPool Instances

There’s one commonly-known hack (that you should not be using since it relies on internal implementation details of Stream API) that makes it possible to hijack parallel Stream tasks and run them on the custom fork-join pool by running them from within your own FJP instance:

ForkJoinPool customPool = new ForkJoinPool(42);

customPool.submit(() -> list.parallelStream() /*...*/);


If you thought that you managed to trick everyone already, you were partially right.

It turns out that even though tasks were running on a custom pool instance, they were still coupled to the shared pool – the size of the computation would remain in proportion to the common pool and not the custom pool – JDK-8190974.

So, even if you were using these when you shouldn’t have, the fix for that arrived in JDK 10. Additionally, if you really need to use Stream API to run parallel computations, you could use parallel-collectorsinstead.



If you enjoyed this article and want to learn more about Java Streams, check out this collection of tutorials and articles on all things Java Streams.

Java (programming language) Java Development Kit API Stream (computing)

Published at DZone with permission of Grzegorz Piwowarek, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Reliability Is Slowing You Down
  • Cloud Performance Engineering
  • Is DevOps Dead?
  • Real-Time Analytics for IoT

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: