Over a million developers have joined DZone.

Spray Client and the new Akka 2.2.0 IO

DZone's Guide to

Spray Client and the new Akka 2.2.0 IO

· Java Zone ·
Free Resource

Download Microservices for Java Developers: A hands-on introduction to frameworks and containers. Brought to you in partnership with Red Hat.

It just so happens that the [@akkateam](https://twitter.com/akkateam) [released Akka 2.2](http://akka.io). Akka 2.2 includes, amongst others, the new IO. In this post, I will show how to use the 1.2 version of Spray Client with the new Akka IO. I am going to build a simple repository crawler, including tests. You will see just how simple & powerful it is to use the new Akka IO and how nicely Spray integrates into it.

Alongside Scala 2.10.2, we will be using the following libraries in our example:

"com.typesafe.akka" %% "akka-actor" % "2.2.0",
"com.typesafe.akka" %% "akka-testkit" % "2.2.0",
"io.spray" % "spray-client" % "1.2-20130705",
"org.specs2" %% "specs2" % "2.0"

As motivation, you can see that the new Akka IO and Spray Client can make the network interfaces and CPUs work hard enough.


Repository scanner

In a synchronous and blocking world, a repository crawler would make a GET request to the root URL of the repository and get the response. Then, for every “directory” link, it would append the link’s location to the root URL and make another GET request, …, until we reach a “file” link, which then represents the artifact. However, executing all this serially would take a very long time. Let’s see if we can speed up the process using Akka IO and Spray Client.

We will do the same thing, except that we use non-blocking IO and we will run some of the requests in paralel in an actor. Our actor will receive the “Scan“ message, and it will start by making the initial GET request to some baseUrl. When it receives the response, it will then make further GET requests to process each directory or file and descend further or store the discovered artifact. Let’s see what it looks like in code:

object PlainRepositoryScanner {
case object Scan

class PlainRepositoryScanner(baseUrl: String, dependencyStorage: ActorRef) 
extends Actor {
require(baseUrl.endsWith("/"), "The URL must end with /")
require(baseUrl.startsWith("http://") || baseUrl.startsWith("https://"), 
"The URL must start with http:// or https://")

import scala.concurrent.duration._
import context.dispatcher
import PlainRepositoryScanner._

def receive: Receive = {
case Scan => descend(Nil)
case elements: Elements => descend(elements)

private implicit val timeout = Timeout(10.seconds)
private type Elements = List[String]

private val pipeline = sendReceive ~> unmarshal[Elements]
private implicit object StringUnmarshaller extends Unmarshaller[Elements] {

def descend(elements: Elements): Unit = {
def prepareUrl(elements: Elements): String = ...
def processResponse: PartialFunction[Try[Elements], Unit] = ...

val url = prepareUrl(elements)
pipeline(Get(url)) onComplete processResponse


After the initial require sanity checks, we define the receive function. When it receives the Scanmessage, we descend with empty current [path] elements. To make the code clearer, I define type alias for Elements to be simply List[String]. Once that’s out of the way, I construct the Spray Client pipeline: sendReceive ~> unmarshal[Elements]. The pipeline, when it completes is going to take the response and pass it to the instance of the Unmarshaller for type Elements.

The heavy lifting happens in the descend method: its overall structure is

def descend(elements: Elements): Unit = {
def prepareUrl(elements: Elements): String = ...
def processResponse: PartialFunction[Try[Elements], Unit] = ...

val url = prepareUrl(elements)
pipeline(Get(url)) onComplete processResponse

The pipeline makes a GET request to the url constructed from the current elements; when the I/O operation completes, it gets applied to the partial function that the processResponse returns. In that partial function, we decide whether to descend further or whether we have found our artifact.

Implementing the descend function

Let’s get on with implementing the inner functions of the descend function, starting with the rough outline.

def descend(elements: Elements): Unit = {
def prepareUrl(elements: Elements): String = baseUrl + elements.mkString("")

def processResponse: PartialFunction[Try[Elements], Unit] = {
case Success(newElements) =>
// either descend or store artifact
case Failure(exception) =>
// report scan error

val url = prepareUrl(elements)
pipeline(Get(url)) onComplete processResponse(elements)

So far, so good. We can make the HTTP requests and receive a response that is unmarshalled to instance ofElements. To complete our work, we need to decide whether the link is follows down, out, or whether it is an artifact.

def descend(elements: Elements): Unit = {
def prepareUrl(elements: Elements): String = 
baseUrl + elements.mkString("")

def dropLastSlash(s: String): String = {
val lastSlash = s.lastIndexOf('/')
if (lastSlash != -1) s.substring(0, lastSlash) else s

def processResponse: PartialFunction[Try[Elements], Unit] = {
case Success(newElements) =>
newElements foreach {
case element if element startsWith "?" =>
// stay on the same page; skip
case element if !element.startsWith("/") && 
!element.startsWith("http") && 
element.endsWith("/") =>
// this is another 'directory' on the same 'page'
self ! elements :+ element
case element if !element.startsWith("/") &&
!element.startsWith("http") && 
element.endsWith(".jar") =>
// this is a .jar; not a directory
val (rawVersion::rawArtifactId::rawGroupId) = elements.reverse
val groupId = rawGroupId.reverse.map(dropLastSlash) mkString "."
val version = dropLastSlash(rawVersion)
val artifactId = dropLastSlash(rawArtifactId)
dependencyStorage ! (baseUrl, groupId, artifactId, versionId)
case x =>
// some unknown form
case Failure(exception) =>
// report scan error

val url = prepareUrl(elements)
pipeline(Get(url)) onComplete processResponse(elements)

This is rather crude version, but it demonstrates what we are doing quite clearly. The elements are the path elements, for example List(com, typesafe, akka, akka-actor, 2.2.0, akka-actor_2.2.0.jar), to turn them into Maven-style artifact, we simply ignore the last element (the jar), reverse and pattern match.

val (rawVersion::rawArtifactId::rawGroupId) = elements.reverse

And thus, rawVersion is "2.2.0"rawArtifactId is "akka-actor", and rawGroupId is List(akka, typesafe, com). From this, we can easily construct the artifact. (For now, it is a tuple containing its elements.)

Implementing Unmarshaller[Elements]

To complete our actor, we must implement the instance of the Unmarshaller typeclass for the typeElements. In Scala-speak this means completing the implicit object StringUnmarshaller. I shall be lazy and abuse Scala’s regular expressions to pick out all targets of a HTML elements.

private implicit object StringUnmarshaller 
extends Unmarshaller[Elements] {

val AHrefRegex = """<a href="([^"]*)">[^<]*</a>""".r
def apply(entity: HttpEntity): Deserialized[Elements] = {
val body = entity.asString
val matches = AHrefRegex.findAllMatchIn(body)
val hrefs = matches.map(_.group(1)).toList


It is tempting to just take some live repository and start firing requests at it. Apart from being rude (how would you feel if I started hitting your repository with hundreds of requests?), we would not be able to make any sensible assertions. It turns out that we’ll have to implement our own repository server for the tests alone. The good news is that it won’t be that hard.

Let’s start with the simplified version of the spec, though. (I leave the complete implementation, which will assert that the discovered artifacts match the served ones as exercise for the readers. That’s how kindI am!)

class PlainRepositoryScannerSpec 
extends TestKit(ActorSystem())
with SpecificationLike {

import PlainRepositoryScanner._

"Plain repository" >> {
val count = 5000
val port = 12345
val scanner = system.actorOf(Props(
new PlainRepositoryScanner(s"http://localhost:$port/", testActor)))

"scan" in {
val repository = Repository(Artifacts.generateArtifacts(3, count), port)
scanner ! Scan
receiveN(5000, FiniteDuration(30, TimeUnit.SECONDS))


The test is pretty straight-forward. We construct our PlainRepositoryScanner, giving it a reference to our test-only repository server and a reference to the actor that will receive the discovered artifacts. Because we will want to examine the received messages, we can pass in the testActor. In the body of the example, we start the test-only repository, which serves count number of artifacts in a hierarchy three levels deep, bound to TCP port port. Then we send the Scan message to our scanner and give it up to 30 seconds to discover all count artifacts.

And that’s all there is to it!

Test repository

If you are desperate to find out how I’ve implemented the Repository, I give you its full source here:

sealed trait Dependency {
def groupId: GroupId
def artifactId: ArtefactId
sealed trait VersionedDependency extends Dependency {
def version: Version

class Repository 
private(system: ActorSystem, port: Int, 
artifacts: List[VersionedDependency],
elementsToBody: List[String] => String) {

val blackHoleActor = system.actorOf(Props(new Actor {
def receive: Receive = Actor.emptyBehavior

private class Service extends Actor {
def vdToUris(vd: VersionedDependency): List[String] = {
val marker = '\ufffe'
val path = vd.groupId.replace('.', marker) + 
marker + vd.artifactId + 
marker + vd.version.version
val jar = vd.artifactId + "_" + vd.version.version + ".jar"
path.split(marker).map(_ + "/").toList :+ jar
val paths = artifacts.map(vdToUris)

def receive: Receive = {
case _: Http.Connected =>
sender ! Http.Register(self)
case HttpRequest(HttpMethods.GET, Uri.Path(requestPath), _, _, _) =>
val requestPathSegments = 
if (requestPath == "/") 
requestPath.split("/").toList.map(_ + "/").tail
val elements = paths.
requestPathSegments.length + 1)).
val body = elementsToBody(elements)
sender ! HttpResponse(entity = HttpEntity(body))
case _ =>

private val service = 
system.actorOf(Props(new Service).
withRouter(RoundRobinRouter(nrOfInstances = 50)))

private val io = IO(Http)(system)
io.tell(Http.Bind(service, "localhost", port = port), blackHoleActor)

def stop(): Unit = {
io.tell(Http.Unbind, blackHoleActor)

object Repository {

private def trivialElementsToBody(elements: List[String]): String = {
val builder = new mutable.StringBuilder()

elements.foreach(element =>
builder ++= s"""$element\n"""


def apply(artifacts: List[VersionedDependency], port: Int)
(implicit system: ActorSystem): Repository = {
new Repository(system, port, artifacts, trivialElementsToBody)


Download Building Reactive Microservices in Java: Asynchronous and Event-Based Application Design. Brought to you in partnership with Red Hat


Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}