Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Accessing Data - The Reactive Way (Part 4 of Introduction to Vert.x)

DZone's Guide to

Accessing Data - The Reactive Way (Part 4 of Introduction to Vert.x)

JDBC and Vert.x are a match made in Heaven. Come find out why (and how) that is in Part 4 of this Vert.X series.

· Microservices Zone ·
Free Resource

Learn the Benefits and Principles of Microservices Architecture for the Enterprise

This is the fourth post of my "Introduction to Eclipse Vert.x." series. In this article, we are going to see how we can use JDBC in an Eclipse Vert.x application using the asynchronous API provided by the vertx-jdbc-client. But before diving into JDBC and other SQL subtleties, we are going to talk about Vert.x Futures.

In "The Introduction to Vert.x" Series

Let's start by refreshing our memory about the previous articles:

  1. The first post described how to build a vert.x application with Maven and execute unit tests.
  2. The second post reviewed how this application became configurable.
  3. The third post introduced vertx-web, and a collection management application was developed. This application exposes a REST API used by an HTML/JavaScript frontend.

In this fourth post, we will fix the major flaw of our application: the in-memory back-end. The current application uses an in-memory Map to store the products (articles). This is very useful as we lose the content every time we restart the application. Let's use a database. In this post, we are going to use PostgreSQL, but you can use any database providing a JDBC driver. For instance, our tests are going to use HSQL. Interactions with the database are asynchronous and made using the vertx-jdbc-client. But before diving into these JDBC and SQL details, let's introduce the Vert.x Future class and explain how it's going to make asynchronous coordination much simpler.

The code of this post is available on the GitHub repo, in the post-4 directory.

Asynchronous API

One of the Eclipse Vert.x characteristics is its asynchronous and non-blocking nature. With an asynchronous API, you don't wait for a result, but you are notified when this result is ready, the operation has completed. Just to illustrate this, let's take a very simple example.


public void retrieve(Handler<String> resultHandler) {
    fileSystem.read(fileName, res -> {
        resultHandler.handle(res);
    });
}

Just to avoid misconceptions, asynchronous APIs are not about threads. As we can see in the retrieve example, there are no threads involved and most Vert.x applications are using a very small number of threads while being asynchronous and non-blocking. Also, it's important to notice that the method is non-blocking. The retrieve method may return before the resultHandler is called.


public void retrieve(
  Handler<AsyncResult<String>> resultHandler) {
    vertx.fileSystem().readFile("fileName", ar -> {
      if (ar.failed()) {
        resultHandler.handle(
          Future.succeededFuture(ar.result().toString()));
      } else {
        resultHandler.handle(
          Future.failedFuture(ar.cause()));
      }
    });
}

retrieve(ar -> {
  if (ar.failed()) {
    // Handle the failure, the exception is 
    // retrieved using ar.cause()
    Throwable cause = ar.cause();
    // ...
   } else {
    // Made it, the result is in ar.result()
    String content = ar.result();
    // ...
   }
});

So, to summarize, an asynchronous method is a method forwarding its result or failure as a notification, generally calling a callback expecting the result.

The Asynchronous Coordination Dilemma

Once you have a set of asynchronous methods, you generally want to orchestrate them:

  1. Sequentially, so calling once another one has completed.
  2. Concurrently, so calling several actions at the same time and being notified when all/one of them have completed.

For the first case, we would do something like:


retrieve(ar -> {
  if (ar.failed()) {
    // do something to recover
   } else {
    String r = ar.result();
    // call another async method
    anotherAsyncMethod(r, ar2 -> {
      if (ar2.failed()) {
        //...
      } else {
        // ...
      }
    })
   }
});

You can quickly spot the issue... things start getting messy. Nested callbacks reduce code readability, and this was with just two. Imagine dealing with more than that, as we will see later in this post.

For the second type of composition, you can also imagine the difficulty. In each result handler, you need to check whether or not the others have completed or failed and then react accordingly. This leads to convoluted code.

