Concurrency: Java Futures and Kotlin Coroutines
When it comes to concurrency, check out Java and Kotlin's chosen champions: Futures and Streaming for Java and the experimental coroutines for Kotlin.
Join the DZone community and get the full member experience.
Join For FreeA long time ago, one had to manually start new threads to run code concurrently in Java. Not only was this hard to write, it also was easy to introduce bugs that were hard to find. Testing, reading, and maintaining such code was no walk in the park, either. Since that time, and with a little incentive coming from multi-core machines, the Java API has evolved to make developing concurrent code easier. Meanwhile, alternative JVM languages also have their opinion about helping developers write such code. In this post, I’ll compare how it’s implemented in Java
and Kotlin.
To keep the article focused, I deliberately left out performance to write about code readability.
About the Use Case
The use case is not very original. We need to call different web services. The naïve solution would be to call them sequentially, one after the other, and collect the result of each of them. In that
case, the overall call time would be the sum of the call time of each service. An easy improvement is to call them in parallel and wait for the last one to finish. Thus, performance improves from
linear to constant — or for the more mathematically inclined, from o(n) to o(1).
To simulate the calling of a web service with a delay, let’s use the following code (in Kotlin, because this is so much less verbose):
class DummyService(private val name: String) {
private val random = SecureRandom()
val content: ContentDuration
get() {
val duration = random.nextInt(5000)
Thread.sleep(duration.toLong())
return ContentDuration(name, duration)
}
}
data class ContentDuration(val content: String, val duration:
Int)
The Java Future API
Java offers a whole class hierarchy to handle concurrent calls. It’s based on the following classes:
Callable: A Callable is a “task that returns a result.” From another viewpoint, it’s similar to a function that takes no parameter and returns this result.
Future: A Future is “the result of an asynchronous computation.” Also, “the result can only be retrieved using method get when the computation has completed, blocking if necessary until it is
ready.” In other words, it represents a wrapper around a value, where this value is the outcome of a calculation.
Executor Service: An ExecutorService
“provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks.” It is the entry point into concurrent handling code in Java. Implementations of this interface, as well as more specialized ones, can be obtained through static methods in the Executors class.
This is summarized in the diagram below:
Calling our services using the concurrent package is a two-step process.
Creating a Collection of Callables
First, there needs to be a collection of Callable to pass to the executor service. This is how it might go:
Form a stream of service names.
For each service name, create a new dummy service initialized with the string.
For every service, return the service’s getContent() method reference as a Callable. This works because the method signature matches Callable.call() and Callable is a functional interface.
This is the preparation phase. It translates into the following code:
List<Callable<ContentDuration>> callables = Stream.
of(“Service A”, “Service B”, “Service C”)
.map(DummyService::new)
.map(service -> (Callable<ContentDuration>)
service::getContent)
.collect(Collectors.toList());
Processing the Callables
Once the list has been prepared, it’s time for the ExecutorService to process it, AKA the “real work.”
Create a new executor service — any will do.
Pass the list of Callable to the executor service. and stream the resulting list of Future.
For every future, either return the result or handle the exception.
The following snippet is a possible implementation:
ExecutorService executor = Executors.newWorkStealingPool();
List < ContentDuration > results = executor.
invokeAll(callables).stream()
.map(future - > {
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
The Future API, But in Kotlin
Let’s face it: While Java makes it possible to write concurrent code, reading and maintaining it is not that easy, mainly due to:
Going back and forth between collections and streams.
Handling checked exceptions in lambdas.
Casting explicitly.
var callables: List < Callable < ContentDuration >> =
arrayOf(“Service A”, “Service B”, “Service C”)
.map {
DummyService(it)
}
.map {
Callable < ContentDuration > {
it.content
}
}
val executor = Executors.newWorkStealingPool()
val results = executor.invokeAll(callables).map {
it.get()
}
Kotlin Coroutines
With version 1.1 of Kotlin comes a new experimental feature called coroutines. From the Kotlin documentation:
“Basically, coroutines are computations that can be suspended without blocking a thread. Blocking threads is often expensive, especially under high load […]. Coroutine suspension is almost free, on the other hand. No context switch or any other involvement of the OS is required.”
The leading design principle behind coroutines is that they must feel like sequential code but run like concurrent code. They are based on the diagram here. Nothing beats the code itself, though. Let’s implement the same as above, but with coroutines in Kotlin instead of Java futures.
As a pre-step, let’s just extend the service to ease further processing by adding a new computed property wrapped around content of type Deferred:
val DummyService.asyncContent: Deferred<ContentDuration>
get() = async(CommonPool) { content }
This is standard Kotlin extension property code, but notice the CommonPool parameter. This is the magic that makes the code run concurrently. It’s a companion object (i.e. a singleton) that uses a multi-fallback algorithm to get an ExecutorService instance.
Now, onto the code flow proper:
Coroutines are handled inside a block. Declare a variable list
outside the block to be assigned inside it.Open the synchronization block.
Create the array of service names.
For each name, create a service and return it.
For each service, get its async content (declared above) and
return it.For each deferred, get the result and return it.
// Variable must be initialized or the compiler complains
// And the variable cannot be used afterwards
var results: List<ContentDuration>? = null
runBlocking {
results = arrayOf(“Service A”, “Service B”, “Service C”)
.map { DummyService(it) }
.map { it.asyncContent }
.map { it.await() }
}
Takeaways
The Future API is not so much a problem than the Java language itself is. As soon as the code is translated into Kotlin, the readability significantly improves. Yet having to create a collection to pass to the executor service breaks the nice functional pipeline. For coroutines, the only compromise is to move from a var to a val to get the final results (or to add the results to a mutable list).
Also, remember that coroutines are still experimental. Despite all of that, the code does look sequential — and is thus more readable and behaves in parallel. The complete source code for this post can be found on GitHub in Maven format.
Opinions expressed by DZone contributors are their own.
Comments