Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Asynchronous Microservices With Vert.x

DZone's Guide to

Asynchronous Microservices With Vert.x

Read about the advantages of Vert.x and see how to use it for asynchronous microservices and service discovery with this tutorial.

· Microservices Zone ·
Free Resource

Learn how modern cloud architectures use of microservices has many advantages and enables developers to deliver business software in a CI/CD way.

I must admit that as soon as I saw the Vert.x documentation, I liked this concept. This may have happened because I had previously used a very similar framework while creating simple and lightweight applications exposing REST APIs - Node.js. It is a really fine framework, but has one big disadvantage for me - its JavaScript runtime. It is worth mentioning that Vert.x is polyglot, it supports all the most popular JVM based languages like Java, Scala, Groovy, Kotlin, and even JavaScript. These are not all of its advantages. It's lightweight, fast, and modular. I was pleasantly surprised when I added the main Vert.x dependencies to my pom.xml and there were not many other dependencies downloaded, as is often the case when using the Spring Boot framework.

Well, I will not elaborate on the advantages and key concepts of this toolkit. I think you can read more about it in other articles. The most important thing for us is that using Vert.x, we can create high performance and asynchronous microservices based on the Netty framework. In addition, we can use standardized microservices mechanisms such as service discovery, a configuration server, or circuit breaking.

The sample application source code is available on GitHub. It consists of two modules, account-vertx-service and customer-vertx-service. Customer service retrieves data from the Consul registry and invokes the account service API. The architecture of the sample solution is visible in the figure below.

Building Services

To be able to create HTTP services exposing REST APIs, we need to include the following dependency in pom.xml.

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
<version>${vertx.version}</version>
</dependency>

Here's the fragment from the account service where I defined all API methods. The first step (1) was to declare a Router which is one of the core concepts of Vert.x-Web. A router takes an HTTP request, finds the first matching route for that request, and passes the request to that route. The next steps (2) and (3) are to add some handlers, for example,BodyHandler which allows you to retrieve request bodies and has been added to the POST method. Then we can begin to define API methods (4), (5), (6), (7), (8). And finally, (9) we are starting the HTTP server on the port retrieved from the configuration.

Router router = Router.router(vertx); // (1)
router.route("/account/*").handler(ResponseContentTypeHandler.create()); // (2)
router.route(HttpMethod.POST, "/account").handler(BodyHandler.create()); // (3)
router.get("/account/:id").produces("application/json").handler(rc -> { // (4)
repository.findById(rc.request().getParam("id"), res -> {
Account account = res.result();
LOGGER.info("Found: {}", account);
rc.response().end(account.toString());
});
});
router.get("/account/customer/:customer").produces("application/json").handler(rc -> { // (5)
repository.findByCustomer(rc.request().getParam("customer"), res -> {
List<Account> accounts = res.result();
LOGGER.info("Found: {}", accounts);
rc.response().end(Json.encodePrettily(accounts));
});
});
router.get("/account").produces("application/json").handler(rc -> { // (6)
repository.findAll(res -> {
List<Account> accounts = res.result();
LOGGER.info("Found all: {}", accounts);
rc.response().end(Json.encodePrettily(accounts));
});
});
router.post("/account").produces("application/json").handler(rc -> { // (7)
Account a = Json.decodeValue(rc.getBodyAsString(), Account.class);
repository.save(a, res -> {
Account account = res.result();
LOGGER.info("Created: {}", account);
rc.response().end(account.toString());
});
});
router.delete("/account/:id").handler(rc -> { // (8)
repository.remove(rc.request().getParam("id"), res -> {
LOGGER.info("Removed: {}", rc.request().getParam("id"));
rc.response().setStatusCode(200);
});
});
...
vertx.createHttpServer().requestHandler(router::accept).listen(conf.result().getInteger("port")); // (9)

All API methods use a repository object to communicate with the data source. In this case, I decided to use Mongo. Vert.x has a module for interacting with that database, we need to include it as a new dependency.

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mongo-client</artifactId>
<version>${vertx.version}</version>
</dependency>

The Mongo client, same as all other Vert.x modules, works asynchronously. That's why we need to use AsyncResult Handler to pass the result from the repository object. To be able to pass a custom object as AsyncResult we have to annotate it with @DataObject and add the toJson method.

public AccountRepositoryImpl(final MongoClient client) {
this.client = client;
}

@Override
public AccountRepository save(Account account, Handler<AsyncResult<Account>> resultHandler) {
JsonObject json = JsonObject.mapFrom(account);
client.save(Account.DB_TABLE, json, res -> {
if (res.succeeded()) {
LOGGER.info("Account created: {}", res.result());
account.setId(res.result());
resultHandler.handle(Future.succeededFuture(account));
} else {
LOGGER.error("Account not created", res.cause());
resultHandler.handle(Future.failedFuture(res.cause()));
}
});
return this;
}

@Override
public AccountRepository findAll(Handler<AsyncResult<List<Account>>> resultHandler) {
client.find(Account.DB_TABLE, new JsonObject(), res -> {
if (res.succeeded()) {
List<Account> accounts = res.result().stream().map(it -> new Account(it.getString("_id"), it.getString("number"), it.getInteger("balance"), it.getString("customerId"))).collect(Collectors.toList());
resultHandler.handle(Future.succeededFuture(accounts));
} else {
LOGGER.error("Account not found", res.cause());
resultHandler.handle(Future.failedFuture(res.cause()));
}
});
return this;
}

Here's the Account model class.

@DataObject
public class Account {

public static final String DB_TABLE = "account";

private String id;
private String number;
private int balance;
private String customerId;

public Account() {

}

public Account(String id, String number, int balance, String customerId) {
this.id = id;
this.number = number;
this.balance = balance;
this.customerId = customerId;
}

public Account(JsonObject json) {
this.id = json.getString("id");
this.number = json.getString("number");
this.balance = json.getInteger("balance");
this.customerId = json.getString("customerId");
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getNumber() {
return number;
}

public void setNumber(String number) {
this.number = number;
}

public int getBalance() {
return balance;
}

public void setBalance(int balance) {
this.balance = balance;
}

public String getCustomerId() {
return customerId;
}

public void setCustomerId(String customerId) {
this.customerId = customerId;
}

public JsonObject toJson() {
return JsonObject.mapFrom(this);
}

@Override
public String toString() {
return Json.encodePrettily(this);
}

}

Verticles

It is worth mentioning a few words about running an application written in Vert.x. It is based on verticles. Verticles are chunks of code that are deployed and run by Vert.x. A Vert.x instance maintains N event loop threads by default. When creating a verticle, we have to extend the abstract class AbstractVerticle.

public class AccountServer extends AbstractVerticle {

@Override
public void start() throws Exception {
...
}
}

I created two verticles per microservice, first for the HTTP server and second for communication with Mongo. Here's the main application method where I'm deploying verticles.

public static void main(String[] args) throws Exception {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new MongoVerticle());
vertx.deployVerticle(new AccountServer());
}

Well, now we should obtain the reference inside the AccountServer verticle to the service running on MongoVerticle. To achieve it, we have to generate proxy classes using the vertx-codegen module.

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-service-proxy</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-codegen</artifactId>
<version>${vertx.version}</version>
<scope>provided</scope>
</dependency>

First, annotate the repository interface with @ProxyGen ad all public methods with @Fluent.

@ProxyGen
public interface AccountRepository {

@Fluent
AccountRepository save(Account account, Handler<AsyncResult<Account>> resultHandler);

@Fluent
AccountRepository findAll(Handler<AsyncResult<List<Account>>> resultHandler);

@Fluent
AccountRepository findById(String id, Handler<AsyncResult<Account>> resultHandler);

@Fluent
AccountRepository findByCustomer(String customerId, Handler<AsyncResult<List<Account>>> resultHandler);

@Fluent
AccountRepository remove(String id, Handler<AsyncResult<Void>> resultHandler);

static AccountRepository createProxy(Vertx vertx, String address) {
return new AccountRepositoryVertxEBProxy(vertx, address);
}

static AccountRepository create(MongoClient client) {
return new AccountRepositoryImpl(client);
}

}

The generator needs additional configuration inside pom.xml file. After running the command mvn clean install on the parent project, all generated classes should be available under thesrc/main/generated directory for every microservice module.

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.2</version>
<configuration>
<encoding>${project.build.sourceEncoding}</encoding>
<source>${java.version}</source>
<target>${java.version}</target>
<useIncrementalCompilation>false</useIncrementalCompilation>
<annotationProcessors>
<annotationProcessor>io.vertx.codegen.CodeGenProcessor</annotationProcessor>
</annotationProcessors>
<generatedSourcesDirectory>${project.basedir}/src/main/generated</generatedSourcesDirectory>
<compilerArgs>
<arg>-AoutputDirectory=${project.basedir}/src/main</arg>
</compilerArgs>
</configuration>
</plugin>

Now we are able to obtain AccountRepository reference by calling createProxy with  account-service name.

AccountRepository repository = AccountRepository.createProxy(vertx, "account-service");

Service Discovery

To use the Vert.x service discovery, we have to add the following dependencies into pom.xml. In the first of them, there are mechanisms for built-in Vert.x discovery, which is rather not usable if we would like to invoke microservices running on different hosts. Fortunately, there are also available some additional bridges, for example, Consul bridge.

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-service-discovery</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-service-discovery-bridge-consul</artifactId>
<version>${vertx.version}</version>
</dependency>

Great, we only have to declare the service discovery and register service importer. Now, we can retrieve the configuration from Consul, but I assume we also would like to register our service. Unfortunately, problems start here... Like the toolkit authors say, It (Vert.x) does not export to Consul and does not support service modification. Maybe somebody will explain to me why this library can not also export data to Consul - I just do not understand it. I had the same problem with Apache Camel some months ago and I will use the same solution I developed that time. Fortunately, Consul has a simple API for service registration and deregistration. To use it in our application we need to include the Vert.x HTTP client in our dependencies.

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web-client</artifactId>
<version>${vertx.version}</version>
</dependency>

Then, using the declared WebClient while starting the application, we can register the service by invoking the Consul PUT method.

WebClient client = WebClient.create(vertx);
...
JsonObject json = new JsonObject().put("ID", "account-service-1").put("Name", "account-service").put("Address", "127.0.0.1").put("Port", 2222).put("Tags", new JsonArray().add("http-endpoint"));
client.put(discoveryConfig.getInteger("port"), discoveryConfig.getString("host"), "/v1/agent/service/register").sendJsonObject(json, res -> {
LOGGER.info("Consul registration status: {}", res.result().statusCode());
});

Once the account-service has registered itself on the discovery server, we can invoke it from another microservice - in this case, from customer-service. We only have to create a ServiceDiscovery object and register the Consul service importer.

ServiceDiscovery discovery = ServiceDiscovery.create(vertx);
...
discovery.registerServiceImporter(new ConsulServiceImporter(), new JsonObject().put("host", discoveryConfig.getString("host")).put("port", discoveryConfig.getInteger("port")).put("scan-period", 2000));

Here's the AccountClient fragment, which is responsible for invoking GET /account/customer/{customerId} from account-service. It obtains the service reference from the discovery object and casts it to the WebClient instance. I don't know if you have noticed that, apart from the standard fields such as ID, Name or Port, I also set the Tags field to the value of the type of service that we register. In this case, it will be an http-endpoint. Whenever Vert.x reads data from Consul, it will be able to automatically assign a service reference to a WebClient object.

public AccountClient findCustomerAccounts(String customerId, Handler<AsyncResult<List<Account>>> resultHandler) {
discovery.getRecord(r -> r.getName().equals("account-service"), res -> {
LOGGER.info("Result: {}", res.result().getType());
ServiceReference ref = discovery.getReference(res.result());
WebClient client = ref.getAs(WebClient.class);
client.get("/account/customer/" + customerId).send(res2 -> {
LOGGER.info("Response: {}", res2.result().bodyAsString());
List<Account> accounts = res2.result().bodyAsJsonArray().stream().map(it -> Json.decodeValue(it.toString(), Account.class)).collect(Collectors.toList());
resultHandler.handle(Future.succeededFuture(accounts));
});
});
return this;
}

Config

For configuration management within the application, the Vert.x Config module is responsible.

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-config</artifactId>
<version>${vertx.version}</version>
</dependency>

There are many configuration stores, which can be used as configuration data location:

I selected the simplest one - file. But it can be easily changed only by defining another type on theConfigStoreOptions object. To load configuration data from the store, ConfigRetriever is responsible. It reads the configuration as a JsonObject.

ConfigStoreOptions file = new ConfigStoreOptions().setType("file").setConfig(new JsonObject().put("path", "application.json"));
ConfigRetriever retriever = ConfigRetriever.create(vertx, new ConfigRetrieverOptions().addStore(file));
retriever.getConfig(conf -> {
JsonObject discoveryConfig = conf.result().getJsonObject("discovery");
vertx.createHttpServer().requestHandler(router::accept).listen(conf.result().getInteger("port"));
JsonObject json = new JsonObject().put("ID", "account-service-1").put("Name", "account-service").put("Address", "127.0.0.1").put("Port", 2222).put("Tags", new JsonArray().add("http-endpoint"));
client.put(discoveryConfig.getInteger("port"), discoveryConfig.getString("host"), "/v1/agent/service/register").sendJsonObject(json, res -> {
LOGGER.info("Consul registration status: {}", res.result().statusCode());
});
});

The configuration file application.json is available under src/main/resources and it contains application port, service discovery, and datasource adresses.

{
"port" : 2222,
"discovery" : {
"host" : "192.168.99.100",
"port" : 8500
},
"datasource" : {
"host" : "192.168.99.100",
"port" : 27017,
"db_name" : "test"
}
}

Final Thoughts

The Vert.x authors wouldn't like to define their solution as a framework, but as a toolkit. They don't tell you what is the correct way to write an application, but only give you a lot of useful bricks, helping to create your app. With Vert.x, you can create fast and lightweight APIs based on non-blocking, asynchronous I/O. It gives you a lot of possibilities, as you can see in the Config module example, where you can even use Spring Cloud Config Server as a configuration store. But it is also not free from drawbacks, as I showed in the service registration with Consul example. Vert.x also allows us to create reactive microservices with RxJava, which seems to be an interesting option I hope to describe in the future.

Discover how to deploy pre-built sample microservices OR create simple microservices from scratch.

Topics:
microservices ,integration ,vert.x ,asynchronous

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}