{{announcement.body}}
{{announcement.title}}

Book Review: Designing Data-Intensive Applications (Part 2)

DZone 's Guide to

Book Review: Designing Data-Intensive Applications (Part 2)

Go in-depth into a book review about designing data-intensive applications.

· Database Zone ·
Free Resource

This is part 2 of a three-part review. You can find part one here.

5. Replication

This is the first chapter in the Distributed Data section. Replication means that the same data is stored on multiple machines. Some reasons for replication are: to keep working even if some parts of the system fail (increased availability), to keep data geographically close to the users (reduce latency), and to increase read throughput by serving the same data from many machines.

Three different models are covered: single leadermulti-leader, and leaderless. Replication can either be synchronous or asynchronous. If it is synchronous, the leader and all followers acknowledge a write before it is acknowledged back to the user. This can block all writes if any one of the followers is slow or has failed. Therefore, asynchronous replication is more common.

There are many tricky aspects of replication. The first is when setting up a new follower. Since data is constantly changing in the leader, typically you take a consistent snapshot of the leader's database and copy it to the new follower. Then you need to process the backlog of changes that happened during the copying process.

If a follower fails, it needs to catch up once it recovers. This means it needs to keep track of which transactions from the leader it had already processed before failing. If the leader fails, a new leader needs to be selected. This is called failover. Many things can go wrong here. If asynchronous replication is used, the new leader may not have received all the writes. If the former leader rejoins the cluster after a new leader is selected, what should happen to those not-replicated writes?

Performing the replication is also tricky. Simply repeating the SQL statements will cause problems if NOW() or RAND() are used. There are many other edge cases as well, so this method is generally not used. Instead, the database internal write-ahead log is used. This is an append-only sequence of bytes containing all the writes to the database.

Replication Lag

When you use asynchronous replication you have eventual consistency. Even if the replication lag is usually small, a whole range of factors (for example network problems or high load) can cause the lag to become several seconds or even minutes. Several problems can occur when you have replication lag: If you submit data and then reload the page, you may not see the data you just wrote if the read of the reload is done from a different server than received the write. There are several possible solutions to this that guarantee that you read your own writes. This can be even trickier with cross-device reads (updating from a laptop, then reading from a phone).

Another anomaly is when it appears as if time moves backwards. The first read returns a comment recently made by user X. When refreshing the page, the read is from another (lagging) server, and the comment by user X has not yet been made. Monotonic reads is a way to avoid this situation, for example, by routing all reads to the same server. If eventual consistency is too weak, distributed transactions can be used.

Multi-Leader Replication

In multi-datacenter operations, it can make sense to use multi-leader replications. There are several advantages: performance (writes don't all have to go through a single leader), tolerance of datacenter outages, and tolerance of network problems (temporary problems don't prevent writes to go through).

However, with more than one leader, there is the risk of write conflicts. There are several ways to handle these: Last write wins (based on timestamp or unique id), merge the values together automatically, or keep the conflicting values and let the user pick the value to keep.

Leaderless Replication

Dynamo, Cassandra, Riak, and Voldemort all use leaderless replication. In order to make sure no writes are lost and that no reads return stale values, quorum reads and writes are used. If there are n replicas, every write must be confirmed by w nodes to be successful, and we must query at least r nodes for each read. As long as w + r > n, we expect to get up-to-date values when reading. In the simplest case, n=3 and w=2 and r=2. However, there can still be edge cases — for example, if a write is concurrent with a read, the write may only be present in some of the replicas, and it is unclear if the new or old value should be returned for the read.

If a node has been down and has stale values, it can be detected when reading a value from it. This can initiate a repair. There can also be an anti-entropy process running in the background that looks for inconsistencies and initiates repair.

The chapter ends with a good example of how version vectors can be used to keep track of causal dependencies between concurrent events.

6. Partitioning

The reason for breaking the data up into partitions (also known as sharding) is scalability. Note that you can have partitioning and replication at the same time — each partition can be replicated to several nodes. Each piece of data belongs to exactly one partition. The partitioning can be done on key range or by hashing of the key. The advantage of partitioning may be lost if some partitions are hit more than others (skewed workloads). Secondary indexes can get tricky with partitioned databases. One approach to handling this is to scatter/gather.

As the data grows and changes, there may be a need to rebalance — moving load from one node to another. You should not move more data than is necessary. For example, simply using hash mod N will cause too many changes. A better approach is to use many more partitions than there are nodes and only move entire partitions between nodes. If rebalancing happens automatically, there is a risk of cascading failures. Therefore, it may be good to only do it manually. The routing of requests to the correct partition is often handled by a separate coordination service, such as Zookeeper.

7. Transactions

This chapter covers transactions on a single machine. A transaction groups writes and reads into one logical unit. The transaction as a whole either succeeds (commit) or fails (abort, rollback). This frees the application developer from having to handle a lot of different potential problems: that the connectivity to the database is lost, that the database or application crashes before all data is written, that several clients may be overwriting each other's data etc.

ACID stands for AtomicityConsistencyIsolation, and Durability. Atomicity is not about concurrency (isolation is). Instead, it means that either all writes succeeded, or none. There is no partially written data. Consistency means that invariants must always be true before and after a transaction. For example, that credits and debits across all accounts are always balanced. Some invariants, such as foreign key constraints and uniqueness constraints, can be checked by the database. But in general, only the application can define what is valid or invalid data — the database only stores the data.

Isolation means that concurrently executing transactions don't interfere with each other. Much of the rest of the chapters deal with this. Finally, durability means that data that has been written successfully will not be lost. However, even if data is written to disk, there are many ways it can be lost: disks can get corrupted, firmware bugs can stop your reads, and the machine can die and prevent getting to the data even if the data is fine on the disk.

Isolation Levels

When multiple clients concurrently read and update data, there is an amazing number of pitfalls. To avoid this, there are several isolation levels to prevent problems.

The most basic level is read committed. It means that when reading, you will only see committed data, not data in the process of being written but not yet committed. When writing to the database, you will only overwrite data that has been committed. This is also known as no dirty reads and no dirty writes.

Snapshot isolation deals with consistency between different parts. If you have two accounts with $500 in each and you read the balance from the first, then the balance from the second (both reads in the same transaction), they should sum to $1000. However, between the first and second read, there could be a concurrent transaction moving $100 from the second account to the first. So the first read could give $500 while the second read returns $400 (since $100 has already been subtracted here). If we repeat the same account reads again, we would get $600 for the first and $400 for the second, so now they sum to $1000 as expected. If the problem of the sum changing is avoided, it is called snapshot isolation, also known as repeatable read.

The problem is that we want what we see in the database at a given point in time to be consistent. We don't want to see the case where the total is only $900 in the example above. This is important for example when taking a backup. It may take hours to make the backup, and the data keeps changing, but we want what we store in the backup to be consistent. The same goes for long-running queries - we want them to be executed on a consistent snapshot. A common solution for this is to use multi-version concurrency control (MVCC). The database can keep several different committed versions of an object, because various in-progess transactions may need to see the state of the database at different points in time.

When two transactions both write data, there is a risk of lost updates. For example if two users concurrently read a counter, increment it, and write back the result, the final counter value may only have been updated by one, not by two. If a transaction reads some information, bases a decision on the information, and writes the result, there can be a write skew. This means that by the time the result is written, the premise it was based on is no longer true. For example, a meeting room booking system that tries to avoid double-bookings.

Serializable Transactions

A sure way of avoiding problems is if all transactions are done serially instead. However, often the performance suffers too much then. In some circumstances, you can actually execute all transactions serially. You need to have the whole dataset in memory, and use stored procedures to avoid network roundtrips during the transaction, and the throughput must be low enough to be handled by a single CPU.

For about 30 years, the only widely used algorithm for serializability was two-phase locking (2PL). Extensive locking means that the throughput suffers. You can also easily get deadlocks. A new algorithm called serializable snapshot isolation (SSI) can also provide serializability but with better performance due to optimistic concurrency control, as opposed to pessimistic concurrency control used by 2PL.

8. The Trouble With Distributed Systems

This is my other favorite chapter in the book (together with chapter 3, Storage and Retrieval). Even though I worked with distributed systems and concurrency issues for a long time, this chapter was a real eye-opener for me with respect to all the ways things can go wrong.

Unreliable networks. The connections between nodes can fail in various ways. In addition to the normal failure modes, some unusual ones are listed: a software upgrade of a switch causes all network packets to be delayed for more than a minute, sharks bite and damage undersea cables, all inbound packets are dropped, but outbound packets are sent successfully. Even in a controlled environment, such as one datacenter, network problems are common. One study showed 12 network faults per month. The only solution for detecting network problems is timeouts.

Unreliable clocks. Time-of-day clocks return the current date and time according to some calendar. They are usually synchronized with NTP. Because the local clock on the machine can drift, it may get ahead of the NTP time, and a reset can make it appear to jump back in time. Monotonic clocks are more suitable for measuring elapsed time — they are guaranteed to always move forward.

Even if NTP is used to synchronize different machines, there are many possible problems. Since there are network delays from the NTP server, there is a limit to how accurate the clocks can be. Leap seconds have also caused many large outages. These and other problems mean that a day may not have exactly 86,400 seconds, clocks can move backwards, and time on one node may be quite different from time on another node. Furthermore, incorrect clocks easily go unnoticed.

Relying on timestamps for ordering events, such as Last Write Wins in, for example, Cassandra, can lead to unexpected results. One solution used by Google's Spanner is to have confidence intervals for timestamps and make sure there is no overlap when ordering events.

Process Pauses. Another problem related to time is process pauses. Code that checks the current time, and then take some action may have been paused for many seconds before the action is taken. There are many ways this can happen: garbage collection, synchronous disk access that is not obvious from the code (Java classloader lazily loading a class file), operating system swapping to disk (paging), etc.

To handle these various problems in distributed systems, the truth is defined by the majority (more on this in the next chapter). The possible problems described here can lead to some very tricky situations. Even worse would be if participating nodes would deliberately try to cause problems. That is called Byzantine faults, but this is not covered in this book. By having control over all the servers involved, such problems can be avoided.

The chapter ends by defining safety and liveness. Safety means that nothing bad happens (for example, wrong data is not written to the database), and liveness means that something good eventually happens (for example, after a leader node fails, eventually a new leader is elected).

9. Consistency and Consensus

Distributed consistency is mostly about coordinating the state of replicas in the face of delays and faults.

Linearizability

Linearizability means that replicated data can appear as though there is only one single copy of the data, and all operations look like they act atomically on the data (you don't see a value flipping between the new and old value). It makes the database behave like a variable in a single-threaded program. The problem is that it is slow, especially when there are large network delays.

The CAP theorem is sometimes presented as Consistency, Availability, Partition tolerance: pick 2 out of 3. But network partitioning is a fault, so you don't have a choice about it, it will happen. Kleppmann thinks the theorem is better put as: either Consistent or Available when Partitioned. He also notes that the CAP theorem only talks about partitioning and doesn't say anything about other faults, like network delays or dead nodes. So the CAP theorem is not so useful.

Ordering Guarantees

total order allows any two elements to be compared, and you can always say which one is greater. A linearizable system has a total order. If two events happen concurrently, you can't say which happened first. This leads to the weaker consistency model of causality. It defines a partial order: some operations are ordered with respect to each other, but some are incomparable (concurrent).

Lamport timestamps provide a total ordering consistent with causality. However, that is not enough for problems like ensuring that selected usernames are unique (if two users concurrently try to pick the same username, we will only know afterwards who of the two got it). This leads to Total Order Broadcast. It requires that no messages are lost; if a message is delivered to one node, it is delivered to all nodes. It also requires that messages are delivered to every node in the same order. Having a total order broadcast enables correct database replication.

Distributed Transactions and Consensus

For distributed transactions, two-phase commit (2PC) can be used. It requires a coordinator. First, it sends each node a prepare message. The nodes check that they will be able to perform the write, and if so, they answer yes. If all nodes answered yes, the next message is the commit. Each node must then perform the write.

With a single leader database, all the decisions are taken by the leader, and you, therefore, can have linearizable operations, uniqueness constraints, a totally ordered replication log and more (these properties are all reducible to consensus). If the leader fails or network problems prevent you from reaching it, there are three options: Wait for it to recover, manually failover by letting a human pick a new leader, or use an algorithm to automatically choose a new leader. Tools like ZooKeeper and others help with this.

However, not all systems require consensus. Leaderless and multi-leader replication systems typically do without. Instead, they have to handle the conflicts that occur. This was the most difficult chapter to understand for me (I am not even sure I fully understand it), especially the explanation for how consensus is reduced to these other properties.

You can find part 3 here.

Topics:
book review, data systems, data-intensive applications, database, db book review

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}