Asynchronous Programming in Vertx: Callbacks, Futures, and Coroutines
Here's everything you need to know about the three paradigms of asynchronous programming in Vertx with Kotlin.
Join the DZone community and get the full member experience.
Join For FreeI want to show three paradigms of asynchronous programming: callbacks, futures, and coroutines. I will provide a simple web application example using Kotlin and the Vertx framework.
Let's assume we are writing an application that receives a string in an HTTP request, searches by this string for a URL in the DB, fetches the URL content, and then sends it back to the client.
Vertx was created as an asynchronous framework for high-load applications, using Netty, new I/O, and event bus.
As it is common in Vertx, one Verticle
(something like Actor, if you know Akka) receives a request, sends the received string to an event bus and then to another verticle — BusinessVerticle
, which is responsible for the fetching itself.
object Main {
@JvmStatic
fun main(args: Array<String>) {
val vertx = Vertx.vertx()
vertx.deployVerticle(HttpVerticle())
vertx.deployVerticle(BusinessVerticle())
}
}
@JvmStatic
fun main(args: Array<String>) {
val vertx = Vertx.vertx()
vertx.deployVerticle(HttpVerticle())
vertx.deployVerticle(BusinessVerticle())
}
}
class HttpVerticle : AbstractVerticle() {
@Throws(Exception::class)
override fun start(startFuture: Future<Void>) {
val router = createRouter()
vertx.createHttpServer()
.requestHandler(router)
.listen(8080) { result ->
if (result.succeeded()) {
startFuture.complete()
} else {
startFuture.fail(result.cause())
}
}
}
private fun createRouter(): Router = Router.router(vertx).apply {
get("/").handler(handlerRoot)
}
private val handlerRoot = Handler<RoutingContext> { rc ->
vertx.eventBus().send("my.addr", rc.request().getParam("id") ?: "")
{ resp: AsyncResult<Message<String>> ->
if (resp.succeeded()) {
rc.response().end(resp.result().body())
} else {
rc.fail(500)
}
}
}
}
In the standard Vertx API, you perform all asynchronous flows via callbacks, so the initial implementation of BusinessVerticle
looks like this:
class BusinessVerticle : AbstractVerticle() {
private lateinit var dbclient: JDBCClient
private lateinit var webclient: WebClient
override fun start() {
vertx.eventBus().consumer<String>("my.addr") { message ->
handleMessage(message)
}
dbclient = JDBCClient.createShared(
vertx, JsonObject()
.put("url", "jdbc:postgresql://localhost:5432/payroll")
.put("driver_class", "org.postgresql.Driver")
.put("user", "vala")
.put("password", "vala")
.put("max_pool_size", 30)
)
val options = WebClientOptions()
.setUserAgent("My-App/1.2.3")
options.isKeepAlive = false
webclient = WebClient.create(vertx, options)
}
private fun handleMessage(message: Message<String>) {
dbclient.getConnection { res ->
if (res.succeeded()) {
val connection = res.result()
connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 ->
if (res2.succeeded()) {
try {
val url = res2.result().rows[0].getString("url").removePrefix("http://")
webclient
.get(url,"/")
.send { ar ->
if (ar.succeeded()) {
val response = ar.result()
message.reply(response.bodyAsString())
} else {
message.fail(500, ar.cause().message)
}
}
} catch (e: Exception) {
message.fail(500, e.message)
}
} else {
message.fail(500, res2.cause().message)
}
}
} else {
message.fail(500, res.cause().message)
}
}
}
}
And this looks bad. Callbacks and error handling are done in several places.
Let's improve the situation by the extraction of each callback to a separate method:
class BusinessVerticle: AbstractVerticle() {
private lateinit var dbclient: JDBCClient
private lateinit var webclient: WebClient
override fun start() {
vertx.eventBus().consumer<String>("my.addr") { message ->
handleMessage(message)
}
dbclient = JDBCClient.createShared(
vertx, JsonObject()
.put("url", "jdbc:postgresql://localhost:5432/payroll")
.put("driver_class", "org.postgresql.Driver")
.put("user", "vala")
.put("password", "vala")
.put("max_pool_size", 30)
)
val options = WebClientOptions()
.setUserAgent("My-App/1.2.3")
options.isKeepAlive = false
webclient = WebClient.create(vertx, options)
}
private fun handleMessage(message: Message<String>) {
dbclient.getConnection { res ->
handleConnectionCallback(res, message)
}
}
private fun handleConnectionCallback(
res: AsyncResult<SQLConnection>,
message: Message<String>
) {
if (res.succeeded()) {
val connection = res.result()
connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 ->
handleQueryCallBack(res2, message)
}
} else {
message.fail(500, res.cause().message)
}
}
private fun handleQueryCallBack(
res2: AsyncResult<ResultSet>,
message: Message<String>
) {
if (res2.succeeded()) {
try {
val url = res2.result().rows[0].getString("url").removePrefix("http://")
webclient
.get(url, "/")
.send { ar ->
handleHttpCallback(ar, message)
}
} catch (e: Exception) {
message.fail(500, e.message)
}
} else {
message.fail(500, res2.cause().message)
}
}
private fun handleHttpCallback(
ar: AsyncResult<HttpResponse<Buffer>>,
message: Message<String>
) {
if (ar.succeeded()) {
// Obtain response
val response = ar.result()
message.reply(response.bodyAsString())
} else {
message.fail(500, ar.cause().message)
}
}
}
Better, but still bad.
It's just not readable code. The message
object, which should pass to all methods, creates a separate error handling in each callback.
Let's rewrite it usingFuture
s. The great advantage of using Future is the ability to chain them using Future.compose()
.
First, let's translate standard Vertx methods, which receive callbacks to methods and returns Future
. We will use "extension methods" — a Kotlin feature that allows us to add a method to an existing class.
fun JDBCClient.getConnectionF(): Future<SQLConnection> {
val f = Future.future<SQLConnection>()
getConnection { res ->
if (res.succeeded()) {
val connection = res.result()
f.complete(connection)
} else {
f.fail(res.cause())
}
}
return f
}
fun SQLConnection.queryF(query:String): Future<ResultSet> {
val f = Future.future<ResultSet>()
query(query) { res ->
if (res.succeeded()) {
val resultSet = res.result()
f.complete(resultSet)
} else {
f.fail(res.cause())
}
}
return f
}
fun <T,M> HttpRequest<T>.sendF(): Future<HttpResponse<M>> {
val f = Future.future<HttpResponse<M>>()
send() { res ->
if (res.succeeded()) {
val response = res.result()
f.complete(response)
} else {
f.fail(res.cause())
}
}
return f
}
And then, the BusinessVerticle.handleMessage
method will be transformed to:
private fun handleMessage(message: Message<String>) {
val content = getContent(message)
content.setHandler{res->
if (res.succeeded()) {
// Obtain response
val response = res.result()
message.reply(response)
} else {
message.fail(500, res.cause().message)
}
}
}
private fun getContent(message: Message<String>): Future<String> {
val connection = dbclient.getConnectionF()
val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") }
val url = resultSet.map { it.rows[0].getString("url").removePrefix("http://") }
val httpResponse = url.compose { webclient.get(it, "/").sendF() }
val content = httpResponse.map { it.bodyAsString() }
return content
}
Looks good.
Simple, readable code. And we also have error handling in one place. If required, we can add different error handling for different exceptions and/or extract it to a separate method.
But what if we need, under some condition, to stop the chain of Future.compose()
?
For example, if there are no records in the DB, we want to return a response "No records" with HTTP code 200 instead of an error and code 500?
The only way to do it is to throw a special exception and make special handling for such an exception:
class NoContentException(message:String):Exception(message)
private fun getContent(message: Message<String>): Future<String> {
val connection = dbclient.getConnectionF()
val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") }
val url = resultSet.map {
if (it.numRows<1)
throw NoContentException("No records")
it.rows[0].getString("url").removePrefix("http://")
}
val httpResponse = url.compose { webclient.get(it, "/").sendF() }
val content = httpResponse.map { it.bodyAsString() }
return content
}
private fun handleMessage(message: Message<String>) {
val content = getContent(message)
content.setHandler{res->
if (res.succeeded()) {
// Obtain response
val response = res.result()
message.reply(response)
} else {
if (res.cause() is NoContentException)
message.reply(res.cause().message)
else
message.fail(500, res.cause().message)
}
}
}
It works well, but it does not look so good — we are using an exception for flow control, and if there are a lot of "special cases" in the flow, the code will be much less readable.
So, let's try to do the same thing with Kotlin coroutines. There are a lot of articles about Kotlin coroutines, so I'm not going to explain here what they are and how they work.
In the last versions of Vertx, you can include automatically generated coroutine-friendly versions of all callback methods.
You should add libraries: 'vertx-lang-kotlin-coroutines'
and 'vertx-lang-kotlin'
. You will get: JDBCClient.getConnectionAwait()
, SQLConnection.queryAwait()
, and others.
If we use those methods, our message handling method will become simpler:
private suspend fun handleMessage(message: Message<String>) {
try {
val content = getContent(message)
message.reply(content)
} catch(e:Exception){
message.fail(500, e.message)
}
}
private suspend fun getContent(message: Message<String>): String {
val connection = dbclient.getConnectionAwait()
val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'")
val url = resultSet.rows[0].getString("url").removePrefix("http://")
val httpResponse = webclient.get(url, "/").sendAwait()
val content = httpResponse.bodyAsString()
return content
}
Additionally, you should change the event bus message subscribe code:
vertx.eventBus().consumer<String>("my.addr") { message ->
GlobalScope.launch(vertx.dispatcher()) { handleMessage(message)}
}
What happens here?
All those await
methods call code in an asynchronous way. Then, they wait for a result, and while they wait, a thread is switching and running another coroutine.
If we will look to implementation those await
methods, we will see something very similar to our homemade implementation for Future
s:
suspend fun SQLClient.getConnectionAwait(): SQLConnection {
return awaitResult {
this.getConnection(it)
}
}
suspend fun <T> awaitResult(block: (h: Handler<AsyncResult<T>>) -> Unit): T {
val asyncResult = awaitEvent(block)
if (asyncResult.succeeded()) return asyncResult.result()
else throw asyncResult.cause()
}
suspend fun <T> awaitEvent(block: (h: Handler<T>) -> Unit): T {
return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
try {
block.invoke(Handler { t ->
cont.resume(t)
})
} catch (e: Exception) {
cont.resumeWithException(e)
}
}
}
But here, we get a normal code — String as a return type (and not Future
<
String
>
) and try
/catch
instead of the ugly callback with AsyncResult
.
If we need to stop flow in the middle, we can do it in a natural way, without exceptions:
private suspend fun getContent(message: Message<String>): String {
val connection = dbclient.getConnectionAwait()
val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'")
if (resultSet.numRows<1)
return "No records"
val url = resultSet.rows[0].getString("url").removePrefix("http://")
val httpResponse = webclient.get(url, "/").sendAwait()
val content = httpResponse.bodyAsString()
return content
}
IMHO, it is just beautiful!
Opinions expressed by DZone contributors are their own.
Comments