DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
View Events Video Library
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Integrating PostgreSQL Databases with ANF: Join this workshop to learn how to create a PostgreSQL server using Instaclustr’s managed service

Mobile Database Essentials: Assess data needs, storage requirements, and more when leveraging databases for cloud and edge applications.

Monitoring and Observability for LLMs: Datadog and Google Cloud discuss how to achieve optimal AI model performance.

Automated Testing: The latest on architecture, TDD, and the benefits of AI and low-code tools.

Related

  • Reactive Event Streaming Architecture With Kafka, Redis Streams, Spring Boot, and HTTP Server-Sent Events (SSE)
  • Can Redis Be Used as a Relational Database?
  • Boosting Application Performance With MicroStream and Redis Integration
  • Efficient Sharing Data in Ansible

Trending

  • Microservices With Apache Camel and Quarkus (Part 5)
  • Causes and Remedies of Poison Pill in Apache Kafka
  • Exploring Edge Computing: Delving Into Amazon and Facebook Use Cases
  • Top 8 Conferences Developers Can Still Attend
  1. DZone
  2. Coding
  3. Java
  4. PubSub with Redis and Akka Actors

PubSub with Redis and Akka Actors

Debasish Ghosh user avatar by
Debasish Ghosh
·
Apr. 20, 10 · News
Like (0)
Save
Tweet
Share
14.67K Views

Join the DZone community and get the full member experience.

Join For Free
Redis (the version on the trunk) offers publish/subscribe based messaging. This is quite a big feature compared to the typical data structure oriented services that it had been offering so far. This also opens up lots of possibilities to use Redis as a messaging engine of a different kind. The sender and the receiver of the messages are absolutely decoupled from each other in the sense that senders do not send messages to specific receivers. Publishers publish messages on specific channels. Subscribers who subscribe to those channels get them and can take specific actions on them. As Salvatore notes in his weekly updates on Redis, this specific feature has evolved from lots of user requests who had been asking for a general notification mechanism to trap changes in the key space. Redis already offers BLPOP (Blocking list pop operation) for similar use cases. But still it's not sufficient to satisfy the needs of a general notification scheme. Salvatore explains it in more details in his blog post.

I have been working on a Scala client, which I forked from Alejandro Crosa's repository. I implemented pubsub very recently and also have integrated it with Akka actors. The full implementation of the pubsub client in Scala is in my github repository. And if you like to play around with the Akka actor based implementation, have a look at the Akka repository.

You define your publishers and subscribers as actors and exchange messages over channels. You can define your own callbacks as to what you would like to do when you receive a particular message. Let's have a look at a sample implementation at the client level .. I will assume that you want to implement your own pub/sub application on top of the Akka actor based pubsub substrate that uses the redis service underneath.

Implementing the publisher interface is easy .. here is how you can bootstrap your own publishing service ..
object Pub {
println("starting publishing service ..")
val p = new Publisher(new RedisClient("localhost", 6379))
p.start

def publish(channel: String, message: String) = {
p ! Publish(channel, message)
}
}
The publish method just sends a Publish message to the Publisher. Publisher is an actor defined in Akka as follows:
class Publisher(client: RedisClient) extends Actor {
def receive = {
case Publish(channel, message) =>
client.publish(channel, message)
reply(true)
}
}
The subscriber implementation is a bit more complex since you need to register your callback as to what you would like to do when you receive a specific type of message. This depends on your use case and it's your responsibility to provide a proper callback function downstream.

Here is a sample implementation for the subscriber. We need two methods to subscribe and unsubscribe from channels. Remember in Redis the subscriber cannot publish - hence our Sub cannot do a Pub.
object Sub {
println("starting subscription service ..")
val s = new Subscriber(new RedisClient("localhost", 6379))
s.start
s ! Register(callback)

def sub(channels: String*) = {
s ! Subscribe(channels.toArray)
}

def unsub(channels: String*) = {
s ! Unsubscribe(channels.toArray)
}

def callback(pubsub: PubSubMessage) = pubsub match {
//..
}
}
I have not yet specified the implementation of the callback. How should it look like ?

