{{announcement.body}}
{{announcement.title}}

Parallel Streams Done Right

DZone 's Guide to

Parallel Streams Done Right

See how to regain control of parallelism and thread pool choice when using Stream API.

· Java Zone ·
Free Resource

Downsides of Parallel Streams

Parallel collection processing using Stream API can be as easy as:

Java
 




x


 
1
List<Integer> result = list.parallelStream()
2
  .map(toSomething())
3
  .collect(Collectors.toList());


Unfortunately, we can't choose an execution facility, define per-instance parallelism, or avoid blocking the calling thread.

What's more, it turns out that the hardwired thread pool is the static common ForkJoinPool which is a default execution facility not only for parallel streams, but for multiple methods from CompletableFuture, and potentially even Project Loom's virtual threads (fibers) - polluting such a thread pool with long-running operations could be disastrous for performance.

On the one hand, we could mitigate the issue by using a ManagedBlocker, but it turns out that ForkJoinPool is not a part of the public contract:

Note, however, that this technique (...) is an implementation "trick" and is not guaranteed to work. Indeed, the threads or thread pool that is used for execution of parallel streams is unspecified.

source: Stuart Marks @ Stack Overflow

But what to do if we want to process collections using long-running and/or blocking tasks? That's why I created parallel-collectors.

Parallel Collectors

Simply put, parallel-collectors is a zero-dependency toolkit easing parallel collection processing in Java using Stream API and CompletableFuture allowing you to choose your own parallelization strategy, maximum parallelism, executor, and much more.

All of the above and more is provided via noninvasive custom implementations of the Collector interface:

Java
 




xxxxxxxxxx
1


 
1
list.stream()
2
  .collect(parallel(i -> foo(i), toList(), executor, parallelism))
3
  // ..
4
  .orTimeout(1000, MILLISECONDS)
5
  .thenAcceptAsync(System.out::println, otherExecutor)
6
  .thenRun(() -> System.out.println("Finished!"));


In order to be able to start using it, add the following dependency:

XML
 






Double-check the latest version here.

Features

The library leverages the combined power of Stream API and CompletableFuture to provide missing functionality for standard Parallel Streams.

They are:

  • non-invasive - the functionality is provided using custom implementations of public APIs
  • lightweight - if your goal is to apply straightforward parallel processing, it's the sweet spot between Parallel Streams and heavy-armor technologies like RxJava/Project Reactor
  • configurable - choose your own Executor, parallelism, batching/completion strategies
  • non-blocking - results can be returned as CompletableFutures in order to not block the calling thread
  • short-circuiting - if one of the subtasks throws an exception, remaining subtasks get interrupted and/or skipped

Parallel Collectors are unopinionated by design so it's up to their users to use them responsibly, which involves things like:

  • proper configuration of a provided Executor and its lifecycle management
  • choosing the appropriate parallelism level
  • making sure that the tool is applied in the right context

Make sure to read API documentation before using these in production.

API Overview

The com.pivovarit.collectors.ParallelCollectors class serves as a single entry point to the whole library mimicking the java.util.stream.Collectors class.

So, a classic straightforward parallel stream usage:

Java
 




xxxxxxxxxx
1


 
1
List<Integer> result = list.stream()
2
  .parallel()
3
  .map(i -> foo(i))
4
  .collect(toList());


Ends up being replaced by:

Java
 




xxxxxxxxxx
1


 
1
private static ExecutorService e = ...
2
// ...
3
 
          
4
CompletableFuture<List<Integer>> result = list.stream()
5
  .collect(parallel(i -> foo(i), toList(), e, 4));


The library features four types of collectors:

  1. ParallelCollectors.parallel() - process the stream, collect it using a provided Collector, and receive the result as CompletableFuture
  2. ProcessCollectors.parallelToStream() - process the stream and stream the results in the completion order
  3. ParallelCollectors.parallelToOrderedStream() - process the stream and stream the results in the original order
  4. ParallelCollectors.toFuture() - collect a stream of CompletableFutures using a custom Collector

By default, all upstream elements are processed separately, if you want to process them in batches instead, use ParallelCollectors.Batching namespace that contains drop-in alternatives (less stress on the executor in exchange for the lack of work-stealing).

More examples can be found on GitHub in README.

Topics:
java, parallel collectors, parallel streams, stream api, tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}