Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Work With Parallel Database Streams Using Custom Thread Pools

DZone's Guide to

Work With Parallel Database Streams Using Custom Thread Pools

In this follow up post, you will learn how you can lay out parallel database streams over your own custom thread pools.

· Java Zone
Free Resource

Just released, a free O’Reilly book on Reactive Microsystems: The Evolution of Microservices at Scale. Brought to you in partnership with Lightbend.

In my previous post, I wrote about processing database content in parallel using parallel streams and Speedment. Parallel streams can, under many circumstances, be significantly faster than the usual sequential database streams.

The Thread PoolBy default, parallel streams are executed on the common ForkJoinPool where they potentially might compete with other tasks. In this post we will learn how we can execute parallell database streams on our own custom  ForkJoinPool, allowing a much better control of our execution environment.

Speedment is an open-source Stream ORM Java Toolkit and Runtime Java tool that wraps an existing database and its tables into Java 8 streams. We can use an existing database and run the Speedment tool and it will generate POJO classes that correspond to the tables we have selected using the tool. One distinct feature with Speedment is that it supports parallel database streams and that it can use different parallel strategies to further optimize performance.

Getting Started With Speedment

Head out to open-source Speedment on GitHub and learn how to get started with a Speedment project. Connecting the tool to an existing database is really easy. Read my previous post for more information on how the database table and PrimeUtil class looks like for the examples below.

Executing on the Default ForkJoinPool

Here is the application that I talked about in my previous post that will scan a database table in parallel for undetermined prime number candidates and then it will determine if they are primes or not and update the table accordingly. This is how it looks:

Manager<PrimeCandidate> candidatesHigh = app.configure(PrimeCandidateManager.class)
  .withParallelStrategy(ParallelStrategy.computeIntensityHigh())
  .build();

candidatesHigh.stream()
  .parallel() // Use a parallel stream
  .filter(PrimeCandidate.PRIME.isNull()) // Only consider nondetermined prime candidates
  .map(pc -> pc.setPrime(PrimeUtil.isPrime(pc.getValue()))) // Sets if it is a prime or not
  .forEach(candidatesHigh.updater()); // Apply the Manager's updater 


First, we create a stream over all candidates (using a parallel strategy named ParallelStrategy.computeIntensityHigh()) where the 'prime' column is  null using the  stream().filter(PrimeCandidate.PRIME.isNull())  method. Then, for each such prime candidate pc, we either set the 'prime' column to  true  if  pc.getValue()  is a prime or  false  if  pc.getValue()  is not a prime. Interestingly, the  pc.setPrime()  method returns the entity pc itself, allowing us to easily tag on multiple stream operations. On the last line, we update the database with the result of our check by applying the  candidatesHigh.updater()  function.

Again, make sure to check out my previous post on the details and benefits of parallel strategies. In short, Java's default parallel strategy works well for low computational demands because it places a large amount of initial work items on each thread. Speedment's parallel strategies works much better for medium to high computational demands whereby a small amount of work items are laid out on the participating threads.

The stream will determine prime numbers fully parallel and the execution threads will use the common   ForkJoinPool as can be seen in this picture (my laptop has 4 CPU cores and 8 CPU threads):

Use a Custom Executor Service

As we learned in the beginning of this post, parallel streams are executed by the common  ForkJoinPool  by default. But, sometimes we want to use our own Executor, perhaps because we are afraid of flooding the common  ForkJoinPool, so that other tasks cannot run properly. Defining our own executor can easily be done for Speedment (and other stream libraries) like this:

final ForkJoinPool forkJoinPool = new ForkJoinPool(3);

forkJoinPool.submit(() -> candidatesHigh.stream()
    .parallel()
    .filter(PrimeCandidate.PRIME.isNull())
    .map(pc -> pc.setPrime(PrimeUtil.isPrime(pc.getValue())))
    .forEach(candidatesHigh.updater());
);

try {
  forkJoinPool.shutdown();
  forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
} catch (InterruptedException ie) {
  ie.printStackTrace(); 
} 


The application code is unmodified, but wrapped into a custom ForkJoinPool that we can control ourselves. In the example above, we set up a thread pool with just three worker threads. The worker threads are not shared with the threads in the common ForkJoinPool.

Here is how the threads looks like using the custom executor service:

This way we can control both the actual  ThreadPool  itself and precisely how work items are laid out in that pool using a parallel strategy!

Keep up the heat in your pools!

Strategies and techniques for building scalable and resilient microservices to refactor a monolithic application step-by-step, a free O'Reilly book. Brought to you in partnership with Lightbend.

Topics:
speedment ,thread pool ,java ,parallel

Published at DZone with permission of Per-Åke Minborg, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}