Static Groovy, GridGain and GPars for distributed concurrent programming

DZone 's Guide to

Static Groovy, GridGain and GPars for distributed concurrent programming

· Java Zone ·
Free Resource

This article continues story started in my previous articles:

 I am going to show how expressiveness and performance of Groovy can be combined with existing tools to build highly scalable (both multi-core level and cluster level) applications.

Two wonderful tools we will use are

 If you are developer and not familiar with these tools I highly recommend to give a try.

 As usually we will deal with some sample application. It is easy to imagine that GridGain will be used to manage distributed part of our story and GPars will deal with parallel one.

Sample Application

  • We have several machines working together
  • Master machine (grid node) will handle user input and distribute task to other nodes
  • Slave machines will execute commands received from the master

Let us start with code of main () method for master node. It is very interesting that we don't need any special code for slave nodes - GridGain magic allow us to use default grid node code in many situation. What is even more interesting is the fact that during development we even don't need to deploy code to slaves (thanks to GridGain peer class loading)

  static void main (String [] args) {
def config = new GridConfigurationAdapter(
gridGainHome: "/Applications/gridgain-2.1.1",
gridLogger: new GridJavaLogger ()
new MasterActor(GridFactory.start(config)).start ()

What we did here is configured and started grid and created and started main actor on master node. Please note how convinient is Groovy map syntax for defining properties on object construction.

What is Actors?

Actors allow for a messaging-based concurrency model, built from independent active objects that exchange messages and have no mutable shared state. Actors can naturally avoid issues like deadlocks, livelocks or starvation, so typical for shared memory. 

We will use specific type of actors provided by GPars - DynamicDispatchActor. The DynamicDispatchActor class is a pooled actor allowing for an alternative structure of the message handling code. In general DynamicDispatchActor repeatedly scans for messages and dispatches arrived messages to one of the onMessage(message) methods defined on the actor.

We are going to have to types of actors MasterActor and SlaveActor, which will be runned of master and slave nodes respectively. It will be convient for our purposes if the have common super class

   static class RemoteNodeDiscovery { GridDiscoveryEventType type; GridNode node }

static class GridAwareActor extends DynamicDispatchActor {
Grid grid

GridAwareActor (Grid grid) {
setActorGroup Actors.defaultPooledActorGroup
this.grid = grid

protected void afterStart() {
grid.addMessageListener { nodeId, msg ->
if (active)
this << msg
grid.removeMessageListener this
grid.addDiscoveryListener { type, node ->
if (active)
this << new RemoteNodeDiscovery(type: type, node: node)
grid.removeDiscoveryListener this

protected void afterStop (Object ignore) {

What's going on here? When actor starts it register two listeners for events coming from grid and forward these events to the actor for processing. After actor stopped these  listeners automatically unregister to avoid leaking of resources.

Again, Groovy saves us a lot of boilerplate code here and makes codemuch cleaner. Additionally to that static compilation allows us to avoid dynamic dispatch and leave listener code as soon as possible (always best strategy with asynchronious processing)

Interesting that GPars helps us to handle events coming from grid, which we wrap in to RemoteNodeDiscovery,  and messages, which actors send to each other, in absolutely the same manner. Very convinient!

Now we are ready to implement SlaveActor - the one, which will be started on slave nodes by the command from master one.

   static abstract class ExecuteCommand implements Runnable, Serializable {}

static class SlaveActor extends GridAwareActor {
UUID masterId

SlaveActor (Grid grid, UUID masterId) {
this.masterId = masterId

void onMessage (RemoteNodeDiscovery msg) {
if (msg.node.id == masterId && (msg.type == GridDiscoveryEventType.LEFT || msg.type == GridDiscoveryEventType.FAILED)) {
stop ()

void onMessage(ExecuteCommand command) {
command.run ()

So, SlaveActor is almost trivial. It processes just two types of events - discovery events from the grid (to be able to stop itself when master disappear) and commands sent by master (command is normal java.lang.Runnable and we use special class ExecuteCommand by methological reason and because we need java.lang.Serializable for message passing between grid nodes)

Now, we need to implement MainActor. The implementation is a bit lengthy, so I will present it by parts. We will start with declaration of fields

   static class MasterActor extends GridAwareActor {
Set<UUID> readyNodes = []

Thread stdInReader

MasterActor(Grid grid) {
// .................

We will keep track of slave nodes, which reported that they are ready to do work and we will have separate thread to read from standard input.

Note how elegantly compare to Java we initialize readyNode fields. We discuss that already in previous articles but I really like that :)

 Now we can create the thread, which will read standard input and send messages to our actor to process. The message in this case will be normal java.lang.String

       private Thread createStdinReaderThread() {
run: {
def reader = new LineNumberReader(new InputStreamReader(System.in))
while (!Thread.currentThread().isInterrupted()) {
this << reader.readLine()

daemon: true

This part I also like very much. All power of Groovy and type inference of static Groovy together. See how elegantly we combine in to one Groovy map expression subclassing of java.lang.Thread and call to setDaemon(true)

Now we are ready to handle start and stop of our actor. Truly speaking nothing really interesting happens here and I provide code just for convinience of reader.

       void afterStart() {
grid.remoteNodes.each { remoteNode ->
MasterActor.this << new RemoteNodeDiscovery ( type: GridDiscoveryEventType.JOINED, node:remoteNode )
stdInReader = GridTest.createStdinReaderThread(this)
stdInReader.start ()

void afterStop (List ignore) {

OK, we did long way and ready for most interesting part of our story - handling messages. Our actor should handle three types of messages

  • java.lang.String - sent by input reader thread
  • RemoteNodeReady - sent by slave actor, when it was started. We will see a bit later how it happens
  • RemoteNodeDiscovery - sent by listener we register before

Let us start one by one

Handling of input strings is most boring. It is not command to exit we send message to all slave nodes to execute command, which will print this message.

       void onMessage (String msg) {
switch (msg) {
case "exit":
stop ();

readyNodes.each { remoteNodeId ->
grid.sendMessage (grid.getNode(remoteNodeId), { println msg } as ExecuteCommand )
In fact, it is not as boring as I say if you remember what happens after grid.sendMessage
  • GridGain send message to slave node
  • The message is serialized code to execute (the boring this is that in our case it is just println but imagine opportunities)
  • Listener registered by slave actor receive the message and send it to the actor
  • Actor execute the command

Now we can deal with really boring part. When we received RemoteNodeReady message we need to add id of slave node, which sent the message to the list of nodes available to our service (it means node connected, code loaded, actor started and reported).

       void onMessage (RemoteNodeReady msg) {
readyNodes << msg.nodeId
println "Node ${msg.nodeId} is connected and ready"

And finally most interesting part - handling of discovery events.

       void onMessage (RemoteNodeDiscovery msg) {
if (msg.type != GridDiscoveryEventType.METRICS_UPDATED)
println "${msg.type} ${msg.node}"

if (msg.type == GridDiscoveryEventType.JOINED)
grid.executeOnGridNode (msg.node, GridTest.createCodeToStartOnSlaveNodes(grid.localNode.id))

if (msg.type == GridDiscoveryEventType.LEFT || msg.type == GridDiscoveryEventType.FAILED) {
readyNodes.remove msg.node.id

 When slave node left our grid or failed execution we just remove it from set of available node. It is easy part. The interesting part is handling of joined node. What we need to do is to instruct GridGain to start execution of slave actor on the node, which just joined. Here is how do we do it.

    static void executeOnGridNode (Grid grid, GridNode node, RemoteJob operation) {
GridTask task = [
map: { subgrid, arg -> [(operation) : node] },
result: { result, received -> GridJobResultPolicy.REDUCE },
reduce: { results -> results [0] }

grid.execute (task, null)

 That probably require some explaination. GridGain implements map/reduce pattern. To execute some code on a grid we need to implement GridTask interface. For our purposes only method map is interesting. It should return Map<GridJob,GridNode>. GridJob is what to execute and GridNode - where to execute. In our case it is espesially easy as we have only one job and know where exactly we want to execute it. Groovy type inference helps us to write very compact code above.

There are two reasons why method executeOnGridNode is static.

  1. as any static method it can be used as so called extension  method, so we can write grid.executeOnGridNode(...) as if it was method Grid interface provided by GridGain
  2. If it was instance method then the instance of GridTask would keep reference to the instance of outer class, which is huge problem for correct serialization. In the future static compiler will contain proper handling of the case, when such reference is not really used

Now the very last piece of code. What is RemoteJob and how do we create code to execute on the slave node. In fact it is just technicality. Because we need instance of Grid to be available to slave actor we follow some ritual, which GridGain require from us (declare subclass with field, where instance of Grid will be injected). Then Groovy type inference do the job.

   static abstract class RemoteJob implements GridJob {
@GridInstanceResource Grid grid

public void cancel() {}

static RemoteJob createCodeToStartOnSlaveNodes (UUID mainNodeId) {
new SlaveActor(grid, mainNodeId).start()
grid.sendMessage(grid.getNode(mainNodeId), new RemoteNodeReady(nodeId: grid.localNode.id))
println "slave actor started"

 Again method createCodeToStartOnSlaveNodes is static because of not yet implemented functionality discussed above.

Voila! We are done with very simple application, which combines quite complicated concepts of distributed and concurrent computing. Static Groovy was our main tool to simplify code and wire things together.


Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}