Parallel Streams Done Right
See how to regain control of parallelism and thread pool choice when using Stream API.
Join the DZone community and get the full member experience.
Join For FreeDownsides of Parallel Streams
Parallel collection processing using Stream API can be as easy as:
List<Integer> result = list.parallelStream()
.map(toSomething())
.collect(Collectors.toList());
Unfortunately, we can't choose an execution facility, define per-instance parallelism, or avoid blocking the calling thread.
What's more, it turns out that the hardwired thread pool is the static common ForkJoinPool which is a default execution facility not only for parallel streams, but for multiple methods from CompletableFuture, and potentially even Project Loom's virtual threads (fibers) - polluting such a thread pool with long-running operations could be disastrous for performance.
On the one hand, we could mitigate the issue by using a ManagedBlocker, but it turns out that ForkJoinPool is not a part of the public contract:
Note, however, that this technique (...) is an implementation "trick" and is not guaranteed to work. Indeed, the threads or thread pool that is used for execution of parallel streams is unspecified.
source: Stuart Marks @ Stack Overflow
But what to do if we want to process collections using long-running and/or blocking tasks? That's why I created parallel-collectors.
Parallel Collectors
Simply put, parallel-collectors is a zero-dependency toolkit easing parallel collection processing in Java using Stream API and CompletableFuture allowing you to choose your own parallelization strategy, maximum parallelism, executor, and much more.
All of the above and more is provided via noninvasive custom implementations of the Collector interface:
xxxxxxxxxx
list.stream()
.collect(parallel(i -> foo(i), toList(), executor, parallelism))
// ..
.orTimeout(1000, MILLISECONDS)
.thenAcceptAsync(System.out::println, otherExecutor)
.thenRun(() -> System.out.println("Finished!"));
In order to be able to start using it, add the following dependency:
xxxxxxxxxx
<dependency>
<groupId>com.pivovarit</groupId>
<artifactId>parallel-collectors</artifactId>
<version>2.3.3</version>
</dependency>
Double-check the latest version here.
Features
The library leverages the combined power of Stream API and CompletableFuture to provide missing functionality for standard Parallel Streams.
They are:
- non-invasive - the functionality is provided using custom implementations of public APIs
- lightweight - if your goal is to apply straightforward parallel processing, it's the sweet spot between Parallel Streams and heavy-armor technologies like RxJava/Project Reactor
- configurable - choose your own Executor, parallelism, batching/completion strategies
- non-blocking - results can be returned as CompletableFutures in order to not block the calling thread
- short-circuiting - if one of the subtasks throws an exception, remaining subtasks get interrupted and/or skipped
Parallel Collectors are unopinionated by design so it's up to their users to use them responsibly, which involves things like:
- proper configuration of a provided
Executor
and its lifecycle management - choosing the appropriate parallelism level
- making sure that the tool is applied in the right context
Make sure to read API documentation before using these in production.
API Overview
The com.pivovarit.collectors.ParallelCollectors
class serves as a single entry point to the whole library mimicking the java.util.stream.Collectors
class.
So, a classic straightforward parallel stream usage:
xxxxxxxxxx
List<Integer> result = list.stream()
.parallel()
.map(i -> foo(i))
.collect(toList());
Ends up being replaced by:
xxxxxxxxxx
private static ExecutorService e = ...
// ...
CompletableFuture<List<Integer>> result = list.stream()
.collect(parallel(i -> foo(i), toList(), e, 4));
The library features four types of collectors:
ParallelCollectors.parallel()
- process the stream, collect it using a provided Collector, and receive the result as CompletableFutureProcessCollectors.parallelToStream()
- process the stream and stream the results in the completion orderParallelCollectors.parallelToOrderedStream()
- process the stream and stream the results in the original orderParallelCollectors.toFuture()
- collect a stream of CompletableFutures using a custom Collector
By default, all upstream elements are processed separately, if you want to process them in batches instead, use ParallelCollectors.Batching
namespace that contains drop-in alternatives (less stress on the executor in exchange for the lack of work-stealing).
Opinions expressed by DZone contributors are their own.
Trending
-
Effective Java Collection Framework: Best Practices and Tips
-
Writing a Vector Database in a Week in Rust
-
Auditing Tools for Kubernetes
-
Implementing a Serverless DevOps Pipeline With AWS Lambda and CodePipeline
Comments