DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Web Application Security: The Ultimate Guide to Coding Best Practices
  • Cookies Revisited: A Networking Solution for Third-Party Cookies
  • Segmentation Violation and How Rust Helps Overcome It
  • Mastering Ownership and Borrowing in Rust

Trending

  • DZone's Article Submission Guidelines
  • How to Submit a Post to DZone
  • A Complete Guide to Modern AI Developer Tools
  • Top Book Picks for Site Reliability Engineers
  1. DZone
  2. Coding
  3. Languages
  4. Asynchronous Programming in Vertx: Callbacks, Futures, and Coroutines

Asynchronous Programming in Vertx: Callbacks, Futures, and Coroutines

Here's everything you need to know about the three paradigms of asynchronous programming in Vertx with Kotlin.

By 
Pavel Bernshtam user avatar
Pavel Bernshtam
·
May. 08, 19 · Presentation
Likes (8)
Comment
Save
Tweet
Share
25.4K Views

Join the DZone community and get the full member experience.

Join For Free

I want to show three paradigms of asynchronous programming: callbacks, futures, and coroutines. I will provide a simple web application example using Kotlin and the Vertx framework.

Let's assume we are writing an application that receives a string in an HTTP request, searches by this string for a URL in the DB, fetches the URL content, and then sends it back to the client.

Vertx was created as an asynchronous framework for high-load applications, using Netty, new I/O, and event bus.

As it is common in Vertx, one Verticle (something like Actor, if you know Akka) receives a request, sends the received string to an event bus and then to another verticle — BusinessVerticle, which is responsible for the fetching itself.

object Main {
    @JvmStatic
    fun main(args: Array<String>) {
        val vertx =  Vertx.vertx()
        vertx.deployVerticle(HttpVerticle())
        vertx.deployVerticle(BusinessVerticle())
    }
}


@JvmStatic
fun main(args: Array<String>) {
        val vertx =  Vertx.vertx()
        vertx.deployVerticle(HttpVerticle())
        vertx.deployVerticle(BusinessVerticle())
    }
}

class HttpVerticle : AbstractVerticle() {

    @Throws(Exception::class)
    override fun start(startFuture: Future<Void>) {
        val router = createRouter()

        vertx.createHttpServer()
            .requestHandler(router)
            .listen(8080) { result ->
                if (result.succeeded()) {
                    startFuture.complete()
                } else {
                    startFuture.fail(result.cause())
                }
            }
    }

    private fun createRouter(): Router = Router.router(vertx).apply {
        get("/").handler(handlerRoot)
    }


    private val handlerRoot = Handler<RoutingContext> { rc ->
        vertx.eventBus().send("my.addr", rc.request().getParam("id") ?: "") 
               { resp: AsyncResult<Message<String>> ->
                    if (resp.succeeded()) {
                        rc.response().end(resp.result().body())
                    } else {
                       rc.fail(500)
                   }
        }
    }

}


In the standard Vertx API, you perform all asynchronous flows via callbacks, so the initial implementation of BusinessVerticle looks like this:

class BusinessVerticle : AbstractVerticle() {


    private lateinit var dbclient: JDBCClient
    private lateinit var webclient: WebClient

    override fun start() {
        vertx.eventBus().consumer<String>("my.addr") { message ->
            handleMessage(message)
        }
        dbclient = JDBCClient.createShared(
            vertx, JsonObject()
                .put("url", "jdbc:postgresql://localhost:5432/payroll")
                .put("driver_class", "org.postgresql.Driver")
                .put("user", "vala")
                .put("password", "vala")
                .put("max_pool_size", 30)
        )

        val options = WebClientOptions()
            .setUserAgent("My-App/1.2.3")

        options.isKeepAlive = false
        webclient = WebClient.create(vertx, options)
    }

    private fun handleMessage(message: Message<String>) {
        dbclient.getConnection { res ->
            if (res.succeeded()) {

                val connection = res.result()

                connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 ->
                    if (res2.succeeded()) {
                        try {
                            val url = res2.result().rows[0].getString("url").removePrefix("http://")
                            webclient
                                .get(url,"/")
                                .send { ar ->
                                    if (ar.succeeded()) {
                                        val response = ar.result()
                                        message.reply(response.bodyAsString())
                                    } else {
                                        message.fail(500, ar.cause().message)
                                    }
                                }

                        } catch (e: Exception) {
                            message.fail(500, e.message)
                        }
                    } else {
                        message.fail(500, res2.cause().message)
                    }
                }
            } else {
                message.fail(500, res.cause().message)
            }
        }
    }

}


And this looks bad. Callbacks and error handling are done in several places.

Let's improve the situation by the extraction of each callback to a separate method:

class BusinessVerticle: AbstractVerticle() {


    private lateinit var dbclient: JDBCClient
    private lateinit var webclient: WebClient

   override fun start() {
        vertx.eventBus().consumer<String>("my.addr") { message ->
            handleMessage(message)
        }
        dbclient = JDBCClient.createShared(
            vertx, JsonObject()
                .put("url", "jdbc:postgresql://localhost:5432/payroll")
                .put("driver_class", "org.postgresql.Driver")
                .put("user", "vala")
                .put("password", "vala")
                .put("max_pool_size", 30)
        )

        val options = WebClientOptions()
            .setUserAgent("My-App/1.2.3")

        options.isKeepAlive = false
        webclient = WebClient.create(vertx, options)
    }

    private fun handleMessage(message: Message<String>) {
        dbclient.getConnection { res ->
            handleConnectionCallback(res, message)
        }
    }

    private fun handleConnectionCallback(
        res: AsyncResult<SQLConnection>,
        message: Message<String>
    ) {
        if (res.succeeded()) {

            val connection = res.result()

            connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 ->
                handleQueryCallBack(res2, message)
            }
        } else {
            message.fail(500, res.cause().message)
        }
    }

    private fun handleQueryCallBack(
        res2: AsyncResult<ResultSet>,
        message: Message<String>
    ) {
        if (res2.succeeded()) {
            try {
                val url = res2.result().rows[0].getString("url").removePrefix("http://")
                webclient
                    .get(url, "/")
                    .send { ar ->
                        handleHttpCallback(ar, message)
                    }

            } catch (e: Exception) {
                message.fail(500, e.message)
            }
        } else {
            message.fail(500, res2.cause().message)
        }
    }

    private fun handleHttpCallback(
        ar: AsyncResult<HttpResponse<Buffer>>,
        message: Message<String>
    ) {
        if (ar.succeeded()) {
            // Obtain response
            val response = ar.result()
            message.reply(response.bodyAsString())
        } else {
            message.fail(500, ar.cause().message)
        }
    }

}


Better, but still bad.

It's just not readable code. The message object, which should pass to all methods, creates a separate error handling in each callback.

Let's rewrite it usingFutures. The great advantage of using Future is the ability to chain them using  Future.compose().

First, let's translate standard Vertx methods, which receive callbacks to methods and returns Future. We will use "extension methods" — a Kotlin feature that allows us to add a method to an existing class.

fun JDBCClient.getConnectionF(): Future<SQLConnection> {
    val f = Future.future<SQLConnection>()
    getConnection { res ->
        if (res.succeeded()) {
            val connection = res.result()
            f.complete(connection)
        } else {
            f.fail(res.cause())
        }
    }
    return f
}

fun SQLConnection.queryF(query:String): Future<ResultSet> {
    val f = Future.future<ResultSet>()
    query(query) { res ->
        if (res.succeeded()) {
            val resultSet = res.result()
            f.complete(resultSet)
        } else {
            f.fail(res.cause())
        }
    }
    return f
}

fun <T,M> HttpRequest<T>.sendF(): Future<HttpResponse<M>> {
    val f = Future.future<HttpResponse<M>>()
    send() { res ->
        if (res.succeeded()) {
            val response = res.result()
            f.complete(response)
        } else {
            f.fail(res.cause())
        }
    }
    return f
}


And then, the BusinessVerticle.handleMessage method will be transformed to:

  private fun handleMessage(message: Message<String>) {
        val content = getContent(message)

        content.setHandler{res->
            if (res.succeeded()) {
                // Obtain response
                val response = res.result()
                message.reply(response)
            } else {
                message.fail(500, res.cause().message)
            }
        }

    }

    private fun getContent(message: Message<String>): Future<String> {
        val connection = dbclient.getConnectionF()
        val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") }
        val url = resultSet.map { it.rows[0].getString("url").removePrefix("http://") }
        val httpResponse = url.compose { webclient.get(it, "/").sendF() }
        val content = httpResponse.map { it.bodyAsString() }
        return content
    }


Looks good.
Simple, readable code. And we also have error handling in one place. If required, we can add different error handling for different exceptions and/or extract it to a separate method.

But what if we need, under some condition, to stop the chain of  Future.compose()?
For example, if there are no records in the DB, we want to return a response "No records" with HTTP code 200 instead of an error and code 500?

The only way to do it is to throw a special exception and make special handling for such an exception:

class NoContentException(message:String):Exception(message)

 private fun getContent(message: Message<String>): Future<String> {
        val connection = dbclient.getConnectionF()
        val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") }
        val url = resultSet.map {
            if (it.numRows<1)
                throw NoContentException("No records")
            it.rows[0].getString("url").removePrefix("http://")
        }
        val httpResponse = url.compose { webclient.get(it, "/").sendF() }
        val content = httpResponse.map { it.bodyAsString() }
        return content
    }

    private fun handleMessage(message: Message<String>) {
        val content = getContent(message)

        content.setHandler{res->
            if (res.succeeded()) {
                // Obtain response
                val response = res.result()
                message.reply(response)
            } else {
                if (res.cause() is NoContentException)
                    message.reply(res.cause().message)
                else
                    message.fail(500, res.cause().message)
            }
        }
    }


It works well, but it does not look so good — we are using an exception for flow control, and if there are a lot of "special cases" in the flow, the code will be much less readable.

So, let's try to do the same thing with Kotlin coroutines. There are a lot of articles about Kotlin coroutines, so I'm not going to explain here what they are and how they work.

In the last versions of Vertx, you can include automatically generated coroutine-friendly versions of all callback methods.

You should add libraries: 'vertx-lang-kotlin-coroutines' and 'vertx-lang-kotlin'. You will get:  JDBCClient.getConnectionAwait(),  SQLConnection.queryAwait(), and others.

If we use those methods, our message handling method will become simpler:

  private suspend fun handleMessage(message: Message<String>) {
        try {
            val content = getContent(message)
            message.reply(content)
        } catch(e:Exception){
            message.fail(500, e.message)
        }

    }

    private suspend fun getContent(message: Message<String>): String {
        val connection = dbclient.getConnectionAwait()
        val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'")
        val url =  resultSet.rows[0].getString("url").removePrefix("http://")
        val httpResponse = webclient.get(url, "/").sendAwait()
        val content = httpResponse.bodyAsString()
        return content
    }


Additionally, you should change the event bus message subscribe code:

vertx.eventBus().consumer<String>("my.addr") { message ->
           GlobalScope.launch(vertx.dispatcher()) {  handleMessage(message)}
        }


What happens here?

All those await methods call code in an asynchronous way. Then, they wait for a result, and while they wait, a thread is switching and running another coroutine.

If we will look to implementation those await methods, we will see something very similar to our homemade implementation for  Futures:

suspend fun SQLClient.getConnectionAwait(): SQLConnection {
  return awaitResult {
    this.getConnection(it)
  }
}

suspend fun <T> awaitResult(block: (h: Handler<AsyncResult<T>>) -> Unit): T {
  val asyncResult = awaitEvent(block)
  if (asyncResult.succeeded()) return asyncResult.result()
  else throw asyncResult.cause()
}

suspend fun <T> awaitEvent(block: (h: Handler<T>) -> Unit): T {
  return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
    try {
      block.invoke(Handler { t ->
        cont.resume(t)
      })
    } catch (e: Exception) {
      cont.resumeWithException(e)
    }
  }
}


But here, we get a normal code — String as a return type (and not Future<String> ) and try/catch instead of the ugly callback with AsyncResult.

If we need to stop flow in the middle, we can do it in a natural way, without exceptions:

  private suspend fun getContent(message: Message<String>): String {
        val connection = dbclient.getConnectionAwait()
        val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'")
        if (resultSet.numRows<1)
            return "No records"
        val url =  resultSet.rows[0].getString("url").removePrefix("http://")
        val httpResponse = webclient.get(url, "/").sendAwait()
        val content = httpResponse.bodyAsString()
        return content
    }


IMHO, it is just beautiful!

Web application Coding best practices Vert.x programming langauge

Opinions expressed by DZone contributors are their own.

Related

  • Web Application Security: The Ultimate Guide to Coding Best Practices
  • Cookies Revisited: A Networking Solution for Third-Party Cookies
  • Segmentation Violation and How Rust Helps Overcome It
  • Mastering Ownership and Borrowing in Rust

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!