DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workkloads.

Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Exploring Operator, OpenAI’s New AI Agent
  • Building a Sample Kubernetes Operator on Minikube: A Step-by-Step Guide
  • Angular RxJS Unleashed: Supercharge Your App With Reactive Operators
  • Spring WebFlux: publishOn vs subscribeOn for Improving Microservices Performance

Trending

  • Java Virtual Threads and Scaling
  • Evolution of Cloud Services for MCP/A2A Protocols in AI Agents
  • AI, ML, and Data Science: Shaping the Future of Automation
  • Measuring the Impact of AI on Software Engineering Productivity

Callback Hell and Reactive Patterns

Learn more about a Reactive Streams-based approach and how to avoid Callback Hell.

By 
Biju Kunjummen user avatar
Biju Kunjummen
·
Jun. 11, 19 · Presentation
Likes (21)
Comment
Save
Tweet
Share
24.5K Views

Join the DZone community and get the full member experience.

Join For Free

One of the ways that I have better understood the usefulness of a Reactive Streams-based approach is how it simplifies a non-blocking IO call.

This post will be a quick walkthrough of the kind of code involved in making a synchronous remote call. Then, we will demonstrate how layering in non-blocking IO, though highly efficient in the use of resources (especially threads), introduces complications referred to as a callback hell and how a Reactive Streams-based approach simplifies the programming model.

Target Service

Since I will be writing a client call, my target service representing the details of a City has two endpoints. One returning a list of city id's when called with a URI of type — /cityids and a sample result looks like this:

[
    1,
    2,
    3,
    4,
    5,
    6,
    7
]


And an endpoint returning the details of a city given its ID, for example, when called using an ID of 1 - "/cities/1":

{
    "country": "USA",
    "id": 1,
    "name": "Portland",
    "pop": 1600000
}


The client's responsibility is to get the list of city ID's, and then for each city, the ID gets the detail of the city and puts it together into a list of cities.

Synchronous Call

I am using Spring Framework's RestTemplate to make the remote call. A Kotlin function to get the list of cityIds looks like this:

private fun getCityIds(): List<String> {
    val cityIdsEntity: ResponseEntity<List<String>> = restTemplate
            .exchange("http://localhost:$localServerPort/cityids",
                    HttpMethod.GET,
                    null,
                    object : ParameterizedTypeReference<List<String>>() {})
    return cityIdsEntity.body!!
}


And to get the details of a city:

private fun getCityForId(id: String): City {
    return restTemplate.getForObject("http://localhost:$localServerPort/cities/$id", City::class.java)!!
}


Given these two functions, it is easy to compose them so that a list of cities is returned:

val cityIds: List<String> = getCityIds()
val cities: List<City> = cityIds
        .stream()
        .map<City> { cityId -> getCityForId(cityId) }
        .collect(Collectors.toList())

cities.forEach { city -> LOGGER.info(city.toString()) }


The code is very easy to understand; however, there are eight blocking calls involved:

1. To get the list of seven city ids and then to get the details for each

2. To get the details of each of the 7 cities

Each of these calls would have been on a different thread.

Using Non-Blocking IO With callback

I will be using a library called AsyncHttpClient to make a non-blocking IO call.

AyncHttpClient returns a ListenableFuture type when a remote call is made.

val responseListenableFuture: ListenableFuture<Response> = asyncHttpClient
                .prepareGet("http://localhost:$localServerPort/cityids")
                .execute()


A callback can be attached to a ListenableFuture to act on the response when available.

