{{announcement.body}}
{{announcement.title}}

Reactive Elasticsearch With Quarkus

DZone 's Guide to

Reactive Elasticsearch With Quarkus

In this article, we discuss how to connect Quarkus with Elasticsearch with Project Reactor to begin binding Quarkus to Elasticsearch's Java REST Client.

· Big Data Zone ·
Free Resource

I’ve implemented one service with Quarkus as a main framework and Elasticsearch as a data store. During the implementation, I came up with the idea to write an article on how to bind Quarkus in a reactive manner with Elasticsearch's Java High Level REST Client.

I started to make notes about the article and separated the common Elasticsearch related code in the common library (otaibe-commons-quarkus-elasticsearch module) stored in Github. Then, it took me a few hours to assemble a simple example project (also in Github) in a way as in the Quarkus Guides page. For the moment, an Elasticsearch guide is missing there.

Let's continue with a more detailed explanation of how to connect Quarkus with Elasticsearch.

Creating a Quarkus project

Shell




xxxxxxxxxx
1


1
mvn io.quarkus:quarkus-maven-plugin:1.0.1.Final:create \
2
    -DprojectGroupId=org.otaibe.quarkus.elasticsearch.example \
3
    -DprojectArtifactId=otaibe-quarkus-elasticsearch-example \
4
    -DclassName="org.otaibe.quarkus.elasticsearch.example.web.controller.FruitResource" \
5
    -Dpath="/fruits" \
6
    -Dextensions="resteasy-jackson,elasticsearch-rest-client"



You amy also like: Build a Java REST API With Quarkus.

Maven Settings

As you can see an elasticsearch-rest-client is present in Quarkus; however, this is an Elasticsearch Java Low Level REST Client. If we want to use Elasticsearch Java High Level REST Client, we simply have to add it as a dependency in the pom.xml file:

XML




xxxxxxxxxx
1


1
<dependency>
2
    <groupId>org.elasticsearch.client</groupId>
3
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
4
    <version>7.4.0</version>
5
</dependency>



Please, make sure that the version of Elasticsearch Java Low Level REST Client matches the Elasticsearch Java High Level REST Client.

Since we are using the Elasticsearch in a reactive way,  I prefer to use a Project Reactor. We have to add the BOM in the dependency management section:

XML
 




xxxxxxxxxx
1


 
1
<dependency>
2
    <groupId>io.projectreactor</groupId>
3
    <artifactId>reactor-bom</artifactId>
4
    <version>Dysprosium-SR2</version>
5
    <type>pom</type>
6
    <scope>import</scope>
7
</dependency>



We also have to add reactor-core as a dependency:

XML
 




xxxxxxxxxx
1


1
<dependency>
2
    <groupId>io.projectreactor</groupId>
3
    <artifactId>reactor-core</artifactId>
4
</dependency>



I’ve separated the common code in a library, so we should add this library to our example project. For this purpose, we will use Jitpack. It is an awesome service. You just have to point the right way to your Github project, and it will build an artifact for it. Here is the way how I am using it:

XML




xxxxxxxxxx
1
15


1
<dependency>
2
    <groupId>com.github.tpenakov.otaibe-commons-quarkus</groupId>
3
    <artifactId>otaibe-commons-quarkus-core</artifactId>
4
    <version>elasticsearch-example.02</version>
5
</dependency>
6
<dependency>
7
    <groupId>com.github.tpenakov.otaibe-commons-quarkus</groupId>
8
    <artifactId>otaibe-commons-quarkus-elasticsearch</artifactId>
9
    <version>elasticsearch-example.02</version>
10
</dependency>
11
<dependency>
12
    <groupId>com.github.tpenakov.otaibe-commons-quarkus</groupId>
13
    <artifactId>otaibe-commons-quarkus-rest</artifactId>
14
    <version>elasticsearch-example.02</version>
15
</dependency>



Start Elasticsearch Through Docker

Also, we should have Elastisearch started. The easiest way to do this is to run it through Docker:

Shell




xxxxxxxxxx
1


1
docker run -it --rm=true --name elasticsearch_quarkus_test \
2
    -p 11027:9200 -p 11028:9300 \
3
    -e "discovery.type=single-node" \
4
    docker.elastic.co/elasticsearch/elasticsearch:7.4.0



Connecting to Elasticsearch

Let's start with connecting our service to Elasticsearch — the implementation in the example project is simple — so it will listen to the Quarkus startup and shutdown events and init or terminate the connections:

Java
 




xxxxxxxxxx
1
30


1
package org.otaibe.quarkus.elasticsearch.example.service;
2
 
          
3
import io.quarkus.runtime.ShutdownEvent;
4
import io.quarkus.runtime.StartupEvent;
5
import lombok.Getter;
6
import lombok.Setter;
7
import lombok.extern.slf4j.Slf4j;
8
import org.otaibe.commons.quarkus.elasticsearch.client.service.AbstractElasticsearchService;
9
 
          
10
import javax.enterprise.context.ApplicationScoped;
11
import javax.enterprise.event.Observes;
12
 
          
13
@ApplicationScoped
14
@Getter
15
@Setter
16
@Slf4j
17
public class ElasticsearchService extends AbstractElasticsearchService {
18
 
          
19
    public void init(@Observes StartupEvent event) {
20
        log.info("init started");
21
        super.init();
22
        log.info("init completed");
23
    }
24
 
          
25
    public void shutdown(@Observes ShutdownEvent event) {
26
        log.info("shutdown started");
27
        super.shutdown();
28
        log.info("shutdown completed");
29
    }
30
}



The actual job of connecting to the Elasticsearch is done in the AbstractElasticsearchService:

Java
 




xxxxxxxxxx
1
31


 
1
public abstract class AbstractElasticsearchService {
2
    @ConfigProperty(name = "service.elastic-search.hosts")
3
    String[] hosts;
4
    @ConfigProperty(name = "service.elastic-search.num-threads", defaultValue = "10")
5
    Optional<Integer> numThreads;
6
 
          
7
    private RestHighLevelClient restClient;
8
    private Sniffer sniffer;
9
 
          
10
    @PostConstruct
11
    public void init() {
12
        log.info("init started");
13
        List<HttpHost> httpHosts = Arrays.stream(hosts)
14
                .map(s -> StringUtils.split(s, ':'))
15
                .map(strings -> new HttpHost(strings[0], Integer.valueOf(strings[1])))
16
                .collect(Collectors.toList());
17
        RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));
18
        getNumThreads().ifPresent(integer ->
19
                builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultIOReactorConfig(
20
                        IOReactorConfig
21
                                .custom()
22
                                .setIoThreadCount(integer)
23
                                .build())
24
                ));
25
 
          
26
        restClient = new RestHighLevelClient(builder);
27
        sniffer = Sniffer.builder(getRestClient().getLowLevelClient()).build();
28
        log.info("init completed");
29
    }
30
 
          
31
}



As you can see, the connection here is done in the way suggested in the Elasticsearch documentation. My implementation, it depends on two config properties:

Properties files




xxxxxxxxxx
1


 
1
service.elastic-search.hosts=localhost:11027



This is the Elasticsearch connection string after starting it from Docker.

The second optional property is:

Properties files




xxxxxxxxxx
1


1
service.elastic-search.num-threads



This is the number of threads needed for the Client.

Creating POJO

Now, let’s create our domain object (Fruit):

Java




xxxxxxxxxx
1
29


1
package org.otaibe.quarkus.elasticsearch.example.domain;
2
 
          
3
import com.fasterxml.jackson.annotation.JsonProperty;
4
import lombok.AllArgsConstructor;
5
import lombok.Data;
6
import lombok.NoArgsConstructor;
7
 
          
8
@Data
9
@NoArgsConstructor
10
@AllArgsConstructor(staticName = "of")
11
public class Fruit {
12
 
          
13
    public static final String ID = "id";
14
    public static final String EXT_REF_ID = "ext_ref_id";
15
    public static final String NAME = "name";
16
    public static final String DESCRIPTION = "description";
17
    public static final String VERSION = "version";
18
 
          
19
    @JsonProperty(ID)
20
    public String id;
21
    @JsonProperty(EXT_REF_ID)
22
    public String extRefId;
23
    @JsonProperty(NAME)
24
    public String name;
25
    @JsonProperty(DESCRIPTION)
26
    public String description;
27
    @JsonProperty(VERSION)
28
    public Long version;
29
}



Creating and Implementing DAO

Creating the Index

Let's create FruitDaoImpl. It is a high-level class built to fill in the AbstractElasticsearchReactiveDaoImplementation and implement the required business logic. The other important part here is to create an index for the Fruit class:

Java
 




xxxxxxxxxx
1
15


 
1
@Override
2
protected Mono<Boolean> createIndex() {
3
    CreateIndexRequest request = new CreateIndexRequest(getTableName());
4
    Map<String, Object> mapping = new HashMap();
5
    Map<String, Object> propsMapping = new HashMap<>();
6
    propsMapping.put(Fruit.ID, getKeywordTextAnalizer());
7
    propsMapping.put(Fruit.EXT_REF_ID, getKeywordTextAnalizer());
8
    propsMapping.put(Fruit.NAME, getTextAnalizer(ENGLISH));
9
    propsMapping.put(Fruit.DESCRIPTION, getTextAnalizer(ENGLISH));
10
    propsMapping.put(Fruit.VERSION, getLongFieldType());
11
    mapping.put(PROPERTIES, propsMapping);
12
    request.mapping(mapping);
13
 
          
14
    return createIndex(request);
15
}



The real create index call to the Elasticsearch is implemented in the parent class (AbstractElasticsearchReactiveDaoImplementation):

Java




xxxxxxxxxx
1
17


 
1
protected Mono<Boolean> createIndex(CreateIndexRequest request) {
2
    return Flux.<Boolean>create(fluxSink -> getRestClient().indices().createAsync(request, RequestOptions.DEFAULT, new ActionListener<CreateIndexResponse>() {
3
        @Override
4
        public void onResponse(CreateIndexResponse createIndexResponse) {
5
            log.info("CreateIndexResponse: {}", createIndexResponse);
6
            fluxSink.next(createIndexResponse.isAcknowledged());
7
            fluxSink.complete();
8
        }
9
 
          
10
        @Override
11
        public void onFailure(Exception e) {
12
            log.error("unable to create index", e);
13
            fluxSink.error(new RuntimeException(e));
14
        }
15
    }))
16
            .next();
17
}



Playing With the DAO

Most of the CRUD operations are implemented in the AbstractElasticsearchReactiveDaoImplementation.

It has  save,  update,  findById, and  deleteById  public methods. It also has findByExactMatch and findByMatch protected methods. The FindBy* methods are very helpful in the descendant classes when the business logic needs tobe filled in.

The business find methods are implemented in the FruitDaoImpl class:

Java




xxxxxxxxxx
1
18


 
1
public Flux<Fruit> findByExternalRefId(String value) {
2
    return findByMatch(Fruit.EXT_REF_ID, value);
3
}
4
 
          
5
public Flux<Fruit> findByName(String value) {
6
    return findByMatch(Fruit.NAME, value);
7
}
8
 
          
9
public Flux<Fruit> findByDescription(String value) {
10
    return findByMatch(Fruit.NAME, value);
11
}
12
 
          
13
public Flux<Fruit> findByNameOrDescription(String value) {
14
    Map<String, Object> query = new HashMap<>();
15
    query.put(Fruit.NAME, value);
16
    query.put(Fruit.DESCRIPTION, value);
17
    return findByMatch(query);
18
}



Encapsulating DAO in the Service Class

FruitDaoImpl is encapsulated in the FruitService:

Java




xxxxxxxxxx
1
46


 
1
@ApplicationScoped
2
@Getter
3
@Setter
4
@Slf4j
5
public class FruitService {
6
 
          
7
    @Inject
8
    FruitDaoImpl dao;
9
 
          
10
    public Mono<Fruit> save(Fruit entity) {
11
        return getDao().save(entity);
12
    }
13
 
          
14
    public Mono<Fruit> findById(Fruit entity) {
15
        return getDao().findById(entity);
16
    }
17
 
          
18
    public Mono<Fruit> findById(String id) {
19
        return Mono.just(Fruit.of(id, null, null, null, null))
20
                .flatMap(entity -> findById(entity))
21
                ;
22
    }
23
 
          
24
    public Flux<Fruit> findByExternalRefId(String value) {
25
        return getDao().findByExternalRefId(value);
26
    }
27
 
          
28
    public Flux<Fruit> findByName(String value) {
29
        return getDao().findByName(value);
30
    }
31
 
          
32
    public Flux<Fruit> findByDescription(String value) {
33
        return getDao().findByDescription(value);
34
    }
35
 
          
36
    public Flux<Fruit> findByNameOrDescription(String value) {
37
        return getDao().findByNameOrDescription(value);
38
    }
39
 
          
40
    public Mono<Boolean> delete(Fruit entity) {
41
        return Mono.just(entity.getId())
42
                .filter(s -> StringUtils.isNotBlank(s))
43
                .flatMap(s -> getDao().deleteById(entity))
44
                .defaultIfEmpty(false);
45
    }
46
}



Testing the FruitService

The FruitServiceTests is written in order to test basic functionality. It is also used to ensure that the Fruit class fields are properly indexed and the full-text search is working as expected:

Java




xxxxxxxxxx
1
26


 
1
@Test
2
public void manageFruitTest() {
3
    Fruit apple = getTestUtils().createApple();
4
 
          
5
    Fruit apple1 = getFruitService().save(apple).block();
6
    Assertions.assertNotNull(apple1.getId());
7
    Assertions.assertTrue(apple1.getVersion() > 0);
8
    log.info("saved result: {}", getJsonUtils().toStringLazy(apple1));
9
 
          
10
    List<Fruit> fruitList = getFruitService().findByExternalRefId(TestUtils.EXT_REF_ID).collectList().block();
11
    Assertions.assertTrue(fruitList.size() > 0);
12
 
          
13
    List<Fruit> fruitList1 = getFruitService().findByNameOrDescription("bulgaria").collectList().block();
14
    Assertions.assertTrue(fruitList1.size() > 0);
15
 
          
16
    //Ensure that the full text search is working - it is 'Apples' in description
17
    List<Fruit> fruitList2 = getFruitService().findByDescription("apple").collectList().block();
18
    Assertions.assertTrue(fruitList2.size() > 0);
19
 
          
20
    //Ensure that the full text search is working - it is 'Apple' in name
21
    List<Fruit> fruitList3 = getFruitService().findByName("apples").collectList().block();
22
    Assertions.assertTrue(fruitList3.size() > 0);
23
 
          
24
    Boolean deleteAppleResult = getFruitService().getDao().deleteById(apple1).block();
25
    Assertions.assertTrue(deleteAppleResult);
26
}



Adding REST Endpoints

Because this is an example project, full CRUD functionality is not added as REST endpoints. Only the save and findById are added as REST endpoints. They are added in FruitResource. The methods there return CompletionStage<Response>, which ensures that there will be no blocked threads in our application.

Testing REST endpoints

FruitResourceTest is added in order to test the RESTendpoints:

Java




xxxxxxxxxx
1
97


 
1
package org.otaibe.quarkus.elasticsearch.example.web.controller;
2
 
          
3
import io.quarkus.test.junit.QuarkusTest;
4
import lombok.AccessLevel;
5
import lombok.Getter;
6
import lombok.extern.slf4j.Slf4j;
7
import org.apache.commons.lang3.StringUtils;
8
import org.eclipse.microprofile.config.inject.ConfigProperty;
9
import org.junit.jupiter.api.Assertions;
10
import org.junit.jupiter.api.Test;
11
import org.otaibe.commons.quarkus.core.utils.JsonUtils;
12
import org.otaibe.quarkus.elasticsearch.example.domain.Fruit;
13
import org.otaibe.quarkus.elasticsearch.example.service.FruitService;
14
import org.otaibe.quarkus.elasticsearch.example.utils.TestUtils;
15
 
          
16
import javax.inject.Inject;
17
import javax.ws.rs.core.HttpHeaders;
18
import javax.ws.rs.core.MediaType;
19
import javax.ws.rs.core.Response;
20
import javax.ws.rs.core.UriBuilder;
21
import java.net.URI;
22
import java.util.Optional;
23
 
          
24
import static io.restassured.RestAssured.given;
25
 
          
26
@QuarkusTest
27
@Getter(value = AccessLevel.PROTECTED)
28
@Slf4j
29
public class FruitResourceTest {
30
 
          
31
    @ConfigProperty(name = "service.http.host")
32
    Optional<URI> host;
33
 
          
34
    @Inject
35
    TestUtils testUtils;
36
    @Inject
37
    JsonUtils jsonUtils;
38
    @Inject
39
    FruitService service;
40
 
          
41
    @Test
42
    public void restEndpointsTest() {
43
        log.info("restEndpointsTest start");
44
        Fruit apple = getTestUtils().createApple();
45
 
          
46
        Fruit savedApple = given()
47
                .when()
48
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
49
                .body(apple)
50
                .post(getUri(FruitResource.ROOT_PATH))
51
                .then()
52
                .statusCode(200)
53
                .extract()
54
                .as(Fruit.class);
55
        String id = savedApple.getId();
56
        Assertions.assertTrue(StringUtils.isNotBlank(id));
57
 
          
58
        URI findByIdPath = UriBuilder.fromPath(FruitResource.ROOT_PATH)
59
                .path(id)
60
                .build();
61
 
          
62
        Fruit foundApple = given()
63
                .when().get(getUri(findByIdPath.getPath()).getPath())
64
                .then()
65
                .statusCode(200)
66
                .extract()
67
                .as(Fruit.class);
68
 
          
69
        Assertions.assertEquals(savedApple, foundApple);
70
 
          
71
        Boolean deleteResult = getService().delete(foundApple).block();
72
        Assertions.assertTrue(deleteResult);
73
 
          
74
        given()
75
                .when().get(findByIdPath.getPath())
76
                .then()
77
                .statusCode(Response.Status.NOT_FOUND.getStatusCode())
78
                ;
79
 
          
80
        log.info("restEndpointsTest end");
81
    }
82
 
          
83
    private URI getUri(String path) {
84
        return getUriBuilder(path)
85
                .build();
86
    }
87
 
          
88
    private UriBuilder getUriBuilder(String path) {
89
        return getHost()
90
                .map(uri -> UriBuilder.fromUri(uri))
91
                .map(uriBuilder -> uriBuilder.path(path))
92
                .orElse(UriBuilder
93
                        .fromPath(path)
94
                );
95
    }
96
 
          
97
}



Building a Native Executable

Before building a native executable, we have to register our Fruit domain object. The reason for this is that our FruitResource returns CompletionStage<Response>, and because of that, the actual return type is unknown for the application, so we have to register it explicitly for reflection. There are at least two methods to do that in Quarkus:

  1. Via @RegisterForReflection annotation.

  2. Via reflection-config.json.

I personally prefer the second method because the classes that you want to register might be in a third-party library, and it would be impossible to put the @RegisterForReflection there.

Now, the reflection-config.json looks like this:

JSON
 




xxxxxxxxxx
1
11


1
[
2
  {
3
    "name" : "org.otaibe.quarkus.elasticsearch.example.domain.Fruit",
4
    "allDeclaredConstructors" : true,
5
    "allPublicConstructors" : true,
6
    "allDeclaredMethods" : true,
7
    "allPublicMethods" : true,
8
    "allDeclaredFields" : true,
9
    "allPublicFields" : true
10
  }
11
]



The next step is to make Quarkus aware of the reflection-config.json file. You should add this line in to the native profile in your pom.xml file:

XML
 




xxxxxxxxxx
1


1
<quarkus.native.additional-build-args>-H:ReflectionConfigurationFiles=${project.basedir}/src/main/resources/reflection-config.json</quarkus.native.additional-build-args>



You can now build your native application:

Shell
 




xxxxxxxxxx
1


1
mvn clean package -Pnative



And start it:

Shell
 




xxxxxxxxxx
1


1
./target/otaibe-quarkus-elasticsearch-example-1.0-SNAPSHOT-runner



The service will be available on http://localhost:11025 because that is the port is explicitly specified in application.properties.

Properties files




xxxxxxxxxx
1


1
quarkus.http.port=11025



Testing the Native Build

The FruitResourceTest expects the following optional property:

Properties files
 




xxxxxxxxxx
1


1
service.http.host



If it is present, the test requests will hit the specified host. If you start the native executable:

Shell
 




xxxxxxxxxx
1


1
./target/otaibe-quarkus-elasticsearch-example-1.0-SNAPSHOT-runner



and execute the tests/build with the following code:

Shell
 




xxxxxxxxxx
1


1
mvn package -D%test.service.http.host=http://localhost:11025



the tests will run against the native build.

Conclusion

I was pleasantly surprised that Elasticsearch works out of the box with Quarkus and can be compiled to native code, which combined with reactive implementation via Project Reactor, will make the footprint of the application almost insignificant.

 

Further Reading

Topics:
java ,quarkus ,elasticseach ,database ,reactor core ,big data ,tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}