DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
  1. DZone
  2. Data Engineering
  3. AI/ML
  4. Lock free message passing algorithms with Groovy++

Lock free message passing algorithms with Groovy++

Alex Tkachman user avatar by
Alex Tkachman
·
Mar. 03, 10 · Interview
Like (0)
Save
Tweet
Share
14.99K Views

Join the DZone community and get the full member experience.

Join For Free

Last time we talked about implementation of functional queues with Groovy++ Today we will use these queues to implement several algorithms for processing of asynchronious messages. You can find source code and more examples in the Groovy++ distro

What we want to do is to implement simplified actor, the object which sequentially process asynchroniously coming messages. There are two types of actors

  • thread bound actor, which is the one having dedicated message processing thread. The thread is blocked if no messages are available
  • pooled actor, which is executed on some thread pool. The beauty of pooled actor is that it does not consume any resources at all if there is no messages to process

We will try to use the same approach based on our functional queues to implement both.

Let us start with interface definition for message channel

@Trait abstract class MessageChannel<T> {
abstract MessageChannel<T> post (T message)

MessageChannel<T> leftShift (T msg) {
post msg
}
}

@Trait annotation is Groovy++ way to define interface with default implementationof some methods. Each class implementing such interface will inherit default implementation if the method is not implemented by the class or superclass

We use Groovy++ trait here not because it is necessary for our sample but because the sample is real code from Groovy++ runtime.

Note that message channel has nothing to do with concurrency - we can implement method post in whatever way we like. Just for fun we can do following Multiplexor class, which immidiately redistribute all incoming messages to all subscribed channels.

class Multiplexor<M> implements MessageChannel<M> {
private volatile FList<MessageChannel<M>> listeners = FList.emptyList

Multiplexor<M> subscribe(MessageChannel<M> channel) {
for (;;) {
def l = listeners
if (listeners.compareAndSet(l, l + channel))
return this
}
}

Multiplexor<M> subscribe(MessageChannel<M> ... channels) {
for (c in channels) {
subscribe(c)
}
this
}

Multiplexor<M> unsubscribe(MessageChannel<M> channel) {
for (;;) {
def l = listeners
if (listeners.compareAndSet(l, l - channel))
return this
}
}

final Multiplexor<M> post(M message) {
listeners.each { channel ->
channel << message
}
this
}

static Multiplexor<M> of (MessageChannel<M> ... channels) {
new Multiplexor().subscribe(channels)
}
}

You may notice that it was almost trivial to allow subscribers to subscribe and unsubscribe asynchroniously using our functional lists

OK, back to our main story. Let us implement channel with asynchronious queue, which process no more than one message at any given moment.

Our idea is following:

  • we use functional queue to add messages
  • when we add message to the queue we signal subclass (whatever it means for subclassing algorithm)
  • we introduce special state of the queue to be used by subclasses, which means that the queue is already empty but last message is not processed yet. This is probably most non-trivial part of our algorithms

 Here is the implementation

abstract class QueuedChannel<M> implements MessageChannel<M> {

protected volatile FQueue<M> queue = FQueue.emptyQueue

protected static final FQueue busyEmptyQueue = FQueue.emptyQueue + null

MessageChannel<M> post(M message) {
for (;;) {
def oldQueue = queue
def newQueue = (oldQueue === busyEmptyQueue ? FQueue.emptyQueue : oldQueue) + message
if (queue.compareAndSet(oldQueue, newQueue)) {
signalPost(oldQueue, newQueue)
return this
}
}
}

protected abstract void signalPost (FQueue<M> oldQueue, FQueue<M> newQueue)

abstract void onMessage(M message)
}

Now we are ready to create our first real actor backed by Executor and scheduled for execution for each message. We call it "fair" because it does not try to take as much resources as possible but give chance to work for all it's collegues.

Here is explaination of the algorithm

  • our channel implements Runnable. That might be not perfect from OOP prospective bus save us additional object creation
  • when message added to empty queue we schedule actor for execution
  • if after processing of a message our queue still non-empty we schedule again
  • special care taken for the case when we process last message in the queue - we have to make sure that while we are not done new messages will not schedule new execution of the actor
abstract static class FairExecutingChannel<M> extends QueuedChannel<M> implements Runnable {
Executor executor

void run () {
for (;;) {
def q = queue
def removed = q.removeFirst()
if (q.size() == 1) {
if (queue.compareAndSet(q, busyEmptyQueue)) {
onMessage removed.first
if (!queue.compareAndSet(busyEmptyQueue, FQueue.emptyQueue)) {
executor.execute this
}
break
}
}
else {
if (queue.compareAndSet(q, removed.second)) {
onMessage removed.first
executor.execute this
break
}
}
}
}

protected void signalPost(FQueue<M> oldQueue, FQueue<M> newQueue) {
if (oldQueue !== busyEmptyQueue && newQueue.size() == 1)
executor.execute this
}
}

Fair algorithm above has one downside - if processing of messages is really fast we waste a lot of cycles by being executed for each and every message. That leads us to the idea of "non-fair" algorithm, which process all available messages when Runnable executed. For amounts of small messages it runs 2-3 times faster.

Here is the implementation, which is even simplier

@Typed abstract class NonfairExecutingChannel<M> extends FairExecutingChannel<M>  {
void run () {
for (;;) {
def q = queue
if (queue.compareAndSet(q, busyEmptyQueue)) {
for(m in q) {
if (m)
onMessage m
}
if(!queue.compareAndSet(busyEmptyQueue, FQueue.emptyQueue)) {
executor.execute this
}
break
}
}
}
}

Intersting to notice that we can develop some variations of algorithms above. For example we can process as many messages as we can in given timeframe (let say 250ms) or given number of messages in a run. Functional data structures gives us a lot of flexibility.

To have the picture complete we should also implement thread backed variation of our approach. We leave it as exercise for reader

Thank you for reading and hope it was interesting. Till next time. 

Algorithm Message passing

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Multi-Cloud Integration
  • gRPC on the Client Side
  • 11 Observability Tools You Should Know
  • Steel Threads Are a Technique That Will Make You a Better Engineer

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: