Platinum Partner
java,nosql,architecture,tips and tricks,tools & methods,transactions,mongodb,acid

How to Implement Robust and Scalable Transactions Across Documents with MongoDB

The transactional issue

There are some good reasons why historically databases have provided support for transactions across pieces of data. The typical scenario is that the application needs to modify several independent bits and it would likely get into a bad state if only some of those changes actually made it to the datastore. Hence the concept of the long-revered ACID:

  • Atomicity: all changes are made, or none
  • Consistency: data remains in a consistent state
  • Isolation: other clients cannot see partial changes
  • Durability: once the transaction is acknowledged back to the client, the data is in a safe spot (typically in a journal on disk)

With the introduction of the NoSQL databases, support for ACID transactions across documents was typically thrown away. Many key/value stores still have ACID but it only applies to a single entry. The main reason for dropping it is that it just does not scale! If documents are spread over several servers, transactions become extremely difficult to implement and resource intensive. Imagine if the transaction spreads across dozens of servers, some of them distant, some of them unreliable, how difficult and slow it would be!

MongoDB supports ACID at a single document level. More precisely, you get “ACI” by default and get the “D” if you turn on the “j” WriteConcern option. Mongo has a rich query language that spans across documents, and consequently people are longing for multi-document transactions to port over their SQL code. One natural workaround is to leverage the power of documents: instead of many rows and relationships you can embed things together into a single larger document. Denormalization brings you back the transactions!

This technique actually solves a large number of transactional issues for one-to-one and some one-to-many relationships. It also tends to make your application simpler and the datastore faster, so it’s a win-win! But for other cases where data must be split, how can you deal with it?

Reducing the ACID

It really boils down to this for most applications:

  • Atomicity: really you just want ALL changes to be made.
  • Consistency: it is fine is the system is inconsistent for a short time as long as it is eventually consistent
  • Isolation: lack of isolation exposes temporary inconsistency which is not ideal, but most users are used to it in the world of online services (e.g. customer support: “it takes a few seconds to propagate”)
  • Durability: it is important and supported

The problem really boils down to having robust and scalable eventual consistency!

Solution 1: Synchronization Field

This use case is the simplest and most common: there are fields that need to be kept “in sync” between documents. For example say you have a user document with username “John”, and documents representing comments that John has posted. If the user is allowed to change username, the change needs to be propagated throughout all documents, even if there is an application or database failure in the middle of process.

To achieve this, an easy way is to use of a new field (e.g. “syncing”) in the main document (in this case it’s the user doc). Set the “syncing” field to a Date timestamp within the update of the user document:

db.user.update({ _id: userId }, { $set: { syncing: currentTime }, { rest of updates ... } })

Now the application can go ahead with modifying all comments documents. When done, the flag should be unset:

db.user.update({ _id: userId }, { $unset: { syncing: 1 } })

Now imagine there is a failure during the process: there are some comments left with the old username. Thankfully the flag is still set and the application has a way to know that the process should be retried. For this, you need a background thread that checks for dangling “syncing” documents that have not finished in a conservative time (e.g. 1h). Finding those documents can be made very efficient by using an index on the “syncing” flag. This index should be “sparse” so that only the few documents where it is set are actually indexed, keeping it very small.

db.user.ensureIndex({ syncing: 1 }, { sparse: true })

As a result, your system typically keeps things in sync in a short period of time, or up to 1 hour in the case of system failure. If timing is not important, you could even have the application fix the documents in a lazy fashion if the “syncing” flag is detected upon reading.

Solution 2: Job Queue

The principle above works well if the application does not need much context and just reapplies a generic process (e.g. copying a value). Some transactions need to make specific changes that would be difficult to identify later on. For example, say the user document contains a list of friends:

{ _id: userId, friends: [ userId1, userId2, ... ]}

Now user A and B decide to become friends: you need to add B in A’s list and vice-versa. It is fine if it does not happen exactly at the same time (as long as it doesn’t vex one of them :)). A solution for this, and most transaction problems, is to use a job queue also stored in MongoDB. A job document can look like:

{ _id: jobId, ts: timeStamp, state: "TODO", type: "ADD_FRIEND", details: { users: [ userA, userB ]} }

Either the original thread can insert the job and go forward with the changes, or multiple “worker” threads can be dedicated to picking up jobs. The worker fetches the oldest unprocessed job using findAndModify() which is fully atomic. In the operation it marks the job as being processed and also specifies the worker name and current time for tracking. An index on { state: 1, ts: 1 } makes those calls very fast.

db.job.findAndModify({ query: { state: "TODO" }, sort: { ts: 1 }, update: { $set: { state: "PROCESSING", worker: { name: "worker1", ts: startTime } } } })

Then the worker makes changes to both user documents in a way that is idempotent. It is important that those changes can be reapplied many times with the same effect! Here we will just use a $addToSet for that purpose. A more generic alternative is to add a test on the query side to check if the change has been made already or not.

db.user.update({ _id: userA }, { $addToSet: { friends: userB } })

The last step is to either delete the job or to mark it as done. It may be good to keep jobs around for a while as a safety measure. The only downside is that the previous index gets larger over time, though you could also make use of a nifty sparse index on a special field { undone: 1 } instead (and change the queries accordingly)

db.job.update({ _id: jobId }, { $set: { state: "DONE" } })

If the process dies at any point in time, the job is still in the queue but marked as processing. After a period of inactivity a background thread can mark the job as needing processing again, and the job just starts again from the beginning.

Solution 3: Two Phase Commit

The Two Phase Commit is a well known solution that is in use in many distributed systems. MongoDB actually makes this solution easier to implement since you can stick all data needed to perform it into documents thanks to the flexible schema. I did a write up on this topic a few years ago, which can be found in the MongoDB Cookbook Perform Two Phase Commits as well as the MongoDB Manual Perform Two Phase Commits. Happy reading!

Solution 4: Log Reconciliation

One common solution that most financial systems use is the idea of log reconciliation. The idea is to write transactions as a simple log, thus avoiding complexity and potential failures. Then the state of an account is infered by aggregating all the changes since the last known good state. In the extreme case, you could wipe the account and recreate it from scratch by reapplying all changes since day 1… scary but it just works :). The account document just needs a “cached” balance to improve read speed, as well as a sequence id it was calculated at:

{ _id: accountId, cache: { balance: 10000, seqId: 115 } }

Now for the transaction, a typical financial system writes an entry for the transaction and one “account change” entry for each of the account involved. This method requires further write guarantees which can be achieved with the “job queue” solution, with the job being the transaction itself until all account changes are written. But here with MongoDB we can instead write a single document that includes both the transaction and account changes. A document would be inserted in the tx collection like:

{ _id: ObjectId, ts: timestamp , proc: "UNCOMMITTED", state: "VALID", changes: [ { account: 1234, type: "withdraw", value: -100, seqId: 801, cachedBal: null }, { account: 2345, type: "deposit", value: 100, seqId: 203, cachedBal: null } ] }

Fields of interest:

  • proc: transaction starts in “UNCOMMITTED” state and becomes “COMMITTED” when all previous transactions involving those accounts are also in “COMMITTED” state. It signals that this transaction can be used as “anchor” for balance calculation.
  • state: can be a variety of “VALID”, “CANCELLED”, etc. If not “VALID” the transaction is ignored for balance calculations even if “COMMITTED”.
  • seqId: represents a unique sequence id for that account. It gives a deterministic ordering to the account changes
  • cachedBal: the cached balance for the account. If a transaction is in “COMMITTED” state, then the cached balance (if set) is a reliable number.
  • note here that we make use of a unique index on { changes.account: 1, changes.seqId: 1 }. This is needed for speed of reconciliation and also ensures that there is no seqId duplicates for an account.

The key part is to make sure the cached balance gets safely calculated / invalidated even if the ordering of transaction is not guaranteed, and the transaction state may change. For that purpose we use a sequence id per account which ensures deterministic ordering of the account changes and is necessary to avoid complex locks. Before writing the transaction, the application first infers the next seqId for each of the accounts by doing simple queries:

db.tx.find({ "changes.account": 1234 }, { "changes.$.seqId": 1 }).sort({ "changes.seqId": -1 }).limit(1)

Then each seqId is incremented locally and finally written as part of the transaction. In case another thread obtained the same seqId concurrently, the unique index ensures that the write fails, and the process just retries optimistically until it goes through. An alternative is to keep a current seqId in the account collection and obtain the next one for sure with findAndModify(), which is typically slower unless you have a lot of contention on the accounts. Note that it is possible that a seqId gets skipped if somehow the transaction never gets written, but that it not a problem as long as there are no duplicates.

Now we’ve laid out the groundwork of the reconciliation! A background process can ensure that all uncommitted transactions get worked on. A transaction is not marked as committed unless all transactions for lower sequences number for those accounts have been committed. Once a transaction is marked committed it becomes completely immutable. Now for the good part: let’s obtain the account balance! First we can obtain the last known good balance, which can be found quickly by walking down the index:

db.tx.find({ "changes.account": 1234, proc: "COMMITTED" }, { "changes.$": 1 }).sort({ "changes.seqId": -1 }).limit(1)

For there we can obtain all pending changes easily by getting transactions with greater seqId:

db.tx.find({ "changes.account": 1234, "changes.seqId": { $gt: lastGoodSeqId } }, { "changes.$": 1 }).sort({ "changes.seqId": 1 })

We can use those results to display pending charges. If we just want to get a quick standing of where the pending balance is at, we could just ask MongoDB to aggregate the changes and give us the total:

db.tx.aggregate([{ $match: { "changes.account": 1234, "changes.seqId": { $gt: lastGoodSeqId }, state: "VALID" }}, 
{ $unwind: "changes" }, 
{ $match: { "account": 1234 }}, 
{ $group: { _id: "total", total: { $sum: "$value" } }}])

For the system to remain fast and calculations small, it is important that background workers keep making sure that transactions get to a committed state and that balances get cached. Ideally a transaction would never get reverted, and instead a new inverse transaction would be submitted. Still it is doable to implement transaction cancellation as long as all further transactions states and caches get reset properly.

Solution 5: Versioning

Sometimes a change is made that is too complex to represent in JSON and it can trickle down to many documents with complex relationships (e.g. tree structure). It would be too disruptive if the changes appear only partially (e.g. broken tree), hence isolation is needed in this case. One way to achieve this is to insert new documents with a higher version number instead of updating the existing ones. New versions number can easily and safely be obtained by using the same technique as the sequence id of log reconciliation. Typically there is a unique index on { itemId: 1, version: 1}

The application inserts the documents starting with the children and finish with the “master” document (e.g. root of tree). When accessing the data, the application checks the version number on the master document and from there it ignores any document that has a higher version number. Unfinished transactions can be left as-is and ignored, or cleaned up if you want to be neat :)

Conclusion

In conclusion, we’ve seen several solutions for implementing robust and scalable transactions across documents:

  • Syncing Flag: best for just copying data over from a master document
  • Job Queue: very general purpose, solves 95% of cases. Most systems need to have at least one job queue around anyway!
  • Two Phase Commit: this technique ensure that each entity always has all information needed to get to a consistent state
  • Log Reconciliation: the most robust technique, ideal for financial systems
  • Versioning: provides isolation and supports complex structures

Further, it has been mentioned many times that MongoDB will eventually support some kind of truly atomic and isolated transactions across documents. It actually already does as part of sharding, but it’s purely internal for now… It will only be possible to do so across documents that reside on the exact same shard, since otherwise it would put us back to the non-scalable SQL world! Thanks for reading this long post, Stay tuned!

Published at DZone with permission of {{ articles[0].authors[0].realName }}, DZone MVB. (source)

Opinions expressed by DZone contributors are their own.

{{ tag }}, {{tag}},

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}
{{ parent.authors[0].realName || parent.author}}

{{ parent.authors[0].tagline || parent.tagline }}

{{ parent.views }} ViewsClicks
Tweet

{{parent.nComments}}