Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Asynchronous Programming in Vertx: Callbacks, Futures, and Coroutines

DZone 's Guide to

Asynchronous Programming in Vertx: Callbacks, Futures, and Coroutines

Here's everything you need to know about the three paradigms of asynchronous programming in Vertx with Kotlin.

· Java Zone ·
Free Resource

I want to show three paradigms of asynchronous programming: callbacks, futures, and coroutines. I will provide a simple web application example using Kotlin and the Vertx framework.

Let's assume we are writing an application that receives a string in an HTTP request, searches by this string for a URL in the DB, fetches the URL content, and then sends it back to the client.

Vertx was created as an asynchronous framework for high-load applications, using Netty, new I/O, and event bus.

As it is common in Vertx, one Verticle (something like Actor, if you know Akka) receives a request, sends the received string to an event bus and then to another verticle — BusinessVerticle, which is responsible for the fetching itself.

object Main {
    @JvmStatic
    fun main(args: Array<String>) {
        val vertx =  Vertx.vertx()
        vertx.deployVerticle(HttpVerticle())
        vertx.deployVerticle(BusinessVerticle())
    }
}


@JvmStatic
fun main(args: Array<String>) {
        val vertx =  Vertx.vertx()
        vertx.deployVerticle(HttpVerticle())
        vertx.deployVerticle(BusinessVerticle())
    }
}

class HttpVerticle : AbstractVerticle() {

    @Throws(Exception::class)
    override fun start(startFuture: Future<Void>) {
        val router = createRouter()

        vertx.createHttpServer()
            .requestHandler(router)
            .listen(8080) { result ->
                if (result.succeeded()) {
                    startFuture.complete()
                } else {
                    startFuture.fail(result.cause())
                }
            }
    }

    private fun createRouter(): Router = Router.router(vertx).apply {
        get("/").handler(handlerRoot)
    }


    private val handlerRoot = Handler<RoutingContext> { rc ->
        vertx.eventBus().send("my.addr", rc.request().getParam("id") ?: "") 
               { resp: AsyncResult<Message<String>> ->
                    if (resp.succeeded()) {
                        rc.response().end(resp.result().body())
                    } else {
                       rc.fail(500)
                   }
        }
    }

}


In the standard Vertx API, you perform all asynchronous flows via callbacks, so the initial implementation of BusinessVerticle looks like this:

class BusinessVerticle : AbstractVerticle() {


    private lateinit var dbclient: JDBCClient
    private lateinit var webclient: WebClient

    override fun start() {
        vertx.eventBus().consumer<String>("my.addr") { message ->
            handleMessage(message)
        }
        dbclient = JDBCClient.createShared(
            vertx, JsonObject()
                .put("url", "jdbc:postgresql://localhost:5432/payroll")
                .put("driver_class", "org.postgresql.Driver")
                .put("user", "vala")
                .put("password", "vala")
                .put("max_pool_size", 30)
        )

        val options = WebClientOptions()
            .setUserAgent("My-App/1.2.3")

        options.isKeepAlive = false
        webclient = WebClient.create(vertx, options)
    }

    private fun handleMessage(message: Message<String>) {
        dbclient.getConnection { res ->
            if (res.succeeded()) {

                val connection = res.result()

                connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 ->
                    if (res2.succeeded()) {
                        try {
                            val url = res2.result().rows[0].getString("url").removePrefix("http://")
                            webclient
                                .get(url,"/")
                                .send { ar ->
                                    if (ar.succeeded()) {
                                        val response = ar.result()
                                        message.reply(response.bodyAsString())
                                    } else {
                                        message.fail(500, ar.cause().message)
                                    }
                                }

                        } catch (e: Exception) {
                            message.fail(500, e.message)
                        }
                    } else {
                        message.fail(500, res2.cause().message)
                    }
                }
            } else {
                message.fail(500, res.cause().message)
            }
        }
    }

}


And this looks bad. Callbacks and error handling are done in several places.

Let's improve the situation by the extraction of each callback to a separate method:

class BusinessVerticle: AbstractVerticle() {


    private lateinit var dbclient: JDBCClient
    private lateinit var webclient: WebClient

   override fun start() {
        vertx.eventBus().consumer<String>("my.addr") { message ->
            handleMessage(message)
        }
        dbclient = JDBCClient.createShared(
            vertx, JsonObject()
                .put("url", "jdbc:postgresql://localhost:5432/payroll")
                .put("driver_class", "org.postgresql.Driver")
                .put("user", "vala")
                .put("password", "vala")
                .put("max_pool_size", 30)
        )

        val options = WebClientOptions()
            .setUserAgent("My-App/1.2.3")

        options.isKeepAlive = false
        webclient = WebClient.create(vertx, options)
    }

    private fun handleMessage(message: Message<String>) {
        dbclient.getConnection { res ->
            handleConnectionCallback(res, message)
        }
    }

    private fun handleConnectionCallback(
        res: AsyncResult<SQLConnection>,
        message: Message<String>
    ) {
        if (res.succeeded()) {

            val connection = res.result()

            connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 ->
                handleQueryCallBack(res2, message)
            }
        } else {
            message.fail(500, res.cause().message)
        }
    }

    private fun handleQueryCallBack(
        res2: AsyncResult<ResultSet>,
        message: Message<String>
    ) {
        if (res2.succeeded()) {
            try {
                val url = res2.result().rows[0].getString("url").removePrefix("http://")
                webclient
                    .get(url, "/")
                    .send { ar ->
                        handleHttpCallback(ar, message)
                    }

            } catch (e: Exception) {
                message.fail(500, e.message)
            }
        } else {
            message.fail(500, res2.cause().message)
        }
    }

    private fun handleHttpCallback(
        ar: AsyncResult<HttpResponse<Buffer>>,
        message: Message<String>
    ) {
        if (ar.succeeded()) {
            // Obtain response
            val response = ar.result()
            message.reply(response.bodyAsString())
        } else {
            message.fail(500, ar.cause().message)
        }
    }

}


Better, but still bad.

It's just not readable code. The message object, which should pass to all methods, creates a separate error handling in each callback.

Let's rewrite it usingFutures. The great advantage of using Future is the ability to chain them using  Future.compose().

First, let's translate standard Vertx methods, which receive callbacks to methods and returns Future. We will use "extension methods" — a Kotlin feature that allows us to add a method to an existing class.

fun JDBCClient.getConnectionF(): Future<SQLConnection> {
    val f = Future.future<SQLConnection>()
    getConnection { res ->
        if (res.succeeded()) {
            val connection = res.result()
            f.complete(connection)
        } else {
            f.fail(res.cause())
        }
    }
    return f
}

fun SQLConnection.queryF(query:String): Future<ResultSet> {
    val f = Future.future<ResultSet>()
    query(query) { res ->
        if (res.succeeded()) {
            val resultSet = res.result()
            f.complete(resultSet)
        } else {
            f.fail(res.cause())
        }
    }
    return f
}

fun <T,M> HttpRequest<T>.sendF(): Future<HttpResponse<M>> {
    val f = Future.future<HttpResponse<M>>()
    send() { res ->
        if (res.succeeded()) {
            val response = res.result()
            f.complete(response)
        } else {
            f.fail(res.cause())
        }
    }
    return f
}


And then, the BusinessVerticle.handleMessage method will be transformed to:

  private fun handleMessage(message: Message<String>) {
        val content = getContent(message)

        content.setHandler{res->
            if (res.succeeded()) {
                // Obtain response
                val response = res.result()
                message.reply(response)
            } else {
                message.fail(500, res.cause().message)
            }
        }

    }

    private fun getContent(message: Message<String>): Future<String> {
        val connection = dbclient.getConnectionF()
        val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") }
        val url = resultSet.map { it.rows[0].getString("url").removePrefix("http://") }
        val httpResponse = url.compose { webclient.get(it, "/").sendF() }
        val content = httpResponse.map { it.bodyAsString() }
        return content
    }


Looks good.
Simple, readable code. And we also have error handling in one place. If required, we can add different error handling for different exceptions and/or extract it to a separate method.

