DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations

Trending

  • DZone's Article Submission Guidelines
  • How to LINQ Between Java and SQL With JPAStreamer
  • Avoiding Pitfalls With Java Optional: Common Mistakes and How To Fix Them [Video]
  • Extending Java APIs: Add Missing Features Without the Hassle
  1. DZone
  2. Coding
  3. Languages
  4. Groovy concurrency in action: agents with Groovy++

Groovy concurrency in action: agents with Groovy++

Alex Tkachman user avatar by
Alex Tkachman
·
May. 03, 10 · Interview
Like (0)
Save
Tweet
Share
8.31K Views

Join the DZone community and get the full member experience.

Join For Free

Several weeks ago we've talked about lock free message passing algorithms with Groovy++ and provided implementation of actors similar to the ones in Scala or Erlang but faster. Today we will use same technique to implement similar but different concept from Clojure called agents. You can find source code and more examples in the Groovy++ distro

So what is agent? Probably the best source is Clojure documentation, which is very inspiring reading even if you hate Lisp-like no-syntax used by Clojure as much as I do. To make long story short, I will provide very non-scientific definition

Agent is reference  to immutable object and mutation of reference itself only allowed by asynchronious actions, which are executed at most one action for each agent at any point in time.

Agents might have optional validators for suggested new value and (also optional) listeners for successful updates

For Clojure's experts it is important to note that Groovy++ agents are not integrated yet with any STM(Software Transactional Memory)

Let us start with developing our class

@Typed final class Agent<T> extends ExecutingChannel {
private volatile T ref

final T get () { ref }

.........

We derive our class from ExecutingChannel (variation of what we developed in the previous article), which will give us "at most one action at any moment of time" semantic for free. And define volatile field for reference to internal state and getter for this reference.

Several important things to notice

  1. Access to referenced object is available immidiately without any additional synchronization
  2. For that we pay by storing a reference in volatile field
  3. Exactly because of 1) the referenced object must be immutable of treated as immutable
  4. Groovy++ provides several persistent data structures secifically well-suited for such situations - FList, FQueue, FHashMap & FVector (some more are coming)

Now let us take care for validators and listeners. For both we define

  • abstract static inner class (there is not fixed yet bug in Groovy core, which does not allow inner interfaces)
  • volatile field for persistent list of that type (we want to be able to add remove listeners and validators asynchroniously)
  • add/remove method to handle membership in the list
    abstract static class Validator<T> {
abstract boolean validate (Agent<T> agent, T newValue)
}

private volatile FList<Listener<T>> listeners = FList.emptyList

abstract static class Listener<T> {
abstract void onUpdate (Agent<T> agent)
}

private volatile FList<Validator<T>> validators = FList.emptyList

Listener addListener (Listener listener) {
listeners.apply{ it + listener }
listener
}

void removeListener (Listener listener) {
listeners.apply{ it - listener }
}

Validator addValidator (Validator validator) {
validators.apply{ it + validator }
validator
}

void removeValidator (Validator validator) {
validators.apply{ it - validator }
}

 Note how elegantly we able to apply new value to volatile field. Instead of for(;;) {..compareAndSet...} we just use apply {...}

Careful reader can ask why do we need special types for validator/listener and don't use standard Function1<Agent<T>,?> for listener and Function2<Agent<T>,T,Boolean> for validator instead. Truly speaking, my initial version used exactly that but then I found that boxing of validator result is very unnatural and decided to intruduce one method interface (listener has special class for simmetry).

Another interesting question is why do we need return value of addListener/addValidator. It is interesting story. Let us consider following code fragment

def validator = agent.addValidator {...<validator body>... }

What's going on here is implicit creation of anaonimous inner class extending Validator. Then this instance passed as argument to addValidator method. If we need to save reference for this validator (for example for removing later) we need to have access to it. This is reason for return value of addValidator method.

Now we are ready to implement the main method of class Agent. For convinience we will call it 'call' (sorry for tautology)

In Groovy you almost always can omit method name if the method is called 'call'

Here is the code

    void call (Mutation<T> mutation, Runnable whenDone = null) {
def that = this
schedule {
def oldRef = ref
def newRef = mutation(oldRef)
if (newRef !== oldRef) {
if(!validators.any{ !it.validate(that, newRef) }) {
ref = newRef
listeners*.onUpdate(that)
}
}

whenDone?.run ()
}
}

Probably it requires a little bit of explainations

First of all Mutation<T> is the same as Function1<T,T>, which is function taking argument of type T and returning the value of the same type.

Secondly, ExecutorChannel.schedule is method, which send special message to the channel. You can think about this message as if it was Runnable to execute. Have a look at Groovy++ sources for details.

Last but not least is that we have additional optional parameter representing operation to execute when mutation/validation/listener actions are completed. To give one example why it can be useful we will develop one additional method.

Imagine that by some reasons we want to initiate mutation of agent value and wait before it completed. Here is simple (and more important deadlock free) way to do it.

    void await (Mutation<T> mutation) {
CountDownLatch cdl = [1]
call(mutation) {
cdl.countDown ()
}
cdl.await()
}

What we do in the code above is introduce internal CountDownLatch and release it when update action is done.

Now, when we are done with design of Agent, let us try to use it.

To create agent we do the following

    Agent<FVector<Integer>> sharedVector = [FVector.emptyVector]
sharedVector.executor = pool

In code above we set executor property (java.util.concurrent.Executor) explicitly. In many situation it is not necessary as Groovy++ standard library has notation of current executor, which is available in threads managed by the library

To update/fill internal vector we can do something like that

    for(i in 0..<100) {
Mutation<FVector<Integer>> action = {
it.length < 100*100 ? it + it.length : it
}
sharedVector(action) {
if (sharedVector.get().length < 100*100)
sharedVector(action, this)
}
}

Note the trick we use to make our update "recursive". It is another interetsing use of completion callback whenDone. When update is completed we check if we want to continue and if so shedule the same action with the same completion callback.

Before reading next paragraph I suggest to the reader to think what is wrong with code above. 

Unfortunately the code above is not absolutely correct. It might happen that one thread decided and scheduled update for our agent because internal vector contains less elements than we want but several updates was scheduled already and in total we exceed desired number of elements. Validator is useful tool in this situation

    def validator = sharedVector.addValidator { agent, newValue ->
newValue.length <= 100*100
}
..............
sharedVector.removeValidator(validator)

We can also work with agent in quasi-synchronious way. Let say we want to start 10 threads to shuffle the internal vector 1000 times. We can do it like in the code below.

    CountDownLatch shuffleCdl = [10]
for(i in 0..<10) {
pool.execute {
def r = new Random ()
for(j in 0..<1000) {
def i1 = r.nextInt (100*100)
def i2 = r.nextInt (100*100)
sharedVector.await {
def v1 = it[i1]
def v2 = it[i2]
it.set(i2, v1).set(i1, v2)
}
}
shuffleCdl.countDown()
}
}
shuffleCdl.await ()

Agent is simple but convinient shared data structure, which does not require any special synchronization. I hope this article made it more clear for you. Thank you for reading and till next time.

Groovy (programming language)

Opinions expressed by DZone contributors are their own.

Trending

  • DZone's Article Submission Guidelines
  • How to LINQ Between Java and SQL With JPAStreamer
  • Avoiding Pitfalls With Java Optional: Common Mistakes and How To Fix Them [Video]
  • Extending Java APIs: Add Missing Features Without the Hassle

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com

Let's be friends: