DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
  1. DZone
  2. Software Design and Architecture
  3. Cloud Architecture
  4. Remote Actors - Discovering Akka

Remote Actors - Discovering Akka

Tomasz Nurkiewicz user avatar by
Tomasz Nurkiewicz
CORE ·
Nov. 18, 12 · Interview
Like (0)
Save
Tweet
Share
3.95K Views

Join the DZone community and get the full member experience.

Join For Free

 Assume our test application became a huge success and slowly a single server is not capable of handling growing traffic. We are faced with two choices: replacing our server with a better one (scaling up) or buying a second one and building a cluster (scaling out). We've chosen to build a cluster as it's easier to scale in the future. However we quickly discover that our application no longer fulfils the very first requirement:

  1. The client application should call the URL [...] at most from one thread - it's forbidden to concurrently fetch random numbers using several HTTP connections.
Obviously every node in the cluster is independent, having its own, separate instance of Akka, thus a separate copy of RandomOrgClient actor. In order to fix this issue we have few options:


  • having a global (cluster-wide!) lock (distributed monitor, semaphore, etc.) guarding multithreaded access. Ordinary synchronized is not enough.
  • ...or create a dedicated node in the cluster to communicate with random.org, used by all other nodes via some API
  • ...or create only one instance of RandomOrgClient on exactly one node and expose it via some API (RMI, JMS...) to remote clients

Do you remember how much time spent describing the different between Actor and ActorRef? Now this distinction will become obvious. In turns out our solution will be based on the last suggestion, however we don't have to bother about API, serialization, communication or transport layer. Even better, there is no such API in Akka to handle remote actors. It's enough to say in the configuration file: this particular actor is suppose to be created only on this node. All other nodes, instead of creating the same actor locally, will return a special proxy, which looks like a normal actor from the outside, while in reality it forwards all messages to remote actor on other node.
Let's say it again: we don't have to change anything in our code, it's enough to make some adjustments in the configuration file:

akka {
  actor {
  provider = "akka.remote.RemoteActorRefProvider"
  deployment {
      /buffer/client {
        remote = "akka://RandomOrgSystem@127.0.0.1:2552"
      }
    }
  }
  remote {
    transport = "akka.remote.netty.NettyRemoteTransport"
    log-sent-messages = on
    netty {
      hostname = "127.0.0.1"
    }
  }
}

That's it! Each node in the cluster is identified by the server address and port. Key part of the configuration is the declaration that /buffer/client is suppose to be created only 127.0.0.1:2552. Every other instance (working on a different server or port), instead of creating a new copy of the actor, will build a special transparent proxy, calling remote server.

If you don't remember the architecture of our solution, figure below demonstrates the message flow. As you can see each node has its own copy of RandomOrgBuffer (otherwise each access of the buffer would result in remote call, which defeats the purpose of the buffer altogether). However each node (except the middle one) has a remote reference to a RandomOrgClient actor (node in the middle accesses this actor locally):



The machine in the middle (JVM 1) is executed on port 2552 and it's the only machine that communicates with random.org. All the others (JVM 2 and 3 working on 2553 and 2554 respectively) are communicating with this server indirectly via JVM 1. BTW we can change the TCP/IP port used by each node either by using configuration file or -Dakka.remote.netty.port=2553 Java property.

Before we announce premature success (again), we are faced with another problem. Or actually, we haven't really passed the original obstacle yet. Since RandomOrgClient is now accessed by multiple RandomBuffer actors (distributed across the cluster), it can still initiate multiple concurrent HTTP connections to random.org, on behalf of every node in the cluster. It's easy to imagine a situation where several RandomOrgBuffer instances are sending FetchFromRandomOrg message at the same time, beginning several concurrent HTTP connections. In order to avoid this situation we implement already known technique of queueing requests in actor if one request wasn't yet finished:
case class FetchFromRandomOrg(batchSize: Int)
 
case class RandomOrgServerResponse(randomNumbers: List[Int])
 
class RandomOrgClient extends Actor {
 
  val client = new AsyncHttpClient()
  val waitingForReply = new mutable.Queue[(ActorRef, Int)]
 
  override def postStop() {
    client.close()
  }
 
  def receive = LoggingReceive {
    case FetchFromRandomOrg(batchSize) =>
      waitingForReply += (sender -> batchSize)
      if (waitingForReply.tail.isEmpty) {
        sendHttpRequest(batchSize)
      }
    case response: RandomOrgServerResponse =>
      waitingForReply.dequeue()._1 ! response
      if (!waitingForReply.isEmpty) {
        sendHttpRequest(waitingForReply.front._2)
      }
  }
 
  private def sendHttpRequest(batchSize: Int) {
    val url = "https://www.random.org/integers/?num=" + batchSize + "&min=0&max=65535&col=1&base=10&format=plain&rnd=new"
    client.prepareGet(url).execute(new RandomOrgResponseHandler(self))
  }
}
 
private class RandomOrgResponseHandler(notifyActor: ActorRef) extends AsyncCompletionHandler[Unit]() {
  def onCompleted(response: Response) {
    val numbers = response.getResponseBody.lines.map(_.toInt).toList
    notifyActor ! RandomOrgServerResponse(numbers)
  }
}
This time pay attention to waitingForReply queue. When a request to fetch random numbers from remote web service comes in, either we initiate new connection (if the queue doesn't contain no-one except us). If there are other actors awaiting, we must politely put ourselves in the queue, remembering who requested how many numbers (waitingForReply += (sender -> batchSize)). When a reply arrives, we take the very first actor from the queue (who waits for the longest amount of time) and initiate another request on behalf of him.

Unsurprisingly there is no multithreading or synchronization code. However it's important not to break encapsulation by accessing its state outside of receive method. I made this mistake by reading waitingForReply queue inside onCompleted() method. Because this method is called asynchronously by HTTP client worker thread, potentially we can access our actors state from two threads at the same time (if it was handling some message in receive at the same time). That's the reason why I decided to extract HTTP reply callback into a separate class, not having implicit access to an actor. This is much safer as access to actor's state is guarded by the compiler itself.

This is the last part of our Discovering Akka series. Remember that the complete source code is available on GitHub.

remote Akka (toolkit) cluster

Published at DZone with permission of Tomasz Nurkiewicz, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Exploring the Benefits of Cloud Computing: From IaaS, PaaS, SaaS to Google Cloud, AWS, and Microsoft
  • The 12 Biggest Android App Development Trends in 2023
  • Handling Automatic ID Generation in PostgreSQL With Node.js and Sequelize
  • Top Authentication Trends to Watch Out for in 2023

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: