Better Abstractions With core.async
Better Abstractions With core.async
Join the DZone community and get the full member experience.Join For Free
Java-based (JDBC) data connectivity to SaaS, NoSQL, and Big Data. Download Now.
core.async is a Clojure library implementing communication sequential processes, an approach that allows code to be structured as producers and consumers of messages passed through channels. CSPs are an approach to dealing with concurrent activity in a program, and exist as a strong alternative to the kind of callback oriented programming present in NodeJS (for example).
I’ve been a big fan of Clojure’s core.async library since I first heard about it, and have been eagerly using it in a number of ways. I see core.async as fulfilling two roles: enabler and simplifier.
Enablers allow something to be accomplished that otherwise could not be, at least without a large amount of work and uncertainty. Most of the technologies we use are enablers, from the very language, to the libraries and frameworks. For example, after using Clojure for a while, you will find that expressing algorithms using lazy evaluation is clear and simple and desirable. Clojure enables lazy evaluation, by having all that support built into the core language and library. You wouldn’t want to build all that yourself.
Likewise, core.async gives your application many desirable features; breaking your processes into the sequential building blocks, connected by channels, is a terrific way to build servers with great throughput and responsiveness. That’s the majority of what core.async is about, and it paints a very desirable picture of your server humming along, always keeping a small pool of threads hot and busy.
I’ve been working on a bit of code to allocate payment codes (for a project at Aviso). The codes are associated with an invoice that needs to be paid; the payment code acts as a temporary alias that can be texted to a user’s phone.
In order to allocate these easily in a cluster of servers, with minimal coordination, the range of possible numbers is broken up into 64K blocks of 4K codes each (that adds up to 28 bits, which covers most of the range of what can be expressed in six base 26 digits). The database is used to allow servers to allocate blocks, and then any individual server can allocate the stream of 4096 codes within a block without again touching the database.
Ok, so here’s the issue; my code gets to the point where it needs one of these payment codes, so there’s a function that gets called to provide the next payment code. My first pass looked something like this:
At its core is a an atom that is used to track the state; the id of the payment code block, and the most recently allocated index. I don’t show generate-payment-code but it has to do a few things, to allocate a new block when needed, then advance to the next payment code index, then generate the six character string … and return the new value and the new state (needed to generate the next payment code). And, of course, the code must incorporate a spin loop just in case multiple threads allocate codes at the exact same instant.
This is ugly in a number of ways; not least of which is that, as a function, aconsumer of payment codes needs to provide specific arguments (the database, the state-holding atom) needed by the generator of payment codes. That feels complected, and fragile.
So, as I was figuring out what the code inside generate-payment-code would look like I had that delightful spark of insight: If I could look at this problem differently, not as a function to invoke, but as a channel to take values from, I might end up with a better design, easier to implement.
A core.async channel is a perfect way to isolate the generation of payment codes from the consumption of them. But that leaves the question of how many payment codes should be generated. As it turns out, the answer is … all of them! Let’s take a look:
The loop is endless; it allocates a block from the database, and then generates all the keys for that block; the onto-chan function reads from a seq and puts each successive value into the channel. At first glance, it looks like it will immediately generate every possible payment code and stuff them all into the feed, filling up all available memory. Fortunately, that’s not the case.
Visualize what this code does: at startup it allocates the first block id and generates a lazy sequence of all the payment codes for that block. Immediately, the feed channel is filled with the first 10 codes from the lazy sequence. Since the feed channel is now full, the go loop inside onto-chan parks.
Eventually, some other thread takes a value from the feed; once that happens, the process created by onto-chan un-parks, takes one more value from the lazy sequence, and puts that into the feed channel.
Much later, enough payment codes have been taken out of the feed channel that the 4096th payment code from the block can be pushed into the feed channel. At that point, the create-payment-code-feed process, parked at line 16, un-parks … and the process begins again, with a new block immediately allocated.
Perhaps the most confusing part is what happens when the application shuts down … what happens to the two processes (the one directly inside create-payment-code-feed, and the nested one inside into-chan)?
This whole approach is very appealing … it lets you express stateful and asynchronous operations using the same approach that would normally apply to the stateless and synchronous operations elsewhere in your code. It teases apart logic that would normally be combined due to short-term convenience or necessity.
Just as when I was first learning object oriented programming, and then later first learning functional programming, I suspect I will continue to learn better and more elegant ways to make use of channels. It’s something to look forward to.
Published at DZone with permission of Howard Lewis Ship , DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.