DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Using Java Stream Gatherers To Improve Stateful Operations
  • Setting Up Data Pipelines With Snowflake Dynamic Tables
  • Optimize Slow Data Queries With Doris JOIN Strategies
  • Harnessing Real-Time Insights With Streaming SQL on Kafka

Trending

  • SaaS in an Enterprise - An Implementation Roadmap
  • Software Delivery at Scale: Centralized Jenkins Pipeline for Optimal Efficiency
  • Creating a Web Project: Caching for Performance Optimization
  • Go 1.24+ Native FIPS Support for Easier Compliance
  1. DZone
  2. Data Engineering
  3. Databases
  4. Common Fork Join Pool and Streams

Common Fork Join Pool and Streams

Parallel streams are a boon to concurrency, and Common Fork Join Pools are an important part of that interaction. This overview should shed some light on how they work.

By 
Dan Newton user avatar
Dan Newton
·
Feb. 06, 17 · Tutorial
Likes (15)
Comment
Save
Tweet
Share
52.3K Views

Join the DZone community and get the full member experience.

Join For Free

In my post Dipping into Java 8 Streams, a comment was added that I should explain what the Common Fork Join Pool is and how it is linked to parallel streams. Honestly, I had never heard of it, so I set out on my quest to find the answer somewhere on the Internet and make this post to attempt to follow up on the posted comment. Unfortunately, I wasn’t able to reach the understanding about this subject that I hoped I would, so I am going to write what I found out from doing some research and from debugging some code myself. If you think anything is missing, then leave a comment. This is, after all, a place to learn!

So let's start with something I am pretty sure about. When you use a parallel stream, it will run its process in multiple threads when appropriate. Now that’s what you would expect, as it has the word parallel in its name. But what it doesn’t say is that all the parallel streams that you create will share their threads from one Common Fork Join Pool. This shouldn’t be a problem if you're just using a single parallel stream every now and then, but if you're running a few of them concurrently, it might run slower than expected, as the threads they use are being shared between them. Another piece of information to note is that although it is called a parallel stream, it does not run concurrently by default. The Collection that is being processed is done multi-threaded but the main thread will still wait for the overall process to finish.

Let's start with a single parallel stream to see how many threads it creates so we have a baseline to continue from.

public class CommonForkJoinPoolExample1 {

    public static void main(String args[]) throws InterruptedException {

        final List<Integer> numbers = getNumbers();

        numbers.parallelStream().forEach(n -> {
            try {
                Thread.sleep(5);
                System.out.println("Loop 1 : " + Thread.currentThread());
            } catch (InterruptedException e) {

            }
        });
    }

    private static List<Integer> getNumbers() {
        for (int i = 0; i < 100; i++)
        numbers.add(i);
        return Collections.unmodifiableList(numbers);
    }

}

 Thread [main] 
 [Daemon Thread [ForkJoinPool.commonPool-worker-1] 
 [Daemon Thread [ForkJoinPool.commonPool-worker-2] 
 [Daemon Thread [ForkJoinPool.commonPool-worker-3] 

So if you debug into this code after adding some breakpoints into the forEach code, it shows that when a parallel stream is run, it uses the main thread and the threads in the Common Fork Join Pool.

Now let's see what happens when two parallel streams are run at once. What happens to the threads that are used?

public class CommonForkJoinPoolExample2 {

    public static void main(String args[]) throws InterruptedException {

        final List<Integer> numbers = getNumbers();

        Thread t1 = new Thread(() -> numbers.parallelStream().forEach(n -> {
            try {
                Thread.sleep(5);
                System.out.println("Loop 1 : " + Thread.currentThread());
            } catch (InterruptedException e) {

            }
        }));
        Thread t2 = new Thread(() -> numbers.parallelStream().forEach(n -> {
            try {
                Thread.sleep(5);
                System.out.println("Loop 2 : " + Thread.currentThread());
            } catch (InterruptedException e) {

            }
        }));

        t1.start();
        t2.start();
        t1.join();
        t2.join();
    }

    private static List<Integer> getNumbers() {
        List<Integer> numbers = new ArrayList<>(5);
        for (int i = 0; i < 100; i++)
        numbers.add(i);
        return Collections.unmodifiableList(numbers);
    }

}

 Thread [Thread-0] 
 Thread [Thread-1] 
 [Daemon Thread [ForkJoinPool.commonPool-worker-1] 
 [Daemon Thread [ForkJoinPool.commonPool-worker-2] 
 [Daemon Thread [ForkJoinPool.commonPool-worker-3] 

From this, you can see the running threads consist of the two created threads plus the common pool threads. Even though the two parallel streams are being run concurrently, there is no indication of this from looking at the common pool threads and can only be seen by the fact that there are two normal threads running as well.

So how do you make the parallel streams use their own Fork Join Pools instead of sharing the common pool? Well, you need to create your own ForkJoinPool object and use this pool to contain the stream code.

public class ForkJoinPoolExample {

	public static void main(String args[]) throws InterruptedException {

        List<Integer> numbers = buildIntRange();

        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        Thread t1 = new Thread(() -> forkJoinPool.submit(() -> {
            numbers.parallelStream().forEach(n -> {
                try {
                    Thread.sleep(5);
                    System.out.println("Loop 1 : " + Thread.currentThread());
                } catch (InterruptedException e) {

                }
            });
        }).invoke());

        ForkJoinPool forkJoinPool2 = new ForkJoinPool(4);
        Thread t2 = new Thread(() -> forkJoinPool2.submit(() -> {
            numbers.parallelStream().forEach(n -> {
                try {
                    Thread.sleep(5);
                    System.out.println("Loop 2 : " + Thread.currentThread());
                } catch (InterruptedException e) {

                }
            });
        }).invoke());

        t1.start();
        t2.start();
        t1.join();
        t2.join();

    }

    private static List<Integer> buildIntRange() {
        List<Integer> numbers = new ArrayList<>(5);
        for (int i = 0; i < 100; i++)
        numbers.add(i);
        return Collections.unmodifiableList(numbers);
    }

}

The number defined in ForkJoinPool(4) refers to the number of threads in the pool, which in this case is four threads. One of the ways to execute a parallel stream from inside a ForkJoinPool is to submit a Runnable task (which submits the task to be executed sometime in the future) and is invoked to start its execution. Let's look at the threads.

 Thread [Thread-0] 
 Thread [Thread-1] 
 [Daemon Thread [ForkJoinPool.commonPool-worker-1] 
 [Daemon Thread [ForkJoinPool.commonPool-worker-2] 
 [Daemon Thread [ForkJoinPool.commonPool-worker-3] 
 [Daemon Thread [ForkJoinPool-1-worker-0] 
 [Daemon Thread [ForkJoinPool-1-worker-1] 
 [Daemon Thread [ForkJoinPool-1-worker-2] 
 [Daemon Thread [ForkJoinPool-1-worker-3] 
 [Daemon Thread [ForkJoinPool-2-worker-0] 
 [Daemon Thread [ForkJoinPool-2-worker-1] 
 [Daemon Thread [ForkJoinPool-2-worker-2] 
 [Daemon Thread [ForkJoinPool-2-worker-3] 

The first thing you see is that there are way more threads being run. The second thing you will probably notice is that pools ForkJoinPool-1 and ForkJoinPool-2 now exist and have four workers/threads defined from 0 to 3. Hopefully, you will also notice that the common pool is still there even though streams are being run from inside the defined Fork Join Pools. Honestly, I do not know how this happens, but it looks like when a ForkJoinPool is defined, the common pool will still be used — with the defined pool being added on top. So if you know something about this, leave a comment!

Hopefully, in this post, I have somewhat answered the comment that was left on my Dipping into Java 8 Streams. If I haven’t done this effectively, maybe someone will leave a comment telling me how silly I am (hopefully not in much harsher words) while also adding some useful information for everyone to see.

Stream (computing) Joins (concurrency library) Fork (software development)

Published at DZone with permission of Dan Newton, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Using Java Stream Gatherers To Improve Stateful Operations
  • Setting Up Data Pipelines With Snowflake Dynamic Tables
  • Optimize Slow Data Queries With Doris JOIN Strategies
  • Harnessing Real-Time Insights With Streaming SQL on Kafka

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!