Reactive Elasticsearch With Quarkus
Join the DZone community and get the full member experience.
Join For FreeI’ve implemented one service with Quarkus as a main framework and Elasticsearch as a data store. During the implementation, I came up with the idea to write an article on how to bind Quarkus in a reactive manner with Elasticsearch's Java High Level REST Client.
I started to make notes about the article and separated the common Elasticsearch related code in the common library (otaibe-commons-quarkus-elasticsearch module) stored in Github. Then, it took me a few hours to assemble a simple example project (also in Github) in a way as in the Quarkus Guides page. For the moment, an Elasticsearch guide is missing there.
Let's continue with a more detailed explanation of how to connect Quarkus with Elasticsearch.
Creating a Quarkus project
xxxxxxxxxx
mvn io.quarkus:quarkus-maven-plugin:1.0.1.Final:create \
-DprojectGroupId=org.otaibe.quarkus.elasticsearch.example \
-DprojectArtifactId=otaibe-quarkus-elasticsearch-example \
-DclassName="org.otaibe.quarkus.elasticsearch.example.web.controller.FruitResource" \
-Dpath="/fruits" \
-Dextensions="resteasy-jackson,elasticsearch-rest-client"
You amy also like: Build a Java REST API With Quarkus.
Maven Settings
As you can see an elasticsearch-rest-client is present in Quarkus; however, this is an Elasticsearch Java Low Level REST Client. If we want to use Elasticsearch Java High Level REST Client, we simply have to add it as a dependency in the pom.xml file:
xxxxxxxxxx
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.4.0</version>
</dependency>
Please, make sure that the version of Elasticsearch Java Low Level REST Client matches the Elasticsearch Java High Level REST Client.
Since we are using the Elasticsearch in a reactive way, I prefer to use a Project Reactor. We have to add the BOM in the dependency management section:
xxxxxxxxxx
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Dysprosium-SR2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
We also have to add reactor-core as a dependency:
xxxxxxxxxx
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
I’ve separated the common code in a library, so we should add this library to our example project. For this purpose, we will use Jitpack. It is an awesome service. You just have to point the right way to your Github project, and it will build an artifact for it. Here is the way how I am using it:
xxxxxxxxxx
<dependency>
<groupId>com.github.tpenakov.otaibe-commons-quarkus</groupId>
<artifactId>otaibe-commons-quarkus-core</artifactId>
<version>elasticsearch-example.02</version>
</dependency>
<dependency>
<groupId>com.github.tpenakov.otaibe-commons-quarkus</groupId>
<artifactId>otaibe-commons-quarkus-elasticsearch</artifactId>
<version>elasticsearch-example.02</version>
</dependency>
<dependency>
<groupId>com.github.tpenakov.otaibe-commons-quarkus</groupId>
<artifactId>otaibe-commons-quarkus-rest</artifactId>
<version>elasticsearch-example.02</version>
</dependency>
Start Elasticsearch Through Docker
Also, we should have Elastisearch started. The easiest way to do this is to run it through Docker:
xxxxxxxxxx
docker run -it --rm=true --name elasticsearch_quarkus_test \
-p 11027:9200 -p 11028:9300 \
-e "discovery.type=single-node" \
docker.elastic.co/elasticsearch/elasticsearch:7.4.0
Connecting to Elasticsearch
Let's start with connecting our service to Elasticsearch — the implementation in the example project is simple — so it will listen to the Quarkus startup and shutdown events and init or terminate the connections:
xxxxxxxxxx
package org.otaibe.quarkus.elasticsearch.example.service;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.otaibe.commons.quarkus.elasticsearch.client.service.AbstractElasticsearchService;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
public class ElasticsearchService extends AbstractElasticsearchService {
public void init( StartupEvent event) {
log.info("init started");
super.init();
log.info("init completed");
}
public void shutdown( ShutdownEvent event) {
log.info("shutdown started");
super.shutdown();
log.info("shutdown completed");
}
}
The actual job of connecting to the Elasticsearch is done in the AbstractElasticsearchService:
xxxxxxxxxx
public abstract class AbstractElasticsearchService {
name = "service.elastic-search.hosts") (
String[] hosts;
name = "service.elastic-search.num-threads", defaultValue = "10") (
Optional<Integer> numThreads;
private RestHighLevelClient restClient;
private Sniffer sniffer;
public void init() {
log.info("init started");
List<HttpHost> httpHosts = Arrays.stream(hosts)
.map(s -> StringUtils.split(s, ':'))
.map(strings -> new HttpHost(strings[0], Integer.valueOf(strings[1])))
.collect(Collectors.toList());
RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));
getNumThreads().ifPresent(integer ->
builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultIOReactorConfig(
IOReactorConfig
.custom()
.setIoThreadCount(integer)
.build())
));
restClient = new RestHighLevelClient(builder);
sniffer = Sniffer.builder(getRestClient().getLowLevelClient()).build();
log.info("init completed");
}
}
As you can see, the connection here is done in the way suggested in the Elasticsearch documentation. My implementation, it depends on two config properties:
xxxxxxxxxx
service.elastic-search.hosts=localhost:11027
This is the Elasticsearch connection string after starting it from Docker.
The second optional property is:
xxxxxxxxxx
service.elastic-search.num-threads
This is the number of threads needed for the Client.
Creating POJO
Now, let’s create our domain object (Fruit):
xxxxxxxxxx
package org.otaibe.quarkus.elasticsearch.example.domain;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
staticName = "of") (
public class Fruit {
public static final String ID = "id";
public static final String EXT_REF_ID = "ext_ref_id";
public static final String NAME = "name";
public static final String DESCRIPTION = "description";
public static final String VERSION = "version";
ID) (
public String id;
EXT_REF_ID) (
public String extRefId;
NAME) (
public String name;
DESCRIPTION) (
public String description;
VERSION) (
public Long version;
}
Creating and Implementing DAO
Creating the Index
Let's create FruitDaoImpl. It is a high-level class built to fill in the AbstractElasticsearchReactiveDaoImplementation and implement the required business logic. The other important part here is to create an index for the Fruit class:
xxxxxxxxxx
protected Mono<Boolean> createIndex() {
CreateIndexRequest request = new CreateIndexRequest(getTableName());
Map<String, Object> mapping = new HashMap();
Map<String, Object> propsMapping = new HashMap<>();
propsMapping.put(Fruit.ID, getKeywordTextAnalizer());
propsMapping.put(Fruit.EXT_REF_ID, getKeywordTextAnalizer());
propsMapping.put(Fruit.NAME, getTextAnalizer(ENGLISH));
propsMapping.put(Fruit.DESCRIPTION, getTextAnalizer(ENGLISH));
propsMapping.put(Fruit.VERSION, getLongFieldType());
mapping.put(PROPERTIES, propsMapping);
request.mapping(mapping);
return createIndex(request);
}
The real create index call to the Elasticsearch is implemented in the parent class (AbstractElasticsearchReactiveDaoImplementation):
xxxxxxxxxx
protected Mono<Boolean> createIndex(CreateIndexRequest request) {
return Flux.<Boolean>create(fluxSink -> getRestClient().indices().createAsync(request, RequestOptions.DEFAULT, new ActionListener<CreateIndexResponse>() {
public void onResponse(CreateIndexResponse createIndexResponse) {
log.info("CreateIndexResponse: {}", createIndexResponse);
fluxSink.next(createIndexResponse.isAcknowledged());
fluxSink.complete();
}
public void onFailure(Exception e) {
log.error("unable to create index", e);
fluxSink.error(new RuntimeException(e));
}
}))
.next();
}
Playing With the DAO
Most of the CRUD operations are implemented in the AbstractElasticsearchReactiveDaoImplementation.
It has save
, update
, findById
, and deleteById
public methods. It also has findByExactMatch
and findByMatch
protected methods. The FindBy*
methods are very helpful in the descendant classes when the business logic needs tobe filled in.
The business find methods are implemented in the FruitDaoImpl class:
xxxxxxxxxx
public Flux<Fruit> findByExternalRefId(String value) {
return findByMatch(Fruit.EXT_REF_ID, value);
}
public Flux<Fruit> findByName(String value) {
return findByMatch(Fruit.NAME, value);
}
public Flux<Fruit> findByDescription(String value) {
return findByMatch(Fruit.NAME, value);
}
public Flux<Fruit> findByNameOrDescription(String value) {
Map<String, Object> query = new HashMap<>();
query.put(Fruit.NAME, value);
query.put(Fruit.DESCRIPTION, value);
return findByMatch(query);
}
Encapsulating DAO in the Service Class
FruitDaoImpl is encapsulated in the FruitService:
xxxxxxxxxx
public class FruitService {
FruitDaoImpl dao;
public Mono<Fruit> save(Fruit entity) {
return getDao().save(entity);
}
public Mono<Fruit> findById(Fruit entity) {
return getDao().findById(entity);
}
public Mono<Fruit> findById(String id) {
return Mono.just(Fruit.of(id, null, null, null, null))
.flatMap(entity -> findById(entity))
;
}
public Flux<Fruit> findByExternalRefId(String value) {
return getDao().findByExternalRefId(value);
}
public Flux<Fruit> findByName(String value) {
return getDao().findByName(value);
}
public Flux<Fruit> findByDescription(String value) {
return getDao().findByDescription(value);
}
public Flux<Fruit> findByNameOrDescription(String value) {
return getDao().findByNameOrDescription(value);
}
public Mono<Boolean> delete(Fruit entity) {
return Mono.just(entity.getId())
.filter(s -> StringUtils.isNotBlank(s))
.flatMap(s -> getDao().deleteById(entity))
.defaultIfEmpty(false);
}
}
Testing the FruitService
The FruitServiceTests is written in order to test basic functionality. It is also used to ensure that the Fruit class fields are properly indexed and the full-text search is working as expected:
xxxxxxxxxx
public void manageFruitTest() {
Fruit apple = getTestUtils().createApple();
Fruit apple1 = getFruitService().save(apple).block();
Assertions.assertNotNull(apple1.getId());
Assertions.assertTrue(apple1.getVersion() > 0);
log.info("saved result: {}", getJsonUtils().toStringLazy(apple1));
List<Fruit> fruitList = getFruitService().findByExternalRefId(TestUtils.EXT_REF_ID).collectList().block();
Assertions.assertTrue(fruitList.size() > 0);
List<Fruit> fruitList1 = getFruitService().findByNameOrDescription("bulgaria").collectList().block();
Assertions.assertTrue(fruitList1.size() > 0);
//Ensure that the full text search is working - it is 'Apples' in description
List<Fruit> fruitList2 = getFruitService().findByDescription("apple").collectList().block();
Assertions.assertTrue(fruitList2.size() > 0);
//Ensure that the full text search is working - it is 'Apple' in name
List<Fruit> fruitList3 = getFruitService().findByName("apples").collectList().block();
Assertions.assertTrue(fruitList3.size() > 0);
Boolean deleteAppleResult = getFruitService().getDao().deleteById(apple1).block();
Assertions.assertTrue(deleteAppleResult);
}
Adding REST Endpoints
Because this is an example project, full CRUD functionality is not added as REST endpoints. Only the save
and findById
are added as REST endpoints. They are added in FruitResource. The methods there return CompletionStage<Response>
, which ensures that there will be no blocked threads in our application.
Testing REST endpoints
FruitResourceTest is added in order to test the RESTendpoints:
xxxxxxxxxx
package org.otaibe.quarkus.elasticsearch.example.web.controller;
import io.quarkus.test.junit.QuarkusTest;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.otaibe.commons.quarkus.core.utils.JsonUtils;
import org.otaibe.quarkus.elasticsearch.example.domain.Fruit;
import org.otaibe.quarkus.elasticsearch.example.service.FruitService;
import org.otaibe.quarkus.elasticsearch.example.utils.TestUtils;
import javax.inject.Inject;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import java.net.URI;
import java.util.Optional;
import static io.restassured.RestAssured.given;
value = AccessLevel.PROTECTED) (
public class FruitResourceTest {
name = "service.http.host") (
Optional<URI> host;
TestUtils testUtils;
JsonUtils jsonUtils;
FruitService service;
public void restEndpointsTest() {
log.info("restEndpointsTest start");
Fruit apple = getTestUtils().createApple();
Fruit savedApple = given()
.when()
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
.body(apple)
.post(getUri(FruitResource.ROOT_PATH))
.then()
.statusCode(200)
.extract()
.as(Fruit.class);
String id = savedApple.getId();
Assertions.assertTrue(StringUtils.isNotBlank(id));
URI findByIdPath = UriBuilder.fromPath(FruitResource.ROOT_PATH)
.path(id)
.build();
Fruit foundApple = given()
.when().get(getUri(findByIdPath.getPath()).getPath())
.then()
.statusCode(200)
.extract()
.as(Fruit.class);
Assertions.assertEquals(savedApple, foundApple);
Boolean deleteResult = getService().delete(foundApple).block();
Assertions.assertTrue(deleteResult);
given()
.when().get(findByIdPath.getPath())
.then()
.statusCode(Response.Status.NOT_FOUND.getStatusCode())
;
log.info("restEndpointsTest end");
}
private URI getUri(String path) {
return getUriBuilder(path)
.build();
}
private UriBuilder getUriBuilder(String path) {
return getHost()
.map(uri -> UriBuilder.fromUri(uri))
.map(uriBuilder -> uriBuilder.path(path))
.orElse(UriBuilder
.fromPath(path)
);
}
}
Building a Native Executable
Before building a native executable, we have to register our Fruit domain object. The reason for this is that our FruitResource returns CompletionStage<Response>
, and because of that, the actual return type is unknown for the application, so we have to register it explicitly for reflection. There are at least two methods to do that in Quarkus:
Via @RegisterForReflection annotation.
I personally prefer the second method because the classes that you want to register might be in a third-party library, and it would be impossible to put the @RegisterForReflection there.
Now, the reflection-config.json looks like this:
xxxxxxxxxx
[
{
"name" : "org.otaibe.quarkus.elasticsearch.example.domain.Fruit",
"allDeclaredConstructors" : true,
"allPublicConstructors" : true,
"allDeclaredMethods" : true,
"allPublicMethods" : true,
"allDeclaredFields" : true,
"allPublicFields" : true
}
]
The next step is to make Quarkus aware of the reflection-config.json file. You should add this line in to the native
profile in your pom.xml file:
xxxxxxxxxx
<quarkus.native.additional-build-args>-H:ReflectionConfigurationFiles=${project.basedir}/src/main/resources/reflection-config.json</quarkus.native.additional-build-args>
You can now build your native application:
xxxxxxxxxx
mvn clean package -Pnative
And start it:
xxxxxxxxxx
./target/otaibe-quarkus-elasticsearch-example-1.0-SNAPSHOT-runner
The service will be available on http://localhost:11025 because that is the port is explicitly specified in application.properties.
xxxxxxxxxx
quarkus.http.port=11025
Testing the Native Build
The FruitResourceTest expects the following optional property:
xxxxxxxxxx
service.http.host
If it is present, the test requests will hit the specified host. If you start the native executable:
xxxxxxxxxx
./target/otaibe-quarkus-elasticsearch-example-1.0-SNAPSHOT-runner
and execute the tests/build with the following code:
xxxxxxxxxx
mvn package -D%test.service.http.host=http://localhost:11025
the tests will run against the native build.
Conclusion
I was pleasantly surprised that Elasticsearch works out of the box with Quarkus and can be compiled to native code, which combined with reactive implementation via Project Reactor, will make the footprint of the application almost insignificant.
Further Reading
Opinions expressed by DZone contributors are their own.
Comments