Parallel Streams Done Right
See how to regain control of parallelism and thread pool choice when using Stream API.
Join the DZone community and get the full member experience.Join For Free
Downsides of Parallel Streams
Parallel collection processing using Stream API can be as easy as:
List<Integer> result = list.parallelStream()
Unfortunately, we can't choose an execution facility, define per-instance parallelism, or avoid blocking the calling thread.
What's more, it turns out that the hardwired thread pool is the static common ForkJoinPool which is a default execution facility not only for parallel streams, but for multiple methods from CompletableFuture, and potentially even Project Loom's virtual threads (fibers) - polluting such a thread pool with long-running operations could be disastrous for performance.
On the one hand, we could mitigate the issue by using a ManagedBlocker, but it turns out that ForkJoinPool is not a part of the public contract:
Note, however, that this technique (...) is an implementation "trick" and is not guaranteed to work. Indeed, the threads or thread pool that is used for execution of parallel streams is unspecified.
source: Stuart Marks @ Stack Overflow
But what to do if we want to process collections using long-running and/or blocking tasks? That's why I created parallel-collectors.
Simply put, parallel-collectors is a zero-dependency toolkit easing parallel collection processing in Java using Stream API and CompletableFuture allowing you to choose your own parallelization strategy, maximum parallelism, executor, and much more.
All of the above and more is provided via noninvasive custom implementations of the Collector interface:
.collect(parallel(i -> foo(i), toList(), executor, parallelism))
.thenRun(() -> System.out.println("Finished!"));
In order to be able to start using it, add the following dependency:
Double-check the latest version here.
The library leverages the combined power of Stream API and CompletableFuture to provide missing functionality for standard Parallel Streams.
- non-invasive - the functionality is provided using custom implementations of public APIs
- lightweight - if your goal is to apply straightforward parallel processing, it's the sweet spot between Parallel Streams and heavy-armor technologies like RxJava/Project Reactor
- configurable - choose your own Executor, parallelism, batching/completion strategies
- non-blocking - results can be returned as CompletableFutures in order to not block the calling thread
- short-circuiting - if one of the subtasks throws an exception, remaining subtasks get interrupted and/or skipped
Parallel Collectors are unopinionated by design so it's up to their users to use them responsibly, which involves things like:
- proper configuration of a provided
Executorand its lifecycle management
- choosing the appropriate parallelism level
- making sure that the tool is applied in the right context
Make sure to read API documentation before using these in production.
com.pivovarit.collectors.ParallelCollectors class serves as a single entry point to the whole library mimicking the
So, a classic straightforward parallel stream usage:
List<Integer> result = list.stream()
.map(i -> foo(i))
Ends up being replaced by:
private static ExecutorService e = ...
CompletableFuture<List<Integer>> result = list.stream()
.collect(parallel(i -> foo(i), toList(), e, 4));
The library features four types of collectors:
ParallelCollectors.parallel()- process the stream, collect it using a provided Collector, and receive the result as CompletableFuture
ProcessCollectors.parallelToStream()- process the stream and stream the results in the completion order
ParallelCollectors.parallelToOrderedStream()- process the stream and stream the results in the original order
ParallelCollectors.toFuture()- collect a stream of CompletableFutures using a custom Collector
By default, all upstream elements are processed separately, if you want to process them in batches instead, use
ParallelCollectors.Batching namespace that contains drop-in alternatives (less stress on the executor in exchange for the lack of work-stealing).
Opinions expressed by DZone contributors are their own.
Effective Java Collection Framework: Best Practices and Tips
Writing a Vector Database in a Week in Rust
Auditing Tools for Kubernetes
Implementing a Serverless DevOps Pipeline With AWS Lambda and CodePipeline