DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
  1. DZone
  2. Coding
  3. Java
  4. RxJava + Java8 + Java EE 7 + Arquillian = Bliss

RxJava + Java8 + Java EE 7 + Arquillian = Bliss

Alex Soto user avatar by
Alex Soto
·
Jul. 10, 14 · Interview
Like (1)
Save
Tweet
Share
10.98K Views

Join the DZone community and get the full member experience.

Join For Free
Microservices are an architectural style where each service is implemented as an independent system. They can use their own persistence system (although it is not mandatory), deployment, language, ...
Because a system is composed by more than one service, each service will communicate with other services, typically using a lightweight protocol like HTTP and following a Restful Web approach. You can read more about microservices here:http://martinfowler.com/articles/microservices.html
Let's see a really simple example. Suppose we have a booking shop where users can navigate through a catalog and when they find a book which they want to see more information, they click on the isbn, and then a new screen is opened with detailed information of the book and comments about it written by readers.
This system may be composed by two services:
  • One service to get book details. They could be retrieved from any legacy system like a RDBMS.
  • One service to get all comments written in a book and in this case that information could be stored in a document base database.
The problem here is that for each request that a user does we need to open two connections, one for each service. Of course we need a way do that jobs in parallel to improve the performance. And here lies one problem, how we can deal with this asynchronous requests? The first idea is to use Future class. For two services may be good but if you require four or five services the code will become more and more complex, or for example you may need to get data from one service and using it in another services or adapt the result of one service to be the input of another one. So there is a cost of management of threads and synchronization.
It will be awesome to have some way to deal with this problem in a clean and easy way. And this is exactly what RxJava does. RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
With RxJava instead of pulling data from an structure, data is pushed to it which reacts with an event that are listened by a subscriber and acts accordantly. You can find more information in https://github.com/Netflix/RxJava.


So in this case what we are going to implement is the example described here usingRxJava, Java EE 7, Java 8 and Arquillian for testing.
This post assumes you know how to write Rest services using Java EE specification.
So let's start with two services:
@Singleton
@Path("bookinfo")
public class BookInfoService {
    @GET
    @Path("{isbn}")
    @Produces(MediaType.APPLICATION_JSON)
    @Consumes(MediaType.APPLICATION_JSON)
    public JsonObject findBookByISBN(@PathParam("isbn") String isbn) {
        return Json.createObjectBuilder()
            .add("author", "George R.R. Martin")
            .add("isbn", "1111")
            .add("title", "A Game Of Thrones").build();
    }
}
@Singleton
@Path("comments")
public class CommentsService {
    @GET
    @Path("{isbn}")
    @Produces(MediaType.APPLICATION_JSON)
    public JsonArray bookComments(@PathParam("isbn") String isbn) {
        return Json.createArrayBuilder().add("Good Book").add("Awesome").build();
    }
}
@ApplicationPath("rest")
public class ApplicationResource extends Application {
}
And finally it is time to create a third facade service which receives communication from the client, sends to both services in parallel a request and finally zip both responses. zip is the process of combining sets of items emitted together via a specified function and sent it back to client (not to be confused with compression :)).


@Singleton
@Path("book")
public class BookService {
    private static final String BOOKSERVICE = "http://localhost:8080/bookservice";
    private static final String COMMENTSERVICE = "http://localhost:8080/bookcomments";
    @Resource(name = "DefaultManagedExecutorService")
    ManagedExecutorService executor;
    Client bookServiceClient;
    WebTarget bookServiceTarget;
    Client commentServiceClient;
    WebTarget commentServiceTarget;
    @PostConstruct
    void initializeRestClients() {
        bookServiceClient = ClientBuilder.newClient();
        bookServiceTarget = bookServiceClient.target(BOOKSERVICE + "/rest/bookinfo");
        commentServiceClient = ClientBuilder.newClient();
        commentServiceTarget = commentServiceClient.target(COMMENTSERVICE + "/rest/comments");
    }
    @GET
    @Path("{isbn}")
    @Produces(MediaType.APPLICATION_JSON)
    public void bookAndComment(@Suspended final AsyncResponse asyncResponse, @PathParam("isbn") String isbn) {
    //RxJava code shown below
    }
}
Basically we create a new service. In this case URLs of both services we are going to connect are hardcoded. This is done for academic purpose but in production-like code you will inject it from a producer class or from properties file or any system you will use for this purpose. Then we create javax.ws.rs.client.WebTarget for consuming Restful Web Service.
After that we need to implement the bookAndComment method using RxJava API.
The main class used in RxJava is rx.Observable. This class is an observable as his name suggest and it is the responsible of firing events for pushing objects. By default events are synchronous and it is responsible of developer to make them asynchronous.
So we need one asynchronous observable instance for each service:
public Observable<JsonObject> getBookInfo(final String isbn) {
        return Observable.create((Observable.OnSubscribe<JsonObject>) subscriber -> {
            Runnable r = () -> {
                subscriber.onNext(bookServiceTarget.path(isbn).request().get(JsonObject.class));
                subscriber.onCompleted();
            };
            executor.execute(r);
        });
}
Basically we create an Observable that will execute the specified function when aSubscriber subscribes to it. The function is created using a lambda expression to avoid creating nested inner classes. In this case we are returning a JsonObject as a result of calling the bookinfo service. The result is passed to onNext method so subscribers can receive the result. Because we want to execute this logic asynchronously, the code is wrapped inside a Runnable block.

Also it is required to call the onCompleted method when all logic is done.

Notice that because we want to make observable asynchronous apart of creating aRunnable, we are using an Executor to run the logic in separate thread. One of the great additions in Java EE 7 is a managed way to create threads inside a container. In this case we are using ManagedExecutorService provided by container to span a task asynchronously in a different thread of the current one.
public Observable<JsonArray> getComments(final String isbn) {
        return Observable.create((Observable.OnSubscribe<JsonArray>) subscriber -> {
            Runnable r = () -> {
                subscriber.onNext(commentServiceTarget.path(isbn).request().get(JsonArray.class));
                subscriber.onCompleted();
            };
            executor.execute(r);
        });
}
Similar to previous but instead of getting book info we are getting an array of comments.

Then we need to create an observable in charge of zipping both responses when both of them are available. And this is done by using zip method on Observable class which receives two Observables and applies a function to combine the result of both of them. In this case a lambda expression that creates a new json object appending both responses.

@GET
@Path("{isbn}")
@Produces(MediaType.APPLICATION_JSON)
public void bookAndComment(@Suspended final AsyncResponse asyncResponse, @PathParam("isbn") String isbn) {
    //Calling previous defined functions
    Observable<JsonObject> bookInfo = getBookInfo(isbn);
    Observable<JsonArray> comments = getComments(isbn);
    Observable.zip(bookInfo, comments, (JsonObject book, JsonArray bookcomments) ->
                    Json.createObjectBuilder().add("book", book).add("comments", bookcomments).build()
                  )
                  .subscribe(new Subscriber<JsonObject>() {
                        @Override
                        public void onCompleted() {
                        }
                        @Override
                        public void onError(Throwable e) {
                            asyncResponse.resume(e);
                        }
                        @Override
                        public void onNext(JsonObject jsonObject) {
                            asyncResponse.resume(jsonObject);
                        }
                    });
}
Let's take a look of previous service. We are using one of the new additions in Java EEwhich is Jax-Rs 2.0 asynchronous REST endpoints by using @Suspended annotation. Basically what we are doing is freeing server resources and generating the response when it is available using the resume method.

And finally a test. We are using Wildfly 8.1 as Java EE 7 server and Arquillian. Because each service may be deployed in different server, we are going to deploy each service in different war but inside same server.

So in this case we are going to deploy three war files which is totally easy to do it inArquillian.

@RunWith(Arquillian.class)
public class BookTest {
    @Deployment(testable = false, name = "bookservice")
    public static WebArchive createDeploymentBookInfoService() {
        return ShrinkWrap.create(WebArchive.class, "bookservice.war").addClasses(BookInfoService.class, ApplicationResource.class);
    }
    @Deployment(testable = false, name = "bookcomments")
    public static WebArchive createDeploymentCommentsService() {
        return ShrinkWrap.create(WebArchive.class, "bookcomments.war").addClasses(CommentsService.class, ApplicationResource.class);
    }
    @Deployment(testable = false, name = "book")
    public static WebArchive createDeploymentBookService() {
        WebArchive webArchive = ShrinkWrap.create(WebArchive.class, "book.war").addClasses(BookService.class, ApplicationResource.class)
                .addAsLibraries(Maven.resolver().loadPomFromFile("pom.xml").resolve("com.netflix.rxjava:rxjava-core").withTransitivity().as(JavaArchive.class));
        return webArchive;
    }
    @ArquillianResource
    URL base;
    @Test
    @OperateOnDeployment("book")
    public void should_return_book() throws MalformedURLException {
        Client client = ClientBuilder.newClient();
        JsonObject book = client.target(URI.create(new URL(base, "rest/").toExternalForm())).path("book/1111").request().get(JsonObject.class);
        //assertions
    }
}
In this case client will request all information from a book. In server part zip method will wait until book and comments are retrieved in parallel and then will combine both responses to a single object and sent back to client.
This is a very simple example of RxJava. In fact in this case we have only seen how to usezip method, but there are many more methods provided by RxJava that are so useful as well like take(), map(), merge(), ... (https://github.com/Netflix/RxJava/wiki/Alphabetical-List-of-Observable-Operators)

Moreover in this example we have seen only an example of connecting to two services and retrieving information in parallel, and you may wonder why not to use Future class. It is totally fine to use Future and Callbacks in this example but probably in your real life your logic won't be as easy as zipping two services. Maybe you will have more services, maybe you will need to get information from one service and then for each result open a new connection. As you can see you may start with two Future instances but finishing with a bunch of Future.get() methods, timeouts, ... So it is in these situations where RxJava really simplify the development of the application.

Furthermore we have seen how to use some of the new additions of Java EE 7 like how to develop an asynchronous Restful service with Jax-Rs.

In this post we have learnt how to deal with the interconnection between services andhow to make them scalable and less resource consume. But we have not talked about what's happening when one of these services fails. What's happening with the callers? Do we have a way to manage it? Is there a way to not spent resources when one of the service is not available? We will touch this in next post talking about fault tolerance.

We keep learning,
Alex.
Java EE microservice Web Service Java (programming language)

Published at DZone with permission of Alex Soto, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Best Practices to Succeed at Continuous AWS Security Monitoring
  • Image Classification With DCNNs
  • Tech Layoffs [Comic]
  • Last Chance To Take the DZone 2023 DevOps Survey and Win $250! [Closes on 1/25 at 8 AM]

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: