Over a million developers have joined DZone.

New & cool in Spray 1.2.0 (part II)

· Java Zone

Microservices! They are everywhere, or at least, the term is. When should you use a microservice architecture? What factors should be considered when making that decision? Do the benefits outweigh the costs? Why is everyone so excited about them, anyway?  Brought to you in partnership with IBM.

In the previous post, I showed you the improvements to the chunked request handling. Unfortunately, I had to deal with uri.path.toString(), which I hope you too found rather unsatisfactory. Let me improve the API, and add some tests while I’m at it!

I would like to make the most of the Spray Routing; unfortunately, I cannot simply take the routing as it stands because of our funky chunked HTTP posts. Fortunately, Spray routing allows me to combine the two approaches rather nicely. Let’s break the RecogService and StreamingRecogService from the previous post into:

trait BasicRecogService extends Directives { ... }
trait StreamingRecogService extends Directives { this: Actor => ... }
class RecogServiceActor(coordinator: ActorRef) 
  extends Actor with BasicRecogService with StreamingRecogService { ... }
class StreamingRecogServiceActor[A](
  coordinator: ActorRef, 
  sessionId: String, 
  message: (String, Array[Byte]) => A) 
  extends Actor { ... }

This gives me the overall structure. The BasicRecogService contains routing.Routes that handle the non-chunked requests; the StreamingRecogService contains routing.Routes that deal with the chunked requests. Finally, I’ve renamed the subtypes of Actor to end with the word Actor. The next thing to do is to migrate the functionality from

case HttpRequest(HttpMethods.POST, uri, _, entity, _) =>
  val client = sender
  uri.path.toString() match {
    case RootUri =>
      (coordinator ? Begin(1)).mapTo[String].onComplete {
        case Success(sessionId) => 
          client ! HttpResponse(entity = sessionId)
        case Failure(ex) => 
          client ! HttpResponse(entity = ex.getMessage, 
                                status = StatusCodes.InternalServerError)
      }
    case StaticUri(sessionId) =>
      coordinator ! SingleImage(sessionId, entity.data.toByteArray)
  }

I am going to use Spray’s routing DSL to define that I have two URIs, both accept POST requests and complete them in their specific ways. In Scala, it is the body of the BasicRecogService

trait BasicRecogService extends Directives {
  import scala.concurrent.duration._
  import akka.pattern.ask
  import CoordinatorActor._
  import RecogService._

  implicit val timeout = akka.util.Timeout(2.seconds)

  def normalRoute(coordinator: ActorRef)(implicit ec: ExecutionContext) =
    path(Recog) {
      post {
        complete((coordinator ? Begin(1)).mapTo[String])
      }
    } ~
    path(Recog / Rest) { sessionId =>
      post {
        entity(as[Array[Byte]]) { entity =>
          coordinator ! SingleImage(sessionId, entity)
          complete("{}")
        }
      }
    }
}

Next up, I need similar handling, but for the chunked parts. I am translating the old code

case ChunkedRequestStart(
  HttpRequest(HttpMethods.POST, uri, _, entity, _)) =>
  val streamer = uri.path.toString() match {
    case MJPEGUri(sessionId) => 
      context.actorOf(Props(
        new StreamingRecogService(coordinator, sessionId, SingleImage)))
    case H264Uri(sessionId)  => 
      context.actorOf(Props(
        new StreamingRecogService(coordinator, sessionId, FrameChunk)))
  }
  sender ! RegisterChunkHandler(streamer)

The route is similar: on post to some URIs, create the appropriate StreamingRecogServiceActorinstances and have Spray send the remainder of the chunked post to the newly created actors.

trait StreamingRecogService extends Directives {
  this: Actor =>

  import CoordinatorActor._
  import RecogService._

  def chunkedRoute(coordinator: ActorRef) = {
    def handleChunksWith(creator: => Actor): 
    	RequestContext => Unit = {
      val handler = context.actorOf(Props(creator))
      sender ! RegisterChunkHandler(handler)

      {_ => ()}
    }

    path(Recog / MJPEG / Rest) { sessionId =>
      post {
        handleChunksWith(
          new StreamingRecogServiceActor(coordinator, sessionId, SingleImage))
      }
    } ~
    path(Recog / H264 / Rest)  { sessionId =>
      post {
        handleChunksWith(
          new StreamingRecogServiceActor(coordinator, sessionId, FrameChunk))
      }
    }
  }

}

I now have to traits that define Spray routes that can be applied to the plain and chunked requests. All I need to do now is to use them in the RecogServiceActor.

RecogServiceActor

The usage of the routes I created earlier is really simple: when the actor receives a plain HttpRequest, we let the normalRoute handle it; when the actor receives a chunked HttpRequest, we let thechunkedRoute handle it.

class RecogServiceActor(coordinator: ActorRef) 
  extends Actor with BasicRecogService with StreamingRecogService {
  import context.dispatcher
  val normal = normalRoute(coordinator)
  val chunked = chunkedRoute(coordinator)

  def receive: Receive =  {
    case _: Http.Connected => sender ! Http.Register(self)
    case request: HttpRequest => 
      normal(RequestContext(request, sender, request.uri.path).
             withDefaultSender(sender))
    case ChunkedRequestStart(request) => 
      chunked(RequestContext(request, sender, request.uri.path).
              withDefaultSender(sender))
  }

}

Notice that I pre-chew the routing.Routes by applying the normalRoute to coordinator (and implicitly the ExecutionContext, made available by import context.dispatcher), and by applying thechunkedRoute to the same coordinator. The good news is that the StreamingRecogServiceActorremains the same:

class StreamingRecogServiceActor[A](
  coordinator: ActorRef, 
  sessionId: String, 
  message: (String, Array[Byte]) => A) 
  extends Actor {

  def receive = {
    case MessageChunk(data, _) =>
      coordinator ! message(sessionId, data.toByteArray)
    case ChunkedMessageEnd(_, _) =>
      sender ! HttpResponse(entity = "{}")
      context.stop(self)
  }

}

Testing it

Let me complete by showing you how to trivially test the routes. In the example test, I will verify that HTTP POST to /recog responds with the session id it receives from the coordinator actor. The good news is that now that I the routing.Route routes, I can use Spray’s Test Kit to simplify the testing.

class BasicRecogServiceSpec 
  extends Specification 
  with Specs2RouteTest 
  with BasicRecogService {

  class TestCoordinatorActor extends Actor {
    def receive: Receive = {
      case Begin(_) => sender ! "a10b2f45-87dd-4fe1-accf-3361763c1553"
    }
  }

  "Basic recog service" should {
    val coordinator = system.actorOf(Props(new TestCoordinatorActor))

    "return the session ID on post" in {
      Post("/recog") ~> normalRoute(coordinator) ~> check {
        responseAs[String] mustEqual "a10b2f45-87dd-4fe1-accf-3361763c1553"
      }
    }

  }

}

As you can see, I use the routing.Route returned by applying normalRoute to the coordinatorActorRef I created above, and then using Spray’s Test Kit to construct the test case: Post(uri) ~> route ~> check { responseAs[...] mustEqual }.

Code

As usual, the complete code is on GitHub, at https://github.com/eigengo/codemesh2013 for your cloning pleasure!

Discover how the Watson team is further developing SDKs in Java, Node.js, Python, iOS, and Android to access these services and make programming easy. Brought to you in partnership with IBM.

Topics:

Published at DZone with permission of Jan Machacek, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}