Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Callback Hell and Reactive Patterns

DZone 's Guide to

Callback Hell and Reactive Patterns

Learn more about a Reactive Streams-based approach and how to avoid Callback Hell.

· Java Zone ·
Free Resource

One of the ways that I have better understood the usefulness of a Reactive Streams-based approach is how it simplifies a non-blocking IO call.

This post will be a quick walkthrough of the kind of code involved in making a synchronous remote call. Then, we will demonstrate how layering in non-blocking IO, though highly efficient in the use of resources (especially threads), introduces complications referred to as a callback hell and how a Reactive Streams-based approach simplifies the programming model.

Target Service

Since I will be writing a client call, my target service representing the details of a City has two endpoints. One returning a list of city id's when called with a URI of type — /cityids and a sample result looks like this:

[
    1,
    2,
    3,
    4,
    5,
    6,
    7
]


And an endpoint returning the details of a city given its ID, for example, when called using an ID of 1 - "/cities/1":

{
    "country": "USA",
    "id": 1,
    "name": "Portland",
    "pop": 1600000
}


The client's responsibility is to get the list of city ID's, and then for each city, the ID gets the detail of the city and puts it together into a list of cities.

Synchronous Call

I am using Spring Framework's RestTemplate to make the remote call. A Kotlin function to get the list of cityIds looks like this:

private fun getCityIds(): List<String> {
    val cityIdsEntity: ResponseEntity<List<String>> = restTemplate
            .exchange("http://localhost:$localServerPort/cityids",
                    HttpMethod.GET,
                    null,
                    object : ParameterizedTypeReference<List<String>>() {})
    return cityIdsEntity.body!!
}


And to get the details of a city:

private fun getCityForId(id: String): City {
    return restTemplate.getForObject("http://localhost:$localServerPort/cities/$id", City::class.java)!!
}


Given these two functions, it is easy to compose them so that a list of cities is returned:

val cityIds: List<String> = getCityIds()
val cities: List<City> = cityIds
        .stream()
        .map<City> { cityId -> getCityForId(cityId) }
        .collect(Collectors.toList())

cities.forEach { city -> LOGGER.info(city.toString()) }


The code is very easy to understand; however, there are eight blocking calls involved:

1. To get the list of seven city ids and then to get the details for each

2. To get the details of each of the 7 cities

Each of these calls would have been on a different thread.

Using Non-Blocking IO With callback

I will be using a library called AsyncHttpClient to make a non-blocking IO call.

AyncHttpClient returns a ListenableFuture type when a remote call is made.

val responseListenableFuture: ListenableFuture<Response> = asyncHttpClient
                .prepareGet("http://localhost:$localServerPort/cityids")
                .execute()


A callback can be attached to a ListenableFuture to act on the response when available.

responseListenableFuture.addListener(Runnable {
    val response: Response = responseListenableFuture.get()
    val responseBody: String = response.responseBody
    val cityIds: List<Long> = objectMapper.readValue<List<Long>>(responseBody,
            object : TypeReference<List<Long>>() {})
    ....
}        


Given the list of cityIds, I want to get the details of the city, so from the response, I need to make more remote calls and attach a callback for each of the calls to get the details of the city along these lines:

val responseListenableFuture: ListenableFuture<Response> = asyncHttpClient
        .prepareGet("http://localhost:$localServerPort/cityids")
        .execute()

responseListenableFuture.addListener(Runnable {
    val response: Response = responseListenableFuture.get()
    val responseBody: String = response.responseBody
    val cityIds: List<Long> = objectMapper.readValue<List<Long>>(responseBody,
            object : TypeReference<List<Long>>() {})

    cityIds.stream().map { cityId ->
        val cityListenableFuture = asyncHttpClient
                .prepareGet("http://localhost:$localServerPort/cities/$cityId")
                .execute()

        cityListenableFuture.addListener(Runnable {
            val cityDescResp = cityListenableFuture.get()
            val cityDesc = cityDescResp.responseBody
            val city = objectMapper.readValue(cityDesc, City::class.java)
            LOGGER.info("Got city: $city")
        }, executor)
    }.collect(Collectors.toList())
}, executor)


This is a gnarly piece of code; there is a set of callbacks within a callback, which is very difficult to reason with and make sense of — hence why it is referred to as the "Callback Hell."

Using Non-Blocking IO With Java CompletableFuture

This code can be improved a little by returning Java's CompletableFuture as the return type instead of the ListenableFutureCompletableFuture provides operators that allow the return type to be modified and returned.

As an example, consider the function to get the list of city ids:

private fun getCityIds(): CompletableFuture<List<Long>> {
    return asyncHttpClient
            .prepareGet("http://localhost:$localServerPort/cityids")
            .execute()
            .toCompletableFuture()
            .thenApply { response ->
                val s = response.responseBody
                val l: List<Long> = objectMapper.readValue(s, object : TypeReference<List<Long>>() {})
                l
            }
}


Here, I am using the thenApply operator to transform CompletableFuture<Response> " to CompletableFuture<List<Long>> .

And similarly, to get the detail, a city:

private fun getCityDetail(cityId: Long): CompletableFuture<City> {
    return asyncHttpClient.prepareGet("http://localhost:$localServerPort/cities/$cityId")
            .execute()
            .toCompletableFuture()
            .thenApply { response ->
                val s = response.responseBody
                LOGGER.info("Got {}", s)
                val city = objectMapper.readValue(s, City::class.java)
                city
            }
}


This is an improvement from the Callback-based approach. However, CompletableFuture lacks sufficient operators, say, in this specific instance, where all the city details need to be put together:

val cityIdsFuture: CompletableFuture<List<Long>> = getCityIds()
val citiesCompletableFuture: CompletableFuture<List<City>> =
        cityIdsFuture
                .thenCompose { l ->
                    val citiesCompletable: List<CompletableFuture<City>> =
                            l.stream()
                                    .map { cityId ->
                                        getCityDetail(cityId)
                                    }.collect(toList())

                    val citiesCompletableFutureOfList: CompletableFuture<List<City>> =
                            CompletableFuture.allOf(*citiesCompletable.toTypedArray())
                                    .thenApply { _: Void? ->
                                        citiesCompletable
                                                .stream()
                                                .map { it.join() }
                                                .collect(toList())
                                    }
                    citiesCompletableFutureOfList
                }


I have used an operator called CompletableFuture.allOf, which returns a "Void" type and has to be coerced to return the desired type of CompletableFuture<List<City>>.

Using Project Reactor

Project Reactor is an implementation of the Reactive Streams specification. It has two specialized types to return a stream of 0/1 item and a stream of 0/n items — the former is a Mono, the latter a Flux.

Project Reactor provides a very rich set of operators that allow the stream of data to be transformed in a variety of ways. Consider first the function to return a list of City ids:

private fun getCityIds(): Flux<Long> {
    return webClient.get()
            .uri("/cityids")
            .exchange()
            .flatMapMany { response ->
                LOGGER.info("Received cities..")
                response.bodyToFlux<Long>()
            }
}


I am using Spring's excellent WebClient library to make the remote call and get a Project Reactor  Mono<ClientResponse>  type of response, which can be modified to a  Flux<Long> type using the flatMapMany operator.

Along the same lines to get the detail of the city, given a city id:

private fun getCityDetail(cityId: Long?): Mono<City> {
    return webClient.get()
            .uri("/cities/{id}", cityId!!)
            .exchange()
            .flatMap { response ->
                val city: Mono<City> = response.bodyToMono()
                LOGGER.info("Received city..")
                city
            }
}


Here, a Project Reactor  Mono<ClientResponse>  type is being transformed to  Mono<City>  type using the flatMap operator.

And the code to get the cityIds and then the City's from it:

val cityIdsFlux: Flux<Long> = getCityIds()
val citiesFlux: Flux<City> = cityIdsFlux
        .flatMap { this.getCityDetail(it) }

return citiesFlux


This is very expressive — contrast the mess of a callback-based approach and the simplicity of the Reactive Streams-based approach.

Conclusion

In my mind, this is one of the biggest reasons to use a Reactive Streams-based approach, and in particular, Project Reactor for scenarios that involve crossing asynchronous boundaries like in this instance to make remote calls. It cleans up the mess of callbacks and callback hells and provides a natural approach of modifying/transforming types using a rich set of operators.

My repository with a working version of all the example I have used here is available on GitHub.

Topics:
java ,callback ,non-blocking ,io ,completablefuture ,reactive programming ,reactive ,reactive patterns ,callback hell

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}