Future and CompositeFuture (Async Coordination Made Easy)

To reduce the code complexity, Vert.x proposes a class named Future. A Future is an object that encapsulates a result of an action that may, or may not, have occurred yet. Unlike regular Java Future, Vert.x Future is non-blocking and a Handler is called when the Future is completed or failed. The Future class implements AsyncResult as it represents a result computed asynchronously.

A note about Java Future: Regular Java Future is blocking. Calling get blocks the caller thread until the result is received (or a timeout is reached). Vert.x Futures also have a get method returning null if the result is not yet received. They also expect a handler to be attached to them, calling it when the result is received.

Creating a Future object is done using the Future.future() factory method:


Future<Integer> future = Future.future();
future.complete(1); // Completes the Future with a result
future.fail(exception); // Fails the Future

// To be notified when the future has been completed 
// or failed
future.setHandler(ar -> {
  // Handler called with the result or the failure, 
  // ar is an AsyncResult
});

Let's revisit our retrieve method. Instead of taking a callback as a parameter, we can return a Future object:


public Future<String> retrieve() {
    Future<String> future = Future.future();
    vertx.fileSystem().readFile("fileName", ar -> {
        if (ar.failed()) {
            future.failed(ar.cause());
        } else {
            future.complete(ar.result().toString());
        }
    });
    return future;
}

public Future<String> retrieve() {
  Future<String> future = Future.future();
  vertx.fileSystem().readFile("fileName", 
    ar -> future.handle(ar.map(Buffer::toString)));
  return future;
}

We are going to cover this API in a few minutes. but first, let's look at the caller side, things do not change much. The handler is attached on the returned Future.


retrieve().setHandler(ar -> {
  if (ar.failed()) {
    // Handle the failure, the exception is 
    // retrieved using ar.cause()
    Throwable cause = ar.cause();
    // ...
   } else {
    // Made it, the result is in ar.result()
    int r = ar.result();
    // ...
   }
});

Where things become much easier is when you need to compose asynchronous action. Sequential composition is handled using the compose method:


retrieve()
  .compose(this::anotherAsyncMethod)
  .setHandler(ar -> {
    // ar.result is the final result
    // if any stage fails, ar.cause is 
    // the thrown exception
  });

Future.compose takes as a parameter a function consuming the result of the previous Future and returning another Future. This way you can chain many asynchronous actions.

What about concurrent composition. Let's imagine you want to invoke 2 unrelated operations and be notified when both have completed:


Future<String> future1 = retrieve();
Future<Integer> future2 = anotherAsyncMethod();
CompositeFuture.all(future1, future2)
  .setHandler(ar -> {
    // called when either all future have completed
    // successfully (success), 
    // or one failed (failure)
});

Using Future and CompositeFuture make the code much more readable and maintainable. Vert.x also supports RX Java to manage asynchronous composition, this will be covered in another post.

JDBC Yes, but Asynchronous

So, now that we have seen some basics about asynchronous APIs and Futures, let's have a look to the vertx-jdbc-client. This Vert.x module lets us interact with a database through a JDBC driver. These interactions are asynchronous, so when you were doing:


String sql = "SELECT * FROM Products";
ResultSet rs = stmt.executeQuery(sql);

When you use the vertx-jdbc-client, it becomes:


connection.query("SELECT * FROM Products", result -> {
        // do something with the result
});

This model avoids waiting for the result. You are notified when the result has been retrieved from the database.

A Note on JDBC: JDBC is a blocking API by default. To interact with the database, Vert.x delegates to a worker thread. While it's asynchronous, it's not totally non-blocking. However, the Vert.x ecosystem also provides truly non-blocking clients for MySQL and PostgreSQL.

Let's now modify our application to use a database to store our products (articles).

Some Maven Dependencies

The first things we need to do it to declare two new Maven dependencies in our pom.xml file:


<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-jdbc-client</artifactId>
  <version>${vertx.version}</version>
</dependency>
<dependency>
  <groupId>org.postgresql</groupId>
  <artifactId>postgresql</artifactId>
  <version>9.4.1212</version>
</dependency>

The first dependency provides the vertx-jdbc-client, while the second one provides the PostgreSQL JDBC driver. If you want to use another database, change this dependency. You will also need to change the JDBC URL and JDBC driver class name in the code.

Initializing the JDBC Client

Now that we have added these dependencies, it's time to create our JDBC client. But it needs to be configured. Edit the src/main/conf/my-application-conf.json to match the following content:


{
  "HTTP_PORT": 8082,

  "url": "jdbc:postgresql://localhost:5432/my_read_list",
  "driver_class": "org.postgresql.Driver",
  "user": "user",
  "password": "password"
}

Now that the configuration is written, we need to create an instance of JDBC client. In the MyFirstVerticle class, declare a new field JDBCClient jdbc;, and update the end of the start method to become:


ConfigRetriever retriever = ConfigRetriever.create(vertx);
retriever.getConfig(
  config -> {
    if (config.failed()) {
      fut.fail(config.cause());
    } else {
      // Create the JDBC client
      jdbc = JDBCClient.createShared(vertx, config.result(), 
        "My-Reading-List");
      vertx
        .createHttpServer()
        .requestHandler(router::accept)
        .listen(
          // Retrieve the port from the configuration,
          // default to 8080.
          config.result().getInteger("HTTP_PORT", 8080),
          result -> {
            if (result.succeeded()) {
              fut.complete();
            } else {
              fut.fail(result.cause());
            }
        });
    }
  }
);

Ok, we have the client configured with our configuration, we need a connection to the database. This is achieved using the jdbc.getConnection method that provides its result (the connection) to a Handler<AsyncResult>. This handler is notified when the connection with the database is established or if something bad happens during the process. While we could use the method directly, let's extract the retrieval of a connection to a separate method and returns a Future:


private Future<SQLConnection> connect() {
  Future<SQLConnection> future = Future.future();      // 1
  jdbc.getConnection(ar ->                             // 2
      future.handle(ar.map(connection ->               // 3
        connection.setOptions(
          new SQLOptions().setAutoGeneratedKeys(true)) // 4
      )
    )
  );
  return future;                                       // 5
}

Let's have a deeper look to this method. First we create a Future object (1) that we return at the end of the method (5). This Future will be completed or failed depending wether or not we successfully retrieve a connection to the database. This is done in (2). The function we passed to getConnection receives an AsyncResult. Future have a method ( handle) to directly completes or fails based on an AsyncResult. To handle is equivalent to:


if (ar.failed()) {
  future.failed(ar.cause());
} else {
  future.complete(ar.result());
}

Just... shorter.

We Need Articles

Now that we have a JDBC client, and a way to retrieve a connection to the database it's time to insert articles. But because we use a relational database, we first need to create the table. Create the following method:


private Future<SQLConnection> createTableIfNeeded(SQLConnection connection) {
    Future<SQLConnection> future = Future.future();
    vertx.fileSystem().readFile("tables.sql", ar -> {
        if (ar.failed()) {
            future.fail(ar.cause());
        } else {
            connection.execute(ar.result().toString(),
                ar2 -> future.handle(ar2.map(connection))
            );
        }
    });
    return future;
}

So, we need the tables.sql file. Creates the src/main/resources/tables.sql file with the following content:


CREATE TABLE IF NOT EXISTS Articles (id SERIAL PRIMARY KEY,
    title VARCHAR(200) NOT NULL,
    url VARCHAR(200) NOT NULL)

Ok, so now we have a connection to the database and the table. Let's insert articles, but only if the database is empty. For this, create the createSomeDataIfNone and insert methods:


