Scala Actors 101 - Threadless and Scalable
Join the DZone community and get the full member experience.
Join For FreeJava Concurrency is still based on Shared Memory Model
However, all concurrency abstractions in Java are based on the shared mutable state paradigm and prone to the vulnerabilities of race conditions, blocking/contention and deadlocks. Java 5 gave us better abstractions for coarse-grained concurrency, FJ will allow programmers to exploit finer-grained concurrency by allowing independent task components to be parallelized through recursive divide-and-conquer techniques. The individual subtasks (known as the Fork/Join Task) will be mapped to a pool of worker threads that initiates and manages their execution using advanced queuing and scheduling techniques.
Fine grained parallelism is hard to exploit for the programmer at large. Not all problems are parallelizably decomposable, and to ensure that all subtasks do not share any mutable state, while programming in an imperative language, is a big big ask. As Doug Lea himself has admitted in an interview to InfoQ, that hand-crafting an efficient fork/join solution for a given problem is not an easy task - hence efforts are on to incorporate more and more extensions of the Fork/Join base abstractions in Java's core concurrency module.
As people have repeated many times, threads themselves are not the main problem, the shared memory model is the real problem. As a model of computation, threads are non-deterministic, and hence not composable. And, with all subtleties of the Java memory model, when multiple threads can potentially invade a piece of critical section concurrently, the programming model becomes hugely complicated. Fork/Join points us towards functional decomposition, which becomes most effective, when we can talk about side-effect-free subtasks that can execute in its own thread of execution independent of its other counterparts.
Scala Actors - Shared Nothing, based on Message Passing
An Actor is a mathematical model of concurrent computation that encapsulate data, code and its own thread of control, and communicate asynchronously using immutable message passing techniques. When the basic architecture is shared-nothing, each actor seems to act in its own process space. And the success and scalability of the actor implementation depends a lot on the ability of the language to implement lightweight processes on top of the native threading model. Every actor has it's own mailbox for storing messages, implemented as asynchronous, race-free, non-blocking queues.
Erlang has popularized concurrency-oriented programming by implementing an extremely lightweight process model on top of the operating system primitives. The actor model in Erlang is based on strong isolation semantics, self-contained processes with unforgeable process ids and asynchronous message passing. The strong isolation semantics is guaranteed by the underlying virtual machine that enforces single assignment semantics and functional programming model. Hence every Erlang process is a serialized actor that can also be transparently distributed across nodes in a cluster.
Scala actors use Fork/Join as the underlying implementation. And exposes a concurrency model that encourages shared-nothing structures that interact through asynchronous message passing.
Scala implements the actor model on the JVM - it has to abide by the rules of the underlying platform. It encourages shared-nothing process abstractions, but does not mandate it. However, for all practical purposes, actors in Scala need to be shared-nothing and functional, with all mutable state being private and all shared state being immutable.
Scala Actors are Lightweight
JVM offers shared memory threads with locks as the primary form of concurrency abstractions. But shared memory threads are quite heavyweight and incur severe performance penalties from context switching overheads. For an actor implementation based on a one-to-one mapping with JVM threads, the process payload per Scala actor will not be as lightweight that we can spawn a million instances of an actor for a specific computation. Hence Scala actors have been designed as lightweight event objects, which get scheduled and executed on an underlying worker thread pool that gets automatically resized when all threads block on long running operations. In fact, Scala implements a unified model of actors - thread based and event based. Scala actors offer two form of suspension mechanisms - a full stack frame suspension(implemented as receive) and a suspension based on a continuation closure (implemented as react). In case of event based actors, a wait on react is represented by a continuation closure, i.e. a closure that captures the rest of the actor's computation. When the suspended actor receives a message that matches one of the patterns specified in the actor, the continuation is executed by scheduling the task to one of the worker threads from the underlying thread pool. The paper "Actors that Unify Threads and Events" by Haller and Odersky discusses the details of the implementation.
Functional, Immutable and Asynchronous
Scala actors combine the powers of functional programming along with the amazingly powerful type-system of Scala. The following snippet is part of the Actor trait in Scala, illustrating some of the important methods of message passing. Pattern matching is one of the core tenets used for message processing in Scala actors. And in Scala, pattern matching is implemented as a PartialFunction, which is a subclass of Function1, the class of unary functions. In the snippet below, both receive and react take as argument a PartialFunction, thereby allowing pattern matching semantics to be applied to the set of messages that can be processed by the actor. The following snippet from Scala source code also shows some of the important methods like message send (synchronous and asynchronous) and the continuation closure on which the actor waits for resumption after being suspended.
trait Actor extends OutputChannel[Any] {
/**
* mailbox of the actor
*/
private val mailbox = new MessageQueue
def receive[R](f: PartialFunction[Any, R]): R = ..
def react(f: PartialFunction[Any, Unit]): Nothing = ..
/**
* The behavior of an actor is specified by implementing this
* abstract method.
*/
def act(): Unit
/**
* Sends message to this actor (asynchronous).
*/
def !(msg: Any) {
send(msg, Actor.self)
}
/**
* Sends message to this actor and awaits reply
* (synchronous).
*/
def !?(msg: Any): Any = {
//..
}
/**
* Sends message to this actor and immediately
* returns a future representing the reply value.
*/
def !!(msg: Any): Future[Any] = {
//..
}
/**
* continuation on which the suspended actor waits
* for resumption on message matching.
*/
private var continuation: PartialFunction[Any, Unit] = null
//..
}
Consider the following implementation of an asynchronous accumulator based on Scala actors. The accumulator progresses based on the processing of the following messages :
- Accumulate message for adding the passed number to the running total
- Reset message for resetting the current total and
- Total message that reports the current total to the requesting actor
import scala.actors.Actor
import scala.actors.Actor._
// case classes for immutable messages
case class Accumulate(amount: Int)
case class Reset
case class Total
// actor definition
object Accumulator extends Actor {
def act = {
var sum = 0
loop {
react {
case Accumulate(n) => sum += n
case Reset => sum = 0
case Total => reply(sum); exit
}
}
}
}
// application
object Accumulators extends Application {
Accumulator.start
for(i <- (1 to 100)) {
Accumulator ! Accumulate(i)
}
Accumulator !? Total match {
case result: Int => println(result)
}
}
In the above implementation, the event based actors are executed by a pool of worker threads. Instead of employing coarse grained parallelism and scheduling every actor on to a thread, the execution process generates finer grained tasks that get appended to the mail box of the actor. These small tasks, each an instance of lightweight FJTask, are then scheduled and executed by the Fork/Join engine.
Each message needs to be immutable, so that the generated tasks can be distributed concurrently amongst the multiple cores of your machine, thereby making the actor model much more scalable than the shared memory implementation. This is where functional programming shines, allowing developers to design pure, side-effect-free, referentially transparent abstractions that can be run concurrently without any need for locks and mutexes.
Opinions expressed by DZone contributors are their own.
Comments