Reactive Programming with Akka and Scala
Join the DZone community and get the full member experience.
Join For Freeduring our lab, we wanted to implement an application with akka and scala, because we’re going to evaluate highly performing and scalable software architectures on the jvm. in this blog we’re describing how to set up an akka app and show a few simple demo applications.
bootstrapping an akka/scala app
the basic setup of the application is simple. we use
sbt
as build tool. therefore we need to create a
build.sbt
and add the required akka artifacts as dependencies:
name := "the akka lab" version := "0.1" scalaversion := "2.10.4 " librarydependencies ++= seq( "com.typesafe.akka" %% "akka-actor" % "2.2.3", "com.typesafe.akka" %% "akka-testkit" % "2.2.3", "org.scalatest" %% "scalatest" % "2.0" % "test", )
you can easily import the project into intellij or use sbt plugins to generate project files for your preferred ide:
simple message passing
after importing the project, we can implement our first actorsystem. its structure is shown below:
we want to create a single actorsystem called
routing
having a
receiver
actor called
single
next to a
roundrobinrouter
router
with 10 children of type
receiver
. we just need to instantiate the actorsystem and create both the children
single
and
router
. the
roundrobinrouter
creates its children by itself:
import scala.concurrent.duration._ val duration = 3.seconds implicit val timeout = timeout(duration) val sys = actorsystem("routing") val single = sys.actorof(props[receiver](new receiver(2.seconds.tomillis)), "single") val router = sys.actorof(props[receiver].withrouter(roundrobinrouter(nrofinstances = 10)), "router")
the
receiver
receives messages of type
message(string)
and prints its message parameter. after receiving a message, we toggle the state of our receiver by using akka’s
become
mechanism. so here is the definition of our
receiver
actor:
class receiver(timeout: long) extends actor with actorlogging { import demo.routingstrategies.receiver._ def this() = this(1000) override def receive = fastreceive def fastreceive: receive = { case message(m)=> { log.info(m) context.become(slowreceive) } } def slowreceive: receive = { case message(m) => { thread.sleep(timeout) log.info(s"slow: $m") context.become(fastreceive) } } }
as mentioned before, the actor simply prints the message. after it’s received a message it toggles its state from
fastreceive
to
slowreceive
and vice versa to simulate a more complex and time-consuming operation. now that our system is complete, we can start sending messages to
single
and
router
:
// sending a message by using actors path sys.actorselection("user/single") ! message("hello you, by path! [fast]") // fast really? // sending a message by using actorref single ! message("hello you! [slow]") // slow really? single ! message("hello you! [fast]") // fast really? // sending a message by using router router ! message("hello anybody! [fast]") // route message to next receiver actor router ! broadcast(message("hello world! [1xslow, 9xfast]")) // route message to all receiver actors
right here we got our first problem. as you can see, expected that akka preserves the order of the messages and that is true – as long as you don’t mix up sending messages by
actorref
and
actorselection
. in this case the only guarantee is that all messages sent to an
actorref
will have a defined order and all messages sent by an
actorselection
have a defined order, too. but between these two mechanism of addressing messages, there is no guaranteed order. the last thing that we want to try is to shut down the
actorsystem
after all messages have been processed. because we’re in a multi-threaded environment, we cannot simply shutdown the system at the end of the main method. we could call
system.shutdown()
and then use
system.awaittermination()
to wait until all currently active operations are finished, but we don’t know whether all messages have been processed. for this reason, akka provides the
gracefulshutdown
mechanism: using it means that a special message, the
poisonpill
, is enqueued in the actors mailbox. all messages before the
poisonpill
will be processed normally. when the
poisonpill
is processed, the actor terminates and sends a
terminated
message. after we’ve picked up all
terminated
messages, we can shutdown the system safely:
for { routerterminated <- gracefulstop(router, duration, broadcast(poisonpill)) singleterminated <- gracefulstop(single, duration) } { sys.shutdown() }
pingpong: remote messages
to try remoting in akka, we have decided to play actor ping-pong. the basic actor code is quite simple (simplified version):
object pingpongactor { case class ping(message : string) case class pong(message : string) } class pingpongactor extends actor with actorlogging { import demo.pingpongactor.{pong, ping} def receive = { case ping(m) => { log.info(s"ping: $m") sender ! pong(m) } case pong(m) => { log.info(s"pong: $m") sender ! ping(m) } } }
based on an
akka remote hello-world example
we wrote a “client” and a “server” application and configured them using typesafe config. one of the actors just needs to kick off the game and then both ping-pong happily ever after. as the message protocol is very simple, the application is well-suited to measure akka message latencies. hence, we attached a timestamp to each message using
system#nanotime()
. however, as stated in the
javadoc of system#nanotime()
, it is only suited for time measurements within a single jvm. so, instead of measuring only the latency from one actor to the other, we decided to measure roundtrip latency which allows us to use
system#nanotime()
safely. to measure them, both messages are extended by a timestamp property and
receive
is changed accordingly:
def receive = { case ping(m, ts) => { log.info(s"ping: $m") //just forward the timestamp sender ! pong(m, ts) } case pong(m, ts) => { val roundtriptime = system.nanotime() - ts log.info(s"pong: $m with round trip time $roundtriptime ns.") sender ! ping(m, next, system.nanotime()) } }
our takeaways for this example:
- actor distribution is easily possible but it is not immediately obvious how actors are distributed (i.e. we have to write a client and server application in our case)
- time measurement in a distributed system requires some thought but we got away with a very simple solution to measure roundtrip latencies
aside: typesafe config
we found that typesafe config is noteworthy because it has an easy syntax, it is easy to use the scala/java api and it is akka’s configuration mechanism. typesafe config has a json-like syntax, called hocon, that allows using different data types e.g. numbers, strings, arrays or nested “objects”. it also has built-in support for placeholder replacement. you can use it to override akka defaults to tune your akka application without changing a single line of code or to provide custom configuration for you own application. here’s a structural excerpt from our application config:
# overriding akka defaults akka { ... } # server-side akka overrides server { akka { ... } } # client-side akka overrides client { ... }
in the server application, we’re loading the configuration as follows:
// load akka defaults, ignore others val akkaconf = configfactory.load("application-remoting.conf").withonlypath("akka") // load server default val serverconf = configfactory.load("application-remoting.conf").getconfig("server") // merge server and akka config val conf = serverconf.withfallback(akkaconf)
first, we are loading the default configuration into
akkaconf
and afterwards the dedicated server configuration into
serverconf
. finally, we merge them into a single configuration called
conf
. when we’re reading a property from
conf
, we will get the one from ‘akka’ block in server section if it is present or the one from the root ‘akka’ block if not. the same way, akka reads defaults from
reference.conf
and overrides them with properties from
application.conf
if such a file is present in the application’s classpath. if you want to know akka’s default configuration you can take a look into
reference.conf
or into the
akka documentation
.
the trading app
finally, we wanted to try a more involved example which needs more domain modeling and a more sophisticated messaging protocol. the example is inspired by the trading application based on the akka trading performance example but we deviated in multiple aspects. in its current state, the trading application has some quirks, lacks a lot of features and it currently even lets money evaporate in certain circumstances…. that’s not nice, but we were able to prototype some concepts and it is a starting place for further experiments and enhancements, which we’ll discuss later in more detail.
the domain
the purpose of the application is to simulate market participants who want to buy securities. each participant can place orders: buyers place a bid, sellers place an ask. bids and asks are matched in an orderbook (one per security) and a trade is made. the algorithm is based on akka’s orderbook.scala . it basically tries to match the highest bids with the lowest asks as long as possible. if an order cannot be fulfilled entirely, it is split. all participants’ goods are tracked in accounts: securities are kept in a depot, cash is kept in a deposit. each account is charged as soon as an order is placed to avoid overcommitment. upon fulfillment of an order the goods are credited.
modeling an akka app
the application consists of two actors which are coupled by a custom router :
-
marketparticipant
: a market participant periodically places orders. it randomly decides whether to place a bid or an ask and also randomly decides on the offered price which is based on the current market price of the security including a random spread. -
orderbook
: there is oneorderbook
actor for each security within the system to match trades. it takes orders and periodically matches them. afterwards, it notifies the involvedmarketparticipant
s of the successful trade. -
orderrouter
: we decided to couplemarketparticipant
s andorderbooks
s via a custom router. during startup the router createsorderbook
actors. when an order arrives, it decides whichorderbook
is responsible and forwards the order.
the diagram below shows the message flow of a trade through the system. the market participants place a bid and an ask through the
orderrouter
which forwards the messages to the corresponding
orderbook
for this security. it matches the orders and replies to both parties with
bidresponse
and
askresponse
on success. they can in turn adjust their account balances accordingly.
implementation
the simulation exists in two flavours: a single-node implementation which is bootstrapped in
tradingsimulationapp
and a distributed implementation which is implemented in
remoteclientapp
which simulates the market and
remoteserverapp
which simulates order books. to configure various aspects of the application such as the securities or number or market participants we used typesafe config. wiring of the specific implementation is achieved with the
cake pattern
.
open issues
we very able to try a lot of features of akka such as become/unbecome, stashing, custom routers or remoting. however, the domain allows to expand the example application further in many different aspects which we’ll describe shortly below.
domain
regarding the domain we see the following areas of improvement:
- losing money : the application holds money back in case an order is split or even evaporates it if the buying price differs from the bidding price. this hasn’t been much of an issue for our short-running simulation but it clearly is a showstopper for a real application. this issue can be solved in different ways. for example, we could cancel orders after a specific amount of time if they cannot be fulfilled or just reserve money instead of really charging the deposit.
- acknowledgements : acknowledgements of order receipts would allow for easier state tracking by market participants.
technological
-
replication and fault tolerance
: currently, if an
orderbook
actor fails, all open trades and the market valuation are lost. using a dedicated supervisor node and a replication mechanism for eachorderbook
would make the application far more reliable. - monitoring : the demo could include a monitoring mechanism to visualize different business metrics such as current market prices, number of open orders, or aggregated revenue and also technical metrics such as messages delivered, message latency and throughput of the system.
- performance : the system is not tuned for performance at all. based on the monitoring and different scenarios we could look into bottlenecks and tweak the system based on the vast configuration options of akka .
final thoughts
although our three lab days were very productive we barely scratched the surface of what’s possible with akka. as you may have guessed from reading this blog post we struggled with some aspects but that’s a good sign: after all, we only learn something new by struggling first. we had a lot of fun in this lab and we’re looking forward to the next one to explore further aspects of akka.
the sources of the demo can be found in the comsyto github repo .
Published at DZone with permission of Comsysto Gmbh, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments