Over a million developers have joined DZone.

Akka Notes: ActorSystem Configuration and Scheduling

· IoT Zone

Access the survey results 'State of Industrial Internet Application Development' to learn about latest challenges, trends and opportunities with Industrial IoT, brought to you in partnership with GE Digital.

As we saw from our previous posts, we could create an Actor using the actorOf method of the ActorSystem. There's actually much more you could do with ActorSystem. We'll touch upon just the Configuration and the Scheduling bit in this write-up

Let's look at the subsets of methods available in the ActorSystem.

Actor API

1. CONFIGURATION MANAGEMENT

Remember the application.conf file we used for configuring our log level in the previous write-up? This configuration file is just like those.properties files in Java applications and much more. We'll be soon seeing how we could use this configuration file to customize our dispatchers, mailboxes etc. (I am not even closely doing justice to the power of the typesafe config. Please go through some examples to really appreciate its awesomeness)

So, when we create the ActorSystem using the ActorSystem object's apply method without specifying any configuration, it looks out forapplication.confapplication.json and application.properties in the root of the classpath and loads them automatically.

So,

val system=ActorSystem("UniversityMessagingSystem") 

is the same as

val system=ActorSystem("UniversityMessagingSystem", ConfigFactory.load())

To provide evidence to that argument, check out the apply method in ActorSystem.scala

 def apply(name: String, config: Option[Config] = None, classLoader: Option[ClassLoader] = None, defaultExecutionContext: Option[ExecutionContext] = None): ActorSystem = {
val cl = classLoader.getOrElse(findClassLoader())
val appConfig = config.getOrElse(ConfigFactory.load(cl))
new ActorSystemImpl(name, appConfig, cl, defaultExecutionContext).start()
}

A. OVERRIDING DEFAULT CONFIGURATION

If you are not keen on using the application.conf (as in testcases) or would like to have your own custom configuration file (as in testing againt different configuration or deploying to different environments), you are free to override this by passing in your own configuration instead of wanting the one from the classpath.

ConfigFactory.parseString is one option

val actorSystem=ActorSystem("UniversityMessageSystem", ConfigFactory.parseString("""akka.loggers = ["akka.testkit.TestEventListener"]""")) 

or

simply in your Testcase as

class TeacherTestLogListener extends TestKit(ActorSystem("UniversityMessageSystem", ConfigFactory.parseString("""akka.loggers = ["akka.testkit.TestEventListener"]""")))
with WordSpecLike
with MustMatchers
with BeforeAndAfterAll {

There's also a ConfigFactory.load

val system = ActorSystem("UniversityMessageSystem", ConfigFactory.load("uat-application.conf")) 

If you need access to your own config parameters in runtime, you could do it via its API like so :

val system=ActorSystem("UniversityMessageSystem", ConfigFactory.parseString("""akka.loggers = ["akka.testkit.TestEventListener"]""")) 
println (system.settings.config.getValue("akka.loggers")) // Results in > SimpleConfigList(["akka.testkit.TestEventListener"])

B. EXTENDING DEFAULT CONFIGURATION

Other than overriding, you could also extend the default configuration with your custom configuration using the withFallback method of theConfig.

Let's say your application.conf looks like :

akka{ 
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = DEBUG
arun="hello"
}

and you decide to override the akka.loggers property like :

 val config=ConfigFactory.parseString("""akka.loggers = ["akka.testkit.TestEventListener"]""")
val system=ActorSystem("UniversityMessageSystem", config.withFallback(ConfigFactory.load()))

You end up with a merged configuration of both :

 println (system.settings.config.getValue("akka.arun")) //> ConfigString("hello")
println (system.settings.config.getValue("akka.loggers")) //> SimpleConfigList(["akka.testkit.TestEventListener"])

So, why did I tell this whole story on configuration? Because our ActorSystem is the one which loads and provides access to all the configuration information.


IMPORTANT NOTE :

Watch out the order of falling back here - which is the default and which is the extension configuration. Remember, you have to fall back to the default configuration. So,

config.withFallback(ConfigFactory.load()) 

would work

but

ConfigFactory.load().withFallback(config) 

would not get the results that you may need.


2. SCHEDULER

Scheduler method ActorSystem

As you can see from the API of ActorSystem, there is a powerful little method in ActorSystem called scheduler which returns a Scheduler. TheScheduler has a variety of schedule methods with which we could do some fun stuff inside the Actor environment.

A. SCHEDULE SOMETHING TO EXECUTE ONCE

Delayed Student Scheduler

Taking our Student-Teacher example, assume our StudentActor would want to send message to the teacher only after 5 seconds of it receiving the InitSignal from our Testcase and not immediately, our code looks like :

class StudentDelayedActor (teacherActorRef:ActorRef) extends Actor with ActorLogging {




def receive = {
case InitSignal=> {
import context.dispatcher
context.system.scheduler.scheduleOnce(5 seconds, teacherActorRef, QuoteRequest)
//teacherActorRef!QuoteRequest
}
...
...
}
}

Testcase

Let's cook up a testcase to verify this :

"A delayed student" must {




"fire the QuoteRequest after 5 seconds when an InitSignal is sent to it" in {




import me.rerun.akkanotes.messaging.protocols.StudentProtocol._




val teacherRef = system.actorOf(Props[TeacherActor], "teacherActorDelayed")
val studentRef = system.actorOf(Props(new StudentDelayedActor(teacherRef)), "studentDelayedActor")




EventFilter.info (start="Printing from Student Actor", occurrences=1).intercept{
studentRef!InitSignal
}
}




}

Increasing the timeout for Eventfilter interception

Ouch. The default timeout for the EventFilter to wait for the message to appear in the EventStream is 3 seconds. Let's increase that to 7 seconds now to verify our testcase. The filter-leeway configuration property helps us achieve that.

class RequestResponseTest extends TestKit(ActorSystem("TestUniversityMessageSystem", ConfigFactory.parseString(""" 
akka{
loggers = ["akka.testkit.TestEventListener"]
test{
filter-leeway = 7s
}
}
""")))
with WordSpecLike
with MustMatchers
with BeforeAndAfterAll 
with ImplicitSender {
...
...

B. SCHEDULE SOMETHING TO EXECUTE REPEATEDLY

In order to execute something repeatedly, you use the schedule method of the Scheduler.

One of the frequently used overload of the schedule method is the one which sends a message to the Actor on a regular basis. It acccepts 4 parameters :

  1. How long should be initial delay be before the first execution begins
  2. Frequency of subsequent executions
  3. The target ActorRef that we are going to send a message to
  4. The Message
case InitSignal=> { 
import context.dispatcher
context.system.scheduler.schedule(0 seconds, 5 seconds, teacherActorRef, QuoteRequest)
//teacherActorRef!QuoteRequest
}

TRIVIA

The import import context.dispatcher is very important here.

The schedule methods requires a very important implicit parameter - ExecutionContext, the reason for which would be pretty obvious once we see the implementation of the schedule method :

final def schedule( 
initialDelay: FiniteDuration,
interval: FiniteDuration,
receiver: ActorRef,
message: Any)(implicit executor: ExecutionContext,
sender: ActorRef = Actor.noSender): Cancellable =
schedule(initialDelay, interval, new Runnable {
def run = {
receiver ! message
if (receiver.isTerminated)
throw new SchedulerException("timer active for terminated actor")
}
})

The schedule method just wraps the tell in a Runnable which eventually is executed by the ExecutionContext that we pass in.

In order to make an ExecutionContext available in scope as an implicit, we leverage upon the implicit dispatcher available on the context.

From ActorCell.scala (Context)

/**
* Returns the dispatcher (MessageDispatcher) that is used for this Actor.
* Importing this member will place an implicit ExecutionContext in scope.
*/
implicit def dispatcher: ExecutionContextExecutor

CODE

As always, the entire project could be downloaded from github here.

The IoT Zone is brought to you in partnership with GE Digital.  Discover how IoT developers are using Predix to disrupt traditional industrial development models.

Topics:

Published at DZone with permission of Arun Manivannan, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}