Java 7: Fork and join and the jam jar
Join the DZone community and get the full member experience.
Join For FreeAnother Java 7 blog, this time it's about the new concurrency utilities.
It's about plain fork and join in particular. Everything is explained
with a straight code example. Compared to Project Coin concurrency is an
inherently complex topic. The code example is a little more complex.
Let's get started.
The Fork and Join Executor Service
Fork and join employs an efficient task scheduling algorithm that
ensures optimized resource usage (memory and cpu) on multi core
machines. That algorithm is known as "work stealing".
Idle threads in a fork join pool attempt to find and execute subtasks
created by other active threads. This is very efficient 'cause larger
units get divided into smaller units of work that get distributed
accross all active threads (and CPU's). Here is an analogy to explain
the strength of fork and join algorithms: if you have a jam jar and you
fill it with ping-pong balls, there is a lot air left in the glass.
Think of the air as unused CPU resource. If you fill your jam jar with
peas (or sand) there is less air in the glass. Fork and join is like
filling the jam jar with peas. There is also more volume in your glass
using peas, 'cause there is less air (less waste). Fork and join
algorithms always ensure an optimal (smaller) number of active threads
then work sharing algorithms. This is for the same "peas reason". Think
of the jam jar being your thread pool and the peas are your tasks. With
fork and join you can host more tasks (and complete volume) with the
same amount of threads (in the same jam jar).
Image 1: Fork and join in the jam jar |
Here is a plain fork and join code example:
import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class ForkJoinTaskExample_Plain extends RecursiveTask<List<Map<String, Double>>> { private List<Proposal> proposals; private static final long serialVersionUID = -2703342063482619328L; public ForkJoinTaskExample_Plain(List<Proposal> proposals) { super(); this.proposals = proposals; } @Override protected List<Map<String, Double>> compute() { if (proposals.size() == 1) { // task is small enough to compute linear in this thread return Arrays.asList(computeDirectly(proposals.get(0))); } // task is to large for one thread to execute efficiently, split the task // make sure splitting of tasks makes sense! tasks must not be too small ... int split = proposals.size() / 2; ForkJoinTaskExample_Plain f1 = new ForkJoinTaskExample_Plain(proposals.subList(0, split)); f1.fork(); // generate task for some other thread that can execute on some other CPU ForkJoinTaskExample_Plain f2 = new ForkJoinTaskExample_Plain(proposals.subList(split, proposals.size())); List<Map<String, Double>> newList = new ArrayList<>(); newList.addAll(f2.compute()); // compute this sub task in the current thread newList.addAll(f1.join()); // join the results of the other sub task return newList; } private Map<String, Double> computeDirectly(Proposal proposal) { return new PricingEngine().calculatePrices(proposal); } public static void main(String[] args) { // Calculate four proposals ForkJoinTaskExample_Plain task = new ForkJoinTaskExample_Plain(Arrays.asList(new Proposal("Niklas", "Schlimm", "7909", "AAL", true, true, true), new Proposal("Andreas", "Fritz", "0005", "432", true, true, true), new Proposal("Christian", "Toennessen", "0583", "442", true, true, true), new Proposal("Frank", "Hinkel", "4026", "AAA", true, true, true))); ForkJoinPool pool = new ForkJoinPool(); System.out.println(new Date()); System.out.println(pool.invoke(task)); System.out.println(new Date()); } }
Fork and join tasks always have a similar typical fork and join control flow. In my example I do want to calculate the prices for a list of car insurance offers. Let's go through the example.
Line 10: Fork and join tasks extend RecursiveTask or RecursiveAction. Tasks do return a result, actions doesn't. The result of my example is a List of Maps which contain the prices for the car insurance covers. One map of prices for each proposal.
Line 12: The task will calculate prices for proposals.
Line 22: Fork and join tasks implement the compute method. Again, the compute method returns a list of maps that contain prices. If there are four proposals in the input list, then there will be four maps of prices.
Line 24-26: Is the task stack (list of proposals) small enough to compute directly? If yes, then compute in this thread, which means call the pricing engine to calculate the proposal. If no, continue: split the work and call task recursively.
Line 31: Determine where to split the list.
Line 33: Create a new task for the first part of the split list.
Line 34: Fork that task: allow some other thread to perform that smaller subtask. That thread will call compute recursively on that subtask instance.
Line 35: Create a new task for the second part of the split list.
Line 36: Prepare the composed result list of the two devided subtask (you need to compose the results of the two subtwasks into a single result of the parent task)
Line 37: Compute the second subtask in this current thread and add the result to the result list.
Line 38: In the meantime the first subtask f1 was computed by some other thread. Join the result of the first subtask into the composed result list.
Line 39: Return the composed result.
You need to start the fork and join task.
Line 49: Create the main fork and join task with the initial list of proposals.
Line 53: Create a fork and join thread pool.
Line 55: Submit the main task to the fork and join pool.
That's it. You can look into the complete code here. You'll need the PricingEngine.java and the Proposal.java.
From http://niklasschlimm.blogspot.com/2011/12/java-7-fork-and-join-and-jar-jam.html
Opinions expressed by DZone contributors are their own.
Comments