Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Akka vs. ZIO vs. Monix (Part 2): Communication

DZone's Guide to

Akka vs. ZIO vs. Monix (Part 2): Communication

Can ZIO and Monix be used as an alternative to Akka? In regards to state management and communication, the answer is yes!

· Java Zone ·
Free Resource

How do you break a Monolith into Microservices at Scale? This ebook shows strategies and techniques for building scalable and resilient microservices.

In part 1, we explored how to implement a process that manages a non-trivial state using Akka, Akka Typed, Monix, and ZIO. However, as a popular saying by Carl Hewitt goes, “one actor is no actor, they come in systems." With that in mind, let’s explore some examples that use multiple communicating actors and see if it’s still possible and practical to implement them using ZIO or Monix.

Crawler

Our first example will be an implementation of the popular Master-Worker Pattern, where we have a single master process distributing work to a number of worker processes. When a worker finishes a work unit, it sends the results to the master process, which gathers and includes them in the overall computation results.

More concretely, the task will be to create a web crawler. Starting from a given URL, it should traverse all the links by counting which hosts are most popular. HTTP requests should be executed in parallel. However, there’s one additional restriction — we don’t want to be suspicious in our crawling efforts, so we impose a restriction. For this restriction, it requires that at any given time there can be only one request to any one host that is executed (but requests to different hosts can be done in parallel).

As we are interested in the way a process is defined, and not in the actual crawling itself, we’ll use a stub HTTP service, along with stub functions, to extract interesting links from the site’s content:

type Host = String
case class Url(host: Host, path: String)

trait Http[F[_]] {
  def get(url: Url): F[String]
}

type LinkParser = String => List[Url]

Traditional Akka

Let’s start with a “traditional” Akka solution and then move to other implementations. Again, only the crucial snippets will be included, but the full source code is available on GitHub.

We’ll have to define two actors: Crawler and Worker. To construct a Crawleractor, we need an interface for executing HTTP requests (http: Http[Future], as we are in Akka-land and everything is Future-based), a way to parse links (parseLinks), and a Promise waiting to be completed with the final result once all the pages have been crawled. In doing this, we’re assuming that it’s a finite process and parseLinks gives only “interesting” links. Here is an example from a set of “interesting” hosts:

class Crawler(http: Http[Future], 
              parseLinks: String => List[Url], 
              result: Promise[Map[Host, Int]]) extends Actor {

  var referenceCount = Map[Host, Int]()
  var visitedLinks = Set[Url]()
  var inProgress = Set[Url]()
  var workers = Map[Host, ActorRef]()

  // ...
}


The internal state of the actor consists of:

  • referenceCount — the current host popularity
  • visitedLinks — which URLs have already been processed or are processed to avoid processing them once again
  • inProgress — set of URLs that are currently processed. Once this becomes empty (after starting the process), the crawling is done and the result promise can be completed with referenceCount
  • workers — each host will have a dedicated worker actor, which will ensure that at most one request is done for each host at any time

There are two messages that the crawler actor can receive:

sealed trait CrawlerMessage
/**
  * Start the crawling process for the given URL. Should be sent only once.
  */
case class Start(url: Url) extends CrawlerMessage
case class CrawlResult(url: Url, links: List[Url]) extends CrawlerMessage


The Start message should be sent only once to kickstart the whole process. CrawlResult messages will be sent by worker actors once they have completed crawling the given URL and parsing the links.

Let’s start by looking at the crawlUrl method in the actor:

private def crawlUrl(url: Url): Unit = {
  if (!visitedLinks.contains(url)) {
    visitedLinks += url
    inProgress += url
    actorFor(url.host) ! Crawl(url)
  }
}

private def actorFor(host: Host): ActorRef = {
  workers.get(host) match {
    case None =>
      val workerActor = context.actorOf(Props(new Worker(http, parseLinks, self)))
      workers += host -> workerActor
      workerActor

    case Some(ar) => ar
  }
}


The method checks to seee if the URL has already been visited; if not, the visitedLinks and inProgress structures are updated. We create or lookup a worker actor using actorFor and tell it to Crawl the given address.

Notice that when creating a new worker, we’re passing the self: ActorRefreference so that the worker can send messages back to the crawler.

As we mentioned before, the actor can receive two types of messages:

override def receive: Receive = {
  case Start(start) =>
    crawlUrl(start)

  case CrawlResult(url, links) =>
    inProgress -= url

    links.foreach { link =>
      crawlUrl(link)
      referenceCount = referenceCount.updated(link.host, 
                                              referenceCount.getOrElse(link.host, 0) + 1)
    }

    if (inProgress.isEmpty) {
      result.success(referenceCount)
      context.stop(self)
    }
}


The worker actors are expected to reply to the crawler actor with the CrawlResult method. Once this message is received, again the inProgressand referenceCount structures are updated, and all the linked URLs will be crawled. If, at the end, there‘s nothing being crawled, we are done!

The worker isn’t complicated as well. It’s parametrised with a reference (ActorRef) to the master actor, which allows sending back messages, as well as the Http[Future] interface and a way to parse links:

class Worker(http: Http[Future], 
             parseLinks: String => List[Url], 
             master: ActorRef) extends Actor with ActorLogging {

  var urlsPending: Vector[Url] = Vector.empty
  var getInProgress = false

  // ...
}


The internal state of the actor consists of a list of URLs that should be crawled (urlsPending), and a flag indicating if there’s a request in progress (getInProgress). This is needed to ensure that there’s at most one request to a given domain executing at any time.


There are also two messages that the worker will receive:

sealed trait WorkerMessage
case class Crawl(url: Url) extends WorkerMessage
case class HttpGetResult(url: Url, result: Try[String]) extends WorkerMessage


The first one is sent, as we’ve seen, by the crawler actor:

override def receive: Receive = {
  case Crawl(url) =>
    urlsPending = urlsPending :+ url
    startHttpGetIfPossible()

  // ...
}

private def startHttpGetIfPossible(): Unit = {
  urlsPending match {
    case url +: tail if !getInProgress =>
      getInProgress = true
      urlsPending = tail

      import context.dispatcher
      http.get(url).onComplete(r => self ! HttpGetResult(url, r))

    case _ =>
  }
}


Once we get a new URL to crawl, we add it to the list of pending requests (urlsPending). If possible — that is, if there are no requests in progress — in the startHttpGetIfPossible method we start executing a new HTTP request. Once this completes, we send a HttpGetResult message to ourselves (the worker actor). Note that this is an asynchronous operation, and you always have to be cautious not to access or mutate the actor’s state from within such callbacks.

override def receive: Receive = {
  case Crawl(url) =>
    // ...

  case HttpGetResult(url, Success(body)) =>
    getInProgress = false
    startHttpGetIfPossible()

    val links = parseLinks(body)
    master ! CrawlResult(url, links)

  case HttpGetResult(url, Failure(e)) =>
    getInProgress = false
    startHttpGetIfPossible()

    log.error(s"Cannot get contents of $url", e)
    master ! CrawlResult(url, Nil)
}


Once the worker actor receives the HttpGetResult message, it sends a notification to the master with the results (CrawlResult) and starts another request, if there’s one pending.

Overall, it’s not a complicated process, but there’s some communication happening — both between the master and the worker and the other way round. There are tests for the implementation (see AkkaCrawlerTest) that can verify if we recieved the correct answers.

Akka Typed

With Akka Typed, instead of writing actors directly, we’ll be defining actor behaviors. The messages sent between the actors/behaviors will be exactly the same; however, we’ll additionally encapsulate the whole state in a case class:

case class CrawlerData(referenceCount: Map[Host, Int],
                       visitedLinks: Set[Url],
                       inProgress: Set[Url],
                       workers: Map[Host, ActorRef[WorkerMessage]])


The behaviors are parametrised with an interface for executing HTTP requests, a function to parse the links and an actor to which the reply with the results should be sent once available (this used to be a Promise in the previous example, but this way is more natural here):

class Crawler(http: Http[Future], 
              parseLinks: String => List[Url], 
              reportTo: ActorRef[Map[Host, Int]]) {
  def crawlerBehavior: Behavior[CrawlerMessage] = ???
}


The crawler behavior that we’ll define uses the actor’s context. Therfore, we’ll wrap the method that defines the message-processing behavior with a factory method that obtains the context. The context, as everything in Akka Typed, is parameterised with the type of messages that the actor handles. For example, this is needed  to obtain a well-typed self actor reference, which needs to understand what kind of messages it accepts:

def crawlerBehavior: Behavior[CrawlerMessage] = 
  Behaviors.setup[CrawlerMessage] { ctx =>
    def receive(data: CrawlerData): Behavior[CrawlerMessage] = ???
    def crawlUrl(data: CrawlerData, url: Url): CrawlerData = ???
    def workerFor(data: CrawlerData, 
                  host: Host): (CrawlerData, ActorRef[WorkerMessage]) = ???
  }


Let’s start from the end, with the method for looking up a worker actor as the given host. Since this implementation is not using a mutable state and, instead, returning modified actor behaviors that wrap the state (CrawlerData), all methods will:

  • as a parameter, take a CrawlerData instance
  • return the modified CrawlerData as part of the return type
def workerFor(data: CrawlerData, host: Host): (CrawlerData, ActorRef[WorkerMessage]) = {
  data.workers.get(host) match {
    case None =>
      val workerActor = ctx.spawn(workerBehavior(ctx.self), s"worker-$host")
      (data.copy(workers = data.workers + (host -> workerActor)), workerActor)

    case Some(ar) => (data, ar)
  }
}


If there’s no worker for the given domain yet, we’re spawning a new child actor, using the worker behavior and returning an updated actor state together with the created actor reference.

Since the type of the behavior we are passing to spawn is Behavior[WorkerMessage], the result of this method will be ActorRef[WorkerMessage].

The crawl and receive method are quite similar to the “traditional” Akka implementation, with the significant difference being that we need to thread through the modified actor state. Sometimes, there’s a couple of modifications, which is why we get a chain of data, data2, data3 references:

def receive(data: CrawlerData): Behavior[CrawlerMessage] = Behaviors.receiveMessage {
  case Start(start) =>
    receive(crawlUrl(data, start))

  case CrawlResult(url, links) =>
    val data2 = data.copy(inProgress = data.inProgress - url)

    val data3 = links.foldLeft(data2) {
      case (d, link) =>
        val d2 = d.copy(referenceCount = d.referenceCount.updated(
          link.host, d.referenceCount.getOrElse(link.host, 0) + 1))
        crawlUrl(d2, link)
    }

    if (data3.inProgress.isEmpty) {
      reportTo ! data3.referenceCount
      Behavior.stopped
    } else {
      receive(data3)
    }
}

def crawlUrl(data: CrawlerData, url: Url): CrawlerData = {
  if (!data.visitedLinks.contains(url)) {
    val (data2, worker) = workerFor(data, url.host)
    worker ! Crawl(url)
    data2.copy(
      visitedLinks = data.visitedLinks + url,
      inProgress = data.inProgress + url
    )
  } else data
}


Communication in both Akka variants looks the same, as well. We use the ! (tell) method to send a message to a (typed) actor. Don’t be mistaken: here, everything is well-typed. You won’t be able to send a message of an incorrect type to an actor.

The worker behavior also corresponds closely to what we’ve seen before. Again, with the exception that we’re not using a mutable state, there’s no possibility of accidentally modifying it within callbacks:

def workerBehavior(master: ActorRef[CrawlResult]): Behavior[WorkerMessage] = 
  Behaviors.setup[WorkerMessage] { ctx =>

  def receive(urlsPending: Vector[Url], getInProgress: Boolean): Behavior[WorkerMessage] =
    Behaviors.receiveMessage {
      case Crawl(url) =>
        startHttpGetIfPossible(urlsPending :+ url, getInProgress)

      case HttpGetResult(url, Success(body)) =>
        val links = parseLinks(body)
        master ! CrawlResult(url, links)

        startHttpGetIfPossible(urlsPending, getInProgress = false)

      case HttpGetResult(url, Failure(e)) =>
        ctx.log.error(s"Cannot get contents of $url", e)
        master ! CrawlResult(url, Nil)

        startHttpGetIfPossible(urlsPending, getInProgress = false)
    }

  def startHttpGetIfPossible(urlsPending: Vector[Url], 
                             getInProgress: Boolean): Behavior[WorkerMessage] =
    urlsPending match {
      case url +: tail if !getInProgress =>
        import ctx.executionContext
        http.get(url).onComplete(r => ctx.self ! HttpGetResult(url, r))

        receive(tail, getInProgress = true)

      case _ =>
        receive(urlsPending, getInProgress)
    }

  receive(Vector.empty, getInProgress = false)
}


ZIO

Once again, let’s leave the eager scala.concurrent.Future world and venture into the lazy land of IO. In the example from the previous article, we used an IOQueue to communicate with the process from the outside world. Here, we’ll be using multiple IOQueues.

The Crawler process will also use a CrawlerData case class for storing the current state, but, instead of a map from the domain to the worker’s ActorRef, it will contain an IOQueue:

case class CrawlerData(referenceCount: Map[Host, Int], 
                       visitedLinks: Set[Url], 
                       inProgress: Set[Url], 
                       workers: Map[Host, IOQueue[Url]])


Instead of actor classes, we’ll be defining methods that will return IOinstances. For example, it can return descriptions of how to compute the host popularity counts. The method will take an Http[IO] interface, but this time, when executing the request, we won’t get a Future[String]. Instead, as we’re in ZIO-world, we will get an IO[String]. That is, we’ll get back a description of how to execute a GETrequest to the given address:

def crawl(crawlUrl: Url, 
          http: Http[IO[Throwable, ?]],
          parseLinks: String => List[Url]): IO[Nothing, Map[Host, Int]] {

  def crawler(crawlerQueue: IOQueue[CrawlerMessage], 
              data: CrawlerData): IO[Nothing, Map[Host, Int]] = // ...

  def worker(workerQueue: IOQueue[Url], 
             crawlerQueue: IOQueue[CrawlerMessage]
            ): IO[Nothing, Fiber[Nothing, Unit]] = // ...

  // ...
}


In Akka Typed, we had to define two behaviors for the crawler and the worker. Here, we’ll be defining two process descriptions. The first one, the crawler, contains the same parts as in the previous implementation:

def crawler(crawlerQueue: IOQueue[CrawlerMessage], 
            data: CrawlerData): IO[Nothing, Map[Host, Int]] = {

  def handleMessage(msg: CrawlerMessage, 
                    data: CrawlerData): IO[Nothing, CrawlerData] = ???

  def crawlUrl(data: CrawlerData, 
               url: Url): IO[Nothing, CrawlerData] = ???

  def workerFor(data: CrawlerData, 
                url: Host): IO[Nothing, (CrawlerData, IOQueue[Url])] = ???

  ???
}


Let’s start from the bottom with the description of how to obtain a worker for a given host. Even though we’ve traveled from Akka to Scalaz, we still need a way to ensure that there’s, at most, one request to a given host done at any given time. A separate asynchronous process that makes sure the case is a good fit:

def workerFor(data: CrawlerData, 
              host: Host): IO[Nothing, (CrawlerData, IOQueue[Url])] = {

  data.workers.get(host) match {
    case None =>
      for {
        workerQueue <- IOQueue.make[Nothing, Url](32)
        _ <- worker(workerQueue, crawlerQueue)
      } yield {
        (data.copy(workers = data.workers + (url -> workerQueue)), workerQueue)
      }
    case Some(queue) => IO.now((data, queue))
  }
}


Here, of course, we also don’t have any mutable state, so we need to take in the CrawlerData as a parameter and return an updated copy. If there’s no worker for a given address yet, we first create a (bounded) queue that will be used to communicate with that worker and, then, create the worker process (we’ll get to the definition of worker soon). Finally, we will store the queue in our data structure. Again, that is not very different from the Akka Typed implementation.

The crawlUrl method should look familiar as well:

def crawlUrl(data: CrawlerData, url: Url): IO[Nothing, CrawlerData] = {
  if (!data.visitedLinks.contains(url)) {
    workerFor(data, url.host).flatMap {
      case (data2, workerQueue) =>
        workerQueue.offer(url).map { _ =>
          data2.copy(
            visitedLinks = data.visitedLinks + url,
            inProgress = data.inProgress + url
          )
        }
    }
  } else IO.now(data)
}


The major difference is that sending a message to a worker isn’t as much of a side-effecting operation as before. Instead, we use the workerQueue.offer method that returns a description of how to send a message to the queue. We need to combine this description with the overall description of how our code should run, or it will never be executed. Hence, there is a need for the flatMap/map.

The handleMessage method corresponds to receive from the Akka Typed implementation and should return the modified crawler data after handling a single, given message:

def handleMessage(msg: CrawlerMessage, 
                  data: CrawlerData
                 ): IO[Nothing, CrawlerData] = msg match {
  case Start(url) =>
    crawlUrl(data, url)

  case CrawlResult(url, links) =>
    val data2 = data.copy(inProgress = data.inProgress - url)

    links.foldM(data2) {
      case (d, link) =>
        val d2 = d.copy(referenceCount = d.referenceCount.updated(
          link.host, d.referenceCount.getOrElse(link.host, 0) + 1))
        crawlUrl(d2, link)
    }
}


While before, when handling the CrawlResult message, we did a simple foldLeft on the resulting links, updating the data structure and running the side-effecting crawlUrl method. Here, we need to combine all the IOs returned by every crwalUrl invocation into one big description. That’s what the foldlM method does: def foldlM[G[_], B](z: B)(f: B => A => G[B])(implicit M: Monad[G]): G[B], giving us the final IO[CrawlerData] that composes all side-effects into a single description.

But, that’s not the end! We have helper methods to handle the messages, but what about the main loop? Unlike in an actor, which we’ve summarized before as a pre-defined recipe for an asynchronous process that reads messages from its inbox in a loop, here we need to create the loop by hand:

crawlerQueue.take.flatMap { msg =>
  handleMessage(msg, data).flatMap { data2 =>
    if (data2.inProgress.isEmpty) {
      IO.now(data2.referenceCount)
    } else {
      crawler(crawlerQueue, data2)
    }
  }
}


The loop takes the form of recursive invocations of the main crawler method with updated queue data. Unless, of course, there are no more requests in progress, then we simply return the result.

Having the crawler ready, let’s look at the worker process. It can, in fact, be simpler than in the Akka implementations. The key observation is that we are in full control over when we take a new message from the queue. If an actor has the mailbox-read-loop baked-in, we cannot wait with receiving the next message until some condition is satisfied (it is possible to stash messages, but that requires additional logic). Here, however, we have that possibility.

The worker, after getting a new request to crawl an URL from a queue, can simply execute the request and only take the next URL after the request completes:

def worker(workerQueue: IOQueue[Url], 
           crawlerQueue: IOQueue[CrawlerMessage]
          ): IO[Nothing, Fiber[Nothing, Unit]] = {

  def handleUrl(url: Url): IO[Nothing, Unit] = {
    http
      .get(url)
      .attempt[Nothing]
      .map {
        case Left(t) =>
          logger.error(s"Cannot get contents of $url", t)
          List.empty[Url]
        case Right(b) => parseLinks(b)
      }
      .flatMap(r => crawlerQueue.offer(CrawlResult(url, r))
                                .fork[Nothing].toUnit)
  }

  workerQueue
    .take[Nothing]
    .flatMap(handleUrl)
    .forever
    .fork
}


The worker process is an infinite loop (created with forever) that takes a message from the queue and handles it. It is also forked into a fiber so that it runs asynchronously in the background. The fiber instance is returned, but it’s never used by the crawler process.

There’s a very important detail here — notice that when we send the crawl result to the message queue, we fork the operation into a fiber (crawlerQueue.offer(…).fork). Why is that?

Recall that, unlike the mailboxes of actors, the IOQueue that we are using in ZIO is bounded, and when the queue is full, the offer operation blocks it. That’s good on one side — it gives a bound on memory usage and also provides back-pressure. However, it can also lead to deadlocks.

In our example, imagine that there’s a lot of links from one page to a single host with different paths. So, we’ll be sending a lot of messages from the crawler process to a single worker process. If the number of links (URLs) is higher than the queue capacity, then, at some point, the crawler will become blocked and won’t be able to send any more URLs, since the queue will be full. The worker will slowly work through the requests, replying with results and processing messages from its queue. But, it can get immediately filled up with new Crawl messages.

If the total number of URLs sent from the crawler to one worker during a single crawlUrl invocation exceeds the combined capacities of the crawler and worker queues, at some point, the crawler’s queue will fill up as well . As the crawler will be still sending Crawl messages and won’t get a chance to process the CrawlResult messages it receives, now, the worker will block, as well — hence, the deadlock.

However, if we send the replies in the background and in a background fiber, the worker will be able to continue working through the Crawl requests. All of the spawned offer(CrawlResult(...)) fibers might wait blocking until the crawler finishes enqueueing all Crawl requests, but that’s not a problem.

That way, our memory usage is still bounded by the total size of the queues. We won’t get a deadlock, however, we do need to carefully design the way the processes interact to avoid that situation.

If the processes form a hierarchy — as here, there’s a parent process (crawler) and a number of children processes (worker) — a good rule might be to directly send messages only from parent processes to child processes (down the hierarchy tree). Any replies that are going up the hierarchy tree should be sent in the background, using a forked fiber.

Finally, we need to bootstrap the whole process and create the queue to communicate with the crawler, enqueue the initial message, and create the IO that describes the crawling process:

val crawl = for {
  crawlerQueue <- IOQueue.make[Nothing, CrawlerMessage](32)
  _ <- crawlerQueue.offer[Nothing](Start(crawlUrl))
  r <- crawler(crawlerQueue, CrawlerData(Map(), Set(), Set(), Map()))
} yield r

IO.supervise(crawl, new RuntimeException)


There’s one small, but important, feature here — the IO.supervisecall that wraps the whole process. What this method does is instruct the interpreter on when the wrapped computation completes (crawl), all fibers created by it should be interrupted (and terminated). And that’s exactly what we want — any forked worker fibers should be terminated once we have the final result, as they won’t be ever used.

This closely resembles a hierarchy of actors in Akka. Once a parent actor is stopped, all child actors are stopped as well. In ZIO, it’s not the default, but the option is there. When defining a computation which spawns multiple fibers, it’s very handy not to have to worry about the cleanup, but to delegate the task to supervise.

Monix

Finally, let’s move to Monix. As we noted in the previous installment of the series, Monix and ZIO solutions are closely related. Here, the situation is the same. There are two important differences.

First of all, we cannot use MVars (which behave like bounded queues of size 1) to communicate between the crawler and the worker. As putting a value to a full Mvar is a blocking operation, it could very quickly lead to a deadlock (as described above).

That’s why we need a proper queue. Monix does have an unbounded async queue implementation, monix.execution.misc.AsyncQueue, but it’s Future-based, so we’ll create a thin Task-wrapper around it:

class MQueue[T](q: AsyncQueue[T]) {
  def take: Task[T] = {
    Task.deferFuture(q.poll())
  }
  def offer(t: T): Task[Unit] = {
    Task.eval(q.offer(t))
  }
}
object MQueue {
  def make[T]: MQueue[T] = new MQueue(AsyncQueue.empty)
}


The interface to our MQueue is the same as to Scalaz’s IOQueue, but, with an important difference: IOQueue is bounded and, when the queue is full, IOQueue.offer will (asynchronously) block. Here, we have an unbounded queue, which corresponds to unbounded actor mailboxes in Akka. Therefore, we won’t have problems with deadlocks, but we also don’t get a bound on memory usage.

The second difference is that there’s no construct analogous to IO.supervisein Monix, so we have to manage fibers manually. That means that we are storing the fibers in the CrawlerData data structure, next to the worker queues:

case class WorkerData(queue: MQueue[Url], 
                      fiber: Fiber[Unit])
case class CrawlerData(referenceCount: Map[Host, Int], 
                       visitedLinks: Set[Url], 
                       inProgress: Set[Url], 
                       workers: Map[Host, WorkerData])


When a new worker process is created, we have to store the fiber on which it is running:

def workerFor(data: CrawlerData, url: Host): Task[(CrawlerData, MQueue[Url])] = {
  data.workers.get(url) match {
    case None =>
      val workerQueue = MQueue.make[Url]
      worker(workerQueue, crawlerQueue).map { workerFiber =>
        val workerData = WorkerData(workerQueue, workerFiber)
        val data2 = data.copy(workers = data.workers + (url -> workerData))
        (data2, workerQueue)
      }
    case Some(wd) => Task.now((data, wd.queue))
  }
}


And once the computation is done, all fibers need to be canceled. This manual fiber management complicates the Task construction slightly when we know that we are done with the crawling and want to return the result:

crawlerQueue.take.flatMap { msg =>
  handleMessage(msg, data).flatMap { data2 =>
    if (data2.inProgress.isEmpty) {
      data2.workers.values.map(_.fiber.cancel).toList.sequence_
        .map(_ => data2.referenceCount)
    } else {
      crawler(crawlerQueue, data2)
    }
  }
}


The data2.workers.values.map(_.fiber.cancel).toList.sequence_ creates a Task description that cancels all the fibers (Fiber.cancel: Task[Unit]) in sequence and then returns the final result.

Otherwise, the code is very similar to the ZIO implementation. Here’s the full source for you to browse.

Both the Scalaz and Monix implementations come with tests that simulate deep and wide chains of crawled links. This way we can verify that the solutions are not only correct, but also stack-safe.

Sockets Example

The repository also contains another example, called sockets. It shows how to deal with two common problems:

  1. interfacing with a legacy, blocking API. Here, we have a server socket (Socket) with a blocking and exception-throwing accept method and client sockets (ConnectedSocket) with blocking send/receive methods.
  2. broadcasting messages to a large number of clients. This is a common requirement, e.g. when dealing with WebSockets.

All examples use several processes:

  • the router process (Actor/Behavior/Task/IO) manages the server socket and broadcasts messages received from any connected client sockets to all other connected client sockets.
  • the socket process accepts new client connections, which results in new instances of a ConnectedSocket.
  • the client send/receive processes are created for each client ConnectedSocket and send a message or listen for new ones.

If, at any time, a SocketTerminatedException is thrown by a client socket send/receive operation, the client socket needs to be closed and removed from the router.

The code is constructed in the same way as before, with no significant, new ideas being introduced. Still, it might be educational to explore the code on your own. As in the other examples, there’s also a test suite which might be useful for verifying that the code actually works.

Summary

In this part, we’ve built on the ideas presented in the introductory article, adding communication to our asynchronous processes. As in the last part, the overall structure of the code for all of the different implementations isn’t that different. There are significant differences in type safety and the exact semantics of the constructed objects, but the way communication is performed, via asynchronous message passing, is the same.

It’s quite easy to identify how concepts from actors can be mirrored to the ZIO/Monix worlds. In Akka, each actor is associated with a mailbox, which is a queue of incoming messages. In Monix/ZIO, if we need to model communication, we need to create a queue.

While in Akka we pass around (typed or untyped) ActorRefs, so that one actor can send messages to another actor, in ZIO we pass around (typed) IOQueues and in Monix MQueues or MVars, depending on the use case. Again, this is not that different.

This gives us another piece of the answer to the questions stated in the first part of the series: can ZIO/Monix offer an alternative to Akka/Akka Typed? As far as state management and communication is involved, yes. Keep in mind, however, that we are looking at a small portion of what Akka is. While things like remoting, clustering, or persistence could be implemented using the ZIO/Monix approach, there are no libraries that implement these functionalities (at least not yet).

In the final part, we’ll look at failure management, supervision, and cancellation. Stay tuned!

How do you break a Monolith into Microservices at Scale? This ebook shows strategies and techniques for building scalable and resilient microservices.

Topics:
scala ,akka ,functional programming ,java ,zio ,monix

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}