MongoDB Pub/Sub with Capped Collections
Join the DZone community and get the full member experience.
Join For FreeIf you've been following this blog for any length of time, you know that my NoSQL database of choice is MongoDB. One thing that MongoDB isn't known for, however, is building a publish / subscribe system. Redis, on the other hand, is known for having a high-bandwith, low-latency pub/sub protocol. One thing I've always wondered is whether I can build a similar system atop MongoDB's capped collections, and if so, what the performance would be. Read on to find out how it turned out...
Capped Collections
If you've not heard of capped collections before, they're a nice little feature of MongoDB that lets you have a high-performance circular queue. Capped collections have the following nice features:
- They "remember" the insertion order of their documents
- They store inserted documents in the insertion order on disk
- They remove the oldest documents in the collection automatically as new documents are inserted
However, you give up some things with capped collections:
- They have a fixed maximum size
- You cannot shard a capped collection
- Any updates to documents in a capped collection must not cause a document to
grow. (i.e. not all
$set
operations will work, and no$push
or$pushAll
will) - You may not explicitly
.remove()
documents from a capped collection
To create a capped collection, just issue the following command (all the examples
below are in Python, but you can use any driver you want including the Javascript
mongo
shell):
db.create_collection( 'capped_collection', capped=True, size=size_in_bytes, # required max=max_number_of_docs, # optional autoIndexId=False) # optional
In the example above, I've created a collection that takes up size_in_bytes
bytes on disk, will contain no more than max_number_of_docs
, and which does
not create an index on the _id
field as would normally happen.
Above, I mentioned that the capped collection remembers the insertion order of
its documents.
If you issue a find()
with no sort specified, or with a sort of
('$natural', 1)
, then MongoDB will sort your result in insertion order.
(($natural, -1)
will likewise sort the result in reverse insertion order.)
Since insertion order is the same as the on-disk ordering, these queries are
extremely fast.
To see this, let's create two collections, one capped and one uncapped, and fill
both with small documents:
size = 100000 # Create the collection db.create_collection( 'capped_collection', capped=True, size=2**20, autoIndexId=False) db.create_collection( 'uncapped_collection', autoIndexId=False) # Insert small documents into both for x in range(size): db.capped_collection.insert({'x':x}, manipulate=False) db.uncapped_collection.insert({'x':x}, manipulate=False) # Go ahead and index the 'x' field in the uncapped collection db.uncapped_collection.ensure_index('x')
Now we can see the performance gains by executing find()
on each.
For this, I'll use the IPython, IPyMongo, and the magic
%timeit
function:
In [72] (test): %timeit x=list(db.capped_collection.find()) 1000 loops, best of 3: 708 us per loop In [73] (test): %timeit x=list(db.uncapped_collection.find().sort('x')) 1000 loops, best of 3: 912 us per loop
So we get a moderate speedup, which is nice, but not spectacular. What becomes really interesting with capped collections is that they support tailable cursors.
Tailable cursors
If you're querying a capped collection in insertion order, you can pass a special
flag to find()
that says that it should "follow the tail" of the collection if
new documents are inserted rather than returning the result of the query on the
collection at the time the query was initiated.
This behavior is similar to the behavior of the Unix tail -f
command, hence its
name.
To see this behavior, let's query our capped collection with a 'regular' cursor
as well as a 'tailable' cursor. First, the 'regular' cursor:
In [76] (test): cur = db.capped_collection.find() In [77] (test): cur.next() Out[77]: {u'x': 0} In [78] (test): cur.next() Out[78]: {u'x': 1} In [79] (test): db.capped_collection.insert({'y': 1}) Out[79]: ObjectId('515f205cfb72f0385c3c2414') In [80] (test): list(cur) Out[80]: [{u'x': 2}, ... {u'x': 99}]
Notice above that the document we inserted {'y': 1}
is not included in the
result since it was inserted after we started iterating.
Now, let's try a tailable cursor:
In [81] (test): cur = db.capped_collection.find(tailable=True) In [82] (test): cur.next() Out[82]: {u'x': 1} In [83] (test): cur.next() Out[83]: {u'x': 2} In [84] (test): db.capped_collection.insert({'y': 2}) Out[84]: ObjectId('515f20ddfb72f0385c3c2415') In [85] (test): list(cur) Out[85]: [{u'x': 3}, ... {u'x': 99}, {u'_id': ObjectId('515f205cfb72f0385c3c2414'), u'y': 1}, {u'_id': ObjectId('515f20ddfb72f0385c3c2415'), u'y': 2}]
Now we see that both the "y" document we created before as well as the one created during this cursor's iteration included in the result.
Waiting on data
While tailable cursors are nice for picking up the inserts that happened while we were iterating over the cursor, one thing that a true pub/sub system needs is low latency. Polling the collection to see if messages have been inserted is a non-starter from a latency standpoint because you have to do one of two things:
- Poll continuously, using prodigious server resources
- Poll intermittently, increasing latency
Tailable cursors have another option you can use to "fix" the above problems: the
await_data
flag.
This flag tells MongoDB to actually wait a second or two on an exhausted tailable
cursor to see if more data is going to be inserted.
In PyMongo, the way to set this flag is quite simple:
cur = db.capped_collection.find( tailable=True, await_data=True)
Building a pub/sub system
OK, now that we have a capped collection, with tailable cursors awaiting data, how can we make this into a pub/sub system? The basic approach is:
- We use a single capped collection of moderate size (let's say 32kB) for all messages
- Publishing a message consists of inserting a document into this collection with
the following format:
{ 'k': topic, 'data': data }
- Subscribing to the collection is a tailable query on the collection, using a regular expression to only get the messages we're interested in.
The actual query we use is similar to the following:
def get_cursor(collection, topic_re, await_data=True): options = { 'tailable': True } if await_data: options['await_data'] = True cur = collection.find( { 'k': topic_re }, **options) cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes return cur
Once we have the get_cursor
function, we can do something like the following to
execute the query:
import re, time while True: cur = get_cursor( db.capped_collection, re.compile('^foo'), await_data=True) for msg in cur: do_something(msg) time.sleep(0.1)
Of course, the system above has a couple of problems:
- We have to receive every message in the collection before we get to the 'end'
- We have to go back to the beginning if we ever exhaust the cursor (and its
await_data
delay)
The way we can avoid these problems is by adding a sequence number to each message.
Sequences
"But wait," I imagine you to say, "MongoDB doesn't have an autoincrement field
like MySQL! How can we generate sequences?"
The answer lies in the find_and_modify()
command, coupled with the $inc
operator in MongoDB.
To construct our sequence generator, we can use a dedicated "sequence" collection
that contains nothing but counters.
Each time we need a new sequence number, we perform a find_and_modify()
with
$inc
and get the new number.
The code for this turns out to be very short:
class Sequence(object): def __init__(self, db, name='mongotools.sequence'): self._db = db self._name = name def cur(self, name): doc = self._db[self._name].find_one({'_id': name}) if doc is None: return 0 return doc['value'] def next(self, sname, inc=1): doc = self._db[self._name].find_and_modify( query={'_id': sname}, update={'$inc': { 'value': inc } }, upsert=True, new=True) return doc['value']
Once we have the ability to generate sequences, we can now add a sequence number to our messages on publication:
def pub(collection, sequence, key, data=None): doc = dict( ts=sequence.next(collection.name), k=key, data=data) collection.insert(doc, manipulate=False)
Our subscribing query, unfortunately, needs to get a bit more complicated:
def get_cursor(collection, topic_re, last_id=-1, await_data=True): options = { 'tailable': True } spec = { 'ts': { '$gt': last_id }, # only new messages 'k': topic_re } if await_data: options['await_data'] = True cur = collection.find(spec, **options) cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes return cur
And our dispatch loop likewise must keep track of the sequence number:
import re, time last_id = -1 while True: cur = get_cursor( db.capped_collection, re.compile('^foo'), await_data=True) for msg in cur: last_id = msg['ts'] do_something(msg) time.sleep(0.1)
We an actually improve upon this a tiny bit by finding the ts
field of the last
value in the collection and using it to initialize our last_id
value:
last_id = -1 cur = db.capped_collection.find().sort([('$natural', -1)]) for msg in cur: last_id = msg['ts'] break ...
So we've fixed the problem of processing messages multiple times, but we still have a slow scan of the whole capped collection on startup. Can we fix this? It turns out we can, but not without questionable "magic."
Now, for some questionable magic...
You may be wondering why I would use a strange name like ts
to hold a sequence
number.
It turns out that there is poorly documented option for cursors that we can abuse
to substantially speed up the initial scan of the capped collection: the
oplog_replay
option.
As is apparent from the name of the option, it is mainly used to replay the
"oplog", that magic capped collection that makes MongoDB's replication internals
work so well.
The oplog uses a ts
field to indicate the timestamp of a particular operation,
and the oplog_replay
option requires the use of a ts
field in the query.
Now since oplog_replay
isn't really intended to be (ab)used by us mere
mortals, it's not directly exposed in the PyMongo driver.
However, we can manage to get to it via some trickery:
from pymongo.cursor import _QUERY_OPTIONS def get_cursor(collection, topic_re, last_id=-1, await_data=True): options = { 'tailable': True } spec = { 'ts': { '$gt': last_id }, # only new messages 'k': topic_re } if await_data: options['await_data'] = True cur = collection.find(spec, **options) cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes if await: cur = cur.add_option(_QUERY_OPTIONS['oplog_replay']) return cur
(Yeah, I know it's bad to import an underscore-prefixed name from another
module.
But it's marginally better than simply saying oplog_replay_option=8
, which is
the other way to make this whole thing work....)
Performance
So now we have the skeleton of a pubsub system using capped collections.
If you'd like to use it yourself, all the code is available on Github in the
MongoTools project.
So how does it perform?
Well obviously the performance depends on the particular type of message passing
you're doing.
In the MongoTools project, there are a couple of Python example programs
latency_test_pub.py
and latency_test_sub.py
in the
mongotools/examples/pubsub
directory that allow you to do your own
benchmarking.
In my personal benchmarking, running everything locally with small messages, I'm
able to get about 1100 messages per second with a latency of 2.5ms (with
publishing options -n 1 -c 1 -s 0
), or about 33,000 messages per second with a
latency of 8ms (this is with -n 100 -c 1 -s 0
).
For pure publishing bandwidth (the subscriber can't consume this many messages
per second), I seem to max out at around 75,000 messages (inserts) per second.
So what do you think? With MongoTools pubsub
module is MongoDB a viable
competitor to Redis as a low-latency, high-bandwidth pub/sub channel? Let me know
in the comments below!
Published at DZone with permission of Rick Copeland. See the original article here.
Opinions expressed by DZone contributors are their own.
Trending
-
How to Submit a Post to DZone
-
DZone's Article Submission Guidelines
-
Auditing Tools for Kubernetes
-
Extending Java APIs: Add Missing Features Without the Hassle
Comments