Over a million developers have joined DZone.

Async Akka-Persistence-HBase Journal

DZone 's Guide to

Async Akka-Persistence-HBase Journal

· Java Zone ·
Free Resource

Since I had a bit of time during the weekend, I decided to do something useful for Akka this time. I’ve recently been working mostly with HBase and Scalding, so picking up the new Akka Persistence APIs and implementing a plugin for HBase for it was a great idea for one afternoon or two.

Akka Persistence is basically the eventsourced library only that it’s recently been announced that “eventsourced becomes akka-persistence“. As the original name implies – it’s basically a set of tools over Akka to help create event sourcing based apps. One of the most typical elements of such apps is “some actor, that gets messages, and can play these back in case of failure or if we’d have to restart the system”. This part of the APIs is described really well in the new docs about akka-persistence. The idea we’ll need to focus on for the sake of this blog post is ProcessorsPersistent Messages and Journals.

Quick side-note: The APIs are still marked as experimental so they might change.But from what I’ve seen currently they already seem really nice, so I don’t expect too much changes over time.

To introduce them in a few words: Processors are Stateful Actors, messages sent to them (Persistent Messages – so messages wrapped in the Persistent() envelope) contain both the payload (the “actual message”) as well as a Sequence Number. The Sequence Number is important for the later replay of messages. A Persistent Message gets persisted in a Journal before it reaches a Processors receive method – this guarantees that we store all messages an actor gets, even if the system blows up “while it is processing that message”. Thanks to storing messages in a Persistent Journal, we’re able to spin up a new actor with the same processor identifier and before it will resume getting new messages, the system will “replay” the previous messages onto it. After the replay (or “recovery”) finishes, it will continue receiving new messages as usual.

That’s the general idea – there’s more, like for example “snapshots” (which shorten the recovery time), but for today let’s not talk about them (and I didn’t yet get to implementing them in akka-persistence-hbase).

The default Journal used by akka is LevelDB, a very fast local database developed by Google (for Chrome I believe?) some time ago. As great as it is, you’ll probably want a distributed store to back your Akka app. Thus ktoso/akka-persistence-hbase  (that’s mine) orkrasserm/akka-persistence-cassandra  (developed by Martin, who’s working on Akka/Eventsourced) if  you’re on the cassandra fanbase.

Using these “storage plugins” you’re able to back your Persistent Journal with whichever datastore you prefer. Implementing these is API-wise quite simple, as there are just a few methods you need to implement by extending AsyncWriteJournal:

def writeAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit]
def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit]
def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit]
// and "replay":
def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)
              (replayCallback: (PersistentRepr) => Unit): Future[Long]
Where the first three are the “basics” and the 4th gives us the “replay” functionality. It’s worth pointing out that these APIs are designed with the thought of  fully asynchronous implementations  (such as the both linked examples). That is, even when going to the database – you never block. As working with Promises and Futures is so elegant in Scala that’s no problem Scala-wise, although you have to select a proper async database driver. Luckily there is such impl for HBase – although it’s not the “official java api” (it works on RPC level directly) – it’s called  OpenTSDB/asynchbase .

For example, this is how you can implement the asyncWrite method:

  override def writeAsync(persistentBatch: Seq[PersistentRepr]): Future[Unit] = {
    log.debug(s"Write async for ${persistentBatch.size} presistent messages")
    val futures = persistentBatch map { p =>
      import p._
        RowKey(processorId, sequenceNr).toBytes,
        Array(ProcessorId,          SequenceNr,          Marker,                  Message),
        Array(toBytes(processorId), toBytes(sequenceNr), toBytes(AcceptedMarker), persistentToBytes(p))
  protected def executePut(key: Array[Byte], qualifiers: Array[Array[Byte]], values: Array[Array[Byte]]): Future[Unit] = {
    val request = new PutRequest(TableBytes, key, Family, qualifiers, values)
    client.put(request) // implicitly converted from Deferred => Future[Unit]

If you’re curious, or… stuck with a database that does not have an async driver, you can implement the same functionality by extending SyncWriteJournal, which has the same methods, but with both “async” and “Future[_]” stripped from the method signatures… ;-) At first I also implemented a HBaseSyncWriteJournal, if you’re curious you can check it out on github.

Once you get that, the rest is only performance optimizations and hot-region avoidance(here a nice explanation of the problem – “region-server hotspotting“) which I’ve currently implemented using “region salting” (I prefix row keys n % partitionCount wherepartitionCount >= regionCount.

All in all, In the current release of 0.2 includes the fully async hbase journal implementation, and I’ll improve the replay performance soon too – you’re very welcome to take a look and give it a spin (it’s on maven central)!

PS: I think I will have to trim this blog post, gotten a bit too-many-words-too-little-code…


Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}