Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Elasticsearch Fault Tolerance: Reindexing Need and Strategies - Part II

DZone's Guide to

Elasticsearch Fault Tolerance: Reindexing Need and Strategies - Part II

After adding fault tolerance for ElasticSearch based on above design this year and never looked back. The reindexing process has been designed with scrupulous attention to detail. Let’s take a look at the steps one by one.

· Big Data Zone
Free Resource

Learn best practices according to DataOps. Download the free O'Reilly eBook on building a modern Big Data platform.

In the last part, I discussed the limitation of Elastic Search and scenarios in which reindexing is absolutely essential. As a part of fault tolerance strategy, I elaborated that fault could be corrupt index or schema change, and in this post I will be explaining the action to overcome fault i.e. reindexing strategy.

The general notion of ElasticSearch reindexing involves creating a new index and then copying existing data from either old index or primary data store (like relational or key values) to the new index. Whereas my end to end automated reindexing strategy is broader than the general notion. It includes the following steps:

  1. Configure read and write aliases for index
  2. Capture faults applicable to application
  3. Categorize faults
  4. Execute fault tolerance strategy
  5. Perform reindexing
  6. Delete old index

We added fault tolerance for ElasticSearch based on above design this year and never looked back. The reindexing process has been designed with scrupulous attention to detail. Let’s take a look at the steps one by one.

Step I – Configure Read and Write Aliases for Index

We started using ElasticSearch in 2014 when ES 1.1.2 based on Apache Lucene 4 was released. A study shows that first users of ElasticSearch (or any other technology) tend to make common mistakes, and like most of the others, we did not consider it a necessity to use write and read aliases for indexes. Alias is a symbolic link that points to one or more indices. In simple terms, index is the physical entity and alias is the logical one.

All ElasticSearch APIs which take in index name as parameter accept alias name. So, alias_name and index_name can be used interchangeably. Alias mechanism provides a lot of flexibility as it offers features such as grouping multiple indices, creating views on subsets of documents in an index and switch transparently in between one index or other. We are interested in the last feature, which is switching in between two indexes; it gives us the flexibility to reindex data in the background and this is the basis of automated reindexing strategy. Reindexing can be done without restarting ElasticSearch cluster or client application.

Alias can be added to index as follows:

//create index with name as old_index
curl -XPUT 'http://localhost:9200/old_index/' -d '{
    "settings" : {
        "index" : {
            "number_of_replicas" : 2
        }
    }
}'

//create aliases read_alias and write_alias pointing to old_index
curl -XPOST 'http://localhost:9200/_aliases' -d '
{
    "actions" : [
        { "add" : { "index" : "old_index", "alias" : "read_alias" } },
        { "add" : { "index" : "old_index", "alias" : "write_alias" } }
    ]
}'

//check aliases
curl http://localhost:9200/_aliases

Java API to create alias:

public IndicesAliasesResponse addAliastoIndex(String aliasName, String indexName) {
return esClientMediator.getESClient()
               .getClient()
                 .admin()
       .indices()
       .prepareAliases()
       .addAlias(indexName, aliasName)
       .execute()
       .actionGet();
}

Initially, read_alias and write_alias will be pointing to same physical index i.e. old_index.

Step II – Capture Faults Applicable to Application

This is the second step of the solution and as a part of this, we identify faults. Faults can be corrupt index, schema change, or field parsing failure. Here I am talking about faults, which require reindexing; otherwise, ElasticSearch faults also include nodes not available or shard failure whose fault tolerance action is different and not reindexing. ElasticSearch faults can be captured in following ways:

1. One simple way is to check the Elastic Search response and then carry out litmus test for fault on the basis of the response.

2. Otherwise, capture faults as a part of exception handling. For example, if there is a parsing issue due to schema mismatch then ElasticSearch throws the following exception. One can handle the exception and trigger fault tolerance action.

3. The best way is to use Elastic Search’s ActionListner feature.

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;

public class ESIndexActionListner implements ActionListener<IndexResponse> {
@Override
public void onResponse(IndexResponse response) {
//success
}

@Override
public void onFailure(Throwable e) {
//trigger action
}
}

Step III  Categorize Faults

The main principle of fault tolerance strategy is the categorization of faults. It’s utterly important to group faults together based on the action. Because in most of the cases, fault tolerance action is identical for a group of faults.

For example, in our case, it’s better to group following faults together.

  1. Schema change from less restrictive to more restrictive data type
  2. Corrupt index
  3. Shard failure/Index missing

For all of the above faults, fault tolerance action is to do reindexing primary data store

Whereas following group requires the same action, which is reindexing from existing index.

  1. Schema change from more restrictive to less restrictive data type
  2. Change in primary shards

The benefit is that you will have to implement a single event and corresponding event handler. It improvises design of fault tolerance mechanism. No need to say that it's good for maintainability of the application.

Step IV  Execute Fault Tolerance Strategy

You will find different designs for fault tolerance strategy and there are different variations. I like the unique combination of event-driven architecture and chain of responsibility pattern. After identifying fault, fault tolerance action can be triggered directly, but it’s better to decouple your fault tolerance action logic from the code that identifies fault. One way to decouple fault and fault tolerance is event driven architecture. I prefer to use Google Event bus for the same; it's very flexible as it supports synchronous event as well as asynchronous event.

Asynchronous events can be handled in a different thread or it can be distributed to other node using messaging service. It’s possible to use any messaging system such as Active MQ, IBM Websphere MQ, RabbitMQ etc. EventBus allows publish-subscribe-style communication between components without requiring the components to explicitly register with one another. In our case, code that detects fault publishes event and subscriber is the code that handles fault tolerance action.

Receivers receive the events and event handler logic can then pass the event to a workflow manager such as Apache Chain of Flows (based on chain of responsibility). You can design chain of workflows in which either only a single workflow handles an event or a group of workflows handles an event in which each workflow does some kind of enrichment. Just to illustrate, let’s take use cases of corrupt index and schema change faults. Imagine that application identifies fault and now it’s required to create new index and trigger indexing from primary data store. This can be achieved in the following way:

  • New event #reindexing_from_data_store is created with following event parameters:
    • old_index name
    • old_index_read_alias, old_index_write_alias,
    • JNDI of data store or bean name of data store
    • fault name
  • Event handler subscribed to this event captures the event.
  • Event handler loads a workflow to process the event. The workflow consists of a chain of flows. Each flow is context aware and it knows whether it is responsible to serve the event or not. The chain to handle corrupt index fault constitutes four flows.
    • Create new index based on the configuration settings such as number of shards, number of replicas, etc. (you can use old index settings).
    • Change read alias to point to new index
    • Change write alias to point to new index
    • Update global state to mark processing of reindexing (so that other nodes do not trigger similar type of event)
    • Trigger reindexing from data store
    • Update global state to mark completion of reindexing.
    • Delete old index

Whereas in other fault, Schema change from less restrictive to more restrictive data type workflow constitutes of chain of following flows:

  • Create new index based on the configuration settings such as number of shards, number of replicas etc. (you can use old index settings).
  • Change write alias to point to new index
  • Trigger reindexing from data store
  • Update global state to mark processing of reindexing (so that other nodes do not trigger similar type of event)
  • Update global state to mark completion of reindexing
  • Change read alias to point to new index
  • Delete old index

Both the workflows are almost identical except order of flows. Because of chain of responsibility pattern, it’s easy to design chain of different flows by reusing same flows under the hood. You decouple all the steps of fault tolerance action into different flows/slices so that these flows can be combined in different order to form a workflow. Also, these flows can be extended to provide additional functionalities.

Step V – Perform Reindexing

It’s most critical point of our discussion. Based on scenario, either reindexing can be done from old index or it has to be done from primary data store. Depending upon volume of data, it may take hours to days to complete indexing from primary data store or from old index. So, what do you do when reindexing is in progress? Because though in all products, there is a maintenance time in which application is upgrading to a new version or some other activities are going on. But most of the applications need to be running 24/7; any downtime is a business loss. For high availability, you should design reindexing strategy in advance. During reindexing, ElasticSearch cluster must stay fully operational and there should not be any downtime. Here, read and write aliases are going to play vital role and we will also see which reindexing api to be used depending upon the requirement.

With read write aliases, following different strategies are available:

1. Write to both indexes and read from old index 

We use this approach for our product as it guarantees strong consistency up to certain extent. I have already listed the scenarios in which reindexing is required. Maybe you want to change mapping or maybe you want to change number of shards assigned to index. You still have the old index holding your data and new index is still getting populated and does not hold entire data. So, by writing to both the indexes you ensure that both the indexes are up to date and reading data from old index guarantees that your query is getting executed against entire data.

Only the problem is with the fields that you have changed. For example, if you have changed data type of a field from string to double and you are performing some aggregations on it then your old index would not allow it. But that’s the only issue with this approach; otherwise, your results would always be reliable and reflect new data.  Note that there will be some performance overhead as you are writing data to both indexes. I would advise to follow this approach for financial or legal application domains in which strong consistency is desired.

A picture is worth a thousand words:

Image title


2. Read from old index and write to new index

In this approach, all new data is written to new index and data is searched from old index. Obviously, new data would not available for search or it would not be used for aggregations. So your results might not always reflect the current state of the system. This approach is fine for some application domains such as analytics. This approach is based on eventual consistency principle as your search results or aggregations would start reflecting accurate state when reindexing task is finished.

3. Write to new index and read from both indexes

With ElasticSearch, you can read from different indexes or types and combine the results. Only the caveat is duplicate data and pagination. If you are using pagination, then this approach is not for you as it requires lot of hassle to get it working. Why? Let’s see first how you can remove duplicates from the result set.

POST http://localhost:9200/index_name/index_type/_search?search_type=count&pretty=true
{
  "aggs":{
    "dedup" : {
      "terms":{
        "field": "name"
       },
       "aggs":{
         "dedup_docs":{
           "top_hits":{
             "size":1
           }
         }
       }    
    }
  }
}

Note that we have to use aggregates; if you want ElasticSearch to remove duplicates, then this is the mechanism. Currently, pagination is not supported for queries involving aggregates. Hence, this approach is cumbersome to design and implement.

Reindexing Approaches

There are two ways in which reindexing can be done:

  1. Using cursor (scroll) and bulk APIs
  2. Reindex API

Scroll API

Scroll API of ElasicSearch can be used to retrieve large numbers of results (or even all results) using a single search request, in much the same way as you would use a cursor on a traditional database. The reindexing logic includes fetching data from an old index and then insert fetched data into new index using bulk processor.

public ReindexingResponse reindex(ReIndexingRequest reIndexingRequestrequest) {
    long startTime = System.currentTimeMillis();
    ReindexingResponse response = new ReindexingResponse();
    response.setStartTime(System.currentTimeMillis());
    org.elasticsearch.action.search.SearchResponse searchResponse =  
    esClient.getClient()
    .prepareSearch(reIndexingRequestrequest.getOldIndex())
    .setTypes(Strings.EMPTY_ARRAY)
    .setQuery(reIndexingRequestrequest.getBody())
    .setSearchType(SearchType.SCAN)
    .setScroll(new Scroll(reIndexingRequestrequest.getKeepAlive()))
    .setSize(reIndexingRequestrequest.getSize())
    .execute()
    .actionGet();
    BulkProcessor bulkProcessor = BulkProcessor.builder(esClient.getClient(), new BulkProcessorLoggerLister())
    .setConcurrentRequests(100)
    .build();
    try {
        while (true) {
            for (SearchHit hit : searchResponse.getHits()) {
                IndexRequest indexingRequest = new IndexRequest(reIndexingRequestrequest.getNewIndex(), hit.type(), hit.id());
                indexingRequest.source(hit.source());
                bulkProcessor.add(indexingRequest);
            }
            searchResponse =  esClient.getClient().prepareSearchScroll(searchResponse.getScrollId()).setScroll(new Scroll(new TimeValue(60000))).execute().actionGet();
            if (searchResponse.getHits().getHits().length == 0) {
                //Break condition: No hits are returned
                bulkProcessor.close();
                break;
            }
        }
    }
    catch (Throwable failure) {
        response.setFailure(failure);
        response.setHasFailures(true);
    }
    finally {
        response.setEndTime(System.currentTimeMillis());
    }
    long endTime = System.currentTimeMillis();
    System.out.println("Total time taken is" +(endTime-startTime));
    return response;
}

Let’s see pros and cons of this approach.

Pros:

  1. You have full control over reindexing process. One can control batch size, concurrent requests and thread priority. As application itself is inserting data into new index in batches, you can even control the priority of application thread performing reindexing operation. You can even stop the reindexing in between.
  2. You can monitor the progress of reindexing in the same thread that is executing reindexing operation.

Cons:

  1. Its expensive process as this involves heavy network traffic.
  2. If JVM from which reindexing operation is being executed crashes then reindexing process is stopped in between. You will have to write logic to check if reindexing process is stopped in between. It has to be resumed another JVM (node) of your application cluster. In short, you have to maintain global state of reindexing operation.
  3. As data moves back and forth between ElasticSearch nodes to your application node, it is time consuming process.

Reindex API

ElasticSearch has started providing reindex api from ES 2.3.0 version. ElasticSearch client just has to invoke reindexing api and that’s it. Once triggered, entire reindexing process happens on ElasticSearch cluster. ElasticSearch takes the complete ownership of task and manages it end to end. One would imagine that reindex API would be a lot quicker than scroll API. Well, you would be surprised to know the fact, which is exactly opposite. If you start reindexing using both the apis then reindex api takes more time than scroll, but for good reasons. ElasticSearch prioritizes other operations such as search or index over reindex and that’s why it’s slower than scroll. It’s very simple to invoke the reindexing operation: 

POST /_reindex
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter"
  }
}

In Java it can be used in two ways.

Using Rest as follows:

public ReindexResponse reindexUsingRest(ReIndexingRequest reIndexingRequest) throws ClientProtocolException, IOException {
    ReIndexRequest request = new ReIndexRequest();
    ReindexResponse response = new ReindexResponse();
    Source source = new Source();
    source.setIndex(reIndexingRequest.getOldIndex());
    Dest destination = new Dest();
    destination.setIndex(reIndexingRequest.getNewIndex());
    request.setDest(destination);
    request.setSource(source);
    String json = new ObjectMapper().writeValueAsString(request);
    StringEntity entity = new StringEntity(json);
    HttpClient httpClient = new HttpClient();
    response = (ReindexResponse) httpClient.post("http://localhost:9200/testindex".concat("_/reindex"), entity, true);
    return response;
}

Directly using API as follows:

Maven dependency:

<dependency>
   <groupId>org.elasticsearch.module</groupId>
   <artifactId>reindex</artifactId>
   <version>2.3.2</version>
</dependency>


public ReindexResponse reindex(ReIndexingRequest reIndexingRequest) throws ClientProtocolException, IOException {
ReindexRequestBuilder builder = ReindexAction.INSTANCE
     .newRequestBuilder(esClient.getClient())
     .source(reIndexingRequest.getOldIndex())
     .destination(reIndexingRequest.getNewIndex())
     .size(reIndexingRequest.getSize())
     .timeout(reIndexingRequest.getKeepAlive());
    ReindexResponse response = builder.get();
    return response;
}

Let’s see pros and cons of this approach.

Pros:

  1. The process is managed by ElasticSearch itself on server side.
  2. It offers other features such as version control, conflict management, document meta data modification during reindexing, batch size etc...
  3. Status of reindex api can be monitored using "GET /_tasks/?pretty&&actions=*reindex".
  4. So, you start reindexing process and then have an event to check the status of operation.
  5. Reindexing process can be cancelled.
  6. This approach is completely fault tolerant unlike node crash case of scroll based approach.

Cons:

  1. It’s still in experimental phase, The API may change in ways that are not backwards compatible.

Based on your read_alias, write_alias strategy, you will have to manage aliases during and after reindexing process.

Step VI – Delete Old Index

You can use index, alias or wildcard expression.

Using curl command as follows:

$ curl -XDELETE 'http://localhost:9200/old_index/'

Using Java API as follows:

DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index);
ActionFuture<DeleteIndexResponse> delete = client.admin().indices().delete(deleteIndexRequest);

Conclusion

In Part I, I explained different scenarios in which it would be required to carry out reindexing operation. Then in this part, I discussed how it can performed as a part of fault tolerance strategy and at last, I elaborated different reindexing approaches using read/write alias and scroll/reindex APIs. There are different combinations possible and selection of strategy is a trade-off; we need to choose a way that best suits our application requirements. You can find reindexing project on GitHub: https://github.com/nikhilbhide/elasticsearch-reindexing-zerodowntime.

Find the perfect platform for a scalable self-service model to manage Big Data workloads in the Cloud. Download the free O'Reilly eBook to learn more.

Topics:
elasticseach ,fault tolerance ,big data ,nosql

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}