Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Java 7: Fork and join decomposable input pattern

DZone's Guide to

Java 7: Fork and join decomposable input pattern

· Java Zone
Free Resource

Learn how to troubleshoot and diagnose some of the most common performance issues in Java today. Brought to you in partnership with AppDynamics.

In my recent blog I have introduced the fork and join framework of Java 7. This blog presents a little framework on top of raw fork and join. The framework implements the decomposable input pattern (dip) - which originated from my own laziness when I was using the framework a couple of times. I have realized that I was writing the same code every time when I was implementing a slightly different use case. And you know, let's write a little peace of software that I can reuse. The decomposable input pattern framwork was born.

You can download the binary here.
The API-documentation is hosted here.
And the sources are also available here.

Now what's different when you use that framework? I'd say the difference is that the dip-framework follows good OO design principles, like the open-closed-principle that says: "A module should be open for extension but closed for modification." In other words I have seperated concerns in a fork and join scenario to make the whole more flexible and easy to change.

In my last blog I presented a code snippet that illustrated how to use plain fork and join to calculate offers of car insurances. Let's see how this can be done using my dip-framwork.

The input to the proposal calculation is - well - a list of proposals :-) In the dip framework you wrap the input of a ForkJoinTask into a subclass of DecomposableInput. The name originates from the fact that input to ForkJoinTask is decomposable. Here is the snippet:

import java.util.ArrayList;
import java.util.List;

import com.schlimm.forkjoindip.DecomposableInput;

public class ListOfProposals extends DecomposableInput<List<Proposal>> {

	public ListOfProposals(List<Proposal> proposals) {
		super(proposals);
	}

	@Override
	public boolean computeDirectly() {
		return rawInput.size()==1;
	}

	@Override
	public List<DecomposableInput<List<Proposal>>> decompose() {
		int split = rawInput.size() / 2;
		List<DecomposableInput<List<Proposal>>> decomposedListOfProposals = new ArrayList<>();
		decomposedListOfProposals.add(new ListOfProposals(rawInput.subList(0, split)));
		decomposedListOfProposals.add(new ListOfProposals(rawInput.subList(split, rawInput.size())));
		return decomposedListOfProposals;
	}

}

 


The class wraps the raw input to ForkJoinTask and provides a method how that input can be decomposed. Also, it provides a method computeDirectly() that can decide on whether this input needs further decomposition to be small enough for direct computation.

The output of proposal calculation is a list of maps of prices. If you have four input proposals, you'll get a list of four maps with various prices. In the dip framework, you wrap the output into a subclass of ComposableResult.
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import com.schlimm.forkjoindip.ComposableResult;

public class ListOfPrices extends ComposableResult<List<Map<String, Double>>> {

	public ListOfPrices(List<Map<String, Double>> firstpeace) {
		super(firstpeace);
	}

	@Override
	public ComposableResult<List<Map<String, Double>>> compose(ComposableResult<List<Map<String, Double>>> result) {
		List<Map<String, Double>> listOfPrices = new ArrayList<>();
		listOfPrices.addAll(result.getRawResult());
		listOfPrices.addAll(rawResult);
		ListOfPrices prices = new ListOfPrices(listOfPrices);
		return prices;
	}
	
	@Override
	public String toString() {
		return getRawResult().toString();
	}

}


The class implements the compose method that can compose an atomic result of a computation into the existing raw result. It returns a ComposableResult instance that holds the new composition.

I agree it's a little abtsract. Not only that concurrency is inherently complex. I am also putting another abstraction onto it. But once you've used the framework you'll realize the strength. So stay tuned, we're almost finnished :-)

Now, you hav an input and an ouptut and the last thing you need is a computation object. In my example that's the pricing engine. To connect the pricing engine to the dip framework, you'll need to implement a subclass of ComputationActivityBridge.
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import com.schlimm.forkjoindip.ComposableResult;
import com.schlimm.forkjoindip.ComputationActivityBridge;
import com.schlimm.forkjoindip.DecomposableInput;

public class PricingEngineBridge extends ComputationActivityBridge<List<Proposal>, List<Map<String, Double>>> {

	private PricingEngine engine = new PricingEngine();
	
	@Override
	public ComposableResult<List<Map<String, Double>>> compute(DecomposableInput<List<Proposal>> input) {
		Map<String, Double> result = engine.calculatePrices(input.getRawInput().get(0));
		List<Map<String, Double>> priceList = new ArrayList<>();
		priceList.add(result);
		return new ListOfPrices(priceList);
	}


}


The PricingEngineBridge implements the compute method that calls the pricing engine. It translates the DecomposableInput into an input that the pricing engine accepts. And it creates an instance of ComposableResult that contains the output of the pricing engine.

Last thing to do is to get the stuff started.
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;

import com.schlimm.forkjoindip.ComposableResult;
import com.schlimm.forkjoindip.GenericRecursiveTask;

public class ForkJoinTaskExample_Generic {

	@SuppressWarnings("unchecked")
	public static void main(String[] args) {
		List<Proposal> proposalsList = new ArrayList<>();
		proposalsList.add(new Proposal("Niklas", "Schlimm", "7909","AAL", true, true, true));
		proposalsList.add(new Proposal("Andreas", "Fritz", "0005", "432", true, true, true));
		proposalsList.add(new Proposal("Christian", "Toennessen", "0583", "442", true, true, true));
		proposalsList.add(new Proposal("Frank","Hinkel", "4026", "AAA", true, true, true));
		ListOfProposals proposals = new ListOfProposals(proposalsList);
		GenericRecursiveTask task = new GenericRecursiveTask(proposals, new PricingEngineBridge());
		ForkJoinPool pool = new ForkJoinPool();
		System.out.println(new Date());
		ComposableResult<List<Map<String, Double>>> result = pool.invoke(task);
		System.out.println(result);
		System.out.println(new Date());
	}
}


The example creates an instance of GenericRecursiveTask and passes the ListOfProposals as well as the PricingEngineBrige as input. If you pass that to the ForkJoinPool then you receive an instance of ListOfPrices as ouput.

What's the advantage when you use the dip-framework? For instance:

- you could pass arbitrary processing input to GenericRecursiveTask by implementing a subclass of DecomposableInput
- you could implement your own custom RecursiveTask the same way I have implemented GenericRecursiveTask and pass the proposals and the PricingEngineBridge to that task
- you could implement a custom ForkAndJoinProcessor and use that by subclassing GenericRecursiveTask: that way you can control the creation of subtask and their distribution across threads
- you could exchange the processing activity (here: PricingEngineBridge) by implementing a custom ComputationActivityBridge and try alternative pricing engines or make something completely different then calculating prices ...

I think I have made my point: the whole is closed for modification, but open for extention now.

The complete example code is here in my git repository.

Let me know if you like it. Looking forward to critical and enjoyable comments.

Cheers, Niklas

 

From http://niklasschlimm.blogspot.com/2011/12/java-7-fork-and-join-decomposable-input.html

Understand the needs and benefits around implementing the right monitoring solution for a growing containerized market. Brought to you in partnership with AppDynamics.

Topics:

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}