Reactive Microservices and Service Discovery with Vert.x
Vert.x can lend a hand with helping your microservices find each other. See how to get it set up and what it can do for your software.
Join the DZone community and get the full member experience.
Join For FreeRemember the Unix philosophy "Do one thing and do it well?" That is the philosophy of microservices. In software development, it is a common practice that when the same functionality is seen in the different parts of the application, it is abstracted away as another component.
In Vert.x, we do that by encapsulating repeated functionality in a Verticle. But the issue here is because Vert.x is asynchronous by nature and uses callbacks to achieve this, we may need to write a lot of boilerplate code to reach those microservices. As you already know, nested callbacks can be terrifying. So, to address these problems, Vert.x integrates all its API with RxJava because it is already reactive. It can do it so by just simply code generating, which allows us to use Observable instead of callbacks,and also introduces Service Proxy to simplify the process of calling async remote services by generating boilerplate codes for us.
It all sounds easy right? But when it comes time to make all these components work together flawlessly, we have a little problem to solve. We need to make sure that services are there, up and running, before we call them, which adds another level of complexity to our codebase. Vert.x seems like it solved this issue by its 3.3.0 version by bringing a promising Service Discovery API.
Service Discovery can be considered a must-have component in the modern microservice architecture, where services may scale up and down anytime. In such a dynamic environment, there must be a way for services to find each other and operate independently of where they are, which is known as location transparency.
Tools like Consul, etcd, and ZooKeeper have been used as to respond to this need. Whichever one you choose, there is always some development effort to be taken. Companies that have products that depend on microservice architecture usually develop in-house solutions to make this process easier. Thanks to their collaborative approach, tools like eureka and nerve are available to the community through GitHub.
As an another alternative to these tools, Vert.x has its solution for Service Discovery while still having components like Kubernetes Registry Importer, Consul Registry Importer, Redis Registry Importer, and Docker Links Registry Importer for integrating other Service Discovery solutions. And as a cluster manager interface, Vert.x uses Hazelcast by default implementation. For those who would like to read more about cluster initialization with Hazelcast's Zookeeper Discovery Plugin, we published this article, so please have a quick look at it.
To see this flow in action, let's imagine we have FileReader Service Proxy and Stub, and we want to reach it from another microservice. The first point is that we don't know whether the FileReader service is there up and running.
One thing we can do would be to ask Service Discovery if that service is available, but, if it is not, then we need to have a callback function that lets us know about the status, whether it is UP or DOWN, so that when it becomes UP, we can request for it. At first glance, this looks like a lot of code, and it is; however, with Rxified Vert.x, it eventually becomes very simple and lean.
//When service become UP, it publishes this event thru the eventbus
//over announce address topic
def listenServiceDeparture = bus.consumer(options.getAnnounceAddress())
.bodyStream() //Takes the body of Message<JsonObject>
.toObservable() //converts it to Observable<JsonObject>
.filter(requestedService) //filter if it is UP and its name is what we want
.flatMap(requestService) // If all goes well, then request that service
//If there is already a service available, we can immediately use it,
//otherwise listen its arrival
def fileLinesObservable=
EventBusService.getProxyObservable(serviceDiscovery, config().getJsonObject("requestedServiceRecord")) //get me the service proxy with given configuration
.onErrorResumeNext(Observable.empty()) // If there is no such a service then continue with empty
.switchIfEmpty(listenServiceDeparture) // If we came here with empty observable then listen for service arrival
.flatMap(readFile) // Here we have service interface so consume it
.flatMap({ Observable.from(it) }) // readFile returns Observable<List<String>>, so here we flatten it up
.onBackpressureBuffer(10000) // if backpressure occurs then buffer.
Func1 readFile = { io.filereader.FileReaderService fileReaderService ->
def rxFileReader = new FileReaderService(fileReaderService);
rxFileReader.readObservable("data/app.log", [read: true, write: false] as JsonObject)
.doOnCompleted({ -> ServiceDiscovery.releaseServiceObject(serviceDiscovery, fileReaderService) });
}
Imagine how this code would look like if we were to write it in an imperative way instead of declarative. However, as seen above, it is very readable as it is written in native English. For now, it is great with one little exception — when the service gracefully shuts down, it doesn't automatically publish a DOWN message because the Service Discovery API wants us to publish a DOWN message manually. And Verticle's stop method is not automatically called before shutdown. To solve this, first, we need to make sure that stop method will be called, then, in Verticle's stop method, we need to unpublish the service so that a DOWN message can be sent. Below is the solution, and the full code is here.
VertxConfigLauncher.java
private static Observable<Vertx> createVertx(VertxOptions vertxOptions) {
if (vertxOptions.isClustered()) {
HazelcastClusterManager clusterManager = getClusterManager();
vertxOptions.setClusterManager(clusterManager);
return Vertx.clusteredVertxObservable(vertxOptions).map(vertx -> {
clusterManager.getHazelcastInstance().getLifecycleService().addLifecycleListener(state -> {
if (state.getState() == LifecycleEvent.LifecycleState.SHUTTING_DOWN) {
ICountDownLatch latch = clusterManager.getHazelcastInstance().getCountDownLatch("shutdown.latch");
latch.trySetCount(1);
beforeLeaveUndeploy(vertx, latch);
}
});
return vertx;
});
} else {
return Observable.just(Vertx.vertx(vertxOptions)).map(vertx -> {
Runtime.getRuntime().addShutdownHook(new Thread(() -> beforeLeaveUndeploy(vertx, new CountDownLatch(1))));
return vertx;
});
}
}
private static void beforeLeaveUndeploy(Vertx vertx, ICountDownLatch latch) {
Observable.from(vertx.deploymentIDs().stream().map(vertx::undeployObservable).toArray())
.doOnCompleted(latch::countDown)
.subscribe();
try {
latch.await(30000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
ExampleServiceVerticle.java
@Override public void stop(Future<Void> stopFuture) throws Exception {
discovery.unpublish(registrationId, r -> {
if (r.succeeded()) {
LOGGER.info("Unregistering Service Record With ID {}", registrationId);
ProxyHelper.unregisterService(this.serviceProxy);
stopFuture.complete(r.result());
} else {
LOGGER.error("ERROR unregistering service");
}
});
}
Bottom Line
Service Discovery is a solution for addressing the challenge of orchestration. With Reactive Extensions, it becomes easier to discover services in a non-blocking manner while keeping the codebase readable and simple. Because Vert.x integrates naturally with RxJava, it makes this kind of event-driven asynchronous application development very easy. On the other hand, it is good to note that when we have a lot of microservices, we may consider using Service Proxy and Code-Gen API that generates boilerplate codes for Java, Groovy, RxJava, etc., and lets us work with reactive interfaces.
Opinions expressed by DZone contributors are their own.
Comments