Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

A Distributed Cache in Less Than 100 Lines of Code With Akka

DZone's Guide to

A Distributed Cache in Less Than 100 Lines of Code With Akka

Learn how you can resolve issues in distributed systems like Akka Cluster with Conflict-Free Replicated Data Types, or CRDTs.

Free Resource

Learn how modern cloud architectures use of microservices has many advantages and enables developers to deliver business software in a CI/CD way.

An in-memory cache is a crucial component in a modern software architecture. It can improve the throughput of a system. As a result, the user experience is increased while the operating cost is decreased due to efficient resource usages. The well-known open-source in-memory caches are Redis and Memcache. The latter does not support replication, and it is not a distributed caching system either. However, if you are already using Akka as your backend service, why bother having another system for caching when Akka could be the right tool for you.

Akka Cluster uses the Gossip protocol for communicating across members in a cluster. Akka Cluster is an extension of the Akka toolkit that extends the Akka remote functionality. You have to provide a set of seed nodes that act as contact points for new nodes. There is no election process for a leader node since it can be determined by any node when there is a gossip convergence.

The caching system building on top of Akka cluster can take advantage of the Gossip-based dissemination for data replication. The distributed nature may lead to data conflict when multiple nodes receive different values for the same key. This problem can be solved using Conflict-Free Replicated Data Types (CRDTs). If you’re interested, the paper is freely available online. With this algorithm, the concurrent updates can be performed in different nodes without any coordination. Why do we care? Without the magic merge, we cannot guarantee the eventual consistency. This post explains CRDTs and illustrates how it works.

Let’s Get Started

Luckily, we don’t have to implement CRDTs. It is just another Akka extension that you can add to your dependency list.

"com.typesafe.akka" %% "akka-distributed-data" % akkaVersion

The Actor model is a message passing concurrency. Thus, we have to define a protocol for communicating with Actor. Case class is a popular approach in Scala to define such protocol (actually case class is everywhere). In caching, we need at least 3 operations: get, put, and delete. These operations will be defined as case classes for the sake of type-safety. The akka-distributed-data library also has its own protocol, but we will hide that protocol under a user-defined protocol which is our protocol. The reason is that we don’t want to expose unnecessary complicate definitions to users. After all, we’re building a specific application on top of akka-distributed-data library.

class DistributedCacheReplicator[V <: Serializable](name: String) extends Actor with ActorLogging {
  val replicator = DistributedData(context.system).replicator
  implicit val cluster = Cluster(context.system)
  override def receive: Receive = {
    ...
  }
}

First, we have to create an Actor definition. It’s just a regular class that extends Actor class. Distributed Data is built on top of Akka Cluster. So, you have to create a cluster object and make it as an implicit val. The reason why the cluster value needs to be implicit is that it is required by distributed operations such as -and + defined in LWWMap. LWWMap is a specialized ORMap(Observed Remove Map in CRDT)with LWWRegister (Last Writer Wins Register in CRDT) values. Beside a cluster declaration, we have to create a replicator which is an instance of DistributedData. The rest of the implementation is merely defining a protocol handler in the receive function.

Of course, you need to add the following configurations to get the Akka cluster working. The configuration is pretty standard — I just copied it from the Akka documentation website.

akka {
  loglevel = "DEBUG"
  actor {
    provider = "cluster"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://DCacheSystem@127.0.0.1:2018",
      "akka.tcp://DCacheSystem@127.0.0.1:2019"]

    auto-down-unreachable-after = 10s
  }
}

The key configurations are akka.actor.provider and akka.cluster.seed-nodes. By default, the provider is local which means the actor system does not communicate with other actor systems. We have to set it to cluster in this case. There is another option which is remote. If we set it to remote, we’re responsible for maintaining the quorum ourself. Seed nodes are just the contact points for newcomers. When a new member joins a cluster, it needs to join through one of the seed nodes. Akka cluster is not a master-slave, so it doesn’t have a single point of failure. Put it simply, all seed nodes can go down and the cluster still operates normally except that a new node cannot join the cluster until one of the seed nodes goes up again. Seed nodes are like other nodes in a cluster. They are not special, but why do we need to define them in the configuration? That’s because a new node needs to know someone who can introduce it to the rest of the group, and the way to locate that person is by IP address, port, protocol, and system name.

Implementing the Actor’s Receiver

Since the distributed data can be used in many use cases, it is a good idea to have a prefix for a hash key. The hash key needs to be an instance of LWWMapKey, and we can create a utility function for instantiating a key as follows.

def bucketKey(entryKey: String): LWWMapKey[String, V] = {
  LWWMapKey(s"cache-$name:[$entryKey]")
}

The key prefix is cache and nameis referred to a cache name. Think about Redis strings which is a giant hashmap for the entire database and all elements have the same type. In this case, the key is a namespace for a sub-hashmap. Even though Redis supports other data types, we’re not going to complicate our requirements at the moment. Let’s stick to the goal and create an MVP (Minimum Viable Product) in less than 100 lines. 

