Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Spring, Reactor, and ElasticSearch: From Callbacks to Reactive Streams

DZone's Guide to

Spring, Reactor, and ElasticSearch: From Callbacks to Reactive Streams

Let's bring the power of Spring and Project Reactor to ElasticSearch (or vice versa) with this guide to transitioning your callbacks to something better.

· Java Zone ·
Free Resource

Verify, standardize, and correct the Big 4 + more– name, email, phone and global addresses – try our Data Quality APIs now at Melissa Developer Portal!

Spring 5 (and Boot 2, when it arrives in a couple of weeks) is a revolution. Not the "annotations over XML" or "Java classes over annotations" type of revolution. It's truly a revolutionary framework that enables writing a brand new class of applications.

In recent years, I became a little bit intimidated by this framework. Spring Cloud is a framework that simplifies the usage of Spring Boot, which is a framework that simplifies the usage of Spring, which is a framework that simplifies enterprise development.

start.spring.io (also known as "start... dot spring... dot I... O") lists 120 different modules (!) that you can add to your service. Spring, these days, has become an enormous umbrella project, and I can imagine why some people (still!) prefer Java EE (or whatever it's called these days).

But Spring 5 brings the reactive revolution. It's no longer only a wrapper around a blocking servlet API and various web frameworks. Spring 5, on top of Project Reactor, allows writing high-performance, extremely fast, and scalable servers, avoiding the servlet stack altogether.

Damn, there is no Jetty or even servlet API on the CLASSPATH! At the heart of Spring 5 WebFlux, we will find Netty, a low-level framework for writing asynchronous clients and servers. Finally, Spring becomes a first-class citizen in the family of reactive frameworks.

Java developers can implement fast services without leaving their comfort zone and going for https://doc.akka.io/docs/akka-http/current/ or https://www.playframework.com/. Spring 5 is a fully reactive, modern tool for building highly scalable and resilient applications. Nevertheless, the underlying principles like controllers, beans, and dependency injection are all the same. Moreover, the upgrade path is smooth, and we can gradually add features rather than learning a brand new, alien framework.

Enough talking, let's write some code.

In this article, we will write a simple headless application that indexes documents in ElasticSearch in large volume. We will aim for thousands of concurrent connections with just a handful of threads, even when the server becomes slow.

However, unlike e.g. Spring Data MongoDB, Spring Data ElasticSearch does not natively support non-blocking repositories. Well, the latter doesn't even seem to be maintained anymore, with the current version being 3 years old. Many articles target Spring 5 + MongoDB with its repositories returning non-blocking streams (Flux or Flowable from RxJava). This one will be a little bit more advanced.

The ElasticSearch 6 Java API uses a RESTful interface and is implemented using a non-blocking HTTP client. Unfortunately, it uses callbacks rather than something sane like CompletableFuture. So let's build the client adapter ourselves.

ElasticSearch Client Using Fluxes and Monos

Source code for this article is available at github.com/nurkiewicz/elastic-flux, on the reactive-elastic-search branch.

We would like to build an ElasticSearch Java client that supports Project Reactor by returning Flux or Mono. Of course, we get the greatest benefit if the underlying stream is fully asynchronous and does not consume threads. Luckily, the Java API is just like that. First, let's set up ElasticSearch's client as a Spring bean:

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
 
@Bean
RestHighLevelClient restHighLevelClient() {
    return new RestHighLevelClient(
            RestClient
                    .builder(new HttpHost("localhost", 9200))
                    .setRequestConfigCallback(config -> config
                            .setConnectTimeout(5_000)
                            .setConnectionRequestTimeout(5_000)
                            .setSocketTimeout(5_000)
                    )
                    .setMaxRetryTimeoutMillis(5_000));
}


In real life, we would obviously parametrize most of this stuff. We will be indexing simple JSON documents. For the time being, their content is not important:

@Value
class Doc {
    private final String username;
    private final String json;
}


The code we will write wraps RestHighLevelClient and makes it even more high level by returning Mono<IndexResponse>. Mono is pretty much like CompletableFuture but with two exceptions:

  • It's lazy — as long as you don't subscribe, no computation is started
  • Unlike CompletableFuture, Mono can complete normally without emitting any value

The second difference was always a bit misleading to me. In RxJava 2.x, there are two distinct types: Single (always completes with value or error) and Maybe (like Mono). Too bad Reactor doesn't make this distinction. Nevermind. How does the adapter layer look like? The plain Elastic API looks as follows:

client.indexAsync(indexRequest, new ActionListener<IndexResponse>() {
    @Override
    public void onResponse(IndexResponse indexResponse) {
        //got response
    }
 
    @Override
    public void onFailure(Exception e) {
        //got error
    }
});


You can see where this is going: callback hell. Rather than exposing a custom ActionListener as an argument to this logic, let's wrap it in Mono:

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
 
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
 
private Mono<IndexResponse> indexDoc(Doc doc) {
    return Mono.create(sink -> {
        IndexRequest indexRequest = new IndexRequest("people", "person", doc.getUsername());
        indexRequest.source(doc.getJson(), XContentType.JSON);
        client.indexAsync(indexRequest, new ActionListener<IndexResponse>() {
            @Override
            public void onResponse(IndexResponse indexResponse) {
                sink.success(indexResponse);
            }
 
            @Override
            public void onFailure(Exception e) {
                sink.error(e);
            }
        });
    });
}


We must create IndexRequest wrapping a JSON document and send it via RESTful API. But that's not the point. We are using the Mono.create() method. It has some drawbacks, but more on that later. Mono is lazy, so barely calling indexDoc() doesn't suffice. No HTTP request was made to ElasticSearch.

However, every time someone subscribes to this one-element source, the logic inside create() will be executed. The crucial lines are sink.success() and sink.error(). They propagate results from ElasticSearch (coming from the background, asynchronous thread) into the stream. How do we use such a method in practice? It's very simple!

Doc doc = //...
indexDoc(doc)
        .subscribe(
                indexResponse -> log.info("Got response")
        );


Of course, the true power of reactive stream processing comes from composing multiple streams. But we made our first steps: transforming a callback-based asynchronous API into a generic stream. If you are (un)lucky enough to use MongoDB, it has built-in support for reactive types like Mono or Flux right in the repositories. The same goes for Cassandra and Redis. In the next article, we will learn how to generate some fake data and index it concurrently.

Developers! Quickly and easily gain access to the tools and information you need! Explore, test and combine our data quality APIs at Melissa Developer Portal – home to tools that save time and boost revenue. 

Topics:
java ,spring ,reactor ,elasticsearch ,tutorial ,callbacks

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}