Platinum Partner
groovy,bigdata

Why Scala actors slower compare to Jetlang and/or Groovy++ messaging

After this article was published Hossam Karim suggested several improvements, which significantly improved performance of Scala benchmark. That forced me to remove "15-20 times slower" from initial title of the article and include updated code below.

Asynchronious message passing is old and great idea. Instead of synchronizing objects and dealing with deadlocks we try to isolate objects and let them exchange messages. Erlang proved it to be extremly powerful tool applicable to wide range of highly scalable mission critical applications.

In JVM landscape big interest to this approach started after introducing Scala actors. My personal (maybe wrong) impression is that main inteerst to Scala grew up exactly because of actors library.

But do Scala actors perform well? When you exchange millions of messages internally you should believe that it happens as fast as possible.

In Erlang message passing and process scheduling built in to virtual machine. On JVM switching contexts between threads and blocking queues are expensive operations. It was the question we tried to understand while designing Groovy++ message passing architecture.

We started with benchmark, which shocked us. The goal of this article is to describe this benchmark, show three different implementations. One for Scala, one for buitiful Jetlang library (http://code.google.com/p/jetlang/) using Groovy++ and one for our own prototype of Groovy++ messaging.

While prototype of implementation we have in Groovy++ standard library seems to be noticably faster compare to one in Jetlang (20-40%) it does not necessary means a lot. Both implementations are very similar by spirit and most probably some ideas from our implementation can speed up Jetlang or vice versa support for must have features from Jetlang, which we did not implement yet can slow down our performance.

The point is that Scala actors are slower at least 10 times. On some variations of the benchmark with saw them being 15-20 times slower.

So what does the benchmark do?

We want to measure average speed of message sending and receiving. So we choose variation of well known thread ring benchmark.

  • We have 10000 actors indexed 0..9999
  • When object receive a message it forwards the message to the next object in a row
  • We start with sending string "Hi" to first 500 objects in a row
So a bit less than 50M messages are sent and received. We measure how long it takes to construct all these object, send and receive messages

Here is code using Jetlang. Formally speaking we should be implementing it with pure Java but as we know that code produced by Groovy++ should perform as fast as the one produced by javac we choose more expressive and less verbose language. It was very nice that Jetlang primitives mapped easierly to Groovy++ syntax

def start = System.currentTimeMillis()

def pool = Executors.newFixedThreadPool(Runtime.runtime.availableProcessors())
def fiberFactory = new PoolFiberFactory(pool)

def channels = new MemoryChannel[10000]
CountDownLatch cdl = [channels.length*500]
for (i in 0..<channels.length) {
def fiber = fiberFactory.create()
def channel = new MemoryChannel()
channel.subscribe(fiber) {
if (i < channels.length-1)
channels[i+1].publish(it)
cdl.countDown()
}
channels [i] = channel
fiber.start()
}
for(i in 0..<500) {
channels[i].publish("Hi")
for (j in 0..<i)
cdl.countDown()
}

assert(cdl.await(100,TimeUnit.SECONDS))
pool.shutdown()
println (System.currentTimeMillis() - start)

Groovy++ message passing are very similar to Jetlang. The main difference is that we choose (at least so far) not to separate fibers (message consumers) and message channels (where to publish messages). It gives us ability to use some simplified data structures internally and may be the reason why Groovy++ implementation is a bit faster than Jetlang's one. It might happen that we will change it later.

def start = System.currentTimeMillis()
def pool = new ChannelExecutor(Runtime.runtime.availableProcessors())
def channels = new ExecutingChannel [10000]
CountDownLatch cdl = [channels.length*500]
for (i in 0..<channels.length) {
ExecutingChannel channel = {
if (i < channels.length-1)
channels[i+1] << it
cdl.countDown()
}
channel.executor = pool
channels [i] = channel
}

for(i in 0..<500) {
channels[i] << "Hi"
for (j in 0..<i)
cdl.countDown()
}

assert(cdl.await(100,TimeUnit.SECONDS))
assert(pool.shutdownNow().empty)
pool.awaitTermination(0L,TimeUnit.SECONDS)
println(System.currentTimeMillis()-start)

And here is the code with Scala actors.

object FiberRingScala {
def main(args: Array[String]) {
val start = System.currentTimeMillis()
val channels = new Array[Actor](10000)
val cdl = new CountDownLatch(channels.length * 500)

var i: Int = 0
while (i < channels.length) {
val channel = actor {
loop {
react {
case x:Any =>
if (i < channels.length -1)
channels(i+1) ! x
cdl.countDown()
}
}
}
channels(i) = channel
i += 1
}
i = 0
while (i < 500) {
channels(i) ! "Hi"
var j : Int = 0
while (j < i) {
cdl.countDown()
j = j+1
}
i += 1
}

cdl.await(1000, TimeUnit.SECONDS)
Scheduler.shutdown

println(System.currentTimeMillis() - start)
}
}

Seems to be very similar to code with Jetlang ang Groovy++ messaging, right?

Benchmarking results (milliseconds, smaller number the better)

2155 - Jetlang

1682 - Groovy++

47911 - Scala

Something is very wrong with Scala performance. It seems to be 22 times slower compare to Jetlang. We believe that the reason is that Scala's tries to emulate Erlang behaviour instead of using messaging model, which fit naturally to JVM.

Here is optimization invented by Vaclav Pech(creator and lead of brilliant GPars library) which speedups Scala code twofold up to 22484, which is still 13 times slower than Groovy++ messaging. Exactly the fact that such optimization helps and helps so seriously make us think that something is wrong in the Scala approach to actors.

object FiberRingScala {
def main(args: Array[String]) {
val start = System.currentTimeMillis()
val channels = new Array[Actor](10000)
val cdl = new CountDownLatch(channels.length * 500)

var i: Int = 0
while (i < channels.length) {
val channel = actor {
handle(i, channels, cdl)
}
channels(i) = channel
i += 1
}
i = 0
while (i < 500) {
channels(i) ! "Hi"
var j : Int = 0
while (j < i) {
cdl.countDown()
j = j+1
}
i += 1
}

cdl.await(1000, TimeUnit.SECONDS)
Scheduler.shutdown

println(System.currentTimeMillis() - start)
}

def handle(i:Int, channels:Array[Actor], cdl:CountDownLatch) :Unit = {
react {
case x:Any =>
if (i < channels.length -1)
channels(i+1) ! x
cdl.countDown()
handle(i, channels, cdl)
}
}
}

What have changed compare to previous version? Almost nothing really - we replaced loop/react with react, where reaction method calls react again. Not very intuitive but understandable (if you are creator of message passing library as Vaclav or truly yours). Seems like we saved one flow control exception (just guess) The point is that in Erlang you deal with lightweight process. In JVM you have callback objects almost for free but illusion of lighweight process becomes extremly expensive

Don't ask me what flow control exception is. I like idea as interesting animal in the zoo of concurrency tricks but don't want it to become too popular

Here it is for today. You can find source code and libraries used in this article at http://code.google.com/p/groovypptest/source/browse/#svn/trunk/JetLangSample/src/org/mbte/groovypp/samples/jetlang

Please let us know if you have idea how to speedup any of samples above.

I hope it was interesting and till next time.

 UPDATE: Code below suggested by Hossam Karim executed in 5221ms on my setup

I will probably post separate article why it happens. First impression is that as usual smart scheduler helps a lot. Also might be that Reactor has much smarter implementation 

 

package org.mbte.groovypp.samples.jetlang

import java.util.concurrent.{TimeUnit, CountDownLatch}
import scala.actors.Actor._
import scala.actors.scheduler.ForkJoinScheduler
import scala.actors.Reactor
import scala.actors.Scheduler

class FiberRingActor
(i: Int, channels: Array[Reactor], cdl: CountDownLatch)
extends Reactor {
def act = FiberRingScala2.handle(i, channels, cdl)
override def scheduler = FiberRingScala2.fjs
}

object FiberRingScala2 {
val fjs = new ForkJoinScheduler(2, 2, false)
fjs.start()

def main(args: Array[String]) {
val start = System.currentTimeMillis()
val channels = new Array[Reactor](10000)
val cdl = new CountDownLatch(channels.length * 500)

var i: Int = 0
while (i < channels.length) {
/*
val channel = actor {
handle(i, channels, cdl)
}
*/
val channel =
new FiberRingActor(i, channels, cdl)
channel.start
channels(i) = channel
i += 1
}
i = 0
while (i < 500) {
channels(i) ! "Hi"
var j : Int = 0
while (j < i) {
cdl.countDown()
j = j+1
}
i += 1
}

cdl.await(1000, TimeUnit.SECONDS)
Scheduler.shutdown

println(System.currentTimeMillis() - start)
}

def handle(i:Int, channels:Array[Reactor], cdl:CountDownLatch) :Unit = {
react {
case x:Any =>
if (i < channels.length -1)
channels(i+1) ! x
cdl.countDown()
handle(i, channels, cdl)
}
}
}
{{ tag }}, {{tag}},

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}
{{ parent.authors[0].realName || parent.author}}

{{ parent.authors[0].tagline || parent.tagline }}

{{ parent.views }} ViewsClicks
Tweet

{{parent.nComments}}