Event-based actors in Groovy
Join the DZone community and get the full member experience.
Join For FreeAfter publishing my earlier post on Thread-bound actors in Groovy I've received several requests to enhance GParallelizer with support for event-based actors. To briefly recap, actors allow for messaging-based concurrency model, built from independent active objects that exchange messages and have no mutable shared state. Actors naturally avoid issues like deadlocks, livelocks or starvation, so typical for shared memory.
Event-based actors unlike the thread-bound actors described in the original post share a common pool of threads, from which they only borrow a thread to handle a single incoming message and then give it back to the pool so that the thread can serve other actors in turn. Actors become detached from the underlying threads and so a relatively small thread pool can serve potentially unlimited number of actors. Virtually unlimited scalability in number of actors is the main advantage of event-based actors over thread-bound ones, where each actor has its own exclusive background thread associated with it.
Implementing the capability to dynamically attach and detach actors and threads is a bit of a challenge. However, in turned out that life is much easier if you can stand on the shoulders of giants - the java.util.concurrent package and Groovy in my case. To give you a taste of what is now possible with event-based actors in GParallelizer, I've prepared a couple of examples:
import static org.gparallelizer.actors.pooledActors.PooledActors.*
final def decryptor = actor {
loop {
react {String message->
reply message.reverse()
}
}
}.start()
actor {
decryptor.send 'suonorhcnysa si yvoorG'
react {
println 'Decrypted message: ' + it
}
}.start()
As you can see, you create new actors with the actor() method passing in the actor's body as a closure parameter. Inside the actor's body you can use loop() to iterate, react() to receive messages and reply() to send a message to the actor, which has sent you the currently processed message. With the start() method you schedule the actor to the underlying thread pool for processing.
Here's the important piece: when the decryptor actor doesn't find a message in its message queue, the react() method gives up the thread and returns it back to the thread pool for other actors to pick up. Only after a new message arrives to the actor's message queue, the closure of the react() method gets scheduled for processing with the pool. Event-based actors internally simulate continuations - actor's work is split into sequentially run chunks, which get invoked once a message is available in the inbox. Each chunk for a single actor can be performed by different thread from the thread pool.
Simple calculator
Another example of an event-driven actor - a calculator that receives two numeric messages, sums them up and sends the result to the console actor. I deliberately decided to set the pool size to 1 to show that even a one-thread pool can handle multiple actors.import static org.gparallelizer.actors.pooledActors.PooledActors.*
//not necessary, just showing that a single-threaded pool can still handle multiple actors
getPool().initialize 1
final def console = actor {
loop {
react {
println 'Result: ' + it
}
}
}.start()
final def calculator = actor {
react {a ->
react {b ->
console.send(a + b)
}
}
}.start()
calculator.send 2
calculator.send 3
Notice that event-driven actors require special care regarding the react() method. Since event-driven actors need to split the code into independent chunks assignable to different threads sequentially and continuations are not natively supported on JVM, the chunks are created artificially behind the scenes with Runnable tasks and exceptions. As a result the react() and loop() methods never return normally and actors' code must be structured accordingly. Scala programmers would see where I took inspiration from by now, I believe.
The react() method allows timeouts to be specified using the TimeCategory DSL:
import static org.gparallelizer.actors.pooledActors.PooledActors.*
def me = actor {
friend.send('Hi')
react(10.seconds) {message ->
//continue conversation
}
}
me.metaClass.onTimeout = {->friend.send('I see, busy as usual. Never mind.')}
Notice the possibility to use Groovy meta-programming to define actor's lifecycle notification methods (e.g. onTimeout()) dynamically.
Concurrent Merge Sort Example
For comparison with the example given for thread-bound actors I'm including a more involved example performing a concurrent merge sort of a list of integers using actors. As you can see, we came pretty close to the Scala's model, although Scala's pattern matching for message handling is still something to long for.Closure createMessageHandler(def parentActor) {
return {
react {List message ->
assert message != null
switch (message.size()) {
case 0..1:
parentActor.send(message)
break
case 2:
if (message[0] <= message[1]) parentActor.send(message)
else parentActor.send(message[-1..0])
break
default:
def splitList = split(message)
def child1 = actor(createMessageHandler(delegate))
def child2 = actor(createMessageHandler(delegate))
child1.start().send(splitList[0])
child2.start().send(splitList[1])
react {message1 ->
react {message2 ->
parentActor.send merge(message1, message2)
}
}
}
}
}
}
def console = actor {
react { println "Sorted array:\t${it}" }
}.start()
def sorter = actor(createMessageHandler(console))
sorter.start().send([1, 5, 2, 4, 3, 8, 6, 7, 3, 9, 5, 3])
This example shows the main advantage of event-driven actors - no matter how big array you want to sort and thus how many actors you create along the way, it can all be processed with one thread.
There are certainly still some rough edges and missing pieces. Now I'd like to get some feedback to drive me further. Feel free to download GParallelizer and let me know what you think. All comments are appreciated.
Published at DZone with permission of Vaclav Pech. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments