Introducing Distributed Execution and MapReduce Framework
Join the DZone community and get the full member experience.
Join For FreeIn case you did not pay attention to
the area of large scale distributed computing – there is a revolution
going on! It is becoming increasingly evident that the software
ecosystems built around so called Big Data are at the forefront of cloud
computing innovation. Unfortunately, there has been more debate
around determining how big Big Data actually is rather than defining
common set of requirements for the large scale Big Data computational
platforms.
Stephen O'Grady of RedMonk summarized
this phenomena succinctly: “Big Data, like NoSQL, has become a
liability in most contexts. Setting aside the lack of a consistent
definition, the term is of little utility because it is
single-dimensional. Larger dataset sizes present unique computational
challenges. But the structure, workload, accessibility and even location
of the data may prove equally challenging.”
Zack Urlocker, an advisor and board member to several startup companies in the area of SaaS was equally vocal in his criticism
regarding complexity of the existing systems : “You pretty much gotta
be near genius level to build systems on top of Cassandra, Hadoop and
the like today. These are powerful tools, but very low-level, equivalent
to programming client server applications in assembly language. When it
works its [sic] great, but the effort is significant and it’s probably beyond the scope of mainstream IT organizations.”
This
is exactly where we are positioning Infinispan's roadmap as we are
announcing initial steps into the area of distributed execution and
MapReduce framework built on top of Infinispan. Infinispan's distributed
data grid is a most natural fit for such a platform. We have
already built an infrastructure for essentially unlimited linear
in-memory data scaling. However, having such a data grid without an
ability to execute large scale computation on it is like having a
Ferrari without a drivers licence. Listening to the criticism regarding
the lack of direction in Big Data field and complexity of the existing
distributed execution frameworks our focus was primarily on simplicity
without sacrificing power and a rich feature set such a framework should
have.
Simple distributed execution model
The main interfaces for simple distributed task execution are DistributedCallable and DistributedExecutorService.
DistributedCallable is essentially a version of the existing Callable
from java.util.concurrent package except that DistributedCallable can
be executed in remote JVM and receive input from Infinispan cache.
Tasks' main algorithm is essentially unchanged, only the input source
is changed.
Exisiting Callable implementation most likely gets its input in a form of some Java object/primitive while DistributedCallable gets its input from Infinispan cache. Therefore, users who have already implemented Callable interface to describe their task units would simply extend DistributedCallable and use keys from Infinispan execution environment as input for the task. Implentation of DistributedCallable can in fact continue to support implementation of an already existing Callable while simultaneously be ready for distribited execution by extending DistributedCallable.
Exisiting Callable implementation most likely gets its input in a form of some Java object/primitive while DistributedCallable gets its input from Infinispan cache. Therefore, users who have already implemented Callable interface to describe their task units would simply extend DistributedCallable and use keys from Infinispan execution environment as input for the task. Implentation of DistributedCallable can in fact continue to support implementation of an already existing Callable while simultaneously be ready for distribited execution by extending DistributedCallable.
public interface DistributedCallable extends Callable { /** * Invoked by execution environment after DistributedCallable * has been migrated for execution to * a specific Infinispan node. * * @param cache * cache whose keys are used as input data for * this DistributedCallable task * @param inputKeys * keys used as input for this DistributedCallable task */ public void setEnvironment(Cache cache, Set inputKeys); }
DistributedExecutorService is an simple
extension of a familiar ExecutorService from java.util.concurrent
package. However, the advantages of DistributedExecutorService are not
to be overlooked. For the existing Callable tasks users would submit to
ExecutorService there is an option to submit them for an execution on
Infinispan cluster. Infinispan execution environment would migrate this
task to an execution node, run the task and return the results to the
calling node. Of course, not all Callable task would benefit from this
feature. Excellent candidates are long running and computationally
intensive tasks.
The second
advantage of the DistributedExecutorService is that it allows a quick
and simple implementation of tasks that take input from Infinispan
cache nodes, execute certain computation and return results to the
caller. Users would specify which keys to use as input for specified
DistributedCallable and submit that callable for execution on Infinispan
cluster. Infinispan runtime would locate the appriate keys, migrate
DistributedCallable to target execution node(s) and finally return a
list of results for each executed Callable. Of course, users can omit
specifying input keys in which case Infinispan would execute
DistributedCallable on all keys for a specified cache.
MapReduce model
Infinispan's own MapReduce model
is an adaptation of Google's original MapReduce. There are four main
components in each map reduce task: Mapper, Reducer, Collator and
MapReduceTask.
Implementation
of a Mapper class is a component of a MapReduceTask invoked once for
each input entry K,V. Every Mapper instance migrated to an Infinispan
node, given a cache entry K,V input pair transforms that input pair into
a result T. Intermediate result T is further reduced using a Reducer.
public interface Mapper { /** * Invoked once for each input cache entry * K,V transforms that input into a result T. * * @param key * the kay * @param value * the value * @return result T */ T map(K key, V value); }
Reducer, as its name implies, reduces a list of results
T from map phase of MapReduceTask. Infinispan distributed execution
environment creates one instance of Reducer per execution node.
public interface Reducer { /** * Reduces a result T from map phase and return R. * Assume that on Infinispan node N, an instance * of Mapper was mapped and invoked on k many * key/value pairs. Each T(i) in the list of all * T's returned from map phase executed on * Infinispan node N is passed to reducer along * with previsouly computed R(i-1). Finally the last * invocation of reducer on T(k), R is returned to a * distributed task that originated map/reduce * request. * * @param mapResult * result T of map phase * @param previouslyReduced * previously accumulated reduced result * @return result R * */ R reduce(T mapResult, R previouslyReduced); }
Collator coordinates
results from Reducers executed on Infinispan cluster and assembles a
final result returned to an invoker of MapReduceTask.
public interface Collator { /** * Collates all results added so far and * returns result R to invoker of distributed task. * * @return final result of distributed task computation */ R collate(); /** * Invoked by runtime every time reduced result * R is received from executed Reducer on remote * nodes. * * @param remoteNode * address of the node where reduce phase occurred * @param remoteResult * the result R of reduce phase */ void reducedResultReceived(Address remoteNode, R remoteResult); }
Finally, MapReduceTask
is a distributed task uniting Mapper, Reducer and Collator into a
cohesive large scale computation to be transparently parallelized across
Infinispan cluster nodes. Users of MapReduceTask need to provide a
cache whose data is used as input for this task. Infinispan execution
environment will instantiate and migrate instances of provided mappers
and reducers seamlessly across Infinispan nodes. Unless otherwise
specified using onKeys method input keys filter all available key value
pairs of a specified cache will be used as input data for this task.
MapReduceTask
implements a slightly different execution model from the original
MapReduce proposed by Google. Here is the pseudocode of the
MapReduceTask.
mapped = list() for entry in cache.entries: t = mapper.map(entry.key, entry.value) mapped.add(t) r = null for t in mapped: r = reducer.reduce(t, r) return r to Infinispan node that invoked the task On Infinispan node invoking this task: reduced_results = invoke map reduce task on all nodes, retrieve map{address:result} for r in reduced_results.entries: remote_address = r.key remote_reduced_result = r.value collator.add(remote_address, remote_reduced_result) return collator.collate()
Examples
In
order to get a better feel for MapReduce framework lets have a look at
the example related to Infinispan's grid file system. How would we
calculate total size of all files in the system using MapReduce
framework? Easy! Have a look at GridFileSizeExample.
public class GridFileSizeExample { public static void main(String arg[]) throws Exception { Cache cache = null; MapReduceTask task = new MapReduceTask(cache); Long result = task.mappedWith(new Mapper() { @Override public Long map(String key, GridFile.Metadata value) { return (long) value.getLength(); } }).reducedWith(new Reducer() { @Override public Long reduce(Long mapResult, Long previouslyReduced) { return previouslyReduced == null ? mapResult : mapResult + previouslyReduced; } }).collate(new Collator(){ private Long result = 0L; @Override public Long collate() { return result; } @Override public void reducedResultReceived(Address remoteNode, Long remoteResult) { result += remoteResult; }}); System.out.println("Total filesystem size is " + result + " bytes"); } }
Execution (computing)
MapReduce
Big data
Framework
Infinispan
Task (computing)
Opinions expressed by DZone contributors are their own.
Comments