DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
View Events Video Library
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Integrating PostgreSQL Databases with ANF: Join this workshop to learn how to create a PostgreSQL server using Instaclustr’s managed service

Mobile Database Essentials: Assess data needs, storage requirements, and more when leveraging databases for cloud and edge applications.

Monitoring and Observability for LLMs: Datadog and Google Cloud discuss how to achieve optimal AI model performance.

Automated Testing: The latest on architecture, TDD, and the benefits of AI and low-code tools.

Related

  • Vert.x With Zookeeper Service Discovery
  • Event-Driven Microservices With Vert.x and Kubernetes (Part 2 of 3)
  • Service Discovery From Within the Kubernetes Cluster
  • Microservices With JHipster

Trending

  • Hugging Face Is the New GitHub for LLMs
  • Decoding Business Source Licensing: A New Software Licensing Model
  • TypeScript: Useful Features
  • How TIBCO Is Evolving Its Platform To Embrace Developers and Simplify Cloud Integration
  1. DZone
  2. Software Design and Architecture
  3. Microservices
  4. Reactive Microservices and Service Discovery with Vert.x

Reactive Microservices and Service Discovery with Vert.x

Vert.x can lend a hand with helping your microservices find each other. See how to get it set up and what it can do for your software.

Sercan Karaoglu user avatar by
Sercan Karaoglu
·
Jul. 26, 16 · Tutorial
Like (30)
Save
Tweet
Share
22.35K Views

Join the DZone community and get the full member experience.

Join For Free

Remember the Unix philosophy "Do one thing and do it well?" That is the philosophy of microservices. In software development, it is a common practice that when the same functionality is seen in the different parts of the application, it is abstracted away as another component.

In Vert.x, we do that by encapsulating repeated functionality in a Verticle. But the issue here is because Vert.x is asynchronous by nature and uses callbacks to achieve this, we may need to write a lot of boilerplate code to reach those microservices. As you already know, nested callbacks can be terrifying. So, to address these problems, Vert.x integrates all its API with RxJava because it is already reactive. It can do it so by just simply code generating, which allows us to use Observable instead of callbacks,and also introduces Service Proxy to simplify the process of calling async remote services by generating boilerplate codes for us.

It all sounds easy right? But when it comes time to make all these components work together flawlessly, we have a little problem to solve. We need to make sure that services are there, up and running, before we call them, which adds another level of complexity to our codebase. Vert.x seems like it solved this issue by its 3.3.0 version by bringing a promising Service Discovery API.

Service Discovery can be considered a must-have component in the modern microservice architecture, where services may scale up and down anytime. In such a dynamic environment, there must be a way for services to find each other and operate independently of where they are, which is known as location transparency.

Tools like Consul, etcd, and ZooKeeper have been used as to respond to this need. Whichever one you choose, there is always some development effort to be taken. Companies that have products that depend on microservice architecture usually develop in-house solutions to make this process easier. Thanks to their collaborative approach, tools like eureka and nerve are available to the community through GitHub.

As an another alternative to these tools, Vert.x has its solution for Service Discovery while still having components like Kubernetes Registry Importer, Consul Registry Importer, Redis Registry Importer, and Docker Links Registry Importer for integrating other Service Discovery solutions. And as a cluster manager interface, Vert.x uses Hazelcast by default implementation. For those who would like to read more about cluster initialization with Hazelcast's Zookeeper Discovery Plugin, we published this article, so please have a quick look at it. 


Image title


To see this flow in action, let's imagine we have FileReader Service Proxy and Stub, and we want to reach it from another microservice. The first point is that we don't know whether the FileReader service is there up and running.

One thing we can do would be to ask Service Discovery if that service is available, but, if it is not, then we need to have a callback function that lets us know about the status, whether it is UP or DOWN, so that when it becomes UP, we can request for it. At first glance, this looks like a lot of code, and it is; however, with Rxified Vert.x, it eventually becomes very simple and lean.


//When service become UP, it publishes this event thru the eventbus 
//over announce address topic

def listenServiceDeparture = bus.consumer(options.getAnnounceAddress())
    .bodyStream() //Takes the body of Message<JsonObject>
    .toObservable() //converts it to Observable<JsonObject>
    .filter(requestedService) //filter if it is UP and its name is what we want
    .flatMap(requestService) // If all goes well, then request that service

//If there is already a service available, we can immediately use it,
//otherwise listen its arrival
def fileLinesObservable=
    EventBusService.getProxyObservable(serviceDiscovery, config().getJsonObject("requestedServiceRecord")) //get me the service proxy with given configuration
        .onErrorResumeNext(Observable.empty()) // If there is no such a service then continue with empty
        .switchIfEmpty(listenServiceDeparture) // If we came here with empty observable then listen for service arrival
        .flatMap(readFile) // Here we have service interface so consume it
        .flatMap({ Observable.from(it) }) // readFile returns Observable<List<String>>, so here we flatten it up
        .onBackpressureBuffer(10000) // if backpressure occurs then buffer.

Func1 readFile = { io.filereader.FileReaderService fileReaderService ->
def rxFileReader = new FileReaderService(fileReaderService);
rxFileReader.readObservable("data/app.log", [read: true, write: false] as JsonObject)
    .doOnCompleted({ -> ServiceDiscovery.releaseServiceObject(serviceDiscovery, fileReaderService) });
}


Imagine how this code would look like if we were to write it in an imperative way instead of declarative. However, as seen above, it is very readable as it is written in native English. For now, it is great with one little exception — when the service gracefully shuts down, it doesn't automatically publish a DOWN message because the Service Discovery API wants us to publish a DOWN message manually. And Verticle's stop method is not automatically called before shutdown. To solve this, first, we need to make sure that stop method will be called, then, in Verticle's stop method, we need to unpublish the service so that a DOWN message can be sent. Below is the solution, and the full code is here.

VertxConfigLauncher.java


private static Observable<Vertx> createVertx(VertxOptions vertxOptions) {
    if (vertxOptions.isClustered()) {
    HazelcastClusterManager clusterManager = getClusterManager();
    vertxOptions.setClusterManager(clusterManager);
    return Vertx.clusteredVertxObservable(vertxOptions).map(vertx -> {
        clusterManager.getHazelcastInstance().getLifecycleService().addLifecycleListener(state -> {
            if (state.getState() == LifecycleEvent.LifecycleState.SHUTTING_DOWN) {
                ICountDownLatch latch = clusterManager.getHazelcastInstance().getCountDownLatch("shutdown.latch");
                latch.trySetCount(1);
                beforeLeaveUndeploy(vertx, latch);
            }
        });
    return vertx;
    });
        } else {
            return Observable.just(Vertx.vertx(vertxOptions)).map(vertx -> {
            Runtime.getRuntime().addShutdownHook(new Thread(() -> beforeLeaveUndeploy(vertx, new CountDownLatch(1))));
            return vertx;
        });
    }
}

private static void beforeLeaveUndeploy(Vertx vertx, ICountDownLatch latch) {
    Observable.from(vertx.deploymentIDs().stream().map(vertx::undeployObservable).toArray())
        .doOnCompleted(latch::countDown)
        .subscribe();
         try {
             latch.await(30000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        e.printStackTrace();
    }
}


ExampleServiceVerticle.java


@Override public void stop(Future<Void> stopFuture) throws Exception {
    discovery.unpublish(registrationId, r -> {
        if (r.succeeded()) {
            LOGGER.info("Unregistering Service Record With ID {}", registrationId);
            ProxyHelper.unregisterService(this.serviceProxy);
            stopFuture.complete(r.result());
        } else {
            LOGGER.error("ERROR unregistering service");
        }
    });
}


Bottom Line

Service Discovery is a solution for addressing the challenge of orchestration. With Reactive Extensions, it becomes easier to discover services in a non-blocking manner while keeping the codebase readable and simple. Because Vert.x integrates naturally with RxJava, it makes this kind of event-driven asynchronous application development very easy. On the other hand, it is good to note that when we have a lot of microservices, we may consider using Service Proxy and Code-Gen API that generates boilerplate codes for Java, Groovy, RxJava, etc., and lets us work with reactive interfaces.

microservice Service discovery Vert.x Discovery (law)

Opinions expressed by DZone contributors are their own.

Related

  • Vert.x With Zookeeper Service Discovery
  • Event-Driven Microservices With Vert.x and Kubernetes (Part 2 of 3)
  • Service Discovery From Within the Kubernetes Cluster
  • Microservices With JHipster

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: