This article takes a journey into the future of cloud computing, discussing emerging trends such as autonomous and distributed cloud-generative AI tools.
Last time we talked about implementation of functional queues with Groovy++ Today we will use these queues to implement several algorithms for processing of asynchronious messages. You can find source code and more examples in the Groovy++ distro What we want to do is to implement simplified actor, the object which sequentially process asynchroniously coming messages. There are two types of actors thread bound actor, which is the one having dedicated message processing thread. The thread is blocked if no messages are available pooled actor, which is executed on some thread pool. The beauty of pooled actor is that it does not consume any resources at all if there is no messages to process We will try to use the same approach based on our functional queues to implement both. Let us start with interface definition for message channel @Trait abstract class MessageChannel { abstract MessageChannel post (T message) MessageChannel leftShift (T msg) { post msg } } @Trait annotation is Groovy++ way to define interface with default implementationof some methods. Each class implementing such interface will inherit default implementation if the method is not implemented by the class or superclass We use Groovy++ trait here not because it is necessary for our sample but because the sample is real code from Groovy++ runtime. Note that message channel has nothing to do with concurrency - we can implement method post in whatever way we like. Just for fun we can do following Multiplexor class, which immidiately redistribute all incoming messages to all subscribed channels. class Multiplexor implements MessageChannel { private volatile FList> listeners = FList.emptyList Multiplexor subscribe(MessageChannel channel) { for (;;) { def l = listeners if (listeners.compareAndSet(l, l + channel)) return this } } Multiplexor subscribe(MessageChannel ... channels) { for (c in channels) { subscribe(c) } this } Multiplexor unsubscribe(MessageChannel channel) { for (;;) { def l = listeners if (listeners.compareAndSet(l, l - channel)) return this } } final Multiplexor post(M message) { listeners.each { channel -> channel << message } this } static Multiplexor of (MessageChannel ... channels) { new Multiplexor().subscribe(channels) } } You may notice that it was almost trivial to allow subscribers to subscribe and unsubscribe asynchroniously using our functional lists OK, back to our main story. Let us implement channel with asynchronious queue, which process no more than one message at any given moment. Our idea is following: we use functional queue to add messages when we add message to the queue we signal subclass (whatever it means for subclassing algorithm) we introduce special state of the queue to be used by subclasses, which means that the queue is already empty but last message is not processed yet. This is probably most non-trivial part of our algorithms Here is the implementation abstract class QueuedChannel implements MessageChannel { protected volatile FQueue queue = FQueue.emptyQueue protected static final FQueue busyEmptyQueue = FQueue.emptyQueue + null MessageChannel post(M message) { for (;;) { def oldQueue = queue def newQueue = (oldQueue === busyEmptyQueue ? FQueue.emptyQueue : oldQueue) + message if (queue.compareAndSet(oldQueue, newQueue)) { signalPost(oldQueue, newQueue) return this } } } protected abstract void signalPost (FQueue oldQueue, FQueue newQueue) abstract void onMessage(M message) } Now we are ready to create our first real actor backed by Executor and scheduled for execution for each message. We call it "fair" because it does not try to take as much resources as possible but give chance to work for all it's collegues. Here is explaination of the algorithm our channel implements Runnable. That might be not perfect from OOP prospective bus save us additional object creation when message added to empty queue we schedule actor for execution if after processing of a message our queue still non-empty we schedule again special care taken for the case when we process last message in the queue - we have to make sure that while we are not done new messages will not schedule new execution of the actor abstract static class FairExecutingChannel extends QueuedChannel implements Runnable { Executor executor void run () { for (;;) { def q = queue def removed = q.removeFirst() if (q.size() == 1) { if (queue.compareAndSet(q, busyEmptyQueue)) { onMessage removed.first if (!queue.compareAndSet(busyEmptyQueue, FQueue.emptyQueue)) { executor.execute this } break } } else { if (queue.compareAndSet(q, removed.second)) { onMessage removed.first executor.execute this break } } } } protected void signalPost(FQueue oldQueue, FQueue newQueue) { if (oldQueue !== busyEmptyQueue && newQueue.size() == 1) executor.execute this } } Fair algorithm above has one downside - if processing of messages is really fast we waste a lot of cycles by being executed for each and every message. That leads us to the idea of "non-fair" algorithm, which process all available messages when Runnable executed. For amounts of small messages it runs 2-3 times faster. Here is the implementation, which is even simplier @Typed abstract class NonfairExecutingChannel extends FairExecutingChannel { void run () { for (;;) { def q = queue if (queue.compareAndSet(q, busyEmptyQueue)) { for(m in q) { if (m) onMessage m } if(!queue.compareAndSet(busyEmptyQueue, FQueue.emptyQueue)) { executor.execute this } break } } } } Intersting to notice that we can develop some variations of algorithms above. For example we can process as many messages as we can in given timeframe (let say 250ms) or given number of messages in a run. Functional data structures gives us a lot of flexibility. To have the picture complete we should also implement thread backed variation of our approach. We leave it as exercise for reader Thank you for reading and hope it was interesting. Till next time.
This article will show how to solve three popular linked list problems: Linked List Cycle, Middle of the Linked List, and Remove Duplicates from Sorted List.
Kubernetes allows for highly precise resource utilization and minimal cloud waste. But in reality, there are many good reasons why we have not yet reached this goal.
In this post, we’ll focus on getting more familiar with Jupyter notebooks and how we can leverage them within Kubeflow as part of our machine learning workflow.
In this post, we’ll focus on getting a little more familiar with one of the first Kubeflow components that data scientists need familiarity with, Kubeflow Notebooks.
This series aims to give a detailed introduction of Kubeflow, it's various components, add-ons and how they all come together to deliver a complete MLOps platform.