Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Elasticsearch Distributed Consistency Principles Analysis and Data, Part 1

DZone 's Guide to

Elasticsearch Distributed Consistency Principles Analysis and Data, Part 1

We take a dive into the principles and the code behind Elasticsearch clusters and data write processes contained in ES.

· Big Data Zone ·
Free Resource

The previous two articles (here and here) described the composition of the ES clusters, master election algorithms, master update meta-processes, and analyzed the consistency issues of the election and meta update. This article analyzes data flow in ES, including its write process, PacificA algorithm model, SequenceNumber, Checkpoint, and compares the similarities and differences between ES implementation and the standard PacificA algorithm. We will be covering:

  1. Current issues.
  2. Data write processes.
  3. PacificA algorithm.
  4. SequenceNumber, Checkpoint, and failure recovery.
  5. Comparing ES and PacificA,
  6. Summary.

Current Issues

Anyone who has ever used ES knows that each ES index is divided into multiple shards. Shards are distributed on different nodes to enable distributed storage and queries and support large-scale datasets. Each shard has multiple copies, one of which is the primary node, and the others are replica nodes. Data is written to the primary node first, then synchronized with replica nodes from the primary node. When reading data, to improve read capability, both primary nodes and replica nodes accept read requests.

With this model, we can see that ES has some of the following characteristics:

  1. High data reliability: The data has multiple copies.
  2. High service availability: If the primary node crashes, a new primary node can be chosen from the replica nodes to continue offering services.
  3. Extended read capability: The primary node and replica nodes can take read requests.
  4. Failure recovery capability: If the primary node or replica nodes crash, there are not enough copies. New copies can be generated by copying the data from the new primary node.

Some questions may come to mind, for example:

  1. How is data copied from Primary node to Replica nodes?
  2. Does it need to write to all copies to be successful?
  3. Do Primary node crashes cause data loss?
  4. Is the latest data always read when reading from Replica nodes?
  5. Do I need to copy all Shard data when performing failure recovery?

As you can see, although we can easily understand the general principles of ES data consistency, many details remain unclear. This article focuses on the ES write process, the consistency algorithm used, SequenceId and Checkpoint design, and other aspects to describe how ES works and address the questions above. It is important to note that the analysis in this article is based on ES version 6.2. Much of the content does not apply to previous ES versions, such as version 2. X version.

Data Write Process

First, let us take a look at the data write process.

From the Replication Perspective: Primary -> Replica

From the macro perspective, the ES write process involves writing data to the primary node first, then concurrently writing it to replica nodes and finally returning it to the client. The process is as follows:

Check the Active Shard count.

String activeShardCountFailure = checkActiveShardCount();

Write to the primary.

primaryResult = primary.perform(request);

Concurrently initiate write requests to all replicates.

performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());

After all replicates are returned or fail, they are returned to the client.

private void decPendingAndFinishIfNeeded() {
  assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]";
  if (pendingActions.decrementAndGet() == 0) {
      finish();
  }
}

The procedure above is an execute function of the ReplicationOperation class, and the complete code is as follows:

    public void execute() throws Exception {
        final String activeShardCountFailure = checkActiveShardCount();
        final ShardRouting primaryRouting = primary.routingEntry();
        final ShardId primaryId = primaryRouting.shardId();
        if (activeShardCountFailure ! = null) {
            finishAsFailed(new UnavailableShardsException(primaryId,
                "{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
            return;
        }

        totalShards.incrementAndGet();
        pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
        primaryResult = primary.perform(request);
        primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint());
        final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
        if (replicaRequest ! = null) {
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
            }

            // We must obtain the replication group after successfully indexing into the primary to follow recovery semantics.
            // We must make sure that every operation indexed into the primary after recovery start is also replicated
            // to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
            // We also must make sure to obtain the global checkpoint before the replication group to ensure that the global checkpoint
            // is valid for this replication group. If we sampled in the reverse direction, the global checkpoint might be based on a subset
            // of the sampled replication group and advanced further than what the given replication group would allow.
            // This would mean that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
            final long globalCheckpoint = primary.globalCheckpoint();
            final ReplicationGroup replicationGroup = primary.getReplicationGroup();
            markUnavailableShardsAsStale(replicaRequest, replicationGroup.getInSyncAllocationIds(), replicationGroup.getRoutingTable());
            performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());
        }

        successfulShards.incrementAndGet();  // mark primary as successful
        decPendingAndFinishIfNeeded();
    }

Next, we analyze some questions about this process:

1. Why must the active shard count be checked in the first step?

There is a parameter called wait_for_active_shards in ES. It is an index setting and can be attached to the request. This parameter indicates the minimum numbers of active copies that the shard should have before each write operation. Assume that we have an index in which each shard has three replica nodes, totaling four copies (plus primary node). If wait_for_active_shards is configured to 3, a maximum of one replica node is allowed to crash; if two replica nodes crash, the number active copies is less than three and, at that point, the write operation is not allowed.

This parameter is set to 1 by default, which means that the write operation is allowed if the primary node exists, meaning this parameter is not use at this point. If it is set to a number greater than 1, it can have a protective effect, ensuring that the written data has higher reliability. However, this parameter only carries out the check before the write operation, which cannot guarantee that the data is written successfully to the copies; thus, the minimum number of copies to which the data is written is not strictly guaranteed.

2. After writing to the Primary node finishes, why is it not returned until all replica nodes respond (or the connection fails)?

In earlier versions of ES, asynchronous replication was allowed between the primary node and replica nodes, which meant that the primary node returned once writing was successful. But, in this mode, if the primary node crashes, there is a risk of data loss, and it is difficult to guarantee that the data read from replica nodes is up to date. So, ES stopped using asynchronous mode. Now, the primary node is not returned to the client until the replica nodes are returned.

Because the primary node is not returned to the client until all replica nodes are returned, the latency is affected by the slowest replica node, which is a clear disadvantage of the current ES architecture. Originally, we thought that the result would be returned once writing to wait_for_active_shards copies was successful, but, later, after reading the source code, we realized that the result was not returned until all replica nodes were returned.

If writing to a replica node fails, ES executes retry logic; however, the number of nodes that need to be successfully written to is not explicitly specified. The returned result includes the number of shards in which the data write was successful or failed:

{
    "_shards" : {
        "total" : 2,
        "failed" : 0,
        "successful" : 2
    }
}

3. If writing to a replica node continuously fails, do user lookups see legacy data?

In other words, assuming writing to a replica node continuously fails, the data in the replica node could be much older than that in the primary node. We know that, in ES, replicas can also handle read requests, so does the user read the legacy data in this replica node?

The answer is that, if writing to a replica node fails, the primary node reports the issue to the master, and the master then updates the InSyncAllocations configuration of the index in meta and removes the replica node. After that, it no longer handles read requests. Users can still read the data on this replica node before the meta update reaches every node, but this does not happen after the meta update completes. This solution is not strict. Considering that ES is a near real-time system, after data is written, a refresh is required for it to be visible. So, in general, it should be acceptable that legacy data can be read for a short time.

ReplicationOperation.java, OnFailure function for failure to write to Replica nodes:

            public void onFailure(Exception replicaException) {
                logger.trace(
                    (org.apache.logging.log4j.util.Supplier<? >) () -> new ParameterizedMessage(
                        "[{}] failure while performing [{}] on replica {}, request [{}]",
                        shard.shardId(),
                        opType,
                        shard,
                        replicaRequest),
                    replicaException);
                if (TransportActions.isShardNotAvailableException(replicaException)) {
                    decPendingAndFinishIfNeeded();
                } else {
                    RestStatus restStatus = ExceptionsHelper.status(replicaException);
                    shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
                        shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
                    String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
                    replicasProxy.failShardIfNeeded(shard, message,
                            replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
                            ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
                }
            }

call failShardIfNeeded:

        public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,
                                      Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {

            logger.warn((org.apache.logging.log4j.util.Supplier<? >)
                    () -> new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);
            shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, message, exception,
                    createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
        }

shardStateAction.remoteShardFailed sends the request to the Master, executes the ShardFailed logic of the Replica, and removes the Shard from InSyncAllocation.

    public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) {
        if (failedShard.active() && unassignedInfo.getReason() ! = UnassignedInfo.Reason.NODE_LEFT) {
            removeAllocationId(failedShard);

            if (failedShard.primary()) {
                Updates updates = changes(failedShard.shardId());
                if (updates.firstFailedPrimary == null) {
                    // more than one primary can be failed (because of batching, primary can be failed, replica promoted and then failed...)
                    updates.firstFailedPrimary = failedShard;
                }
            }
        }

        if (failedShard.active() && failedShard.primary()) {
            increasePrimaryTerm(failedShard.shardId());
        }
    }

Maintaining InSyncAllocation in ES uses the PacificA algorithm, which is detailed in the next article.

From the Perspective of the Primary

From the perspective of primary, a write request is written to Lucene before it is written to translog.

1. Why Is a Translog Write Required?

Translog is similar to commitlog in a database, or binlog. Once translog write is successful and flushed, the data is flushed directly to the disk, which guarantees data security, so that segment can be flushed to the disk later. Because translog is written using append, write performance is better than using random write.

In addition, because the translog records every data change and the order in which the data changes, it can be used for data recovery. Data recovery consists of two parts: first, after the node reboots, the segment data that has not been flushed to the disk before reboot is recovered from translog; second, it is used for data synchronization between the primary node and the new replica node, which is the process by which the replica tries to keep up with the primary data.

2. Why Is Lucene Write Required Before Translog Write?

Lucene write writes the data to memory. After the write operation is finished, the data can be read immediately on refresh; translog write flushes data to the disk for data persistence and recovery. Normally, in distributed systems, commitLog is written for data persistence first, then this change is applied to the memory. So, why does ES work in exactly the opposite way? It is likely that the main reason is that, when writing to Lucene, Lucene runs various data checks, and the Lucene write operation may fail. If translog is written first, you may have to deal with the issue of Lucene write continuously failing while the translog write operation is successful. So, ES adopted the process of writing to Lucene first.

That's all for Part 1! In Part 2, we'll cover PacificA algorithms, SequenceNumber, Checkpoint, and Failure Discovery. Tune back in tomorrow to check it out! 

Topics:
big data ,elasticsearch ,elasticsearch tutorial ,data write processes

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}