Spring, Reactor, and ElasticSearch: Benchmarking With Fake Test Data

With your ElasticSearch-to-Reactor adapter set up, now it's time to benchmark your indexing code with some help from jFairy.

In the previous article, we created a simple adapter from ElasticSearch's API to Reactor's Mono, which looks like this:

import reactor.core.publisher.Mono;

private Mono<IndexResponse> indexDoc(Doc doc) {

Now we would like to run this method at a controlled concurrency level — millions of times. Basically, we want to see how our indexing code behaves under load by benchmarking it.

Fake Data With jFairy

First, we need some good looking test data. For that purpose, we'll use the handy jFairy library. The document we'll index is a simple POJO:

class Doc {
    private final String username;
    private final String json;

The generation logic is wrapped inside a Java class:

import io.codearte.jfairy.Fairy;
import io.codearte.jfairy.producer.person.Address;
import io.codearte.jfairy.producer.person.Person;
import org.apache.commons.lang3.RandomUtils;

class PersonGenerator {

    private final ObjectMapper objectMapper;
    private final Fairy fairy;

    private Doc generate() {
        Person person = fairy.person();
        final String username = person.getUsername() + RandomUtils.nextInt(1_000_000, 9_000_000);
        final ImmutableMap<String, Object> map = ImmutableMap.<String, Object>builder()
                .put("address", toMap(person.getAddress()))
                .put("firstName", person.getFirstName())
                .put("middleName", person.getMiddleName())
                .put("lastName", person.getLastName())
                .put("email", person.getEmail())
                .put("companyEmail", person.getCompanyEmail())
                .put("username", username)
                .put("password", person.getPassword())
                .put("sex", person.getSex())
                .put("telephoneNumber", person.getTelephoneNumber())
                .put("dateOfBirth", person.getDateOfBirth())
                .put("company", person.getCompany())
                .put("nationalIdentityCardNumber", person.getNationalIdentityCardNumber())
                .put("nationalIdentificationNumber", person.getNationalIdentificationNumber())
                .put("passportNumber", person.getPassportNumber())
        final String json = objectMapper.writeValueAsString(map);
        return new Doc(username, json);

    private ImmutableMap<String, Object> toMap(Address address) {
        return ImmutableMap.<String, Object>builder()
                .put("street", address.getStreet())
                .put("streetNumber", address.getStreetNumber())
                .put("apartmentNumber", address.getApartmentNumber())
                .put("postalCode", address.getPostalCode())
                .put("city", address.getCity())
                .put("lines", Arrays.asList(address.getAddressLine1(), address.getAddressLine2()))


Quite a bit of boring code that actually does something cool. Every time we run it, it generates random, but reasonable JSON like so:

        "street":"Ford Street",
        "city":"San Francisco",
            "32 Ford Street",
            "San Francisco 63913"
        "name":"Woods LLC",

Neat! Unfortunately, it's not documented whether jFairy is thread safe so just in case, I'm using ThreadLocal. OK, so we have one document, but we need millions! Using a for-loop is so old-fashioned.

import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

private final Scheduler scheduler = Schedulers.newParallel(PersonGenerator.class.getSimpleName());

Mono<Doc> generateOne() {
    return Mono

Flux<Doc> infinite() {
    return generateOne().repeat();

generateOne() wraps the blocking generate() method in a Mono<Doc>. Additionally, generate() is run on the parallel Scheduler.

Why? It turned out that jFairy wasn't fast enough on a single core (lots of random number generation, table lookups, etc.), so I had to parallelize data generation. That shouldn't normally be an issue, but when generating fake data is slower than your reactive application that touches an external server, that tells you something about the performance of Netty-based Spring WebFlux.

Calling ElasticSearch Concurrently

All right, having an infinite stream of good looking fake test data, we now want to index it in ElasticSearch.

void startIndexing() {
    index(1_000_000, 1_000);

private void index(int count, int maxConcurrency) {
            .flatMap(this::indexDocSwallowErrors, maxConcurrency)
            .subscribe(winSize -> log.debug("Got {} responses in last second", winSize));

private Mono<IndexResponse> indexDocSwallowErrors(Doc doc) {
    return indexDoc(doc)
            .doOnError(e -> log.error("Unable to index {}", doc, e))
            .onErrorResume(e -> Mono.empty());

When the application starts, it initiates indexing of 1 million documents. Notice how easy it is to tell Reactor (same for RxJava) that it should invoke up to one thousand concurrent requests to ElasticSearch. Once every second, we count how many responses we received:

Got 2925 responses in last second
Got 2415 responses in last second
Got 3336 responses in last second
Got 2199 responses in last second
Got 1861 responses in last second

Not bad! Especially when you consider that there are up to one thousand concurrent HTTP requests and our application started barely 30 threads peak! Alright, it's localhost <-> localhost, guilty! But how do we actually know all of that? Logging is fine, but it's the 21st century. We can do better! Monitoring will be the subject of next installment.

The source code is available github.com/nurkiewicz/elastic-flux in reactive-elastic-search branch.

