Elasticsearch Distributed Consistency Principles Analysis, Part 2

DZone 's Guide to

Elasticsearch Distributed Consistency Principles Analysis, Part 2

We wrap up this two-part series by looking at Raft algorithims, ZooKeeper implementations, error detection, and a whole lot more.

· Big Data Zone ·
Free Resource

Welcome back! If you missed Part 1, check it out here

Error Detection

1. MasterFaultDetection and NodesFaultDetection

The fault detection can be described as a heartbeat-like mechanism. There are two types of fault detection, one is for the master to regularly detect the other nodes in the cluster, and the other is for the other nodes in the cluster to regularly detect the cluster's current master. The detection method performs regular ping requests.

According to the ES documentation:

There are two fault detection processes running. The first is by the master, to ping all the other nodes in the cluster and verify that they are alive. And on the other end, each node pings to master to verify if its still alive or an election process needs to be initiated.

If the master detects that a node is not connected, the removeNode operation is performed to remove the node from the cluster_state, and a new cluster_state is published. When a new cluster_state is applied to each module, a number of recovery operations are performed, for example, to select a new primaryShard or replica, or to perform data replication.

If a node detects that the master is not connected, the pending cluster_state which has not yet been committed to memory is cleared, and a rejoin is initiated to rejoin the cluster (a new master election is triggered if the election conditions are met).

2. Rejoin

In addition to the two cases above, there is another case, in which the master finds that it doesn't meet the quorum condition (>=minimumMasterNodes), and needs to actively exit the master status (and perform rejoin) to avoid split brain. So, how does the master find out that it needs to rejoin?

As mentioned above, removeNode is executed when a node isn't connected. When executing removeNode, it checks whether the remaining nodes meet the quorum condition. If they don't, rejoin is performed.

  if (electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes()) == false) {
                final int masterNodes = electMasterService.countMasterNodes(remainingNodesClusterState.nodes());
                rejoin.accept(LoggerMessageFormat.format("not enough master nodes (has [{}], but needed [{}])",
                                                         masterNodes, electMasterService.minimumMasterNodes()));
                return resultBuilder.build(currentState);
            } else {
                return resultBuilder.build(allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks)));

Publishing the new cluster_state is divided into the send phase and the commit phase. The send phase requires the quorum to succeed before commit. If a successful quorum return is not achieved in the send phase, there may be a new master or the quorum node isn't connected, in which case the master needs to perform a rejoin.

        try {
            publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);
        } catch (FailedToCommitClusterStateException t) {
            // cluster service logs a WARN message
            logger.debug("failed to publish cluster state version [{}](not enough nodes acknowledged, min master nodes [{}])",
                newState.version(), electMaster.minimumMasterNodes());

            synchronized (stateMutex) {
                    new ElasticsearchException("failed to publish cluster state"));

            throw t;

During periodic pings to other nodes, it is discovered that another node is also master. In this case, the cluster_state version of this node is compared with the other master node. The node with the latest version becomes master, and the node with an earlier version performs rejoin.

        if (otherClusterStateVersion > localClusterState.version()) {
            rejoin("zen-disco-discovered another master with a new cluster_state [" + otherMaster + "][" + reason + "]");
        } else {
            // TODO: do this outside mutex
            logger.warn("discovered [{}] which is also master but with an older cluster_state, telling [{}] to rejoin the cluster ([{}])", otherMaster, otherMaster, reason);
            try {
                // make sure we're connected to this node (connect to node does nothing if we're already connected)
                // since the network connections are asymmetric, it may be that we received a state but have disconnected from the node
                // in the past (after a master failure, for example)
                transportService.sendRequest(otherMaster, DISCOVERY_REJOIN_ACTION_NAME, new RejoinClusterRequest(localClusterState.nodes().getLocalNodeId()), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {

                    public void handleException(TransportException exp) {
                        logger.warn((Supplier<? >) () -> new ParameterizedMessage("failed to send rejoin request to [{}]", otherMaster), exp);
            } catch (Exception e) {
                logger.warn((Supplier<? >) () -> new ParameterizedMessage("failed to send rejoin request to [{}]", otherMaster), e);

Cluster Scaling

With mechanisms such as node discovery, master election, and fault detection described above, we can now look at how to scale the cluster.

1. Scale Up DataNode

Assuming that an ES cluster does not have sufficient storage or computing resources, we need to scale the capacity. Here we focus on DataNode, which is configured as follows:

    node.master: false
    node.data: true

Then we need other configurations, such as cluster name and node name. To add the node to the cluster, we configure discovery.zen.ping.unicast.hosts as a master-eligible node in the cluster.

    cluster.name: es-cluster
    node.name: node_Z
    discovery.zen.ping.unicast.hosts: ["x.x.x.x", "x.x.x.y", "x.x.x.z"]

Then we start the node, and the node is automatically added to the cluster. The cluster rebalances automatically or manually through the reroute API.

2. Scale Down DataNode

Assuming that an ES cluster uses too many machines and needs to scale down the capacity, how do we safely perform this operation to ensure data security without compromising availability?

First, we select the nodes that need to be scaled down. This section relates to scaling down DataNode. Scaling down MasterNode is a more complex process, which will be described later.

Then, we migrate shards on this node to another node. We set the allocation rules to prevent shards from being allocated to machines selected to be scaled down, and then rebalance the cluster.

PUT _cluster/settings
  "transient" : {
    "cluster.routing.allocation.exclude._ip" : ""

After all the data on this node has been migrated, the node can be securely released.

For more detailed operations, see the official documentation:


3. Scaling up MasterNode

To scale up a MasterNode (master-eligible node), we must consider the ES quorum strategy to avoid split-brain, as mentioned above. We must, therefore, configure a quorum number:

    discovery.zen.minimum_master_nodes: 2

Assuming there are already three master-eligible nodes, we can configure quorum to 2; if scaling up to four master-eligible nodes, the quorum should be increased to 3.

First, we must change the configuration of discovery.zen.minimum_master_nodes to 3, then scale up the master. Do this using the following API:

curl -XPUT localhost:9200/_cluster/settings -d '{
    "persistent" : {
        "discovery.zen.minimum_master_nodes" : 3

Once this API request is sent to the master of the current cluster, the new value takes effect immediately. The master maintains this configuration in the cluster meta, and all future nodes are based on this configuration.

This approach, however, can result in consistency issues between the configuration file values and those in the cluster meta, which may result in some unusual problems. For example, after the cluster reboots, master election is required before the cluster meta is restored. At this point, only the configuration values can be used rather than the values in the cluster meta. After the cluster meta is recovered, election is based on the cluster meta values, and some integrity-related boundary cases may arise.

In summary, any operations or configurations regarding the master node must be carefully thought out, as master configuration errors can lead to split brain, bad data writes, data loss, and other unwanted situations.

4. Scaling Down MasterNode

Scaling down MasterNode follows the opposite process of scaling up. First, we scale down the node then reduce the number of the quorum. We do not provide further details here.

Comparison of Implementation Methods

1. Comparison With ZooKeeper

This section describes several methods of implementing major node-related functions in an ES cluster:

  1. Node discovery
  2. Master election
  3. Error detection
  4. Cluster scaling

Imagine what would be different if we use ZooKeeper to implement that functionality.

About ZooKeeper

Let us begin with a brief introduction to ZooKeeper. If you are already familiar with it, you can skip this part.

The ZooKeeper distributed service framework is a subproject of Apache Hadoop. It is mainly used to solve some common data management problems that are frequently encountered in distributed applications, such as unified naming service, state synchronization service, cluster management, and distributed application configuration item management.

In short, ZooKeeper is used to manage the nodes, configurations, and states in the distributed system and complete the configurations and state synchronization among individual nodes. Many distributed systems rely on ZooKeeper or similar components.

ZooKeeper manages data in the form of a directory tree; each node is referred to as a znode, and each znode consists of three parts:

  1. This is the state information that describes the znode version, permissions, and other information.
  2. The date associated with the znode.
  3. The child nodes under the znode.

One of the items in stat is ephemeralOwner; if it has a value, it represents a temporary node. This temporary node is deleted after the session ends, and it can be used to assist the application in master election and error detection.

ZooKeeper provides watch functionality that can be used to listen to corresponding events, such as the increase/decrease of a child node under a znode, the increase/decrease of a znode, and the update of a znode.

Implementing the ES functionality Above Using ZooKeeper

  1. Node discovery: Configure the ZooKeeper server address in the configuration file of each node. Once the node starts, it tries to register a temporary znode in a ZooKeeper directory. The master of the current cluster listens to increase/decrease child node events in this directory. Whenever a new node is discovered, it adds the new node to the cluster.
  2. Master election: When a master-eligible node starts, it tries to register a temporary znode named master in a fixed location. If the registration succeeds, it becomes master; if the registration fails, it listens to changes to this znode. When the master fails, it is automatically deleted because it is a temporary znode; meanwhile, the other master-eligible nodes try to register again. When you use ZooKeeper, you turn the master election into the master.
  3. Error detection: Because the znode of the node and the znode of the master are both temporary znodes, if the node fails, the session disconnects from ZooKeeper and the znode is automatically deleted. The master of the cluster only needs to listen to znode change events. If the master fails, other candidate masters listen to the master znode deletion event and try to become the new master.
  4. Cluster scaling: The minimum_master_nodes configuration no longer matters when scaling the cluster, which makes scaling easier.

Advantages and Disadvantages of Using ZooKeeper

ZooKeeper handles some complex distributed consistency issues, simplifying ES operations substantially and helping guarantee data integrity. This is also the common implementation for most distributed systems. While the ES Zen Discovery module has undergone many bug fixes, there remain critical bugs, and operation and maintenance is difficult.

So, why doesn't ES use ZooKeeper? Perhaps the official developers believe that adding ZooKeeper dependency means relying on one more component, adding complexity to cluster deployment and forcing users to manage one more service during regular operation and maintenance.

Are there any other algorithms available for self-implementation? Of course, there is Raft, for example.

2. Comparison With Raft

The Raft algorithm is a very popular distributed consensus algorithm. It is easier to implement than Paxos, and it has been used in a wide variety of distributed systems. Instead of describing the details of this algorithm here, we focus on the master election algorithm to compare the similarities and differences between Raft and the current ES election algorithm:


  1. Quorum principle: Only the node that gets more than half of the votes can become master.
  2. The selected leader must have the latest submitted data: In Raft, the nodes with newer data do not vote for nodes with older data, and because getting elected requires a majority of votes, the leader-elect must have the latest submitted data. In ES, the sort priority is higher for nodes with up-to-date versions to ensure this as well.


  1. Proof of correctness: Raft is an algorithm whose correctness has been proved. The correctness of the ES algorithm is unproven, and any issues will only be found in practice, at which point bugs can be fixed. This is the major difference.
  2. Election Cycle term: Raft introduces the concept of an Election Cycle. The term plus one for each election round ensures that, within the same term, each participant can only have one vote. ES does not have a term concept during election and is unable to guarantee that each node can only have one vote every round.
  3. Election tendency: In Raft, if a node has the latest submitted data, there is an opportunity for it to be elected master. In ES, nodes with the same version are sorted by NodeId, and nodes with a lower NodeId always take priority.


In terms of correctness, Raft is definitely a better choice; however, after fixing several bugs, the ES election algorithm is becoming more like Raft. Of course, Raft was not around during early ES development. If ES continues in this direction, it might eventually become as capable as a Raft implementation.


This series introduced the implementations of composition, node discovery, master election, fault detection, scaling, and other aspects of the Elasticsearch cluster. Unlike more general articles, this series analyzes cluster principles and issues and compares them with other implementation methods.

big data ,elasticsearch ,elasticsearch tutorial for beginners ,raft algorithm ,error detection

Published at DZone with permission of Leona Zhang . See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}