Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Create a Reactive Websocket Server with akka-streams

DZone's Guide to

Create a Reactive Websocket Server with akka-streams

In this article we'll show you how you can create a websockets server using akka-http.

· Java Zone
Free Resource

Download Microservices for Java Developers: A hands-on introduction to frameworks and containers. Brought to you in partnership with Red Hat.

In the previous article we looked at how backpressure works when working with akka-streams. Akka also uses akka-streams as the base for akka-http, with which you can easily create a HTTP server and client, where each request is processed using a stream materialized using actors. In the latest release akka-http also supports websockets. In this article we'll show you how you can create a websockets server using akka-http.

We'll show and explain the following subjects:

  • Respond to messages using a simple flow created with the flow API.
  • Respond to messages with a flow graph, created with the flow graph DSL.
  • Proactively push messages to the client by introducing an additional source to the flow.
  • Create a custom publisher from an Akka Actor.

When writing this article, it became a bit longer than initially planned. I'll write a follow-up on how you can see that with websockets backpressure and rate control is also working, so watch for that one in a couple of weeks. The source files for this article can be found in the following Github Gists:

So first lets look at how to set up the basic skeleton of the application.

Getting started

Lets start by looking at the dependencies we need. For all the examples we use the following simple sbt file

name := "akka-http-websockets"

version := "1.0"

scalaVersion := "2.11.6"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-stream-experimental" % "1.0-RC2",
  "com.typesafe.akka" %% "akka-http-core-experimental" % "1.0-RC2",
  "com.typesafe.play" %% "play-json" % "2.3.4"
)
As you can see we use the RC2 version of akka-streams and akka-http, which was released at the end of April. Creating a websocket server with akka-http is very easy, and pretty much the same way as we also did with the http server:
object WSRequest {

  def unapply(req: HttpRequest) : Option[HttpRequest] = {
    if (req.header[UpgradeToWebsocket].isDefined) {
      req.header[UpgradeToWebsocket] match {
        case Some(upgrade) => Some(req)
        case None => None
      }
    } else None
  }

}

/**
 * Simple websocket server using akka-http and akka-streams.
 *
 * Note that about 600 messages get queued up in the send buffer (on mac, 146988 is default socket buffer)
 */
object WSServer extends App {

  // required actorsystem and flow materializer
  implicit val system = ActorSystem("websockets")
  implicit val fm = ActorFlowMaterializer()

  // setup the actors for the stats
  // router: will keep a list of connected actorpublisher, to inform them about new stats.
  // vmactor: will start sending messages to the router, which will pass them on to any
  // connected routee
  val router: ActorRef = system.actorOf(Props[RouterActor], "router")
  val vmactor: ActorRef = system.actorOf(Props(classOf[VMActor], router ,2 seconds, 20 milliseconds))

  // Bind to an HTTP port and handle incoming messages.
  // With the custom extractor we're always certain the header contains
  // the correct upgrade message.
  // We can pass in a socketoptions to tune the buffer behavior
  // e.g options =  List(Inet.SO.SendBufferSize(100))
  val binding = Http().bindAndHandleSync({

    case WSRequest(req@HttpRequest(GET, Uri.Path("/simple"), _, _, _)) => handleWith(req, Flows.reverseFlow)
    case WSRequest(req@HttpRequest(GET, Uri.Path("/echo"), _, _, _)) => handleWith(req, Flows.echoFlow)
    case WSRequest(req@HttpRequest(GET, Uri.Path("/graph"), _, _, _)) => handleWith(req, Flows.graphFlow)
    case WSRequest(req@HttpRequest(GET, Uri.Path("/graphWithSource"), _, _, _)) => handleWith(req, Flows.graphFlowWithExtraSource)
    case WSRequest(req@HttpRequest(GET, Uri.Path("/stats"), _, _, _)) => handleWith(req, Flows.graphFlowWithStats(router))
    case _: HttpRequest => HttpResponse(400, entity = "Invalid websocket request")

  }, interface = "localhost", port = 9001) 


  // binding is a future, we assume it's ready within a second or timeout
  try {
    Await.result(binding, 1 second)
    println("Server online at http://localhost:9001")
  } catch {
    case exc: TimeoutException =>
      println("Server took to long to startup, shutting down")
      system.shutdown()
  }

  /**
   * Simple helper function, that connects a flow to a specific websocket upgrade request
   */
  def handleWith(req: HttpRequest, flow: Flow[Message, Message, Unit]) = req.header[UpgradeToWebsocket].get.handleMessages(flow)

}
In this example we bind a set of handlers, more on these later, to localhost port 9001. Whenever a request comes in, we try to match it using pattern matching. To detect a WebSocket request we need to check whether the value in a specific header is an UpgradeToWebSocket message. We do this in the WSRequest extractor. So when we have a WebSocket request and a specific Uri.Path we handle the request using the specified flow. If we can't match the request, we just return a 400. Note that the binding itself is a future, so we just wait a second for the server to start, or assume something went wrong. In our pattern matching we match four different patterns with each its own flow. In the next sections we'll look a bit closer at each of the flows to see how they work. Lets start simple with the "/echo" flow, which also allows us to introduce some test tools.

Echo flow

Before we look at the flow lets look a bit closer at what our handler functions require. The signature for "req.header[UpgradeToWebsocket].get.handleMessages" looks like this:

def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String] = None)(implicit mat: FlowMaterializer): HttpResponse

As you can see this function requires a Flow with an open input which accepts a Message and an open output which also expects a message. Akka-streams will attach the created websocket as a Source and pass any sent messages from the client into this flow. Akka-streams will also use the same websocket as a Sink and pass the resulting message from this flow to it. The result from this function is a HTTPResponse that will be sent to the client.

Now lets look at the echo flow. For this flow we defined the following case:

    case WSRequest(req@HttpRequest(GET, Uri.Path("/echo"), _, _, _)) => handleWith(req, Flows.echoFlow)
So whenever we receive a websocket message on "/echo", we run it through the Flows.echoFlow:

def echoFlow: Flow[Message, Message, Unit] =  Flow[Message]
Calling Flow[Message] like this, returns a minimal flow, which just returns the message it received as is input, directly to the output. So in our case any websocket message received, is passed back to the client. Now, lets see this in action. To test this we need a websocket client. We can, of course, write one ourselves, but for now we'll just use a chrome plugin ( Simple Web Socket Client ). Connect this client to  http://localhost:9001/echo  and send a message:

ws_1.png

The flow configured at this endpoint responds with the same text as was entered. We didn't really do anything with this message. Lets add some custom functionality to the flow and see what happens.

SimpleFlow

When we connect a client to "/simple" the Flows.reverseFlow is used to handle incoming websocket messages:

 case WSRequest(req@HttpRequest(GET, Uri.Path("/echo"), _, _, _)) => handleWith(req, Flows.echoFlow)
This time we create a simple Flow using the standard streams API:
 def reverseFlow: Flow[Message, Message, Unit] = {
    Flow[Message].map {
      case TextMessage.Strict(txt) => TextMessage.Strict(txt.reverse)
      case _ => TextMessage.Strict("Not supported message type")
    }
  }
