Callback Hell and Reactive Patterns
Learn more about a Reactive Streams-based approach and how to avoid Callback Hell.
Join the DZone community and get the full member experience.
Join For FreeOne of the ways that I have better understood the usefulness of a Reactive Streams-based approach is how it simplifies a non-blocking IO call.
This post will be a quick walkthrough of the kind of code involved in making a synchronous remote call. Then, we will demonstrate how layering in non-blocking IO, though highly efficient in the use of resources (especially threads), introduces complications referred to as a callback hell and how a Reactive Streams-based approach simplifies the programming model.
Target Service
Since I will be writing a client call, my target service representing the details of a City has two endpoints. One returning a list of city id's when called with a URI of type — /cityids
and a sample result looks like this:
[
1,
2,
3,
4,
5,
6,
7
]
And an endpoint returning the details of a city given its ID, for example, when called using an ID of 1 - "/cities/1":
{
"country": "USA",
"id": 1,
"name": "Portland",
"pop": 1600000
}
The client's responsibility is to get the list of city ID's, and then for each city, the ID gets the detail of the city and puts it together into a list of cities.
Synchronous Call
I am using Spring Framework's RestTemplate to make the remote call. A Kotlin function to get the list of cityIds looks like this:
private fun getCityIds(): List<String> {
val cityIdsEntity: ResponseEntity<List<String>> = restTemplate
.exchange("http://localhost:$localServerPort/cityids",
HttpMethod.GET,
null,
object : ParameterizedTypeReference<List<String>>() {})
return cityIdsEntity.body!!
}
And to get the details of a city:
private fun getCityForId(id: String): City {
return restTemplate.getForObject("http://localhost:$localServerPort/cities/$id", City::class.java)!!
}
Given these two functions, it is easy to compose them so that a list of cities is returned:
val cityIds: List<String> = getCityIds()
val cities: List<City> = cityIds
.stream()
.map<City> { cityId -> getCityForId(cityId) }
.collect(Collectors.toList())
cities.forEach { city -> LOGGER.info(city.toString()) }
The code is very easy to understand; however, there are eight blocking calls involved:
1. To get the list of seven city ids and then to get the details for each
2. To get the details of each of the 7 cities
Each of these calls would have been on a different thread.
Using Non-Blocking IO With callback
I will be using a library called AsyncHttpClient to make a non-blocking IO call.
AyncHttpClient
returns a ListenableFuture type when a remote call is made.
val responseListenableFuture: ListenableFuture<Response> = asyncHttpClient
.prepareGet("http://localhost:$localServerPort/cityids")
.execute()
A callback can be attached to a ListenableFuture
to act on the response when available.
responseListenableFuture.addListener(Runnable {
val response: Response = responseListenableFuture.get()
val responseBody: String = response.responseBody
val cityIds: List<Long> = objectMapper.readValue<List<Long>>(responseBody,
object : TypeReference<List<Long>>() {})
....
}
Given the list of cityIds, I want to get the details of the city, so from the response, I need to make more remote calls and attach a callback for each of the calls to get the details of the city along these lines:
val responseListenableFuture: ListenableFuture<Response> = asyncHttpClient
.prepareGet("http://localhost:$localServerPort/cityids")
.execute()
responseListenableFuture.addListener(Runnable {
val response: Response = responseListenableFuture.get()
val responseBody: String = response.responseBody
val cityIds: List<Long> = objectMapper.readValue<List<Long>>(responseBody,
object : TypeReference<List<Long>>() {})
cityIds.stream().map { cityId ->
val cityListenableFuture = asyncHttpClient
.prepareGet("http://localhost:$localServerPort/cities/$cityId")
.execute()
cityListenableFuture.addListener(Runnable {
val cityDescResp = cityListenableFuture.get()
val cityDesc = cityDescResp.responseBody
val city = objectMapper.readValue(cityDesc, City::class.java)
LOGGER.info("Got city: $city")
}, executor)
}.collect(Collectors.toList())
}, executor)
This is a gnarly piece of code; there is a set of callbacks within a callback, which is very difficult to reason with and make sense of — hence why it is referred to as the "Callback Hell."
Using Non-Blocking IO With Java CompletableFuture
This code can be improved a little by returning Java's CompletableFuture as the return type instead of the ListenableFuture. CompletableFuture
provides operators that allow the return type to be modified and returned.
As an example, consider the function to get the list of city ids:
private fun getCityIds(): CompletableFuture<List<Long>> {
return asyncHttpClient
.prepareGet("http://localhost:$localServerPort/cityids")
.execute()
.toCompletableFuture()
.thenApply { response ->
val s = response.responseBody
val l: List<Long> = objectMapper.readValue(s, object : TypeReference<List<Long>>() {})
l
}
}
Here, I am using the thenApply
operator to transform CompletableFuture<Response>
" to CompletableFuture<List<Long>>
.
And similarly, to get the detail, a city:
private fun getCityDetail(cityId: Long): CompletableFuture<City> {
return asyncHttpClient.prepareGet("http://localhost:$localServerPort/cities/$cityId")
.execute()
.toCompletableFuture()
.thenApply { response ->
val s = response.responseBody
LOGGER.info("Got {}", s)
val city = objectMapper.readValue(s, City::class.java)
city
}
}
This is an improvement from the Callback-based approach. However, CompletableFuture
lacks sufficient operators, say, in this specific instance, where all the city details need to be put together:
val cityIdsFuture: CompletableFuture<List<Long>> = getCityIds()
val citiesCompletableFuture: CompletableFuture<List<City>> =
cityIdsFuture
.thenCompose { l ->
val citiesCompletable: List<CompletableFuture<City>> =
l.stream()
.map { cityId ->
getCityDetail(cityId)
}.collect(toList())
val citiesCompletableFutureOfList: CompletableFuture<List<City>> =
CompletableFuture.allOf(*citiesCompletable.toTypedArray())
.thenApply { _: Void? ->
citiesCompletable
.stream()
.map { it.join() }
.collect(toList())
}
citiesCompletableFutureOfList
}
I have used an operator called CompletableFuture.allOf, which returns a "Void" type and has to be coerced to return the desired type of CompletableFuture<List<City>>
.
Using Project Reactor
Project Reactor is an implementation of the Reactive Streams specification. It has two specialized types to return a stream of 0/1 item and a stream of 0/n items — the former is a Mono, the latter a Flux.
Project Reactor provides a very rich set of operators that allow the stream of data to be transformed in a variety of ways. Consider first the function to return a list of City ids:
private fun getCityIds(): Flux<Long> {
return webClient.get()
.uri("/cityids")
.exchange()
.flatMapMany { response ->
LOGGER.info("Received cities..")
response.bodyToFlux<Long>()
}
}
I am using Spring's excellent WebClient library to make the remote call and get a Project Reactor Mono<ClientResponse>
type of response, which can be modified to a Flux<Long>
type using the flatMapMany
operator.
Along the same lines to get the detail of the city, given a city id:
private fun getCityDetail(cityId: Long?): Mono<City> {
return webClient.get()
.uri("/cities/{id}", cityId!!)
.exchange()
.flatMap { response ->
val city: Mono<City> = response.bodyToMono()
LOGGER.info("Received city..")
city
}
}
Here, a Project Reactor Mono<ClientResponse>
type is being transformed to Mono<City>
type using the flatMap
operator.
And the code to get the cityIds and then the City's from it:
val cityIdsFlux: Flux<Long> = getCityIds()
val citiesFlux: Flux<City> = cityIdsFlux
.flatMap { this.getCityDetail(it) }
return citiesFlux
This is very expressive — contrast the mess of a callback-based approach and the simplicity of the Reactive Streams-based approach.
Conclusion
In my mind, this is one of the biggest reasons to use a Reactive Streams-based approach, and in particular, Project Reactor for scenarios that involve crossing asynchronous boundaries like in this instance to make remote calls. It cleans up the mess of callbacks and callback hells and provides a natural approach of modifying/transforming types using a rich set of operators.
My repository with a working version of all the example I have used here is available on GitHub.
Published at DZone with permission of Biju Kunjummen, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments