Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Spring Boot 2: Fluxes, From Elasticsearch to Controller

DZone's Guide to

Spring Boot 2: Fluxes, From Elasticsearch to Controller

Jump into reactive, event-driven programming by seeing how the new tools in Spring Boot 2 can help expose reactive APIs via RESTful interfaces.

· Java Zone ·
Free Resource

Bring content to any platform with the open-source BloomReach CMS. Try for free.

This is the final piece of the puzzle in our series on exposing reactive APIs via RESTful interfaces. Previously, we were seeding our Elasticsearch with some sample fake data. Now it's about time to expose indexing functionality through some API. Let's start with some simple adapter to our indexing engine:

import lombok.RequiredArgsConstructor;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

@Component
@RequiredArgsConstructor
class ElasticAdapter {

    private final RestHighLevelClient client;
    private final ObjectMapper objectMapper;

    Mono<IndexResponse> index(Person doc) {
        return indexDoc(doc);
    }

    private void doIndex(Person doc, ActionListener<IndexResponse> listener) throws JsonProcessingException {
        return Mono.create(sink -> {
            try {
                doIndex(doc, listenerToSink(sink));
            } catch (JsonProcessingException e) {
                sink.error(e);
            }
        });
    }

    private void doIndex(Person doc, ActionListener<IndexResponse> listener) throws JsonProcessingException {
        final IndexRequest indexRequest = new IndexRequest("people", "person", doc.getUsername());
        final String json = objectMapper.writeValueAsString(doc);
        indexRequest.source(json, XContentType.JSON);
        client.indexAsync(indexRequest, listener);
    }

    private <T> ActionListener<T> listenerToSink(MonoSink<T> sink) {
        return new ActionListener<T>() {
            @Override
            public void onResponse(T response) {
                sink.success(response);
            }

            @Override
            public void onFailure(Exception e) {
                sink.error(e);
            }
        };
    }

}


The index() method takes a strongly typed Person object and sends it over to Elasticsearch. First, the doIndex() method makes the actual call to Elasticsearch, marshalling Person to JSON. Having Elastic's result of the type ActionListener<IndexResponse>, we convert it to a Mono<IndexResponse>. This is done via the listenerToSink() helper method. The sequence of compose() methods are an elegant way to apply a series of metrics:

return indexDoc(doc)
        .compose(this::countSuccFail)
        .compose(this::countConcurrent)
        .compose(this::measureTime)
        .doOnError(e -> log.error("Unable to index {}", doc, e));


These methods are defined as follows:

private final Timer indexTimer = Metrics.timer("es.timer");
private final LongAdder concurrent = Metrics.gauge("es.concurrent", new LongAdder());
private final Counter successes = Metrics.counter("es.index", "result", "success");
private final Counter failures = Metrics.counter("es.index", "result", "failure");

private Mono<IndexResponse> countSuccFail(Mono<IndexResponse> mono) {
    return mono
            .doOnError(e -> failures.increment())
            .doOnSuccess(response -> successes.increment());
}

private Mono<IndexResponse> countConcurrent(Mono<IndexResponse> mono) {
    return mono
            .doOnSubscribe(s -> concurrent.increment())
            .doOnTerminate(concurrent::decrement);
}

private Mono<IndexResponse> measureTime(Mono<IndexResponse> mono) {
    return Mono
            .fromCallable(System::currentTimeMillis)
            .flatMap(time ->
                    mono.doOnSuccess(response ->
                            indexTimer.record(System.currentTimeMillis() - time, TimeUnit.MILLISECONDS))
            );
}


We could technically apply these metrics without the compose() operator like this:

measureTime(
        countConcurrent(
                countSuccFail(
                        indexDoc(doc)
                )
        )
)


But having a flat sequence of Mono<T> -> Mono<T> transformers seems much easier to read. Anyway, this was the write side, so let's implement the read side.

Mono<Person> findByUserName(String userName) {
    return Mono
            .<GetResponse>create(sink ->
                    client.getAsync(new GetRequest("people", "person", userName), listenerToSink(sink))
            )
            .filter(GetResponse::isExists)
            .map(GetResponse::getSource)
            .map(map -> objectMapper.convertValue(map, Person.class));
}


The procedure is pretty much the same:

  • Make an Elasticsearch request
  • Adapt it to Mono<GetResponse>
  • Verify the result and unmarshall it from Map to Person object

Interestingly, Jackson's ObjectMapper can also convert from Maps, not only from JSON strings. Having this layer, we can use it directly in our brand new controller:

import lombok.RequiredArgsConstructor;
import org.elasticsearch.action.index.IndexResponse;
import org.springframework.http.*;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;

import javax.validation.Valid;
import java.util.Map;

@RequiredArgsConstructor
@RestController
@RequestMapping("/person")
class PersonController {

    private static final Mono<ResponseEntity<Person>> NOT_FOUND = 
            Mono.just(ResponseEntity.notFound().build());

    private final ElasticAdapter elasticAdapter;

    @GetMapping("/{userName}")
    Mono<ResponseEntity<Person>> get(@PathVariable("userName") String userName) {
        return elasticAdapter
                .findByUserName(userName)
                .map(ResponseEntity::ok)
                .switchIfEmpty(NOT_FOUND);
    }

    @PutMapping
    Mono<ResponseEntity<Map<String, Object>>> put(@Valid @RequestBody Person person) {
        return elasticAdapter
                .index(person)
                .map(this::toMap)
                .map(m -> ResponseEntity.status(HttpStatus.CREATED).body(m));
    }

    private ImmutableMap<String, Object> toMap(IndexResponse response) {
        return ImmutableMap
                .<String, Object>builder()
                .put("id", response.getId())
                .put("index", response.getIndex())
                .put("type", response.getType())
                .put("version", response.getVersion())
                .put("result", response.getResult().getLowercase())
                .put("seqNo", response.getSeqNo())
                .put("primaryTerm", response.getPrimaryTerm())
                .build();
    }

}


The get() method tries to find a document in Elasticsearch by "userName". Newcomers to RxJava or Reactor are very eager to call subscribe() or block*(). Interestingly, none of these are needed in Spring WebFlux. You create a bunch of Monos or Fluxes, pass them through a series of transformations and return from your controller. It just works.

The put() method is equally simple. For debugging purposes, I convert IndexResponse to JSON in the toMap() method, but this isn't necessary. As you can see, building reactive applications in Spring WebFlux is quite simple. We no longer need any adapting layers or blocking code. Everything is fully asynchronous and event-driven. Moreover, in this setup (see the source code), there are no servlets or Jetty/Tomcat on the CLASSPATH!

Spring has built-in reactive support for some databases like MongoDB. In these blog posts, I gave you an overview how to integrate Reactor with Spring and other databases that provide non-blocking APIs. You can easily adjust code samples to use it with other sources and data stores.

This is part of a longer series:

BloomReach CMS: the API-first CMS of the future. Open-source & enterprise-grade. - As a Java developer, you will feel at home using Maven builds and your favorite IDE (e.g. Eclipse or IntelliJ) and continuous integration server (e.g. Jenkins). Manage your Java objects using Spring Framework, write your templates in JSP or Freemarker. Try for free.

Topics:
java ,spring boot 2 ,flux ,elasticsearch ,reactive programming ,tutorial

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}