Async Akka-Persistence-HBase Journal
Join the DZone community and get the full member experience.
Join For FreeSince I had a bit of time during the weekend, I decided to do something useful for Akka this time. I’ve recently been working mostly with HBase and Scalding, so picking up the new Akka Persistence APIs and implementing a plugin for HBase for it was a great idea for one afternoon or two.
Akka Persistence is basically the eventsourced library only that it’s recently been announced that “eventsourced becomes akka-persistence“. As the original name implies – it’s basically a set of tools over Akka to help create event sourcing based apps. One of the most typical elements of such apps is “some actor, that gets messages, and can play these back in case of failure or if we’d have to restart the system”. This part of the APIs is described really well in the new docs about akka-persistence. The idea we’ll need to focus on for the sake of this blog post is Processors, Persistent Messages and Journals.
Quick side-note: The APIs are still marked as experimental so they might change.But from what I’ve seen currently they already seem really nice, so I don’t expect too much changes over time.
To introduce them in a few words: Processors are Stateful Actors, messages sent to them (Persistent Messages – so messages wrapped in the Persistent() envelope) contain both the payload (the “actual message”) as well as a Sequence Number. The Sequence Number is important for the later replay of messages. A Persistent Message gets persisted in a Journal before it reaches a Processors receive method – this guarantees that we store all messages an actor gets, even if the system blows up “while it is processing that message”. Thanks to storing messages in a Persistent Journal, we’re able to spin up a new actor with the same processor identifier and before it will resume getting new messages, the system will “replay” the previous messages onto it. After the replay (or “recovery”) finishes, it will continue receiving new messages as usual.
That’s the general idea – there’s more, like for example “snapshots” (which shorten the recovery time), but for today let’s not talk about them (and I didn’t yet get to implementing them in akka-persistence-hbase).
The default Journal used by akka is LevelDB, a very fast local database developed by Google (for Chrome I believe?) some time ago. As great as it is, you’ll probably want a distributed store to back your Akka app. Thus ktoso/akka-persistence-hbase (that’s mine) orkrasserm/akka-persistence-cassandra (developed by Martin, who’s working on Akka/Eventsourced) if you’re on the cassandra fanbase.
Using these “storage plugins” you’re able to back your Persistent Journal with whichever datastore you prefer. Implementing these is API-wise quite simple, as there are just a few methods you need to implement by extending AsyncWriteJournal:
def writeAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit] def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit] def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] // and "replay": def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long) (replayCallback: (PersistentRepr) => Unit): Future[Long]
For example, this is how you can implement the asyncWrite method:
override def writeAsync(persistentBatch: Seq[PersistentRepr]): Future[Unit] = { log.debug(s"Write async for ${persistentBatch.size} presistent messages") val futures = persistentBatch map { p => import p._ executePut( RowKey(processorId, sequenceNr).toBytes, Array(ProcessorId, SequenceNr, Marker, Message), Array(toBytes(processorId), toBytes(sequenceNr), toBytes(AcceptedMarker), persistentToBytes(p)) ) } Future.sequence(futures) } protected def executePut(key: Array[Byte], qualifiers: Array[Array[Byte]], values: Array[Array[Byte]]): Future[Unit] = { val request = new PutRequest(TableBytes, key, Family, qualifiers, values) client.put(request) // implicitly converted from Deferred => Future[Unit] }
If you’re curious, or… stuck with a database that does not have an async driver, you can implement the same functionality by extending SyncWriteJournal, which has the same methods, but with both “async” and “Future[_]” stripped from the method signatures… ;-) At first I also implemented a HBaseSyncWriteJournal, if you’re curious you can check it out on github.
Once you get that, the rest is only performance optimizations and hot-region avoidance(here a nice explanation of the problem – “region-server hotspotting“) which I’ve currently implemented using “region salting” (I prefix row keys n % partitionCount
wherepartitionCount >= regionCount
.
All in all, In the current release of 0.2 includes the fully async hbase journal implementation, and I’ll improve the replay performance soon too – you’re very welcome to take a look and give it a spin (it’s on maven central)!
PS: I think I will have to trim this blog post, gotten a bit too-many-words-too-little-code…
Published at DZone with permission of Konrad Malawski, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Trending
-
MLOps: Definition, Importance, and Implementation
-
Building and Deploying Microservices With Spring Boot and Docker
-
Managing Data Residency, the Demo
-
Cypress Tutorial: A Comprehensive Guide With Examples and Best Practices
Comments