Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Controlling the Parallelism Level of Java Parallel Streams

DZone's Guide to

Controlling the Parallelism Level of Java Parallel Streams

If you want more control over your streams' levels of parallelism, you can't just call parallel(). Here's how to assume programmatic control.

· Java Zone
Free Resource

Just released, a free O’Reilly book on Reactive Microsystems: The Evolution of Microservices at Scale. Brought to you in partnership with Lightbend.

With the recent Java 9 release, we got many new goodies to play with to improve our solutions — once we grasp those new features. The release of Java 9 is also a good time to revise whether we have grasped Java 8's features.

In this post, I’d like to bust the most common misconception about Java parallel streams. It’s often said that you cannot control parallel streams’ parallelism level in a programmatic way, that parallel streams always run on a shared ForkJoinPool.commonPool() and there’s nothing you can do about it.

This is the case if you make your stream parallel by just adding a parallel() call to the call chain. That might be sufficient in some cases, e.g. if you perform only lightweight operations on that stream, but if you need to gain more control over your stream’s parallel execution, you need to do a bit more than just calling parallel().

Instead of diving in into theory and technicalities, let’s jump straight to the self-documenting example.

Having a parallel stream being processed on shared ForkJoinPool.commonPool():

Set<FormattedMessage> formatMessages(Set<RawMessage> messages) {
    return messages.stream()
            .parallel()
            .map(MessageFormatter::format)
            .collect(toSet());
}

Let’s move the parallel processing to a pool that we can control and don’t have to share:

private static final int PARALLELISM_LEVEL = 8;

Set<FormattedMessage> formatMessages(Set<RawMessage> messages) {
    ForkJoinPool forkJoinPool = new ForkJoinPool(PARALLELISM_LEVEL);
    try {
        return forkJoinPool.submit(() -> formatMessagesInParallel(messages))
                .get();
    } catch (InterruptedException | ExecutionException e) {
        // handle exceptions
    } finally {
        forkJoinPool.shutdown();
    }
}

private Set<FormattedMessage> formatMessagesInParallel(Set<RawMessage> messages) {
    return messages.stream()
            .parallel()
            .map(MessageFormatter::format)
            .collect(toSet());
}

In this example, we’re interested only in the parallelism level of the ForkJoinPool, though we can also control ThreadFactory and UncaughtExceptionHandler if needed.

Under the hood, the ForkJoinPool scheduler will take care of everything, including incorporating the work-stealing algorithm to improve parallel processing efficiency. Having said that, it’s worth mentioning that manual processing using ThreadPoolExecutor might be more efficient in some cases, e.g. if the workload is evenly distributed over worker threads.

Strategies and techniques for building scalable and resilient microservices to refactor a monolithic application step-by-step, a free O'Reilly book. Brought to you in partnership with Lightbend.

Topics:
java ,parallel streams ,java 8 ,tutorial ,parallelism

Published at DZone with permission of Kamil Szymański, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}