Stretching Async/Await With Lambdas
Are you doing the right use of async/await without constraining scalability? Check out some use cases providing limited concurrency and the right fixes.
Join the DZone community and get the full member experience.
Join For FreeUsing a collection pipeline [1] such as map(…->…).reduce(…->…)
rather than a for(…){……}
statement block can be much more than a fashion decision and enhance overall throughput if it involves asynchronous operations. We will show a simple use case of requesting a list of urls and summing the response bodies’ lengths and how the use or absence of lambdas may result in a concurrent or sequential computing [5].
Notice that we are not claiming for parallelism [6], such as Stream.parallel()
feature, but instead concurrency that may be applicable to "factor the workload into independent, coarse-grained tasks — such as user requests — with the aim of increasing throughput by processing multiple requests" [7]).
We will analyze this use case in 4 different programming languages (Java, Javascript, C# and Kotlin) and how the subtle differences in their asynchronous idioms turn similar usages in completely different results.
Introduction
I totally agree with the general accepted idea that lambdas are not functional programming[3]. So I will not focus in the formalisms that state each programming paradigm but instead on the ability of writing code that comprises the best relationship between readability, efficiency and scalability.
Thus, for many developers writing numbers.filter(::isPrime).map(::toString).forEach(::doSomething)
as opposed to for(var nr in numbers){ if(isPrime((nr)) doSomething(nr)}
is just a stylistic decision. Some may agree the former has better readability and others claim about the efficiency of the latter approach. In truth, the use of a collection pipeline[1] (just like the former example) may incur in additional performance overheads. Yet, many others claim those differences are hardly noticeable when measuring real life performance, and we should rather focus on which approach can take advantage of functional composition providing more flexibility and extensibility.
Here, we bring another consideration: are those computations asynchronous? If they are asynchronous, then we should worry about the execution progress. Each alternative may result in a concurrent or sequential computing, which in turn impacts the overall throughput.
If order does not matter, and we are dealing with large sets with millions of elements, then computing it concurrently may result in differences of several orders of magnitude in total execution time. As clearly denoted by Bryan Goetz "concurrency was mostly about asynchrony--allowing an activity to relinquish the CPU while waiting for an I/O to complete." [7].
To evaluate the impact of using different programming idioms to deal with concurrency, we have taken the use case presented in the question "Java Equivalent of C# async/await?" [10], which obtains the length of the body response to an HTTP request.
Now, consider for example that we want to sum the bodies’ lengths resulting from the HTTP get of a sequence of URLs. Using the collection pipeline approach we may achieve the desired behavior with the following Javascript sample (in a simplified version):
xxxxxxxxxx
const sum = urls.map(url => fetch(url)).reduce((prv, cur) => prv.then(p => cur.then(c => p.length + c.length)))
Here, the fetch
operation returns a Promise
[4] (the Javascript equivalent to Java CompletableFuture
) with a method then
(providing both behaviors of thenApply
and thenCompose
of CompletableFuture
). We may still simplify the previous pipeline taking advantage of async
/await
idiom to unwrap the result of each Promise
, rather than using chained then
calls:
xxxxxxxxxx
const sum = urls.map(url => fetch(url)).reduce(async (prv, cur) => await prv.length + await cur.length)
Both versions of the pipeline executes all fetch
operations concurrently and after all requests have been dispatched it will accumulate the body lengths trough a reduce
operation.
Yet, now if we translate this pipeline to the equivalent for
loop we may get an unexpected behavior. The execution of the following program performs the fetch
operations sequentially rather than concurrently.
xxxxxxxxxx
let sum = 0;
for (const url of urls) {
const body = await fetch(url);
sum += body.length
}
If we are processing 1,000 URLs sequentially, and each request has a latency of 20 ms on average, then we will take around 20 seconds to sum all bodies lengths. Whereas requesting those URLs concurrently, as we did with the collection pipeline version may take around 20 ms.
The reason for observing different behaviors between the use of a collection pipeline idiom and a for
loop is because those programs are not really equivalent. In fact, we are using the collection pipeline API from JavaScript arrays that is processing elements Eagerly. So, each intermediate operation (such as map
, filter
, etc) traverses all elements of the entire collection before applying the next operation. To achieve equivalent executions we should have written latter version as the following:
xxxxxxxxxx
const resps = []
for (const url of urls) {
resps.push(fetch(url))
}
let sum = 0;
for (const r in resps) {
const body = await r
sum += body.length
}
Bellow I will explain how different programming idioms may impact the application progress and also with different programming languages including Java, Javascript, C# and Kotlin, each one founding a different abstraction model to deal with asynchronous programing. Java provides the CompletableFuture
API, Javascript and C# provide the async
/await
idiom, and Kotlin introduces suspend
functions.
Regardless, the asynchronous model idiosyncrasies of each programming language, these 4 languages build their asynchronous abstractions on top of the concept of Promise[4], which represents the result of an asynchronous computation.
This is not entirely true for Kotlin because a native Kotlin operation can be implemented as a native suspending function avoiding the use of promises. Yet, in most interoperability scenarios between Kotlin and Java we will see that the most inner asynchronous operation returns a CompletableFuture
. This also happens in our use case where we are using the HttpClient
(included in java 11) in both Java and Kotlin programs to fetch the urls.
Briefly, we will observe different effects depending on the use of each asynchronous abstraction combined with use or absence of lambdas to implement a function that sums the HTTP responses’ bodies lengths -- fetchAndSum(urls)
. The following table summarizes the experimental observations roadmap that we will present along this article:
fetchAndSum(urls: Array) |
CompletableFuture |
async /await |
suspend |
---|---|---|---|
collection pipeline and lambdas | Concurrent | Concurrent | Sequential |
single loop with async /await |
NA | Sequential | Sequential |
Notice this table only resumes our observations on the specific use case of fetchAndSum(urls)
and is almost only a table-of-contents for this article. You should NOT look at this table as establishing any corollary resulting from the use of different asynchronous models combined with lambdas.
CompletableFuture
Before implementing fetchAndSum(urls)
using asynchronous operations such those provided from java 11 HttpClient
we will first address a synchronous implementation. To that end we will use the former java.net.URL
to perform HTTP get requests.
Thus, in the following implementation of fetchAndSumBlocking(urls)
we can identify four operations: 1) fetch the url; 2) read the HTTP response body, 3) get the body's length and 4) accumulate the length on sum. Operations 1 and 2 perform I/O whereas operations 3 and 4 involves read, add and set computations.
xxxxxxxxxx
static int fetchAndSumBlocking(String...urls) throws MalformedURLException, IOException {
int sum = 0;
for(var url : urls) {
InputStream resp = new URL(url).openStream(); // 1 - Fetch the url
String body = new BufferedReader(new InputStreamReader(resp)) // 2 - Read the body
.lines()
.collect(joining("\n"));
int length = body.length(); // 3 – Get body’s length
sum += length; // 4 - Sum lengths
}
return sum;
}
In the former example, all operations happen synchronously. When we call a function such as openStream()
it returns only when that action has finished and it can return its result. This means that the iteration loop only advances to the next step upon completion of the 4th operation sum += length
. Thus, the action performed by every operation happen one after another, where each one consumes the result produced by the previous operation
On the other hand, when we call an asynchronous operation it returns before the enclosing action has completed. So, rather than returning the result of the operation, it returns a promise denoting the asynchronous action that may complete at some point and produce a result. When the action finishes, the promise is informed and gets access to the result (for example, the response from an HTTP get request). To interact with a promise we may use its then()
method, which registers callbacks to receive its eventual result. Considering p
a promise that produces a result r
, formerly we may use p.then(r -> ...)
to register a callback r -> ...
that receives the result r
from p
. Here we make two simplifications:
- Regardless different methods with prefix
then
(e.g.thenApply
,thenCompose
,thenAccept
, etc) we will simply denote them asthen()
. - We will not deal with errors and exceptional completion.
We can also use the method then()
to register a callback that receives the result from two promises. In this case, considering p1
and p2
two promises that produce two results r1
and r2
, we may use p1.then(p2, (r1, r2) -> ...)
to register a callback (r1, r2) -> ...
that receives the result of both promises p1
and p2
(this is the role of the method thenCombine
of CompletableFuture
). Note that the callback is just invoked when both promises are fulfilled.
So, using an asynchronous API to perform an HTTP get request may result in a promise of the response rather than the concrete response itself. To perform asynchronous HTTP requests in Java we may use the java 11 HttpClient
, which is also available for Kotlin. These methods return an instance of CompletableFuture
which implements the concept of promise [4].
For simplification, consider in the next examples that we have accessible the following static members:
xxxxxxxxxx
static HttpClient httpClient = HttpClient.newHttpClient();
static HttpRequest.Builder requestBuilder = HttpRequest.newBuilder();
static HttpRequest request(String url) {
return requestBuilder.uri(URI.create(url)).build();
}
Taking advantage of the HttpClient
and the CompletableFuture
API we may write the asynchronous implementation of the fetchAndSum(urls)
in the following way:
xxxxxxxxxx
CompletableFuture<Integer> fetchAndSum(String...urls) {
var sum = CompletableFuture.completedFuture(0);
for(var url : urls) {
var prev = sum;
sum = httpClient
.sendAsync(request(url), BodyHandlers.ofString()) // 1 - Fetch the url
.thenApply(HttpResponse::body) // 2 - Read the body
.thenApply(String::length) // 3 – Get body’s length
.thenCombine(sum, (length, val) -> { // 4 - Sum lengths
return val + length;
});
}
return sum;
}
Since all requests are asynchronous, the iteration loop proceeds to the next request before the completion of the previous one and thus executing concurrently. Each request performed through the sendAsync()
results in a new CompletableFuture
that proceeds with the continuation ...::body
when the response is complete. Finally, we use thenCombine
to combine (aka zip) the previous result of sum
with the new one of body.length()
.
The fetchAndSum
can be rewritten in a collection pipeline according to:
xxxxxxxxxx
CompletableFuture<Integer> fetchAndSum(String...urls) {
var sum = CompletableFuture.completedFuture(0);
return Stream
.of(urls)
.peek(url -> out.printf("FETCHING from %s\n", url))
.map(url -> httpClient
.sendAsync(request(url), BodyHandlers.ofString()) // 1 - Fetch the url
.thenApply(HttpResponse::body) // 2 - Read the body
.thenApply(String::length) // 3 – Get body’s length
.whenComplete((l, err) -> out.printf("=======> from %s\n", url)))
.reduce(sum, (prev, curr) -> prev
.thenCombine(curr, (p, c) -> p + c)); // 4 - Sum lengths
}
The sequential and concurrent result of each approach is clearly observable if we insert an out.printf("FETCHING from %s\n", url);
before the fetch statement and an out.printf("=======> from %s\n", url);
before adding length
to sum
.
Running both approaches with an array of ["https://stackoverflow.com/","https://github.com/",
"http://dzone.com/"]
we may get the following output:
fetchAndSumBlocking(urls): |
fetchAndSum(urls): |
|
|
You may find the source code for this example here: https://github.com/javasync/async-await-idioms/tree/master/java
async
/await
idiom
Today many programing languages such as Javascript, Python or C#, provide the async
/await
keywords that let you hide the explicit use of promises and allows you to write "pseudo-synchronous" code without lambdas. In general the async
/await
"allows an asynchronous, non-blocking function to be structured in a way similar to an ordinary synchronous function" [8]. Regarding Javascript and as explained by Marijn Haverbeke in its notable book Eloquent JavaScript: "An async
function is a function that implicitly returns a promise and that can, in its body, await
other promises in a way that looks synchronous." [9].
Thus we can easily translate the Java implementation of fetchAndSumBlocking(urls)
to an async
Javascript function fetchAndSum(urls)
that takes advantage of the await
without incurring in a blocking use. We may start by the use of the asynchronous fetch()
function to perform an HTTP get request and henceforward we may use the await
to get the fulfilled value of the resulting promise. This is equivalent to the actions performed by the lambdas passed to the then()
methods in the previous section, but here without lambdas use, as depicted in the following implementation of fetchAndSum(urls)
.
xxxxxxxxxx
async function fetchAndSum(urls) {
let sum = 0
for (const url of urls) {
const res = await fetch(url) // 1 - Fetch the url
const body = await res.text() // 2 - Read the body
const length = body.length // 3 – Get body’s length
sum += length // 4 - Sum lengths
}
return sum
}
Every time an operation returns a promise we use the await
to get its result. This implementation discards the explicit use of promises and lambdas, being structured in a way similar to a synchronous function but keeping the non-blocking nature of an asynchronous function. The asynchronous behavior can be easily observed running the following program:
xxxxxxxxxx
const urls = [ "https://stackoverflow.com/", "https://github.com/", "http://dzone.com/"]
fetchAndSum(urls).then(sum => console.log(`Sum = ${sum}`))
console.log("fetchAndSum(urls) dispatched!")
Which may produce the output:
xxxxxxxxxx
fetchAndSum(urls) dispatched!
Sum = 338742
We can see that the result of the last console.log("fetchAndSum(urls) dispatched!")
statement is produced before the completion of the fetchAndSum(urls)
, which stands the non-blocking behavior of this function, otherwise we would observe the messages in a reversed order. At first glance this is a nice result from the async
/await
use. Yet, there is a limitation: urls are NOT being fetched concurrently!
On the first await
statement the function will immediately proceed to return a Promise
to its caller. After that, the execution of the fetchAndSum
will resume only when the first promise is resolved and then it will continue with the next statement.
To achieve a concurrent execution we must postpone the first await
statement after the last fetch
operation to the given urls
array. In this way all fetch operations may proceed concurrently when the method fetchAndSum
return on the first await
. To implement this behavior we may first collect the resulting promises from all fetch operations in an intermediate array and later await
for the responses as presented in the next implementation of fetchAndSumConcur
.
xxxxxxxxxx
async function fetchAndSumConcur(urls) {
let sum = 0
let promises = []
for (const url of urls) {
promises.push(fetch(url)) // 1 - Fetch the url
}
for (let i = 0; i < urls.length; i++) {
const resp = await promises[i]
const body = await resp.text() // 2 - Read the body
const length = body.length // 3 – Get body’s length
sum += length // 4 - Sum lengths
}
return sum
}
We can easily observe the differences between these functions if we insert a console.log(
before the fetch statement and a FETCHING from ${url}
)console.log(
before adding =======> from ${urls[i]}
) length
to sum
. Running both functions with an array of ["https://stackoverflow.com/","https://github.com/",
"http://dzone.com/"]
may produce the following outputs:
fetchAndSum(urls): |
fetchAndSumConcur(urls): |
|
|
Looking at the block for (const url of urls) promises.push(fetch(url))
we realize this is equivalent to the use of the map()
function over the urls
array, such as urls.map(url => fetch(url))
turning it less verbose. This is the starting point for the use of a collection pipeline rather than two for
blocks. Thus we may also replace the second for
block with another map
which transforms each response in its corresponding length. Finally we may collect the sum
with a reduce()
operation achieving the following implementation of fetchAndSumλ
.
xxxxxxxxxx
async function fetchAndSumλ(urls) {
return urls
.map(url => fetch(url)) // 1 - Fetch the url
.map(async (promise, i) => {
const resp = await promise
const body = await resp.text() // 2 - Read the body
return body.length // 3 – Get body’s length
})
.reduce(async (l1, l2) => {
return await l1 + await l2 // 4 - Sum lengths
})
}
Note that the second lambda is an async
function whereas the former is not. The async
is required when we need to use the await
to get the eventual value of a promise, which does not happen in the former lambda. The first map()
only dispatches the HTTP get requests without specifying what to do with their responses. Later in the second map()
we proceed tackling each response with an await
and in turn it returns a new promise with the body's length of each response. Finally, since the previous map()
results in an array of promises, the next reduce()
should also use an async
function to accumulate the lengths.
We can further coalesce both map
operations in a single map
and achieve the following implementation of fetchAndSumλ
that is still concurrent and has a quite similar layout to the former fetchAndSum
. It is important to note that fetchAndSumλ
proceed the collection pipeline to completion before it returns. In this case the await
instructions are relative to the inner async
lambdas and not to the outer fetchAndSumλ
.
For each iteration of map()
we invoke an inner async
function that returns immediately on each await
. Returning from the most inner async
function let the outer function resume to the next iteration of the map()
and proceed to another fetch
and so on, resulting in concurrent requests.
xxxxxxxxxx
async function fetchAndSumλ(urls) {
return urls
.map(async (url, i) => {
const resp = await fetch(url) // 1 - Fetch the url
const body = await resp.text() // 2 - Read the body
return body.length // 3 – Get body’s length
})
.reduce(async (l1, l2) => {
return await l1 + await l2 // 4 - Sum lengths
})
}
fetchAndSum
and fetchAndSumλ
have the same number of lines and analogous sequence of operations. Both take advantage of the async
/await
idiom but the later fetchAndSumλ
still uses lambdas in a collection pipeline. Although, at first glance they look as two equivalent options to solve the same problem, now we know they achieve different kind of progress, which may have different throughput if we are processing a large sequence of urls.
We can observe the same behaviors if we replicate both implementations in C#, which also provides the async
/await
features. The following listings shows the corresponding implementations in C# that are very close to the Javascript ones. The main difference is on naming where map(…).reduce(…)
corresponds to Select(…).Aggregate(….)
. The return type Task
is the implementation of the promise concept in .Net and equivalent to Javascript Promise
.
x
static async Task<int> FetchAndSum(string[] urls) {
int sum = 0;
using(HttpClient httpClient = new HttpClient()) {
foreach(var url in urls) {
var body = await httpClient.GetStringAsync(url);
sum += body.Length;
}
}
return sum;
}
static async Task<int> FetchAndSumλ(string[] urls) {
using(HttpClient httpClient = new HttpClient()) {
return await urls
.Select(async url => {
var body = await httpClient.GetStringAsync(url);
return body.Length;
})
.Aggregate(async (l1, l2) => await l1 + await l2);
}
}
Both implementations show the same results seen before with the JavaScript implementations. By the same reasons, the former FetchAndSum(urls)
performs sequentially whereas FetchAndSumλ(urls)
performs concurrently. We can easily observe this behavior by running these functions with the same urls
array.
You may try both implementations from the source provided in:
- https://github.com/javasync/async-await-idioms/tree/master/js
- https://github.com/javasync/async-await-idioms/tree/master/dotnet
Suspend
Functions
Kotlin provides asynchronous functions by mean of coroutines. This results not only in a binary decision between the use of a for
block rather than a collection pipeline, but also about the boundaries of coroutines scopes. Moreover, as we will see below the inlining feature of Kotlin may also impact on the resulting behavior.
So, starting with the most close translation of fetchAndSum(urls)
and fetchAndSumλ(urls)
to Kotlin and the use the of its counterpart suspend
functions in place of the async
. Then, we will observe the first major difference from the results seen in Java, Javascript and C#. Now, both implementations perform sequentially. For those less familiar with kotlin the main particularities in the following listing is related with the use of lambdas which are denoted between {...}
and the .await()
, which is an extension method rather than a keyword:
xxxxxxxxxx
suspend fun fetchAndSum(urls: Array<String>): Int {
var sum = 0
for (url in urls) {
val resp = httpClient
.sendAsync(request(url), BodyHandlers.ofString())
.await()
sum = sum + resp.body().length
}
return sum
}
suspend fun fetchAndSumλ(urls: Array<String>): Int {
return urls
.map { url ->
val resp = httpClient
.sendAsync(request(url), HttpResponse.BodyHandlers.ofString())
.await()
resp.body().length
}
.reduce { l1, l2 -> l1 + l2 }
}
Both implementations of fetchAndSum(urls)
and fetchAndSumλ(urls)
result in a similar behavior because methods of collections, such as Array
, are inlined on compilation. Thus, bytecodes resulting from the translation of kotlin source code for both implementations is equivalent. This means that there is not any function resulting from the translation of the lambda {url -> …}
and the .await()
call it is relative to the outer function fetchAndSumλ
, exactly as happens in the former fetchAndSum(urls)
But if we change the urls
type to Sequence
instead of Array
we get a different effect. Since sequences represent lazily evaluated collections, then their utility methods cannot be inlined and the lambda {url -> …}
must be translated to an anonymous function. Yet, another problem arises from this modification. Since the map()
receives a regular function (not a suspend
) we cannot use the .await()
inside that lambda.
To that end we need to use a coroutine builder that will run the given lambda in a new coroutine and returns a promise of its execution. In this case, we will use the async()
builder that returns an instance of Deferred
, which is the kotlin implementation of the concept of promise. The same modifications appear in the lambda passed to reduce()
as depicted in the following listing:
xxxxxxxxxx
suspend fun CoroutineScope.fetchAndSumλ(urls: Sequence<String>): Int {
return urls
.map{url -> async {
val resp = httpClient
.sendAsync(request(url), HttpResponse.BodyHandlers.ofString())
.await()
resp.body().length
}}
.reduce{ l1, l2 -> async {
l1.await() + l2.await()
}}
.await();
}
Finally, the async()
builder is an extension method of CoroutineScope
that requires a specific target (also known as a receiver in kotlin) to be properly invoked, such as target.async{…}
. If we also declare our fetchAndSumλ
as an extension method of CoroutineScope
then we can suppress the explicit use of the receiver on the async()
call which will be inferred from the outer function.
Now running the later version of fetchAndSumλ(urls)
we already achieve a concurrent execution similar to that one observed in JavaScript and C#.
You my find the source code of these samples here: https://github.com/javasync/async-await-idioms/tree/master/kotlin
Conclusion
Former techniques for writing asynchronous programs can be intrincated, making them difficult to write, debug, and maintain. For example, managing lambdas in a callback idiom can be tedious and easily lead to the widely known "callback hell".
Fortunately today many programming languages offer alternative techniques that let you create an asynchronous method almost as easily as you create a synchronous one. The compiler does the difficult work that the developer used to do, and your application retains a logical structure that resembles synchronous code.
Yet, you cannot ignore what is happening behind the scenes, otherwise you can be caught off guard about the progress achieved by your application.
One of the purposes of asynchronous programming for non-blocking IO operations, such as network access. By using asynchronous techniques we can let an application to continue with other tasks until the potentially blocking task finishes. However to take advantage of this behavior we have to write other non-blocking tasks in such a way that they not depend on the completion of the asynchronous tasks. Otherwise we may enforce a sequential execution, which prevent the concurrent progress of different tasks.
In this article we showed in different programming languages, how a simple decision such as using a for
statement rather than a collection pipeline may impact the overall progress of an application. Deliberately we start solving our problem using a for
statement and avoiding the use of lambdas. With this approach we showed how we have fallen in a scenario where some decoupled operations (i.e. fetch url
) were chained in a sequential order constraining the overall progress of the function fetchAndSum
.
Concluding, avoiding callbacks is not the same as avoiding lambdas and using different asynchronous idioms is not only a decision between programming paradigms. Take the most of your programming language constructions and do not ignore what is happening under the hood.
The source code of these examples is provided here: https://github.com/javasync/async-await-idioms
Acknowledgments
Thanks to Pedro Félix (https://twitter.com/pmhsfelix) for the revision and precious feedback to enhance this article.
References
[1] Collection Pipeline, 25 June 2015, Martin Fowler
[2] Continuation-passing style, wikipedia
[3] Lambdas are not functional programming, 12 Nov 2018, John McClean
[4] Futures and promises, wikipedia
[5] Concurrent computing, wikipedia
[6] Parallel computing, wikipedia
[7] From concurrent to parallel, Brian Goetz, 18 July 2016,
[8] Async/await, wikipedia
[9] Eloquent JavaScript, 3 edition, Marijn Haverbeke, December 2018
[10] Java Equivalent of C# async/await?
Opinions expressed by DZone contributors are their own.
Comments