The callback will be invoked when the subscriber receives a specific type of message. According to Redis specification, the types of messages which a subscriber can receive are:

a. subscribe
b. unsubscribe
c. message

Refer to the Redis documentation for details of these message formats. In our case, we model them as case classes as part of the core Redis client implementation ..
sealed trait PubSubMessage
case class S(channel: String, noSubscribed: Int) extends PubSubMessage
case class U(channel: String, noSubscribed: Int) extends PubSubMessage
case class M(origChannel: String, message: String) extends PubSubMessage
Our callback needs to take appropriate custom action on receipt of any of these types of messages. The following can be one such implementation .. It is customized for a specific application which treats various formats of messages and gives appropriate application dependent semantics ..
def callback(pubsub: PubSubMessage) = pubsub match {
case S(channel, no) => println("subscribed to " + channel + " and count = " + no)
case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no)
case M(channel, msg) =>
msg match {
// exit will unsubscribe from all channels and stop subscription service
case "exit" =>
println("unsubscribe all ..")
r.unsubscribe

// message "+x" will subscribe to channel x
case x if x startsWith "+" =>
val s: Seq[Char] = x
s match {
case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => }
}

// message "-x" will unsubscribe from channel x
case x if x startsWith "-" =>
val s: Seq[Char] = x
s match {
case Seq('-', rest @ _*) => r.unsubscribe(rest.toString)
}

// other message receive
case x =>
println("received message on channel " + channel + " as : " + x)
}
}
Note in the above implementation we specialize some of the messages to give additional semantics. e.g. if I receive a message as "+t", I will interpret it as subscribing to the channel "t". Similarly "exit" will unsubscribe me from all channels.

How to run this application ?

I will assume that you have the Akka master with you. Also you need to have a version of Redis running that implements pubsub. You can start the subscription service using the above implementation and then use any other Redis client to publish messages. Here's a sample recipe for a run ..


Prerequisite: Need Redis Server running (the version that supports pubsub)
1. Download redis from http://github.com/antirez/redis
2. build using "make"
3. Run server as ./redis-server


For running this sample application :-

Starting the Subscription service

1. Open up another shell similarly as the above and set AKKA_HOME
2. cd $AKKA_HOME
3. sbt console
4. scala> import sample.pubsub._
5. scala> Pub.publish("a", "hello") // the first shell should get the message
6. scala> Pub.publish("c", "hi") // the first shell should NOT get this message


Another publishing client using redis-cli

Open up a redis-client from where you installed redis and issue a publish command
./redis-cli publish a "hi there" ## the first shell should get the message

Have fun with the message formats

1. Go back to the first shell
2. Sub.unsub("a") // should unsubscribe the first shell from channel "a"
3. Study the callback function defined below. It supports many other message formats.
4. In the second shell window do the following:

scala> Pub.publish("b", "+c") // will subscribe the first window to channel "c"
scala> Pub.publish("b", "+d") // will subscribe the first window to channel "d"
scala> Pub.publish("b", "-c") // will unsubscribe the first window from channel "c"
scala> Pub.publish("b", "exit") // will unsubscribe the first window from all channels

The full implementation of the above is there as a sample project in Akka master. And in case you are not using Akka, I also have a version of the above implemented using Scala actors in the scala-redis distribution.

Have fun!




Redis (company) Akka (toolkit)

Published at DZone with permission of Debasish Ghosh, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Reactive Event Streaming Architecture With Kafka, Redis Streams, Spring Boot, and HTTP Server-Sent Events (SSE)
  • Can Redis Be Used as a Relational Database?
  • Boosting Application Performance With MicroStream and Redis Integration
  • Efficient Sharing Data in Ansible

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: