Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

RxJava + Java8 + Java EE 7 + Arquillian = Bliss

DZone's Guide to

RxJava + Java8 + Java EE 7 + Arquillian = Bliss

· DevOps Zone
Free Resource

Learn more about how CareerBuilder was able to resolve customer issues 5x faster by using Scalyr, the fastest log management tool on the market. 

Microservices are an architectural style where each service is implemented as an independent system. They can use their own persistence system (although it is not mandatory), deployment, language, ...
Because a system is composed by more than one service, each service will communicate with other services, typically using a lightweight protocol like  HTTP and following a  Restful Web approach. You can read more about  microservices here: http://martinfowler.com/articles/microservices.html
Let's see a really simple example. Suppose we have a booking shop where users can navigate through a catalog and when they find a book which they want to see more information, they click on the isbn, and then a new screen is opened with detailed information of the book and comments about it written by readers.
This system may be composed by two services:
  • One service to get book details. They could be retrieved from any legacy system like a RDBMS.
  • One service to get all comments written in a book and in this case that information could be stored in a document base database.
The problem here is that for each request that a user does we need to open two connections, one for each service. Of course we need a way do that jobs in parallel to improve the performance. And here lies one problem, how we can deal with this asynchronous requests? The first idea is to use  Future class. For two services may be good but if you require four or five services the code will become more and more complex, or for example you may need to get data from one service and using it in another services or adapt the result of one service to be the input of another one. So there is a cost of management of threads and synchronization.
It will be awesome to have some way to deal with this problem in a clean and easy way. And this is exactly what  RxJava does.  RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
With  RxJava instead of pulling data from an structure, data is pushed to it which reacts with an event that are listened by a subscriber and acts accordantly. You can find more information in  https://github.com/Netflix/RxJava.


So in this case what we are going to implement is the example described here using RxJavaJava EE 7Java 8 and  Arquillian for testing.
This post assumes you know how to write  Rest services using  Java EE specification.
So let's start with two services:
@Singleton
@Path("bookinfo")
public class BookInfoService {
    @GET
    @Path("{isbn}")
    @Produces(MediaType.APPLICATION_JSON)
    @Consumes(MediaType.APPLICATION_JSON)
    public JsonObject findBookByISBN(@PathParam("isbn") String isbn) {
        return Json.createObjectBuilder()
            .add("author", "George R.R. Martin")
            .add("isbn", "1111")
            .add("title", "A Game Of Thrones").build();
    }
}
@Singleton
@Path("comments")
public class CommentsService {
    @GET
    @Path("{isbn}")
    @Produces(MediaType.APPLICATION_JSON)
    public JsonArray bookComments(@PathParam("isbn") String isbn) {
        return Json.createArrayBuilder().add("Good Book").add("Awesome").build();
    }
}
@ApplicationPath("rest")
public class ApplicationResource extends Application {
}
And finally it is time to create a third facade service which receives communication from the client, sends to both services in parallel a request and finally  zip both responses.  zip is the process of combining sets of items emitted together via a specified function and sent it back to client (not to be confused with compression :)).


@Singleton
@Path("book")
public class BookService {
    private static final String BOOKSERVICE = "http://localhost:8080/bookservice";
    private static final String COMMENTSERVICE = "http://localhost:8080/bookcomments";
    @Resource(name = "DefaultManagedExecutorService")
    ManagedExecutorService executor;
    Client bookServiceClient;
    WebTarget bookServiceTarget;
    Client commentServiceClient;
    WebTarget commentServiceTarget;
    @PostConstruct
    void initializeRestClients() {
        bookServiceClient = ClientBuilder.newClient();
        bookServiceTarget = bookServiceClient.target(BOOKSERVICE + "/rest/bookinfo");
        commentServiceClient = ClientBuilder.newClient();
        commentServiceTarget = commentServiceClient.target(COMMENTSERVICE + "/rest/comments");
    }
    @GET
    @Path("{isbn}")
    @Produces(MediaType.APPLICATION_JSON)
    public void bookAndComment(@Suspended final AsyncResponse asyncResponse, @PathParam("isbn") String isbn) {
    //RxJava code shown below
    }
}
Basically we create a new service. In this case URLs of both services we are going to connect are hardcoded. This is done for academic purpose but in production-like code you will inject it from a producer class or from properties file or any system you will use for this purpose. Then we create  javax.ws.rs.client.WebTarget for consuming  Restful Web Service.
After that we need to implement the  bookAndComment method using  RxJava API.
The main class used in  RxJava is  rx.Observable. This class is an observable as his name suggest and it is the responsible of firing events for pushing objects. By default events are synchronous and it is responsible of developer to make them asynchronous.
So we need one asynchronous observable instance for each service:
public Observable<JsonObject> getBookInfo(final String isbn) {
        return Observable.create((Observable.OnSubscribe<JsonObject>) subscriber -> {
            Runnable r = () -> {
                subscriber.onNext(bookServiceTarget.path(isbn).request().get(JsonObject.class));
                subscriber.onCompleted();
            };
            executor.execute(r);
        });
}
Basically we create an  Observable  that will execute the specified function when a Subscriber  subscribes to it. The function is created using a lambda expression to avoid creating nested inner classes. In this case we are returning a  JsonObject  as a result of calling the  bookinfo  service. The result is passed to  onNext  method so subscribers can receive the result. Because we want to execute this logic asynchronously, the code is wrapped inside a  Runnable  block.

Also it is required to call the  onCompleted method when all logic is done.

Notice that because we want to make observable asynchronous apart of creating a Runnable, we are using an  Executor to run the logic in separate thread. One of the great additions in  Java EE 7 is a managed way to create threads inside a container. In this case we are using  ManagedExecutorService provided by container to span a task asynchronously in a different thread of the current one.
public Observable<JsonArray> getComments(final String isbn) {
        return Observable.create((Observable.OnSubscribe<JsonArray>) subscriber -> {
            Runnable r = () -> {
                subscriber.onNext(commentServiceTarget.path(isbn).request().get(JsonArray.class));
                subscriber.onCompleted();
            };
            executor.execute(r);
        });
}
Similar to previous but instead of getting book info we are getting an array of comments.

Then we need to create an observable in charge of zipping both responses when both of them are available. And this is done by using  zip method on  Observable class which receives two  Observables and applies a function to combine the result of both of them. In this case a lambda expression that creates a new  json object appending both responses.

@GET
@Path("{isbn}")
@Produces(MediaType.APPLICATION_JSON)
public void bookAndComment(@Suspended final AsyncResponse asyncResponse, @PathParam("isbn") String isbn) {
    //Calling previous defined functions
    Observable<JsonObject> bookInfo = getBookInfo(isbn);
    Observable<JsonArray> comments = getComments(isbn);
    Observable.zip(bookInfo, comments, (JsonObject book, JsonArray bookcomments) ->
                    Json.createObjectBuilder().add("book", book).add("comments", bookcomments).build()
                  )
                  .subscribe(new Subscriber<JsonObject>() {
                        @Override
                        public void onCompleted() {
                        }
                        @Override
                        public void onError(Throwable e) {
                            asyncResponse.resume(e);
                        }
                        @Override
                        public void onNext(JsonObject jsonObject) {
                            asyncResponse.resume(jsonObject);
                        }
                    });
}
Let's take a look of previous service. We are using one of the new additions in  Java EE which is  Jax-Rs 2.0  asynchronous  REST  endpoints by using  @Suspended  annotation. Basically what we are doing is freeing server resources and generating the response when it is available using the resume method.

And finally a test. We are using  Wildfly 8.1 as  Java EE 7 server and  Arquillian. Because each service may be deployed in different server, we are going to deploy each service in different  war but inside same server.

So in this case we are going to deploy three war files which is totally easy to do it in Arquillian.

@RunWith(Arquillian.class)
public class BookTest {
    @Deployment(testable = false, name = "bookservice")
    public static WebArchive createDeploymentBookInfoService() {
        return ShrinkWrap.create(WebArchive.class, "bookservice.war").addClasses(BookInfoService.class, ApplicationResource.class);
    }
    @Deployment(testable = false, name = "bookcomments")
    public static WebArchive createDeploymentCommentsService() {
        return ShrinkWrap.create(WebArchive.class, "bookcomments.war").addClasses(CommentsService.class, ApplicationResource.class);
    }
    @Deployment(testable = false, name = "book")
    public static WebArchive createDeploymentBookService() {
        WebArchive webArchive = ShrinkWrap.create(WebArchive.class, "book.war").addClasses(BookService.class, ApplicationResource.class)
                .addAsLibraries(Maven.resolver().loadPomFromFile("pom.xml").resolve("com.netflix.rxjava:rxjava-core").withTransitivity().as(JavaArchive.class));
        return webArchive;
    }
    @ArquillianResource
    URL base;
    @Test
    @OperateOnDeployment("book")
    public void should_return_book() throws MalformedURLException {
        Client client = ClientBuilder.newClient();
        JsonObject book = client.target(URI.create(new URL(base, "rest/").toExternalForm())).path("book/1111").request().get(JsonObject.class);
        //assertions
    }
}
In this case client will request all information from a book. In server part  zip  method will wait until book and comments are retrieved in parallel and then will combine both responses to a single object and sent back to client.
This is a very simple example of  RxJava. In fact in this case we have only seen how to use zip method, but there are many more methods provided by  RxJava that are so useful as well like  take()map()merge(), ... ( https://github.com/Netflix/RxJava/wiki/Alphabetical-List-of-Observable-Operators)

Moreover in this example we have seen only an example of connecting to two services and retrieving information in parallel, and you may wonder why not to use  Future class. It is totally fine to use  Future and  Callbacks in this example but probably in your real life your logic won't be as easy as  zipping two services. Maybe you will have more services, maybe you will need to get information from one service and then for each result open a new connection. As you can see you may start with two  Future instances but finishing with a bunch of  Future.get() methods, timeouts, ... So it is in these situations where  RxJava really simplify the development of the application.

Furthermore we have seen how to use some of the new additions of  Java EE 7 like how to develop an asynchronous  Restful service with  Jax-Rs.

In this post we have learnt how to deal with the interconnection between services andhow to make them scalable and less resource consume. But we have not talked about what's happening when one of these services fails. What's happening with the callers? Do we have a way to manage it? Is there a way to not spent resources when one of the service is not available? We will touch this in next post talking about fault tolerance.

We keep learning,
Alex.

Find out more about how Scalyr built a proprietary database that does not use text indexing for their log management tool, allowing customers to search 1TB of data in under a second. 

Topics:

Published at DZone with permission of Alex Soto, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}