This is summarized in the following class diagram:

Calling our services using the concurrent package is a 2-steps process.

Creating a Collection of Callables

First, there needs to be a collection of Callable to pass to the executor service. This is how it might go:

  1. From a stream of service names
  2. For each service name, create a new dummy service initialized with the string
  3. For every service, return the service’s getContent()method reference as a Callable. This works because the method signature, matches Callable.call() and Callable is a functional interface.

This is the preparation phase. It translates into the following code:

List<Callable<ContentDuration>> callables = Stream.of("Service A", "Service B", "Service C")
      .map(DummyService::new)
      .map(service -> (Callable<ContentDuration>) service::getContent)
      .collect(Collectors.toList());

Processing the Callables

Once the list has been prepared, it’s time for the ExecutorService to process it aka the “real work”.

  1. Create a new executor service — any will do
  2. Pass the list of Callable to the executor service, and stream the resulting list of Future
  3. For every future,
  4. Either return the result
  5. Or handle the exception

The following snippet is a possible implementation:

ExecutorService executor = Executors.newWorkStealingPool();
List<ContentDuration> results = executor.invokeAll(callables).stream()
    .map(future -> {
         try { return future.get(); } 
         catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); }
     }).collect(Collectors.toList());

The Future API, but in Kotlin

Let’s face it, while Java makes it possible to write concurrent code, reading and maintaining it is not that easy, mainly due to:

  • Going back and forth between collections and streams
  • Handling checked exception in lambdas
  • Casting explicitly

Just porting the above code to Kotlin removes those limitations and makes it more straightforward:

var callables: List<Callable<ContentDuration>> = arrayOf("Service A", "Service B", "Service C")
    .map { DummyService(it) }
    .map { Callable<ContentDuration> { it.content } }

val executor = Executors.newWorkStealingPool()
val results = executor.invokeAll(callables).map { it.get() }

Kotlin Coroutines

With version 1.1 of Kotlin comes a new experimental feature called coroutines.

Basically, coroutines are computations that can be suspended without blocking a thread. Blocking threads is often expensive, especially under high load […]. Coroutine suspension is almost free, on the other hand. No context switch or any other involvement of the OS is required.

The leading design principle behind coroutines is that they must feel like sequential code but run like concurrent code. The are based on the following class diagram:

Nothing beats the code itself though. Let’s implement the same as above, but with coroutines in Kotlin instead of Java futures.

As a pre-step, let’s just extend the service to ease further processing by adding a new computed property wrapped around content, of type Deferred:

val DummyService.asyncContent: Deferred<ContentDuration>
    get() = async(CommonPool) { content }


This is standard Kotlin extension property code, but notice the CommonPool parameter. This is the magic that makes the code run concurrent. It’s a companion object (i.e. a singleton) that uses a multi-fallback algorithm to get an ExecutorService instance.

Now, onto the code flow proper:

  1. Coroutines are handled inside a block. Declare a variable list outside the block to be assigned inside it.
  2. Open the synchronization block.
  3. Create the array of service names.
  4. For each name, create a service and return it.
  5. For each service, get its async content (declared above) and return it.
  6. For each deferred, get the result and return it.
// Variable must be initialized or the compiler complains
// And the variable cannot be used afterwards
var results: List<ContentDuration>? = null
runBlocking {
    results = arrayOf("Service A", "Service B", "Service C")
        .map { DummyService(it) }
        .map { it.asyncContent }
        .map { it.await() }
}

Takeaways

The Future API is not so much a problem than the Java language itself is. As soon as the code is translated into Kotlin, readability improves a lot. Yet, having to create a collection to pass to the executor service breaks the nice functional pipeline.

For coroutines, the only compromise is to move from a var to a val to get the final results (or to add the results to a mutable list). Also, remember that coroutines are still experimental. Despite all of that, code does look sequential — and is thus more readable, and behaves parallel.

The complete source code for this post can be found on GitHub in Maven format.