Synchronous Replication in Tarantool (Part 2)
Tarantool offers synchronous replication and the Raft-based algorithm for choosing a leader. In this series, learn of the algorithm's design and implementation.
Join the DZone community and get the full member experience.
Join For FreeRead part 1 of this article here: Synchronous Replication in Tarantool (Part 1).
In continuation from where Part 1 left off, we will continue the journey of the long path to the algorithm's design and implementation. All sections are essential and add up to a single story.
3. Raft: Replication and Leader Election
To understand the technical details of replication and leader election, you need to understand the Raft protocol, at least the part concerning synchronous replication. You can find the full description of Raft in the original article titled In Search of an Understandable Consensus Algorithm. This section describes both parts of Raft for those who don't want to read the article. If you are familiar with Raft, you can skip this section.
The algorithm is divided into two independent parts: replication and leader election.
In Raft, the leader is the node that accepts all requests. In a way, it is similar to the master in Tarantool.
The first part of Raft is responsible for synchronous replication with a known leader. Raft describes what the transaction log looks like, how transactions are identified and distributed, and what level of reliability is guaranteed under what conditions.
The second part of Raft deals with detecting leader failure and selecting a new leader.
In the canonical version of Raft, all nodes in a replica set have an assigned role: leader, replica (follower), or candidate.
- The leader is a node that accepts all read/write requests.
- Replicas receive transactions from the leader and commit them. Replicas also redirect all client requests to the leader (including read requests).
- A replica becomes a candidate when the leader stops responding. Then a new leader must be elected.
Under normal conditions, a replica set contains only one leader, while all the other nodes are replicas.
Leader Election
The lifecycle of a replica set is divided into terms, or numbered time intervals between leader elections. A term is assigned a non-decreasing number stored on different nodes independently. There is a new leader election in each term, where the nodes must elect one leader. If no node becomes the leader, a new term starts, and there is a new election.
The replicas decide to move to the next term when the leader is not responding for a long time. They become candidates, initiate the next term (increment the term's number), and start voting. Each candidate votes for itself and sends a vote request to the other replicas.
After receiving a vote request, the nodes act depending on their state. If their term is less than or equals the candidate's and there was no election in this term, then the node votes for the candidate.
The candidate counts the votes. If the majority of the nodes have voted for the candidate, it becomes the leader. If none of the nodes have the majority, a new election will start after a random time. Each node waits for its own randomized period. It reduces the probability that the nodes would begin the election simultaneously: every node would vote for itself, and none of them would win. If the leader has been elected, it can commit new transactions.
Synchronous Replication
Raft describes the process of replication as a procedure called AppendEntries
. The leader calls this procedure on the replicas when a transaction or a bundle of transactions arrives. In Raft, it is something like a function that includes all the logic of applying changes to the database and responding to the leader. If none of the nodes get the majority of votes, the leader will continue sending `AppendEntries` until a quorum is reached.
As soon as there is a quorum, the awaiting transactions are committed, and another entry is recorded in the log. The leader doesn't wait for other replicas to respond. It doesn't send any synchronous messages to confirm the commit, either. Otherwise, it would be an infinite loop of quorums and commits.
The transaction and the confirming message are delivered asynchronously to the replicas that aren't in the quorum.
Logging and replicating new transactions doesn't require that all earlier transactions are committed. Because of that, the transactions don't block each other. This is one of the reasons why transactions in Raft can be sent in batches and reach quorum in batches. However, it makes the log structure more complicated.
The Raft log is a sequence of "key = value" entries. Each entry contains a data change and the corresponding metadata: the log index and the term during which the entry was logged on the leader.
The leader log supports two "cursors": the end of the log and the last committed transaction. The logs of the replicas are prefixes in the leader log. When the leader receives a confirmation from a replica, it logs the commit and increments the index of the last finished transaction.
Raft guarantees the following:
- If log entries on two different nodes have the same index and term, they also have the same command, "key = value".
- If log entries on two different nodes have the same index and term, their logs, including the entry declaring that index and term, are completely identical.
The first is true because new data changes are generated on the only leader each term. They contain the same commands and terms, which are distributed to all replicas. The index always increases, and log entries never change order.
The second works because of a check built into AppendEntries
. This request, sent by the leader, includes not only the new changes but also the term and index of the last entry in the leader's log. Every replica receives AppendEntries
and checks that the term and index of the leader's last entry are the same as in its local log. If it's true, the new changes can be applied. Transactions strictly maintain their order. Otherwise, the replica is not synchronized: it misses a piece of the leader's log and might even contain transactions from other nodes. According to Raft, non-synchronized replicas must remove the head of their log so that the rest of the log becomes a prefix of the leader's log. Then they must download the correct head of the log from the leader.
Note that in practice, removing the head of the log may not be possible, as data is stored not only in logs. It can be also stored in a file such as a B-tree in SQLite, or it can be part of an LSM tree, like in Tarantool's vinyl engine. Removing the log head won't remove the data waiting for the leader's commit if the data has been moved to the storage immediately. In this case, the log needs at least an undo mechanism. It should be possible to figure out from each log entry how to make a reverse entry to roll back the changes. However, an undo log can take a lot of space. Tarantool uses a redo log, which can be repeated from the beginning but not rolled back from the end.
It might be unclear how replicas can get de-synchronized with the leader. It happens when nodes go inactive for some time, up to a few terms. Let's take a look at a couple of examples.
A replica might be missing a piece of the log or even a whole term. For example, a replica was inactive for a term while the leader was active and making changes. When the replica comes back online, its log has fallen behind the leader's log.
The replica's log could also be longer and even have newer terms than the leader's log. However, the current term of the leader would still be greater even if there were no updates. Otherwise, the node wouldn't have been elected as leader. In the example above, the replica was the leader during term 3 and made two entries, which failed to achieve a quorum. In term 4, the new leader recorded two more entries, which also failed to reach a quorum but got replicated to the leader of term 3. Then in term 5, the current leader was elected.
In Raft, the leader is always right, which is why replicas with incomplete or incorrect logs must remove a part of them to make their log the leader's prefix. This is completely valid and doesn't lead to data loss since it only applies to the changes that haven't reached the quorum, haven't been committed, and haven't been presented to the user as successful. If a proposed change achieved the quorum, the next leader will be elected from the nodes of the quorum. However, this applies only if more than half of the cluster is active. It is yet another task of the leader election module.
You have just read a brief introduction to Raft with a focus on synchronous replication. The algorithm is relatively simple compared to its analogs (like Paxos). The information above should be enough to understand this article.
4. Asynchronous Replication
Until recently, only asynchronous replication was available in Tarantool. Since synchronous replication is based on asynchronous, you first need to learn about the latter to dive into the topic.
Tarantool has three main execution threads and one thread for each connected replica:
- TX: transaction processing thread
- IProto: network thread
- WAL: write-ahead log thread
- Relay: replication thread
Transaction Processing Thread (TX)
TX is the main Tarantool thread. All user requests are processed here and why Tarantool is often referred to as a single-threaded application.
The thread works in a cooperative multitasking mode using coroutines, which are lightweight threads written in C and Assembler. They are called fibers in Tarantool.
A Tarantool application can have thousands of fibers, while there is always only one thread as far as the operating system is concerned. Therefore, while Tarantool has some elements of parallel computing, there are no mutexes, conditional variables, spinlocks, and all the other primitives associated with thread synchronization. On the plus side, it leaves more time to work with the data instead of figuring out locks. Also, it simplifies development for both the Tarantool team and our users.
Users can create their own fibers. Network requests are automatically processed in separate fibers, and each request can create new fibers. Tarantool also uses fibers for service tasks, including replication.
Network Thread (IProto)
The purpose of this thread is to read and write data to and from the network and decode messages using Tarantool's IProto protocol. It relieves the TX thread significantly from the rather complex task of managing network input/output. This thread is unavailable for users, but they can not do anything with it.
However, our community has asked for the ability to create user threads and run their own servers in them, for example, via the HTTPS protocol. We are already working in this direction.
Write-Ahead Log Thread (WAL)
This thread is designed for logging transactions in the write-ahead log (WAL). Transactions are logged in the WAL before they are applied to the database structures and become visible to all users. That is why it's called a write-ahead log.
If a node fails, it can read the log after restarting and redo the saved transaction. If the transaction was first applied and then logged, it couldn't be restored if the node was restarted between these actions.
The Tarantool log is a redo log. It can be replayed from the beginning with all its transactions reapplied. That's what happens when a node is restarted. However, it is only possible to replay the log from the beginning to the end. You can't move in the opposite direction to roll transactions back. To save space, transactions in a redo log do not contain the information necessary to roll them back.
Part of Tarantool's strength is executing everything in large batches. This is especially true for the logging process. When a lot of transactions make commits to different fibers in the main thread, they are united in one large batch of commits. It is sent to the WAL thread and saved to the disk in one write operation.
While the WAL thread is writing, the TX thread accepts new transactions and prepares the next pack of transactions. It allows Tarantool to use fewer system calls. Users have no access to this thread. In asynchronous replication, logging is the only requirement to commit a transaction.
Replication Thread (Relay)
In addition to the three main threads, Tarantool creates replication threads. They are called relay threads, and they only exist if replication is enabled.
One relay thread is created for each connected replica. The relay thread receives a request from the replica to retrieve all transactions starting from a particular moment. It keeps executing this request within the replication lifecycle, constantly tracking new transactions in the log and sending them to the replica. This is how replication to another node works.
For replication not to but from another node, Tarantool creates a fiber in the TX thread. This fiber is called "applier." The relay thread of the initial instance connects to this fiber. That is, a relay and an applier are the two ends of the same connection where the data flows in one direction: from a relay to an applier. Metadata, like confirming delivery, is sent both ways.
Let's say there are two nodes. Node 1 with configuration box.cfg{listen = 3313, replication = {«localhost:3314»}}
and Node 2 with configuration box.cfg{listen = 3314}
. Both nodes have TX, WAL, and IProto threads. Node 1 will also have an applier fiber that downloads transactions from Node 2 in its TX thread, and Node 2 will have a relay thread that sends transactions to the applier in Node 1.
Relays are implemented as separate threads since they have the heavy duty of reading from the disk and sending log entries to the network. Reading from disk is the longest operation here.
Identifying Transactions
To sort transactions for replication, filter their duplicates in replica sets with redundant connections, and agree on who sends what, where, and when, transactions are identified in a specific way.
Transactions have two IDs in the log: replica ID and LSN. The first is the unique ID of the node that created the transaction. The second, log sequence number (LSN), is the ID of the corresponding log entry. This number is constantly increasing within one replica ID. Two LSNs from different replica IDs cannot be compared in a meaningful way.
This double identification is used in a master-master replication, where there can be a lot of instances generating transactions. The source node's ID is used to distinguish these transactions, and the LSN to sort them. With replica IDs, there is no need to generate unique and ordered LSNs for the whole replica set.
There can be up to 31 replicas, and their IDs have numbers from 1 to 31. That is, the Tarantool log is a serialized version of the 31st log. If you collect all transactions with all replica IDs on one node, you get an array of at most 31 values, with the index of an element being the node ID and the value: last applied LSN from that node. Such an array is called vclock (a vector clock). Vclock is an accurate snapshot of the entire cluster. By exchanging vclocks, instances inform each other how far behind they are, what changes should be sent to the ones that are behind, and filter out duplicates.
There is also the 32nd part of vclock under the number 0, which is responsible for local transactions and isn't related to replication.
Replicated transactions on replicas are applied just like on the source node, with the same replica ID and LSNs. Therefore, the same part of the replica's vclock is increased as on the source node. Therefore, if a replica reconnects and sends its full vclock, the transaction's author can determine if they need to send the same transaction again.
Here is an example of updating and exchanging vclock on three nodes. Let's say the nodes have replica IDs 1, 2, and 3, respectively. Their LSNs initially equal 0.
Node 1: [0, 0, 0]
Node 2: [0, 0, 0]
Node 3: [0, 0, 0]
Node 1 makes 5 transactions and increases its LSN by 5.
Node 1: [5, 0, 0]
Node 2: [0, 0, 0]
Node 3: [0, 0, 0]
Now, these transactions are replicated to Nodes 2 and 3. Node 1 sends them through two relay threads. The transactions contain `{replica ID = 1}` and are therefore applied to the first part of vclocks on other nodes.
Node 1: [5, 0, 0]
Node 2: [5, 0, 0]
Node 3: [5, 0, 0]
Now Node 2 makes 6 transactions, and Node 3 makes 9 transactions. Then, before replication, vclock looks like this:
Node 1: [5, 0, 0]
Node 2: [5, 6, 0]
Node 3: [5, 0, 9]
After the replication, it looks like this:
Node 1: [5, 6, 9]
Node 2: [5, 6, 9]
Node 3: [5, 6, 9]
General Schema
The asynchronous replication process in this architecture looks like the following:
- A transaction is created in the TX thread, the user initiates its commit, and its fiber falls asleep.
- The transaction is sent to the WAL thread for logging. After it is logged, a positive response is returned to the TX.
- The TX thread wakes up the user's fiber. The user sees the successful commit.
- The relay thread wakes up, reads this transaction from the log, and sends it to a replica via the network.
- On the replica, the applier fiber receives the transaction and commits it.
- The transaction is sent to the WAL thread of the replica, logged in its log. A positive response is sent to the TX thread.
- The applier fiber sends a response with its new vclock to confirm that the transaction was successfully committed.
By the last step, the user might have already left. If you plug off Tarantool before the transaction is sent to replicas (after step 3, before step 4) and don't turn it on again, this transaction will be lost. Of course, if the node turns on again, it will keep trying to send the transaction to the replicas. But if the server's disk has burned out, then there's nothing that can be done.
With such an architecture, synchronicity can be achieved if a transaction isn't considered successful until it has been sent to the required number of replicas.
Read the next part of this article here: Synchronous replication in Tarantool (Part 3).
Opinions expressed by DZone contributors are their own.
Comments