DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
  1. DZone
  2. Data Engineering
  3. Databases
  4. Making the Reactive Queue Durable with Akka Persistence

Making the Reactive Queue Durable with Akka Persistence

Adam Warski user avatar by
Adam Warski
·
Jul. 17, 14 · Interview
Like (0)
Save
Tweet
Share
6.12K Views

Join the DZone community and get the full member experience.

Join For Free

some time ago i wrote how to implement a reactive message queue with akka streams . the queue supports streaming send and receive operations with back-pressure, but has one downside: all messages are stored in-memory, and hence in case of a restart are lost.

but this can be easily solved with the experimental akka-persistence module, which just got an update in akka 2.3.4 .

queue actor refresher

to make the queue durable, we only need to change the queue actor; the reactive/streaming parts remain intact. just as a reminder, the reactive queue consists of:

  • a single queue actor , which holds an internal priority queue of messages to be delivered. the queue actor accepts actor-messages to send, receive and delete queue-messages
  • a broker , which creates the queue actor, listens for connections from senders and receivers, and creates the reactive streams when a connection is established
  • a sender , which sends messages to the queue (for testing, one message each second). multiple senders can be started. messages are sent only if they can be accepted (back-pressure from the broker)
  • a receiver , which receives messages from queue, as they become available and as they can be processed (back-pressure from the receiver)

reactmq actors

going persistent (remaining reactive)

the changes needed are quite minimal.

first of all, the queueactor needs to extend persistentactor , and define two methods:

  • receivecommand , which defines the “normal” behaviour when actor-messages (commands) arrive
  • receiverecover , which is used during recovery only, and where replayed events are sent

but in order to recover, we first need to persist some events! this should of course be done when handling the message queue operations.

for example, when sending a message, a messageadded event is persisted using persistasync :

def handlequeuemsg: receive = {
  case sendmessage(content) =>
    val msg = sendmessage(content)
    persistasync(msg.tomessageadded) { msgadded =>
      sender() ! sentmessage(msgadded.id)
      tryreply()
    }
 
   // ...
}

persistasync is one way of persisting events using akka-persistence. the other, persist (which is also the default one), buffers subsequent commands (actor-messages) until the event is persisted; this is a bit slower, but also easier to reason about and remain consistent. however in case of the message queue such behaviour isn’t necessary. the only guarantee that we need is that the message send is acknowledged only after the event is persisted; and that’s why the reply is sent in the after-persist event handler. you can read more about persistasync in the docs .

similarly, events are persisted for other commands (actor-messages, see queueactorreceive ). both for deletes and receives we are using persistasync , as the queue aims to provide an at-least-once delivery guarantee.

the final component is the recovery handler, which is defined in queueactorrecover (and then used in queueactor ). recovery is quite simple: the events correspond to adding a new message, updating the “next delivery” timestamp or deleting.

the internal representation uses both a priority queue and a by-id map for efficiency, so when the events are handled during recovert we only build the map, and use the recoverycompleted special event to build the queue as well. the special event is sent by akka-persistence automatically.

and that’s all! if you now run the broker, send some messages, stop the broker, start it again, you’ll see that the messages are recovered, and indeed, they get received if a receiver is run.

the code isn’t production-ready of course. the event log is going to constantly grow, so it would certainly make sense to make use of snapshots , plus delete old events/snapshots to make the storage size small and recovery fast.

replication

now that the queue is durable, we can also have a replicated persistent queue almost for free: we simply need to use a different journal plugin ! the default one relies on leveldb and writes data to the local disk. other implementations are available: for cassandra , hbase , and mongo .

making a simple switch of the persistence backend we can have our messages replicated across a cluster.

summary

with the help of two experimental akka modules, reactive streams and persistence , we have been able to implement a durable, reactive queue with a quite minimal amount of code. and that’s just the beginning, as the two technologies are only starting to mature!

if you’d like to modify/fork the code, it is available on github .


Akka (toolkit) Persistence (computer science) Event Message queue

Published at DZone with permission of Adam Warski, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • How To Create and Edit Excel XLSX Documents in Java
  • Top Five Tools for AI-based Test Automation
  • Stream Processing vs. Batch Processing: What to Know
  • PostgreSQL: Bulk Loading Data With Node.js and Sequelize

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: