Groovy concurrency in action: agents with Groovy++
Join the DZone community and get the full member experience.
Join For FreeSeveral weeks ago we've talked about lock free message passing algorithms with Groovy++ and provided implementation of actors similar to the ones in Scala or Erlang but faster. Today we will use same technique to implement similar but different concept from Clojure called agents. You can find source code and more examples in the Groovy++ distro
So what is agent? Probably the best source is Clojure documentation, which is very inspiring reading even if you hate Lisp-like no-syntax used by Clojure as much as I do. To make long story short, I will provide very non-scientific definition
Agent is reference to immutable object and mutation of reference itself only allowed by asynchronious actions, which are executed at most one action for each agent at any point in time.
Agents might have optional validators for suggested new value and (also optional) listeners for successful updates
For Clojure's experts it is important to note that Groovy++ agents are not integrated yet with any STM(Software Transactional Memory)
Let us start with developing our class
@Typed final class Agent<T> extends ExecutingChannel {
private volatile T ref
final T get () { ref }
.........
We derive our class from ExecutingChannel (variation of what we developed in the previous article), which will give us "at most one action at any moment of time" semantic for free. And define volatile field for reference to internal state and getter for this reference.
Several important things to notice
- Access to referenced object is available immidiately without any additional synchronization
- For that we pay by storing a reference in volatile field
- Exactly because of 1) the referenced object must be immutable of treated as immutable
- Groovy++ provides several persistent data structures secifically well-suited for such situations - FList, FQueue, FHashMap & FVector (some more are coming)
Now let us take care for validators and listeners. For both we define
- abstract static inner class (there is not fixed yet bug in Groovy core, which does not allow inner interfaces)
- volatile field for persistent list of that type (we want to be able to add remove listeners and validators asynchroniously)
- add/remove method to handle membership in the list
abstract static class Validator<T> {
abstract boolean validate (Agent<T> agent, T newValue)
}
private volatile FList<Listener<T>> listeners = FList.emptyList
abstract static class Listener<T> {
abstract void onUpdate (Agent<T> agent)
}
private volatile FList<Validator<T>> validators = FList.emptyList
Listener addListener (Listener listener) {
listeners.apply{ it + listener }
listener
}
void removeListener (Listener listener) {
listeners.apply{ it - listener }
}
Validator addValidator (Validator validator) {
validators.apply{ it + validator }
validator
}
void removeValidator (Validator validator) {
validators.apply{ it - validator }
}
Note how elegantly we able to apply new value to volatile field. Instead of for(;;) {..compareAndSet...} we just use apply {...}
Careful reader can ask why do we need special types for validator/listener and don't use standard Function1<Agent<T>,?> for listener and Function2<Agent<T>,T,Boolean> for validator instead. Truly speaking, my initial version used exactly that but then I found that boxing of validator result is very unnatural and decided to intruduce one method interface (listener has special class for simmetry).
Another interesting question is why do we need return value of addListener/addValidator. It is interesting story. Let us consider following code fragment
def validator = agent.addValidator {...<validator body>... }
What's going on here is implicit creation of anaonimous inner class extending Validator. Then this instance passed as argument to addValidator method. If we need to save reference for this validator (for example for removing later) we need to have access to it. This is reason for return value of addValidator method.
Now we are ready to implement the main method of class Agent. For convinience we will call it 'call' (sorry for tautology)
In Groovy you almost always can omit method name if the method is called 'call'
Here is the code
void call (Mutation<T> mutation, Runnable whenDone = null) {
def that = this
schedule {
def oldRef = ref
def newRef = mutation(oldRef)
if (newRef !== oldRef) {
if(!validators.any{ !it.validate(that, newRef) }) {
ref = newRef
listeners*.onUpdate(that)
}
}
whenDone?.run ()
}
}
Probably it requires a little bit of explainations
First of all Mutation<T> is the same as Function1<T,T>, which is function taking argument of type T and returning the value of the same type.
Secondly, ExecutorChannel.schedule is method, which send special message to the channel. You can think about this message as if it was Runnable to execute. Have a look at Groovy++ sources for details.
Last but not least is that we have additional optional parameter representing operation to execute when mutation/validation/listener actions are completed. To give one example why it can be useful we will develop one additional method.
Imagine that by some reasons we want to initiate mutation of agent value and wait before it completed. Here is simple (and more important deadlock free) way to do it.
void await (Mutation<T> mutation) {
CountDownLatch cdl = [1]
call(mutation) {
cdl.countDown ()
}
cdl.await()
}
What we do in the code above is introduce internal CountDownLatch and release it when update action is done.
Now, when we are done with design of Agent, let us try to use it.
To create agent we do the following
Agent<FVector<Integer>> sharedVector = [FVector.emptyVector]
sharedVector.executor = pool
In code above we set executor property (java.util.concurrent.Executor) explicitly. In many situation it is not necessary as Groovy++ standard library has notation of current executor, which is available in threads managed by the library
To update/fill internal vector we can do something like that
for(i in 0..<100) {
Mutation<FVector<Integer>> action = {
it.length < 100*100 ? it + it.length : it
}
sharedVector(action) {
if (sharedVector.get().length < 100*100)
sharedVector(action, this)
}
}
Note the trick we use to make our update "recursive". It is another interetsing use of completion callback whenDone. When update is completed we check if we want to continue and if so shedule the same action with the same completion callback.
Before reading next paragraph I suggest to the reader to think what is wrong with code above.
Unfortunately the code above is not absolutely correct. It might happen that one thread decided and scheduled update for our agent because internal vector contains less elements than we want but several updates was scheduled already and in total we exceed desired number of elements. Validator is useful tool in this situation
def validator = sharedVector.addValidator { agent, newValue ->
newValue.length <= 100*100
}
..............
sharedVector.removeValidator(validator)
We can also work with agent in quasi-synchronious way. Let say we want to start 10 threads to shuffle the internal vector 1000 times. We can do it like in the code below.
CountDownLatch shuffleCdl = [10]
for(i in 0..<10) {
pool.execute {
def r = new Random ()
for(j in 0..<1000) {
def i1 = r.nextInt (100*100)
def i2 = r.nextInt (100*100)
sharedVector.await {
def v1 = it[i1]
def v2 = it[i2]
it.set(i2, v1).set(i1, v2)
}
}
shuffleCdl.countDown()
}
}
shuffleCdl.await ()
Agent is simple but convinient shared data structure, which does not require any special synchronization. I hope this article made it more clear for you. Thank you for reading and till next time.
Opinions expressed by DZone contributors are their own.
Trending
-
DZone's Article Submission Guidelines
-
How to LINQ Between Java and SQL With JPAStreamer
-
Avoiding Pitfalls With Java Optional: Common Mistakes and How To Fix Them [Video]
-
Extending Java APIs: Add Missing Features Without the Hassle
Comments