Testing Leader Election (in Raft) using Akka, without Thread.sleep() ;-)
Join the DZone community and get the full member experience.
Join For FreeAs I’m currently living in London and miss the Kraków SCKRK Reading Club meetings a lot – (they’re basically the best thing since sliced bread!). Even though I do I attend them remotely anyway nowadays, I still miss having proper discussions on science papers. The same feeling struck me and some friends (Andrzej from GeeCON, to name one) decided to start a similar initiative in London. It’s called Paper Cup, the Reading Club, and is a “reading club”, which means that we select Computer Science Whitepapers to read, read them at home, and then during a meeting discuss them fiercly :-)
For this week’s meeting we decided to read Raft (Raft – In Search of an Understandable Consensus Algorithm), which is “very trendy” right now (and relatively fresh; the paper is from Oct 2013!). But to sum it up quickly – it’s an algorithm trying to solve the same problem as Paxos (achieving consensus in distributed systems), yet being understandable at the same time. We did read Paxos more than a few times (theLamport’s “Paxos Made Simple” paper for example), but it was always a challenge to get everything right with it. Raft’s authors on the other hand really wanted the algorithm to be understandable, as well as still “good enough” performance wise.
As the algorithm is really so nice, and described in terms of a simple State Machine, I figured I’d implement it using Akka (and FSM) for the next meeting, and another friend is implementing it on Clojure. To be fair, it seems I’m not the first person to have figured out that Akka’s FSM and Actors in general will make this super fun to implement – there are already some repos doing exactly that (well, some of them being “initial commit” ;-)). In this post I won’t be focusing on the algorithm itself, but rather, would like to share a useful technique when you implement a thing like this, and would like to write a “big test”, that asserts from a high point of view that the system performs as you’d expect.
The test case we’ll talk about is very simple:
- Raft should elect a leader right away after the nodes start
- Raft should re-elect a leader if we kill the current one
As I already said, we won’t be diving into the algorithm here – if you’re curious you can just read the whitepaper (and maybe even drop by for PaperCup? :-)).
Let’s jump right into the tests:
it should "elect initial Leader" in { // given info("Before election: ") infoMemberStates() // when awaitElectedLeader() info("After election: ") infoMemberStates() // then members.count(_.stateName == Leader) should equal (1) members.count(_.stateName == Candidate) should equal (0) members.count(_.stateName == Follower) should equal (4) } it should "elect replacement Leader if current Leader dies" in { // given infoMemberStates() // when val leaderToStop = leader.get leaderToStop.stop() info(s"Stopped leader: ${simpleName(leaderToStop)}") // then awaitElectedLeader() info("New leader elected: ") infoMemberStates() members.count(_.stateName == Leader) should equal (1) members.count(_.stateName == Candidate) should equal (0) members.count(_.stateName == Follower) should equal (3) }
Each member is running an instance of Raft, and one of them (by election) will after some time become the Leader. One way to test this would be to plan a TestProbe in the “members to notify”, or use the probe as intermediate between members (A -> Probe -> B). But that’s a lot of work to be honest. The solution I’m proposing here, is also used byakka-persistence (well, and my akka-persistence-hbase plugin to it too) – let’s call it “Event Stream Subscription Awaiting“. Now that we have a fancy name for it, let’s continue with implementing it.
Let’s start from the end of the story. I want to block until some event occurs in the system (the awaitElectedLeader()
method does this). Where can I fish for those events? Turns out Akka has a built in eventBus ready to messages into (and it’s available for every ActorSystem, without any additional setup). Let’s first implement our helper methods, for awaiting on an ElectedAsLeader
message:
implicit val probe = TestProbe() // somewhere def subscribeElectedLeader()(implicit probe: TestProbe): Unit = system.eventStream.subscribe(probe.ref, classOf[ElectedAsLeader]) def awaitElectedLeader()(implicit probe: TestProbe): Unit = probe.expectMsgClass(max = 3.seconds, classOf[ElectedAsLeader])
Here (in the a test class extending TestKit) we can get access to the EventStream, and subscribe for specific types of messages on it. We’ll be using a TestProbe() to recieve these events, because it allows us to expectMsg* which is exactly what we need. So in awaitElectedLeader() we just await using the probe, until the message of “some Leader has appeared in the cluster” comes in. So far this is not so different what I described before and then called it “a lot of work”. Now we’ll get to the actual trick in this method though. The eventStream you see here is defined on the ActorSystem, and as we know, that is also accessible from within an Actor itself.
In akka-persistence for example Actors publish events about the progress of the replay / confirmations etc. But that is only for testing, so you can enable a flag (persistence.publish-confirmations for example) to make it publish events to the eventStream. That’s a great idea and makes testing very simple, and would be certainly possible to implement in my Raft implementation (and probably will end up like this anyway). Let’s now however think how we could extend a receive of an Actor, to automatically send incoming messages also to the EventStream.
It’s very simple actually, if you know about Actor’s aroundReceive method. Just like it’s name implies (sounds very much like AOP by the way, doesn’t it? ;-)), it allows you to wrap a receive call of an actor with your code. Our implementation will simply just send all messages to the eventStream:
/** * Use for testing. * * Forwards all messages received to the system's EventStream. * Use this to deterministically `awaitForLeaderElection` etc. */ trait EventStreamAllMessages { this: Actor => override def aroundReceive(receive: Actor.Receive, msg: Any) = { context.system.eventStream.publish(msg.asInstanceOf[AnyRef]) receive.applyOrElse(msg, unhandled) } }
Easy! Now we don’t want to change our production code (or maybe it’s not our code, etc etc), so during Actor creation we can use a type refinement to mix this trait into our RaftActor (during setup, that I briefly mentioned, but didn’t show explicitly):
system.actorOf(Props(new RaftActor with EventStreamAllMessages))
And this Actor will now automatically send all messages to the EventStream. On the event stream we subscribe for LeaderElected events using our probe, and the expectMsg part we already talked about. So… that’s it! A very simple method to test actors “from the outside”. Now you can look back at the first code-snippet in this post, to see how this all fits together.
All in all, if you’re building anything using Actors, or maybe even a library someone might want to test, it’s a great idea to provide a setting to enable your Actors sending events the EventStream, because then even from the outside, it’s easy to track what’s happening deep down in the system. I’m pretty sure I will include such “enableable” events for the libraries I’m currently building – in order to be nice to the lib’s users. ;-)
Published at DZone with permission of Konrad Malawski, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Trending
-
Design Patterns for Microservices: Ambassador, Anti-Corruption Layer, and Backends for Frontends
-
Microservices Decoded: Unraveling the Benefits, Challenges, and Best Practices for APIs
-
Observability Architecture: Financial Payments Introduction
-
Top 10 Pillars of Zero Trust Networks
Comments