responseListenableFuture.addListener(Runnable {
    val response: Response = responseListenableFuture.get()
    val responseBody: String = response.responseBody
    val cityIds: List<Long> = objectMapper.readValue<List<Long>>(responseBody,
            object : TypeReference<List<Long>>() {})
    ....
}        


Given the list of cityIds, I want to get the details of the city, so from the response, I need to make more remote calls and attach a callback for each of the calls to get the details of the city along these lines:

val responseListenableFuture: ListenableFuture<Response> = asyncHttpClient
        .prepareGet("http://localhost:$localServerPort/cityids")
        .execute()

responseListenableFuture.addListener(Runnable {
    val response: Response = responseListenableFuture.get()
    val responseBody: String = response.responseBody
    val cityIds: List<Long> = objectMapper.readValue<List<Long>>(responseBody,
            object : TypeReference<List<Long>>() {})

    cityIds.stream().map { cityId ->
        val cityListenableFuture = asyncHttpClient
                .prepareGet("http://localhost:$localServerPort/cities/$cityId")
                .execute()

        cityListenableFuture.addListener(Runnable {
            val cityDescResp = cityListenableFuture.get()
            val cityDesc = cityDescResp.responseBody
            val city = objectMapper.readValue(cityDesc, City::class.java)
            LOGGER.info("Got city: $city")
        }, executor)
    }.collect(Collectors.toList())
}, executor)


This is a gnarly piece of code; there is a set of callbacks within a callback, which is very difficult to reason with and make sense of — hence why it is referred to as the "Callback Hell."

Using Non-Blocking IO With Java CompletableFuture

This code can be improved a little by returning Java's CompletableFuture as the return type instead of the ListenableFuture. CompletableFuture provides operators that allow the return type to be modified and returned.

As an example, consider the function to get the list of city ids:

private fun getCityIds(): CompletableFuture<List<Long>> {
    return asyncHttpClient
            .prepareGet("http://localhost:$localServerPort/cityids")
            .execute()
            .toCompletableFuture()
            .thenApply { response ->
                val s = response.responseBody
                val l: List<Long> = objectMapper.readValue(s, object : TypeReference<List<Long>>() {})
                l
            }
}


Here, I am using the thenApply operator to transform CompletableFuture<Response> " to CompletableFuture<List<Long>> .

And similarly, to get the detail, a city:

private fun getCityDetail(cityId: Long): CompletableFuture<City> {
    return asyncHttpClient.prepareGet("http://localhost:$localServerPort/cities/$cityId")
            .execute()
            .toCompletableFuture()
            .thenApply { response ->
                val s = response.responseBody
                LOGGER.info("Got {}", s)
                val city = objectMapper.readValue(s, City::class.java)
                city
            }
}


This is an improvement from the Callback-based approach. However, CompletableFuture lacks sufficient operators, say, in this specific instance, where all the city details need to be put together:

val cityIdsFuture: CompletableFuture<List<Long>> = getCityIds()
val citiesCompletableFuture: CompletableFuture<List<City>> =
        cityIdsFuture
                .thenCompose { l ->
                    val citiesCompletable: List<CompletableFuture<City>> =
                            l.stream()
                                    .map { cityId ->
                                        getCityDetail(cityId)
                                    }.collect(toList())

                    val citiesCompletableFutureOfList: CompletableFuture<List<City>> =
                            CompletableFuture.allOf(*citiesCompletable.toTypedArray())
                                    .thenApply { _: Void? ->
                                        citiesCompletable
                                                .stream()
                                                .map { it.join() }
                                                .collect(toList())
                                    }
                    citiesCompletableFutureOfList
                }


I have used an operator called CompletableFuture.allOf, which returns a "Void" type and has to be coerced to return the desired type of CompletableFuture<List<City>>.

Using Project Reactor

Project Reactor is an implementation of the Reactive Streams specification. It has two specialized types to return a stream of 0/1 item and a stream of 0/n items — the former is a Mono, the latter a Flux.

Project Reactor provides a very rich set of operators that allow the stream of data to be transformed in a variety of ways. Consider first the function to return a list of City ids:

private fun getCityIds(): Flux<Long> {
    return webClient.get()
            .uri("/cityids")
            .exchange()
            .flatMapMany { response ->
                LOGGER.info("Received cities..")
                response.bodyToFlux<Long>()
            }
}


I am using Spring's excellent WebClient library to make the remote call and get a Project Reactor  Mono<ClientResponse>  type of response, which can be modified to a  Flux<Long> type using the flatMapMany operator.

Along the same lines to get the detail of the city, given a city id:

private fun getCityDetail(cityId: Long?): Mono<City> {
    return webClient.get()
            .uri("/cities/{id}", cityId!!)
            .exchange()
            .flatMap { response ->
                val city: Mono<City> = response.bodyToMono()
                LOGGER.info("Received city..")
                city
            }
}


Here, a Project Reactor  Mono<ClientResponse>  type is being transformed to  Mono<City>  type using the flatMap operator.

And the code to get the cityIds and then the City's from it:

val cityIdsFlux: Flux<Long> = getCityIds()
val citiesFlux: Flux<City> = cityIdsFlux
        .flatMap { this.getCityDetail(it) }

return citiesFlux


This is very expressive — contrast the mess of a callback-based approach and the simplicity of the Reactive Streams-based approach.

Conclusion

In my mind, this is one of the biggest reasons to use a Reactive Streams-based approach, and in particular, Project Reactor for scenarios that involve crossing asynchronous boundaries like in this instance to make remote calls. It cleans up the mess of callbacks and callback hells and provides a natural approach of modifying/transforming types using a rich set of operators.

My repository with a working version of all the example I have used here is available on GitHub.

Operator (extension)

Published at DZone with permission of Biju Kunjummen, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Exploring Operator, OpenAI’s New AI Agent
  • Building a Sample Kubernetes Operator on Minikube: A Step-by-Step Guide
  • Angular RxJS Unleashed: Supercharge Your App With Reactive Operators
  • Spring WebFlux: publishOn vs subscribeOn for Improving Microservices Performance

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!