Before implementing the receiver function, we have to define our cache protocols. Let’s create 3 case classes for internal use: cache context, cache element, and cache error. And, 3 more case classes for a user protocol: put, get, and delete operation as follows.

private[dcache] final case class CacheContext(key: String, replyTo: ActorRef)
private[dcache] final case class CacheElement[V <: java.io.Serializable](key: String, value: Option[V])
final class CacheOpException(msg: String, cause: Option[Throwable] = None) extends RuntimeException(msg, cause.orNull)

final case class PutToCache[V <: java.io.Serializable](key: String, value: java.io.Serializable, writeConsistency: WriteConsistency)
final case class GetFromCache(key: String, readConsistency: ReadConsistency)
final case class RemoveFromCache(key: String, writeConsistency: WriteConsistency)

WriteConsistency and ReadConsistency are the consistency level for writing and reading respectively. We can choose whether we want a strong consistency or eventually consistency per individual element, not an entire database. This flexibility gives us fine-grained control over the cache replication.

override def receive: Receive = {
  case PutToCache(key, value: V, writeConsistency) ⇒
    val key = bucketKey(key)
    val ctx = Some(CacheContext(key, sender()))
    replicator ! Update(key, LWWMap(), writeConsistency, ctx) (_ + (key → value))
  case GetFromCache(key, readConsistency) ⇒
    val key = bucketKey(key)
    val ctx = Some(CacheContext(key, sender()))
    replicator ! Get(key, readConsistency, ctx)
  case RemoveFromCache(key, writeConsistency) ⇒
    val key = bucketKey(key)
    val ctx = Some(CacheContext(key, sender()))
    replicator ! Update(key, LWWMap(), writeConsistency, ctx)(_ - key)
}

The actor model is all about message passing concurrency. Update and Get are case classes shipped with the akka-distributed-data library. We asynchronously send these case classes to the replicator, which is an ActorRef, to maximize the CPU utilization. To get the result back in asynchronous fashion, we have to pass the context containing the ActorRef (sender()) of the current actor as the third parameter of the case class. You may notice that the type parameter Vin the pattern matching for PutToCache will be eliminated at compile time due to Java type erasure. The compiler will print the warning message to let you know implicitly that the type check will be done at runtime and it might fail. We can use ClassTag to attach the type information to the bytecode by letting the Scala compiler rewrite the code for you. Actually we can prevent the runtime type error through a companion object by not exposing the opportunity to pass the wrong type at interface level; however, having a safety net provided by the compiler like ClassTag is always nice.

The results, as well as errors returning from the replicator, will also be handled by a message passing mechanism, as shown below.

case g @GetSuccess(_, Some(CacheContext(key, replyTo))) ⇒
  g.dataValue match {
    case data: LWWMap[_, _] ⇒ data.asInstanceOf[LWWMap[String, V]].get(key) match {
      case Some(value) ⇒
        replyTo ! CacheElement[V](key, Some(value))
      case None        ⇒
        replyTo ! CacheElement[V](key, None)
    }
  }
case GetFailure(_, Some(CacheContext(_, replyTo))) ⇒
  replyTo ! new CacheOpException("Get operation failed due to ReadConsistency")
case NotFound(_, Some(CacheContext(key, replyTo))) ⇒
  replyTo ! CacheElement(key, None)
case UpdateSuccess(_, Some(CacheContext(_, replyTo))) ⇒
  replyTo ! true
case UpdateTimeout(_, Some(CacheContext(_, replyTo))) ⇒
  replyTo ! new CacheOpException("Update operation timed out")
case ModifyFailure(_, msg, cause, Some(CacheContext(_, replyTo))) ⇒
  replyTo ! new CacheOpException(s"Update operation failed due to $msg", Some(cause))
case StoreFailure(_, Some(CacheContext(_, replyTo))) ⇒
  replyTo ! new CacheOpException("Durable stores error")

When the cache hit occurs, the replicator sends GetSuccess object to the clustered actor which contains a custom context. Again, this is the reason why we have self-ActorRef in the context. So that we can use that ActorRef as a channel to send a message back the caller. The message passing is used as an event callback in this case. The case classes represent events and partial functions (case statements) are callback functions. The event callback will be executed by a local node or the node that you send the message to. As a result, the caller will get only one copy of a message.

You may notice the ugly part here: case data: LWWMap[_, _] ⇒ data.asInstanceOf[LWWMap[String, V]]. Yes, it is a reflection. But, that’s not a problem. The problem is that this line matches with any types, and it throwsClassCastException at runtime when the assigning type is not compatible with the target type. However, we can also prevent this type of error at the interface level by not leaving an opportunity for a user to mess it up. If you’re paranoid about type-safety and really want the compiler works for you, you can use shapeless. It’s a magic library that takes care of the dirty job for you.

Taking Type-Safety to Another Level

Just in case you don’t mind having a large chunk of unreadable code that has nothing to do with your business logic, here is an example of handling the type safely using Typeable and TypeCasefrom shapeless. We don’t count the following code toward the 100 lines, though. I’m sure there will be an easier way to do this. Please share it in a comment below.

def makeLWWMapTypeable[A: Typeable, B: Typeable] =
  new Typeable[LWWMap[A, B]] {
    private val typA = Typeable[A]
    private val typB = Typeable[B]

    def cast(t: Any): Option[LWWMap[A, B]] = {
      t match {
        case o: LWWMap[_, _] ⇒
          o.entries.headOption match {
            case Some((a, b)) ⇒
              for {
                _ <- typA.cast(a)
                _ <- typB.cast(b)
              } yield o.asInstanceOf[LWWMap[A, B]]
            case _ ⇒ None
          }
        case _ ⇒ None
      }
    }

    def describe = s"LWWMap[${typA.describe}, ${typB.describe}]"
  }

implicit val typeableLWWMap = makeLWWMapTypeable[String, V]

val typedLWWMap = TypeCase[LWWMap[String, V]]

replace the line
case data: LWWMap[_, _] ⇒ data.asInstanceOf[LWWMap[String, V]].get(key)
with
case typedLWWMap(data) ⇒ data.get(key)

We also have to pass Typeable[V] as an implicit parameter because the actor class does not know the type of V and it needs the one who creates an actor instance with the concrete type to pass down the type information. Note that you also have to create one more custom Typeable if V is also a custom type.

Testing

The most fun and exciting part of programming is testing to see that our code is working. You can build an actual cluster that each member resides in a different node, but it’s not trivial to set it up even with docker. For the sake of testing, we can just run multiple VMs on a local machine. You can do so right within your favorite IDE.

object TestCache extends App {

  implicit class FutureAwait[T](f: Future[T]) {
    def await(): T = Await.result(f, Duration.Inf)
  }

  val port = StdIn.readLine("Port:").toInt

  val portOverride = s"akka.remote.netty.tcp.port=$port"
  val config = ConfigFactory.parseString(portOverride).
    withFallback(ConfigFactory.load())

  implicit val system = ActorSystem("DCacheSystem", config)

  val cache = DistributedCache[String]("test")

  while(true) {
    val input = StdIn.readLine("put {key} {value} | get {key}: ")
    val parts = input.split("\\W+")
    if (input.startsWith("put")) {
      parts match {
        case Array("put", key, value) ⇒
          cache.put(key, value).await()
          println("Updated successfully!")
        case _ ⇒ println("Invalid syntax!")
      }
    } else {
      parts match {
        case Array("get", key) ⇒
          val result = cache.get(key, ReadAll(1.minute)).await()
          println(s"Result $key = $result")
        case _ ⇒ println("Invalid syntax!")
      }
    }
  }
}

Let’s hit that green "run" button 3 times to get 3 different nodes running on different ports. Remember that we have assigned ports 2018 and 2019 as seed nodes in the Akka configuration. We have to run at least one seed node to make the cluster joinable. Otherwise, you will have 3 one-member clusters.

                                            Figure 1. The seed node running on port 2018.

Once the app starts, it asks for the port number. From Figure 1, we entered “2018” as one of the seed node’s port we have defined. We tried to get a value out of an empty cache by entering get notexist and it returned None as a result. Then, we put a new element into the cache with name as a key.

                                        Figure 2. Another seed node running on port 2019.

From Figure 2, we started another seed node on port 2019. The replicator should replicate data across members in the cluster. So, we expected to get Alice as a result back when we queried using name as a key. Then we updated the value of the name element to Bob. The updated value should be replicated to all nodes.

                                    Figure 3. A regular node running on a random port.

We have another node running on a random available port as shown in Figure 3. We queried the cache by using name as a key again. This time we got Bobinstead of Alice.

Because I tested the distribute cache manually, there is no way I can run into a temporary stale data caused by eventual consistency. You got the idea, the replication process takes a fraction of a second to keep all nodes up-to-date when all nodes are running in the same machine. I’ll leave it to some other time to benchmark this solution against other popular distributed cache systems such as Ehcache, Hazelcast, and Redis. Also note that Akka is a toolkit, a building block, and it is not built or optimized for the cache use case. It takes much more than 100 lines to build a good cache that has features the cache should have such as TTL, sharding, scripting, among others which are very difficult to implement correctly and efficiently. We just scratch the surface of the cache area here, but it can be enough, depending on the requirements.

Discover how to deploy pre-built sample microservices OR create simple microservices from scratch.

Topics:
scala ,akka ,distributed system ,performance ,cache ,clusters ,tutorial

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}