private Future<SQLConnection> createSomeDataIfNone(SQLConnection connection) {
  Future<SQLConnection> future = Future.future();
  connection.query("SELECT * FROM Articles", select -> {
    if (select.failed()) {
      future.fail(select.cause());
    } else {
      if (select.result().getResults().isEmpty()) {
        Article article1 = new Article("Fallacies of distributed computing",
            "https://en.wikipedia.org/wiki/Fallacies_of_distributed_computing");
        Article article2 = new Article("Reactive Manifesto",
            "https://www.reactivemanifesto.org/");
        Future









<Article> insertion1 = insert(connection, article1, false);
        Future









<Article> insertion2 = insert(connection, article2, false);
        CompositeFuture.all(insertion1, insertion2)
            .setHandler(r -> future.handle(r.map(connection)));
        } else {
          // Boring... nothing to do.
          future.complete(connection);
        }
    }
  });
  return future;
}

private Future









<Article> insert(SQLConnection connection, Article article,
  boolean closeConnection) {
  Future









<Article> future = Future.future();
  String sql = "INSERT INTO Articles (title, url) VALUES (?, ?)";
  connection.updateWithParams(sql,
    new JsonArray().add(article.getTitle()).add(article.getUrl()),
    ar -> {
      if (closeConnection) {
          connection.close();
      }
      future.handle(
          ar.map(res -> new Article(res.getKeys().getLong(0), 
              article.getTitle(), article.getUrl()))
      );
    }
  );
  return future;
}

Putting These Pieces Together

It's time to assemble these pieces and see how it works. The start method needs to be updated to execute the following action:

  1. Retrieve the configuration (already done).
  2. When the configuration is retrieved, create the JDBC client (already done).
  3. Retrieve a connection to the database.
  4. With this connection, create the table if they do not exist.
  5. With the same connection, check whether the database contains articles, if not, insert some data.
  6. Close the connection.
  7. Start the HTTP server as we are ready to serve.
  8. Report the success or failure of the boot process to fut.

Wow...that's a lot of actions. Fortunately, we have implemented almost all the required method in a way we can use Future composition. In the start method, replace the end of the code with:


// Start sequence:
// 1 - Retrieve the configuration
//  |- 2 - Create the JDBC client
//  |- 3 - Connect to the database (retrieve a connection)
//          |- 4 - Create table if needed
//               |- 5 - Add some data if needed
//                      |- 6 - Close connection when done
//          |- 7 - Start HTTP server
//  |- 8 - we are done!
ConfigRetriever.getConfigAsFuture(retriever)
  .compose(config -> {
    jdbc = JDBCClient.createShared(vertx, config, 
      "My-Reading-List");
    return connect()
      .compose(connection -> {
        Future<Void> future = Future.future();
        createTableIfNeeded(connection)
        .compose(this::createSomeDataIfNone)
        .setHandler(x -> {
            connection.close();
            future.handle(x.mapEmpty());
         });
         return future;
      })
      .compose(v -> createHttpServer(config, router));
    }).setHandler(fut);

Don't worry about the createHttpServer method. We will cover it shortly. The code starts by retrieving the configuration and creates the JDBCClient. Then, we retrieve a database connection and initialize our database. Notice that the connection is close in all cases (even failures). When the database is set up, we start the HTTP server. Finally, when everything is done, we report the result (success or failure) to the fut telling to Vert.x whether or not we are ready to work.

Note about closing connections: Don't forget to close the SQL connection when you are done. The connection will be given back to the connection pool and be recycled.

The createHTTPServer method is quite simple and follows the same pattern:


private Future<Void> createHttpServer(JsonObject config, Router router) {
    Future<Void> future = Future.future();
    vertx
        .createHttpServer()
        .requestHandler(router::accept)
        .listen(
            config.getInteger("HTTP_PORT", 8080),
            res -> future.handle(res.mapEmpty())
        );
    return future;
}

Notice the mapEmpty. The method returns a Future, as we don't care of the HTTP Server. To create an AsyncResult from an AsyncResult use the mapEmpty method, discarding the encapsulated result.

Implementing the REST API On Top of JDBC

So, at this point, we have everything setup, but our API is still relying on our in-memory back-end. It's time to re-implement our REST API on top of JDBC. But first, we need some utility methods focusing on the interaction with the database. These methods have been extracted to ease the understanding.

First, let's add the query method:


private Future<List









<Article>> query(SQLConnection connection) {
    Future<List









<Article>> future = Future.future();
    connection.query("SELECT * FROM articles", result -> {
            connection.close();
            future.handle(
                result.map(rs -> 
                  rs.getRows().stream()
                    .map(Article::new)
                    .collect(Collectors.toList()))
            );
        }
    );
    return future;
}

This method uses again the same pattern: it creates a Future object and returns it. The future is completed or failed when the underlying action completes or fails. Here the action is a database query. The method executes the query and upon success, for each row creates a new Article. Also, notice that we close the connection regardless the success or failure of the query. It's important to release the connection, so it can be recycled.

In the same vein, let's implement queryOne:


private Future









<Article> queryOne(SQLConnection connection, 
  String id) {
  Future









<Article> future = Future.future();
  String sql = "SELECT * FROM articles WHERE id = ?";
  connection.queryWithParams(sql, 
    new JsonArray().add(Integer.valueOf(id)),
    result -> {
        connection.close();
        future.handle(
          result.map(rs -> {
            List<JsonObject> rows = rs.getRows();
            if (rows.size() == 0) {
              throw new NoSuchElementException(
                "No article with id " + id);
            } else {
              JsonObject row = rows.get(0);
              return new Article(row);
            }
          })
      );
  });
  return future;
}

We have done queries, we need methods to update and delete. Here they are:


private Future<Void> update(SQLConnection connection, 
    String id, Article article) {
  Future<Void> future = Future.future();
  String sql = "UPDATE articles SET title = ?, url = ? WHERE id = ?";
  connection.updateWithParams(sql, 
    new JsonArray().add(article.getTitle())
      .add(article.getUrl())
      .add(Integer.valueOf(id)
    ),
    ar -> {
      connection.close();
      if (ar.failed()) {
        future.fail(ar.cause());
      } else {
        UpdateResult ur = ar.result();
        if (ur.getUpdated() == 0) {
           future.fail(new NoSuchElementException(
           "No article with id " + id));
        } else {
           future.complete();
        }
     }
  });
  return future;
}

private Future<Void> delete(SQLConnection connection, 
  String id) {
  Future<a> future = Future.future();
  String sql = "DELETE FROM Articles WHERE id = ?";
    connection.updateWithParams(sql,
      new JsonArray().add(Integer.valueOf(id)),
      ar -> {
        connection.close();
        if (ar.failed()) {
            future.fail(ar.cause());
        } else {
            if (ar.result().getUpdated() == 0) {
              future.fail(
               new NoSuchElementException(
                 "No article with id " + id));
            } else {
              future.complete();
            }
        }
  });
  return future;
}

They are very similar and follow the same pattern (again!).

That's great but it does not implement our REST API. So, let's focus on this now. Just to refresh our mind, here are the methods we need to update:

  • getAll returns all the articles.
  • addOne inserts a new article. Article details are given in the request body.
  • deleteOne deletes a specific article. The id is given as a path parameter.
  • getOne provides the JSON representation of a specific article. The id is given as a path parameter.
  • updateOne updates a specific article. The id is given as a path parameter. The new details are in the request body.

Because we have extracted the database interactions in their own method, implementing this method is straightforward. For instance, the getAll method is:


private void getAll(RoutingContext rc) {
    connect()
        .compose(this::query)
        .setHandler(ok(rc));
}

Following the same pattern, the other methods are implemented as follows:


private void addOne(RoutingContext rc) {
  Article article = rc.getBodyAsJson().mapTo(Article.class);
  connect()
    .compose(connection -> insert(connection, article, true))
    .setHandler(created(rc));
}


private void deleteOne(RoutingContext rc) {
  String id = rc.pathParam("id");
  connect()
    .compose(connection -> delete(connection, id))
    .setHandler(noContent(rc));
}


private void getOne(RoutingContext rc) {
  String id = rc.pathParam("id");
  connect()
    .compose(connection -> queryOne(connection, id))
    .setHandler(ok(rc));
}

private void updateOne(RoutingContext rc) {
  String id = rc.request().getParam("id");
  Article article = rc.getBodyAsJson().mapTo(Article.class);
  connect()
    .compose(connection ->  update(connection, id, article))
    .setHandler(noContent(rc));
}

Test, Test, and Test Again

If we run the application tests right now, it fails. First, we need to update the configuration to pass the JDBC URL and related details. But wait...we also need a database. We don't necessarily want to use PostgreSQL in our unit test. Let's use HSQL, an in-memory database. To do that we first need to add the following dependency in the pom.xml:


<dependency>
    <groupId>org.hsqldb</groupId>
    <artifactId>hsqldb</artifactId>
    <version>2.4.0</version>
    <scope>test</scope>
</dependency>

But wait, if you already use JDBC or database in general, you know that each database uses a different dialect (that's the power of standards). Here, we can't use the same table creation statement because HSQL does not understand the PostgreSQL dialect. So create the src/test/resources/tables.sql with the following content:


CREATE TABLE IF NOT EXISTS Articles (id INTEGER IDENTITY,
    title VARCHAR(200),
    url VARCHAR(200))

It's the equivalent statement in the HSQL dialect. How would that work? When Vert.x reads a file it also checks the classpath (and src/test/resources is included in the test classpath). When running test, this file supersedes the initial file we created.


DeploymentOptions options = new DeploymentOptions()
    .setConfig(new JsonObject()
        .put("HTTP_PORT", port)
        .put("url", "jdbc:hsqldb:mem:test?shutdown=true")
        .put("driver_class", "org.hsqldb.jdbcDriver")
);

In addition to the HTTP_PORT, we also put the JDBC url and the class of the JDBC driver.

Now, you should be able to run the test with: mvn clean test.

Showtime

This time we want to use a PostgreSQL instance. I'm going to use Docker but use your favorite approach. With Docker, I start my instance as follows:


docker run --name some-postgres -e POSTGRES_USER=user \
    -e POSTGRES_PASSWORD=password \
    -e POSTGRES_DB=my_read_list \
    -p 5432:5432 -d postgres

Let's now run our application:


mvn compile vertx:run

Open your browser to http://localhost:8082/assets/index.html, and you
should see the application using the database. This time the products are stored in a database persisted on the file system. So, if we stop and restart the application, the data is restored.

If you want to package the application, run mvn clean package. Then run the application using:


java -jar target/my-first-app-1.0-SNAPSHOT.jar \
    -conf src/main/conf/my-application-conf.json

Conclusion

This fourth post in our series has covered two topics. First, we have introduced asynchronous composition and how Future helps to manage sequential and concurrent composition. With Future, you follow a common pattern in your implementation, which is quite straightforward once you get it. Secondly, we have seen how JDBC can be used to implement our API. Because we use Future, using asynchronous JDBC is quite simple.

You may have been surprised by the asynchronous development model, but once you start using it, it's hard to go back. Asynchronous and event-driven architecture represents how the world around us works. Embracing these give you superpowers.

In the next post, we will see how RX Java 2 can be used instead of Future. Don't forget that the code is available in this Github repository.

Stay tuned, and happy coding!

Microservices for the Enterprise eBook: Get Your Copy Here

Topics:
vert.x ,eclipse ,java ,jdbc ,tutorial ,code ,reactive ,asynchronous api ,microservices

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}