Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

How to Throttle Messages Using Akka FSM

DZone's Guide to

How to Throttle Messages Using Akka FSM

See how Akka FSM can help your application recover from a crash by introducing message throttling so your thread pools don't get flooded.

· Java Zone ·
Free Resource

Get the Edge with a Professional Java IDE. 30-day free trial.

A few days ago, I was having an issue where the rate of incoming message requests to a process was too high. All requests were thus being processed in the future. By default, every message request was getting processed in parallel. But this was too much parallelism. It was flooding the thread pool with a lot of simultaneous work. A better way to do it is to limit how much you are doing in parallel.

After researching a lot, I read that Akka FSM might help to throttle the messages. So, I tried to go with it.

The goal is to implement a message throttler on the basis of size, a piece of code that ensures that messages are not sent out at a very high rate.

According to the documentation, an FSM can be described as a set of relations in the form:

State(S) x Event(E) -> Actions (A), State(S’)

If we are in state S and the event E occurs, we should perform the actions A and make a transition to the state S’.

Abstract Idea

  1. We will make a throttler machine with two states: Waiting and Active.
  2. There will be two types of messages, incoming messages and Flush (a message that will start processing messages)
  3. Start with the Waiting state by using an empty Queue and accumulate incoming message requests.
  4. A Flush message will decide when to change the state from waiting to active.
  5. While changing state from Waiting to Active, process requests in onTransition.
  6. onTransition is a partial function that takes, as a pair of states as its input — the current state and the next state.
  7. From active, go to the Waiting state again after dequeuing processed requests.

Implementation

sealed trait State
case object Waiting extends State
case object Active extends State

case class Msg(a: Int)
case object Flush

case class StateData(queue: immutable.Queue[Msg])

class SizeBasedThrottler extends FSM[State, StateData] {
    startWith(Waiting, StateData(Queue.empty))

    onTransition {
        case Waiting -> Active =>
        nextStateData match {
            case StateData(queue) =>
            for(x <- queue) yield println(s"$curTime processing ${x.a} ") Thread.sleep(2000L) // used just to depict as real time problem take time to process request } } when(Active) { case Event(msg: Msg, _) =>
            println(s"$curTime at Active $msg" )
            goto(Waiting) using StateData(Queue(msg))//StateData.single(msg)
    }

    when(Waiting, stateTimeout = 2 seconds){
        case Event(msg: Msg, StateData(oldQueue)) =>
        val newQueue = oldQueue :+ msg
        println(s"$curTime at Idle $newQueue")
        stay() using StateData(newQueue)

        case Event(Flush, StateData(queue)) => goto(Active) using StateData(queue)

        case Event(StateTimeout, StateData(queue)) => goto(Active) using StateData(queue)

    }

    initialize()
}


Send the Flush message as soon as the number of requests reaches to the configured value. The processing actually happens in the transition Waiting – > Active. Probably the trickiest point is to not forget is that when FSM is in the state Active, new messages will arrive and should be processed by adding to a queue (or rather starting a new queue with the data from that message). There might be a chance that the size of the request never reaches to the configured value. For that, we have used StateTimeOut. If the message does not come for state's timeout period, process all requests.

The aim of this blog is to demonstrate how Akka FSM can be used to throttle the messages and rescue an application from a crash.

For the complete example, the code is available on GitHub.

References

  1. Akka FSM
  2. Throttling Messages in Akka

Get the Java IDE that understands code & makes developing enjoyable. Level up your code with IntelliJ IDEA. Download the free trial.

Topics:
scala ,akka actors ,throttling ,finite state machine ,java ,tutorial ,akka fsm

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}