This time we once again create a flow using Flow[Message], and on the result we call map. In the provided map function we check whether we have a standard TextMessage (we're ignoring streaming and binary messages in this article), and if we do, we reverse the text and return a new TextMessage. The result is pretty much like you'd expect:

ws_2.png

Anything you enter here, is returned to the client but reversed.

So far we've only created very simple flows, using the flow API directly. I've you've already looked a bit closer at akka-streams you probably know that there is an alternative way of creating flows. You can also use the Graph DSL, as we'll show in the next example:

The Graph flow

With a graph flow it is very easy to create more complex message processing graphs. In the following sample we'll show you how you can use a couple of standard flow constructs to easily process and filter incoming messages. This sample will be run when we access the server on the following endpoint:

  case WSRequest(req@HttpRequest(GET, Uri.Path("/graph"), _, _, _)) => handleWith(req, Flows.graphFlow)
Lets first look at the source code:
  /**
   * Flow which uses a graph to process the incoming message.
   *
   *                           compute
   *  collect ~> broadcast ~>  compute ~> zip ~> map
   *                           compute
   *
   * We broadcast the message to three map functions, we
   * then zip them all up, and map them to the response
   * message which we return.
   *
   * @return
   */
  def graphFlow: Flow[Message, Message, Unit] = {
    Flow() { implicit b =>

      import FlowGraph.Implicits._

      val collect = b.add(Flow[Message].collect[String]({
        case TextMessage.Strict(txt) => txt
      }))

      // setup the components of the flow
      val compute1 = b.add(Flow[String].map(_ + ":1"))
      val compute2 = b.add(Flow[String].map(_ + ":2"))
      val compute3 = b.add(Flow[String].map(_ + ":3"))

      val broadcast = b.add(Broadcast[String](3))
      val zip = b.add(ZipWith[String,String,String,String]((s1, s2, s3) => s1 + s2 + s3))
      val mapToMessage = b.add(Flow[String].map[TextMessage](TextMessage.Strict))

      // now we build up the flow
                 broadcast ~> compute1 ~> zip.in0
      collect ~> broadcast ~> compute2 ~> zip.in1
                 broadcast ~> compute3 ~> zip.in2

      zip.out ~> mapToMessage

      (collect.inlet, mapToMessage.outlet)
    }
  }
In this graph we first filter out messages we don't want by using a collect step. This step only passes on the incoming message, when there is a match in the provided partial function. Next we send the message to a broadcast step. The broadcast step allows you to duplicate the message and send it to multiple downstream steps. In our case we send it to three downstread computer steps. The compute step in itself is just a simple Map function where we concat a number to the message. With the zip node we create a single String from the three Strings created by the compute nodes. Finally, since our flow requires a Message, we map the string to a message in the mapToMessage step. To complete this flow, we need to return a tuple with the entry and exit points of this flow. At this point, we can run the sample again:

ws_3.png

At this point we've only responded to messages from the client, but didn't push anything to the client from the server proactively. In the following sample, we'll introduce an additional source that can push messages to the client regardless whether the client requested it.

Pushing messages to the client

One of the patterns that are matched, use the Flow.graphFlowWithExtraSource flow:

  case WSRequest(req@HttpRequest(GET, Uri.Path("/graphWithSource"), _, _, _)) => handleWith(req, Flows.graphFlowWithExtraSource)
This flow ignores incoming messages, and just sends out 2000 random strings to the connected client. The complete code for this flow is shown next:
  /**
   * When the flow is materialized we don't really just have to respond with a single
   * message. Any message that is produced from the flow gets sent to the client. This
   * means we can also attach an additional source to the flow and use that to push
   * messages to the client.
   *
   * So this flow looks like this:
   *
   *        in ~> filter ~> merge
   *           newSource ~> merge ~> map
   * This flow filters out the incoming messages, and the merge will only see messages
   * from our new flow. All these messages get sent to the connected websocket.
   *
   *
   * @return
   */
  def graphFlowWithExtraSource: Flow[Message, Message, Unit] = {
    Flow() { implicit b =>
      import FlowGraph.Implicits._

      // Graph elements we'll use
      val merge = b.add(Merge[Int](2))
      val filter = b.add(Flow[Int].filter(_ => false))

      // convert to int so we can connect to merge
      val mapMsgToInt = b.add(Flow[Message].map[Int] { msg => -1 })
      val mapIntToMsg = b.add(Flow[Int].map[Message]( x => TextMessage.Strict(":" + randomPrintableString(200) + ":" + x.toString)))
      val log = b.add(Flow[Int].map[Int](x => {println(x); x}))

      // source we want to use to send message to the connected websocket sink
      val rangeSource = b.add(Source(1 to 2000))

      // connect the graph
      mapMsgToInt ~> filter ~> merge // this part of the merge will never provide msgs
         rangeSource ~> log ~> merge ~> mapIntToMsg

      // expose ports
      (mapMsgToInt.inlet, mapIntToMsg.outlet)
    }
  }
In this graph we use a Merge step to connect the source we want to use to the rest of the flow. A merge step takes an element from an upstream source when it becomes available. To make sure that we only take elements from our source (the rangeSource), we add a filter on the messages we receive from the websocket client. Besides these steps we've got a couple of map steps we've already seen before, and that makes up our flow.

When we run a message to this flow, we should see 2000 messages being pushed to the client as fast as the client can process:

ws_4.png

Which is exactly what happens. As soon as the connection is created, 2000 messages are pushed to the client. Any messages sent from the client are ignored, as you can see in the following screenshot:

ws_5.png

We've also added a small logging step to the flow. This will just print out all numbers from 1 tot 2000, to give us an idea how everything is running.
At this point we've only used the standard components provided by akka-streams. In the next section we're going to create a custom publisher, that pushes VM information such as memory usage to a websocket client.

Pusing messages to the client with a custom publisher

We'll need to take a couple of steps before we can get this to work correctly, and this will involve creating a couple of agents:

  1. We'll need a actor that forms our stream. For this we'll use an agent that together with a scheduler sends a VMStat messages at a configured interval.
  2. In akka-streams you can't connect a new subscriber to a running publisher. To work around this we'll have the actor from step 1, send its messages to a router. This router will then broadcast the messages further to an actor that can inject them into a flow.
  3. Finally we need the actor that connects the messages to the akka flow. For this we create an actor for each websocket request, which acts like a publisher, and passes on messages received from the router into the flow.

Lets start with the first one.

The VMActor

The VMActor is a simple actor, which, when started, sends a message every period to the provided actorRef like this:

class VMActor(router: ActorRef, delay: FiniteDuration, interval: FiniteDuration) extends Actor {

  import scala.concurrent.ExecutionContext.Implicits.global

  context.system.scheduler.schedule(delay, interval) {
    val json = Json.obj( "stats" -> getStats.map(el => el._1 -> el._2))
    router ! Json.prettyPrint(json)
  }

  override def receive: Actor.Receive = {
    case _ => // just ignore any messages
  }

  def getStats: Map[String, Long] = {

    val baseStats = Map[String, Long](
      "count.procs" -> Runtime.getRuntime.availableProcessors(),
      "count.mem.free" -> Runtime.getRuntime.freeMemory(),
      "count.mem.maxMemory" -> Runtime.getRuntime.maxMemory(),
      "count.mem.totalMemory" -> Runtime.getRuntime.totalMemory()
    )

    val roots = File.listRoots()
    val totalSpaceMap = roots.map(root => s"count.fs.total.${root.getAbsolutePath}" -> root.getTotalSpace) toMap
    val freeSpaceMap = roots.map(root => s"count.fs.free.${root.getAbsolutePath}" -> root.getFreeSpace) toMap
    val usuableSpaceMap = roots.map(root => s"count.fs.usuable.${root.getAbsolutePath}" -> root.getUsableSpace) toMap

    baseStats ++ totalSpaceMap ++ freeSpaceMap ++ usuableSpaceMap
  }
}
The code for this actor isn't that special. It's just a basic actor which collects some information and passes it on in a map to the provided actorRef. (I know, I know, should have made that a case class...). Now, lets look at the router.

The router

For the router, initially, I wanted to use the standard BroadcastGroup router. But this router is immutable and doesn't really allow dynamically adding new routees. So for this usecase we create a very simple alternative router, which we create like this:

  val router: ActorRef = system.actorOf(Props[RouterActor], "router")
  val vmactor: ActorRef = system.actorOf(Props(classOf[VMActor], router ,2 seconds, 20 milliseconds))
Here we create both actors, and pass in the actorRef of the router to the vmactor. The implementation of this router looks like this:
class RouterActor extends Actor {
  var routees = Set[Routee]()

  def receive = {
    case ar: AddRoutee => routees = routees + ar.routee
    case rr: RemoveRoutee => routees = routees - rr.routee
    case msg => routees.foreach(_.send(msg, sender))
  }
}
The router uses the default case classes the default routers of Akka also use. The vmactor will send an update 50 times per second, with an initial delay of 2 seconds. All that is left to do is create an actor that registers itself to the router as routee and can publish to the flow.

VMStatsPublisher

Akka-streams provides an ActorPublisher[T] trait which you must use on your actors, so that they can be used as publisher inside a flow. Before we look at the implementation of this actor, first lets look at the flow that uses this actor:

 /**
   * Creates a flow which uses the provided source as additional input. This complete scenario
   * works like this:
   *  1. When the actor is created it registers itself with a router.
   *  2. the VMActor sends messages at an interval to the router.
   *  3. The router next sends the message to this source which injects it into the flow
   */
 def graphFlowWithStats(router: ActorRef): Flow[Message, Message, Unit] = {
    Flow() { implicit b =>
      import FlowGraph.Implicits._

      // create an actor source
      val source = Source.actorPublisher[String](Props(classOf[VMStatsPublisher],router))

      // Graph elements we'll use
      val merge = b.add(Merge[String](2))
      val filter = b.add(Flow[String].filter(_ => false))

      // convert to int so we can connect to merge
      val mapMsgToString = b.add(Flow[Message].map[String] { msg => "" })
      val mapStringToMsg = b.add(Flow[String].map[Message]( x => TextMessage.Strict(x)))

      val statsSource = b.add(source)

      // connect the graph
      mapMsgToString ~> filter ~> merge // this part of the merge will never provide msgs
                   statsSource ~> merge ~> mapStringToMsg

      // expose ports
      (mapMsgToString.inlet, mapStringToMsg.outlet)
    }
  }
If you've looked at the other flows, this one shouldn't come as a suprise. It looks a lot like the previous one, only this time we define the source like this:
      val source = Source.actorPublisher[String](Props(classOf[VMStatsPublisher],router))

This means that everytime a websocket connection is made, a new VMStatsPublisher actor is created and the router actorRef is passed into the constructor. So, finally, lets look at this publisher. We will first look at the complete code of this actor and then we'll highlight a couple of small things in the discussion afterwards:
/**
 * for now a very simple actor, which keeps a separate buffer
 * for each subscriber. This could be rewritten to store the
 * vmstats in an actor somewhere centrally and pull them from there.
 *
 * Based on the standed publisher example from the akka docs.
 */
class VMStatsPublisher(router: ActorRef) extends ActorPublisher[String] {

  case class QueueUpdated()

  import akka.stream.actor.ActorPublisherMessage._
  import scala.collection.mutable

  val MaxBufferSize = 50
  val queue = mutable.Queue[String]()

  var queueUpdated = false;

  // on startup, register with routee
  override def preStart() {
    router ! AddRoutee(ActorRefRoutee(self))
  }

  // cleanly remove this actor from the router. To
  // make sure our custom router only keeps track of
  // alive actors.
  override def postStop(): Unit = {
    router ! RemoveRoutee(ActorRefRoutee(self))
  }

  def receive = {

    // receive new stats, add them to the queue, and quickly
    // exit.
    case stats: String  =>
      // remove the oldest one from the queue and add a new one
      if (queue.size == MaxBufferSize) queue.dequeue()
      queue += stats
      if (!queueUpdated) {
        queueUpdated = true
        self ! QueueUpdated
      }

    // we receive this message if there are new items in the
    // queue. If we have a demand for messages send the requested
    // demand.
    case QueueUpdated => deliver()

    // the connected subscriber request n messages, we don't need
    // to explicitely check the amount, we use totalDemand propery for this
    case Request(amount) =>
      deliver()

    // subscriber stops, so we stop ourselves.
    case Cancel =>
      context.stop(self)
  }

  /**
   * Deliver the message to the subscriber. In the case of websockets over TCP, note
   * that even if we have a slow consumer, we won't notice that immediately. First the
   * buffers will fill up before we get feedback.
   */
  @tailrec final def deliver(): Unit = {
    if (totalDemand == 0) {
      println(s"No more demand for: $this")
    }

    if (queue.size == 0 && totalDemand != 0) {
      // we can response to queueupdated msgs again, since
      // we can't do anything until our queue contains stuff again.
      queueUpdated = false
    } else if (totalDemand > 0 && queue.size > 0) {
      onNext(queue.dequeue())
      deliver()
    }
  }
}
The comments inline should give you a fairly good idea what is happening here, but lets look at a couple of items. First lets look at how we register this actor with the router:
 // on startup, register with routee
  override def preStart() {
    router ! AddRoutee(ActorRefRoutee(self))
  }

  // cleanly remove this actor from the router. To
  // make sure our custom router only keeps track of
  // alive actors.
  override def postStop(): Unit = {
    router ! RemoveRoutee(ActorRefRoutee(self))
  }
In the preStart we register, and we must also make sure to deregister before we're stopped. In the receive method we can receive three types of objects. We can receive stats, which we send to an internal queue, we can receive a request from downstream for more message (the Request message), or we can get Cancel message when the subscriber closes in an orderly fashion. To deliver messages to our downstream subscriber we use the onNext call in the deliver function.
  @tailrec final def deliver(): Unit = {
    if (totalDemand == 0) {
      println(s"No more demand for: $this")
    }

    if (queue.size == 0 && totalDemand != 0) {
      // we can response to queueupdated msgs again, since
      // we can't do anything until our queue contains stuff again.
      queueUpdated = false
    } else if (totalDemand > 0 && queue.size > 0) {
      onNext(queue.dequeue())
      deliver()
    }
  }
As long as there is a demand (totalDemand property which is managed by akka-streams), and we've got messages in our queue we'll continue sending messages. This function also outputs a console message when there is no more demand from the subscriber to this publisher. When we connect to this flow using our client we see the following in our websocket client:

ws_6.png

Cool right! There are a couple of other topics to explore regarding websockets and akka-streams, most importantly backpressure. I'll create a separate article on that one in the next couple of weeks to show that slow websocket clients trigger backpressure with akka-streams.

Download Modern Java EE Design Patterns: Building Scalable Architecture for Sustainable Enterprise Development.  Brought to you in partnership with Red Hat

Topics:
java ,languages ,tips and tricks

Published at DZone with permission of Jos Dirksen, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}