DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations

Trending

  • Effective Java Collection Framework: Best Practices and Tips
  • Writing a Vector Database in a Week in Rust
  • Auditing Tools for Kubernetes
  • Implementing a Serverless DevOps Pipeline With AWS Lambda and CodePipeline

Trending

  • Effective Java Collection Framework: Best Practices and Tips
  • Writing a Vector Database in a Week in Rust
  • Auditing Tools for Kubernetes
  • Implementing a Serverless DevOps Pipeline With AWS Lambda and CodePipeline
  1. DZone
  2. Coding
  3. JavaScript
  4. Parallel Streams Done Right

Parallel Streams Done Right

See how to regain control of parallelism and thread pool choice when using Stream API.

Grzegorz Piwowarek user avatar by
Grzegorz Piwowarek
·
Jun. 04, 20 · Tutorial
Like (3)
Save
Tweet
Share
3.91K Views

Join the DZone community and get the full member experience.

Join For Free

Downsides of Parallel Streams

Parallel collection processing using Stream API can be as easy as:

Java
 




x


 
1
List<Integer> result = list.parallelStream()
2
  .map(toSomething())
3
  .collect(Collectors.toList());


Unfortunately, we can't choose an execution facility, define per-instance parallelism, or avoid blocking the calling thread.

What's more, it turns out that the hardwired thread pool is the static common ForkJoinPool which is a default execution facility not only for parallel streams, but for multiple methods from CompletableFuture, and potentially even Project Loom's virtual threads (fibers) - polluting such a thread pool with long-running operations could be disastrous for performance.

On the one hand, we could mitigate the issue by using a ManagedBlocker, but it turns out that ForkJoinPool is not a part of the public contract:

Note, however, that this technique (...) is an implementation "trick" and is not guaranteed to work. Indeed, the threads or thread pool that is used for execution of parallel streams is unspecified.

source: Stuart Marks @ Stack Overflow

But what to do if we want to process collections using long-running and/or blocking tasks? That's why I created parallel-collectors.

Parallel Collectors

Simply put, parallel-collectors is a zero-dependency toolkit easing parallel collection processing in Java using Stream API and CompletableFuture allowing you to choose your own parallelization strategy, maximum parallelism, executor, and much more.

All of the above and more is provided via noninvasive custom implementations of the Collector interface:

Java
 




xxxxxxxxxx
1


 
1
list.stream()
2
  .collect(parallel(i -> foo(i), toList(), executor, parallelism))
3
  // ..
4
  .orTimeout(1000, MILLISECONDS)
5
  .thenAcceptAsync(System.out::println, otherExecutor)
6
  .thenRun(() -> System.out.println("Finished!"));


In order to be able to start using it, add the following dependency:

XML
 




xxxxxxxxxx
1


 
1
<dependency>
2
    <groupId>com.pivovarit</groupId>
3
    <artifactId>parallel-collectors</artifactId>
4
    <version>2.3.3</version>
5
</dependency>


Double-check the latest version here.

Features

The library leverages the combined power of Stream API and CompletableFuture to provide missing functionality for standard Parallel Streams.

They are:

  • non-invasive - the functionality is provided using custom implementations of public APIs
  • lightweight - if your goal is to apply straightforward parallel processing, it's the sweet spot between Parallel Streams and heavy-armor technologies like RxJava/Project Reactor
  • configurable - choose your own Executor, parallelism, batching/completion strategies
  • non-blocking - results can be returned as CompletableFutures in order to not block the calling thread
  • short-circuiting - if one of the subtasks throws an exception, remaining subtasks get interrupted and/or skipped

Parallel Collectors are unopinionated by design so it's up to their users to use them responsibly, which involves things like:

  • proper configuration of a provided Executor and its lifecycle management
  • choosing the appropriate parallelism level
  • making sure that the tool is applied in the right context

Make sure to read API documentation before using these in production.

API Overview

The com.pivovarit.collectors.ParallelCollectors class serves as a single entry point to the whole library mimicking the java.util.stream.Collectors class.

So, a classic straightforward parallel stream usage:

Java
 




xxxxxxxxxx
1


 
1
List<Integer> result = list.stream()
2
  .parallel()
3
  .map(i -> foo(i))
4
  .collect(toList());


Ends up being replaced by:

Java
 




xxxxxxxxxx
1


 
1
private static ExecutorService e = ...
2
// ...
3

          
4
CompletableFuture<List<Integer>> result = list.stream()
5
  .collect(parallel(i -> foo(i), toList(), e, 4));


The library features four types of collectors:

  1. ParallelCollectors.parallel() - process the stream, collect it using a provided Collector, and receive the result as CompletableFuture
  2. ProcessCollectors.parallelToStream() - process the stream and stream the results in the completion order
  3. ParallelCollectors.parallelToOrderedStream() - process the stream and stream the results in the original order
  4. ParallelCollectors.toFuture() - collect a stream of CompletableFutures using a custom Collector

By default, all upstream elements are processed separately, if you want to process them in batches instead, use ParallelCollectors.Batching namespace that contains drop-in alternatives (less stress on the executor in exchange for the lack of work-stealing).

More examples can be found on GitHub in README.

Stream (computing)

Opinions expressed by DZone contributors are their own.

Trending

  • Effective Java Collection Framework: Best Practices and Tips
  • Writing a Vector Database in a Week in Rust
  • Auditing Tools for Kubernetes
  • Implementing a Serverless DevOps Pipeline With AWS Lambda and CodePipeline

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com

Let's be friends: