Over a million developers have joined DZone.

An Articulate Introduction to MongoDB

DZone's Guide to

An Articulate Introduction to MongoDB

· Database Zone ·
Free Resource

Download "Why Your MySQL Needs Redis" and discover how to extend your current MySQL or relational database to a Redis database.

NOSQL has become a very heated topic for large web-scale deployment where scalability and semi-structured data driven the DB requirement towards NOSQL. There has been many NOSQL products evolving in over last couple years. In my past blogs, I have been covering the underlying distributed system theory of NOSQL, as well as some specific products such as CouchDB and Cassandra/HBase. 

Last Friday I was very lucky to meet with Jared Rosoff from 10gen in a technical conference and have a discussion about the technical architecture of MongoDb. I found the information is very useful and want to share with more people.

One thing I am very impressed by MongoDb is that it is extremely easy to use and the underlying architecture is also very easy to understand.

Here are some simple admin steps to start/stop MongoDb server

# Install MongoDB
mkdir /data/lib

# Start Mongod server
.../bin/mongod # data stored in /data/db

# Start the command shell
> show dbs
> show collections

# Remove collection
> db.person.drop()

# Stop the Mongod server from shell
> use admin
> db.shutdownServer()

Major difference from RDBMS
MongoDb differs from RDBMS in the following way

  • Unlike an RDBMS record which is "flat" (a fixed number of simple data type), the basic unit of MongoDb is "document" which is "nested" and can contain multi-value fields (arrays, hash).
  • Unlike RDBMS where all records stored in a table must be confined to the table schema, documents of any structure can be stored in the same collection.
  • There is no "join" operation in the query. Overall, data is encouraged to be organized in a more denormalized manner and the more burden of ensuring data consistency is pushed to the application developers
  • There is no concept of "transaction" in MongoDb. "Atomicity" is guaranteed only at the document level (no partial update of a document will occurred).
  • There is no concept of "isolation", any data read by one client may have its value modified by another concurrent client.

By removing some of those features that a classical RDBMS will provide, MongoDb can be more light-weight and be more scalable in processing big data.

Query processing
MongoDb belongs to the type of document-oriented DB. In this model, data is organized as JSON document, and store into a collection. Collection can be thought for equivalent to Table and Document is equivalent to records in RDBMS world.

Here are some basic example.

# create a doc and save into a collection
> p = {firstname:"Dave", lastname:"Ho"}
> db.person.save(p)
> db.person.insert({firstname:"Ricky", lastname:"Ho"})

# Show all docs within a collection
> db.person.find()

# Iterate result using cursor
> var c = db.person.find()
> p1 = c.next()
> p2 = c.next()

To specify the search criteria, an example document containing the fields that needs to match against need to be provided.

> p3 = db.person.findone({lastname:"Ho"})

Notice that in the query, the value portion need to be determined before the query is made (in other words, it cannot be based on other attributes of the document). For example, lets say if we have a collection of "Person", it is not possible to express a query that return person whose weight is larger than 10 times of their height.

# Return a subset of fields (ie: projection)
> db.person.find({lastname:"Ho"}, {firstname:true})

# Delete some records
> db.person.remove({firstname:"Ricky"})

To speed up the query, index can be used. In MongoDb, index is stored as a BTree structure (so range query is automatically supported). Since the document itself is a tree, the index can be specified as a path and drill into deep nesting level inside the document.

# To build an index for a collection
> db.person.ensureIndex({firstname:1})

# To show all existing indexes
> db.person.getIndexes()

# To remove an index
> db.person.dropIndex({firstname:1})

# Index can be build on a path of the doc.
> db.person.ensureIndex({"address.city":1})

# A composite key can be used to build index
> db.person.ensureIndex({lastname:1, firstname:1})

Index can also be build on an multi-valued attribute such as an array. In this case, each element in the array will have a separate node in the BTree.

Building an index can be done in both offline foreground mode or online background mode. Foreground mode will proceed much faster but the DB cannot be access during the build index period. If the system is running in a replica set (describe below), it is recommended to rotate each member DB offline and build the index in foreground.

When there are multiple selection criteria in a query, MongoDb attempts to use one single best index to select a candidate set and then sequentially iterate through them to evaluate other criteria.

When there are multiple indexes available for a collection. When handling a query the first time, MongoDb will create multiple execution plans (one for each available index) and let them take turns (within certain number of ticks) to execute until the fastest plan finishes. The result of the fastest executor will be returned and the system remembers the corresponding index used by the fastest executor. Subsequent query will use the remembered index until certain number of updates has happened in the collection, then the system repeats the process to figure out what is the best index at that time.

Since only one index will be used, it is important to look at the search or sorting criteria of the query and build additional composite index to match the query better. Maintaining an index is not without cost as index need to be updated when docs are created, deleted and updated, which incurs overhead to the update operations. To maintain an optimal balance, we need to periodically measure the effectiveness of having an index (e.g. the read/write ratio) and delete less efficient indexes.

Storage Model
Written in C++, MongoDB uses a memory map file that directly map an on-disk data file to in-memory byte array where data access logic is implemented using pointer arithmetic. Each document collection is stored in one namespace file (which contains metadata information) as well as multiple extent data files (with an exponentially/doubling increasing size).

The data structure uses doubly-linked-list extensively. Each collection of data is organized in a linked list of extents each of which represents a contiguous disk space. Each extent points to a head/tail of another linked list of docs. Each doc contains a linked list to other documents as well as the actual data encoded in BSON format.

Data modification happens in place. In case the modification increases the size of record beyond its originally allocated space, the whole record will be moved to a bigger region with some extra padding bytes. The padding bytes is used as an growth buffer so that future expansion doesn't necessary require moving the data again. The amount of padding is dynamically adjusted per collection based on its modification statistics. On the other hand, the space occupied by the original doc will be free up. This is kept tracked by a list of free list of different size.

As we can imagine holes will be created over time as objects are created, deleted or modified, this fragmentation will hurt performance as less data is being read/write per disk I/O. Therefore, we need to run the "compact" command periodically, which copy the data to a contiguous space. This "compact" operation however is an exclusive operation and has to be done offline. Typically this is done in a replica setting by rotating each member offline one at a time to perform the compaction.

Index are implemented as BTree. Each BTree node contains a number of keys (within this node), as well as pointers to left children BTree nodes of each key.

Data update and Transaction
To update an existing doc, we can do the following


var p1 = db.person.findone({lastname:"Ho"})
p1["address"] = "San Jose"

# Do the same in one command
                {$set:{address:"San Jose"}},

Write by default doesn't wait. There are various wait options that the client can specified what conditions to wait before the call returns (this can also achievable by a followup "getlasterror" call), such as where the changes is persisted on disk, or changes has been propagated to sufficient members in the replica set. MongoDb also provides a sophisticated way to assign tags to members of replica set to reflect their physical topology so that customized write policy for each collection can be made based on their reliability requirement.

In RDBMS, "Serializability" is a very fundamental concept about the net effect of concurrently executing work units is equivalent to as if these work units are arrange in some order of sequential execution (one at a time). Therefore, each client can treat as if the DB is exclusively available. The underlying implementation of DB server many use LOCKs or Multi-version to provide the isolation. However, this concept is not available in MongoDb (and many other NOSQL as well)

In MongoDb, every data you read should be treated as a snapshot of the past, which means by the time you look at it, it may have been changed in the DB. Therefore, if you are making a modification based on some previously read data, by the time you send the modification request, the condition where your modification is based on may have changed. If this is not acceptable for your application's consistency requirement, you may need to re-validate the condition at the time you request the modification (ie: a "conditional_modify" should be made).

Under this scheme, a "condition" is attached along with the modification request so that the DB server can validate the condition before applying the modification. (of course, the condition checking and modification must be atomic so no update can happen in between). In MongoDb, this can be achieved by the "findAndModify" call.


var account = db.bank.findone({id:1234})
var old_bal = account['balance']
var new_bal = old_bal + fund

# Pre-condition is specified in search criteria
db.bank.findAndModify({id:1234, balance:old_bal},
                     {$set: {balance: new_bal}})

# Check if the prev command successfully
var success =