But what if we need, under some condition, to stop the chain of  Future.compose()?
For example, if there are no records in the DB, we want to return a response "No records" with HTTP code 200 instead of an error and code 500?

The only way to do it is to throw a special exception and make special handling for such an exception:

class NoContentException(message:String):Exception(message)

 private fun getContent(message: Message<String>): Future<String> {
        val connection = dbclient.getConnectionF()
        val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") }
        val url = resultSet.map {
            if (it.numRows<1)
                throw NoContentException("No records")
            it.rows[0].getString("url").removePrefix("http://")
        }
        val httpResponse = url.compose { webclient.get(it, "/").sendF() }
        val content = httpResponse.map { it.bodyAsString() }
        return content
    }

    private fun handleMessage(message: Message<String>) {
        val content = getContent(message)

        content.setHandler{res->
            if (res.succeeded()) {
                // Obtain response
                val response = res.result()
                message.reply(response)
            } else {
                if (res.cause() is NoContentException)
                    message.reply(res.cause().message)
                else
                    message.fail(500, res.cause().message)
            }
        }
    }


It works well, but it does not look so good — we are using an exception for flow control, and if there are a lot of "special cases" in the flow, the code will be much less readable.

So, let's try to do the same thing with Kotlin coroutines. There are a lot of articles about Kotlin coroutines, so I'm not going to explain here what they are and how they work.

In the last versions of Vertx, you can include automatically generated coroutine-friendly versions of all callback methods.

You should add libraries: 'vertx-lang-kotlin-coroutines' and 'vertx-lang-kotlin'. You will get:  JDBCClient.getConnectionAwait(),  SQLConnection.queryAwait(), and others.

If we use those methods, our message handling method will become simpler:

  private suspend fun handleMessage(message: Message<String>) {
        try {
            val content = getContent(message)
            message.reply(content)
        } catch(e:Exception){
            message.fail(500, e.message)
        }

    }

    private suspend fun getContent(message: Message<String>): String {
        val connection = dbclient.getConnectionAwait()
        val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'")
        val url =  resultSet.rows[0].getString("url").removePrefix("http://")
        val httpResponse = webclient.get(url, "/").sendAwait()
        val content = httpResponse.bodyAsString()
        return content
    }


Additionally, you should change the event bus message subscribe code:

vertx.eventBus().consumer<String>("my.addr") { message ->
           GlobalScope.launch(vertx.dispatcher()) {  handleMessage(message)}
        }


What happens here?

All those await methods call code in an asynchronous way. Then, they wait for a result, and while they wait, a thread is switching and running another coroutine.

If we will look to implementation those await methods, we will see something very similar to our homemade implementation for  Futures:

suspend fun SQLClient.getConnectionAwait(): SQLConnection {
  return awaitResult {
    this.getConnection(it)
  }
}

suspend fun <T> awaitResult(block: (h: Handler<AsyncResult<T>>) -> Unit): T {
  val asyncResult = awaitEvent(block)
  if (asyncResult.succeeded()) return asyncResult.result()
  else throw asyncResult.cause()
}

suspend fun <T> awaitEvent(block: (h: Handler<T>) -> Unit): T {
  return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
    try {
      block.invoke(Handler { t ->
        cont.resume(t)
      })
    } catch (e: Exception) {
      cont.resumeWithException(e)
    }
  }
}


But here, we get a normal code — String as a return type (and not Future<String> ) and try/catch instead of the ugly callback with AsyncResult.

If we need to stop flow in the middle, we can do it in a natural way, without exceptions:

  private suspend fun getContent(message: Message<String>): String {
        val connection = dbclient.getConnectionAwait()
        val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'")
        if (resultSet.numRows<1)
            return "No records"
        val url =  resultSet.rows[0].getString("url").removePrefix("http://")
        val httpResponse = webclient.get(url, "/").sendAwait()
        val content = httpResponse.bodyAsString()
        return content
    }


IMHO, it is just beautiful!

Topics:
kotlin ,coroutines ,kotlin coroutines ,java ,vert.x ,async flow

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}