if (!success) {

The concept of "transaction" is also missing in MongoDb. While MongoDb guarantee each document will be atomically modified (so no partial update will happen within a doc), but if the update modifies multiple documents, then there are no guarantee on the atomicity across documents.

Therefore, it is the application developers responsibility to implement the multi-update atomicity across multiple documents. We describe a common design pattern to achieve that. This technique is not specific to MongoDb and applicable to other NOSQL store, which can at least guarantee atomicity at the single record level.

The basic idea is to first create a separate document (called transaction) that links together all the documents that you want to modify. And then create a reverse link from each document (to be modified) back to the transaction. By carefully design the sequence of update in the documents and the transaction, we can achieve the atomicity of modifying multiple documents.














 MongoDb's web site has also described a similar technique here (based on the same concept but the implementation is slightly different).

Replication Model
High availability is achieved in MongoDb via Replica Set, which provides data redundancy across multiple physical servers, including a single primary DB as well as multiple secondary DBs. For data consistency, all modifications (insert / update / deletes) request go to the primary DB where modification is made and asynchronously replicated to the other secondary DBs.








Within the replica set, members are interconnected with each other to exchange heartbeat message. A crashed server with missing heartbeat will be detected by other members and removed from the replica set membership. After the dead secondary recovers in future, it can rejoin the cluster by connecting to the primary to catchup the latest update since its last crashed. If the crashes happens in a lengthy period of time where the change log from the primary doesn't cover the whole crash period, then the recovered secondary need to reload the whole data from the primary (as if it was a brand new server).

In case of the primary DB crashes, a leader election protocol will be run among the remaining members to nominate the new primary, based on many factors such as the node priority, node uptime ... etc. After getting majority vote, the new primary server will take place. Notice that due to async replication, the newly elected primary DB doesn't necessary having all the latest updated from the crashed DB.

Client lib provides the API for the App to access the MongoDB server. At startup, the client lib will connect to some member (based on a seed list) of the Replica set and issue a "isMaster" command to gather the current picture of the set (who is the primary and secondaries). After that, the client lib connect to the single primary (where it will send all DB modification request) and some number of secondaries (where it will send read-only queries). The client library will periodically re-run the "isMaster" command to detect if any new members has joined the set. When an existing member in the set is crashed, connections to all existing clients will be dropped and forces a resynchronization of the latest picture.

There is also a special secondary DB called slave delay, which guarantee the data is propagated with a certain time lag with the master. This is used mainly to recover data after accidental deletion of data.

For data modification request, the client will send the request to the primary DB, by default the request will be returned once written to the primary, an optional parameter can be specified to indicate a certain number of secondaries need to receive the modification before return so the client can ensure the majority portion of members have got the request. Of course there is a tradeoff between latency and reliability.

For query request, by default the client will contact the primary which has the latest updated data. Optionally, the client can specify its willingness to read from any secondaries, and tolerate that the returned data may be outdated. This provide an opportunity to load balance the read request across all secondaries. Notice that in this case, a subsequent read following a write may not seen the update.

For read-mostly application, reading form any secondaries can be a big performance improvement. To select the fastest secondary DB member to issue query, the client driver periodically pings the members and will favor issuing the query to the one with lowest latency. Notice that read request is issued to only one node, there is no quorum read or read from multiple nodes in MongoDb.

The main purpose of Replica set is to provide data redundancy and also load balance read-request. It doesn't provide load balancing for write-request since all modification still has to go to the single master.

Another benefit of replica set is that members can be taken offline on an rotation basis to perform expensive operation such as compaction, indexing or backup, without impacting online clients using the alive members.

Sharding Model
To load balance write-request, we can use MongoDb shards. In the sharding setup, a collection can be partitioned (by a partition key) into chunks (which is a key range) and have chunks distributed across multiple shards (each shard will be a replica set). MongoDb sharding effectively provide an unlimited size for data collection, which is important for any big data scenario.

To reiterate, in the sharding model, a single partition key will be specified for each collection that is stored in the sharding cluster. The key space of the partition key is divided into contiguous key range called chunk, which is hosted by corresponding shards.


# To define the partition key
db.runcommand({shardcollection: "testdb.person",
           key: {firstname:1, lastname:1}})

In the shard setting, the client lib connects to a stateless routing server "MongoS", which behaves as if the "MongoD". The routing server plays an important role in forwarding the client request to the appropriate shard server according to the request characteristics.

For insert/delete/update request containing the partition key, based on the chunk/shard mapping information (obtained from the config Server and cache locally), the route server will forward the request to the corresponding primary server hosting the chunk whose key range covers the partition key of the modified doc. Given a particular partition key, the primary server containing that chunk can be unambiguously determined.

In case of query request, the routing server will examine whether the partition key is part of the selection criteria and if so will only "route" the request to the corresponding shard (primary or secondary). However, if the partition key is not part of the selection criteria, then the routing server will "scatter" the request to every shard (pick one member of each shard) which will compute its local search, and the result will be gathered at the routing server and return to the client. When the query requires the result to be sorted, and if the partition key is involved in the sort order, the routing server will route the request sequentially to multiple shards as the client iterate the result. In case the sort involves other key which is not the partition key, the router server will scatter the request to all shards which will perform its local sort, and then merge the result at the routing server (basically a distributed merge-sort).

As data are inserted into chunk and get close to its full capacity, we need to split the chunk. The routing server can detect this situation statistically based on the number of requests it forward as well as the number of other routing server exist. Then the routing server will contact the primary server of the shard that contains the full chunk and request for a chunk split. The shard server will compute the mid point of the key range that can evenly distribute the data and then split the chunk and update the configuration server about its split point. Notice that so far there is no data movement as data is still residing in the same shard server.

On the other hand, there is another "balancer" process (running in one of the routing server) whose job is to make sure each shard carry approximately same number of chunks. When the unbalance condition is detected, the balancer will contact the busy shard to trigger a chunk migration process. This migration process happens online where the origination contacts the destination to initiate a data transfer, and data will start to be copied from the origination to the destination. This process may take some time (depends on the data volume) during which update can happen continuously at the origination. These changes will be tracked at the origination and when the copy finishes, delta will then transfer and applied to the destination. After multiple rounds of applying deltas, the migration enters the final round and the origination will halt and withhold all request coming from the routing server. After the last round of changes have been applied to the destination, the destination will update the configuration server about the new shard configuration and notify the origination shard (which is still withholding the request) to return a StaleConfigException to the Routing server, which will then re-read the latest configuration from the configServer and re-submit the previous requests. At some future point in time, data at the origination will be physically deleted.

It is possible that under a high frequency update condition, the changes being applied to the destination is unable to catch up with the update rate happen at the origination. When this situation is detected, the whole migration process will be aborted. The routing server may pick a different chunk to start the migration afterwards.

Map/Reduce Execution
MongoDb provide a Map/Reduce framework to perform parallel data processing. The concept is similar to Hadoop Map/Reduce, but with the following small differences ...

  • It takes input from the query result of a collection rather than HDFS directory
  • The reduce output can be append to an existing collection rather than an empty HDFS directory
Map/Reduce works in the following way
  1. Client define a map function, reduce function, query that create the input data, and collection that store the output result. Then send the request to the MongoS routing server
  2. MongoS scatter the request to all Shards (one member of each shard)
  3. Each shard server execute the query, and pass the query output to the map function, which execute the user-defined code and emit the key value pairs.
  4. The emitted pairs of the same key will arrive the same Shard primary server, which execute the user-defined reduce function. The return value will then be written to the output collection.

Here is a simple example to build an inverted index from document to topics


               about:["software", "db"]})

db.book.insert({title:"Java programming",
               about:["software", "program"]})

               about:["db", "technology"]})

               about:["db", "software"]})


m = function() {
   for (var i in this.about) {
       emit(this.about[i], this.title)

r = function(k, vals) {
   return({topic:k, title:vals})

db.book.mapReduce(m, r, {query:{},


Overall speaking, I found MongoDb is very powerful and easy to use. I look forward to use MongoDb with Node.js and will share my experience in future blogs.

Especially thanks to Jared Rosoff who provides me a lot of details of how MongoDb is implemented.



Read "Developing Apps Using Active-Active Redis Enterprise" and discover the advantages over other active-actve databases.


Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}