Basic Aggregation in MongoDB 2.1 with Python
Join the DZone community and get the full member experience.
Join For FreeContinuing on in my series on MongoDB and Python, this article will explore the new aggregation framework introduced in MongoDB 2.1. If you're just getting started with MongoDB, you might want to read the previous articles in the series first:
- Getting Started with MongoDB and Python
- Moving Along With PyMongo
- GridFS: The MongoDB Filesystem
- Aggregation in MongoDB (Part 1)
And now that you're all caught up, let's jump right in....
Why a new framework?
If you've been following along with this article series, you've been introduced to MongoDB's mapreduce command, which up until MongoDB 2.1 has been the go-to aggregation tool for MongoDB. (There's also the group() command, but it's really no more than a less-capable and un-shardable version of mapreduce(), so we'll ignore it here.) So if you already have mapreduce() in your toolbox, why would you ever want something else?
Mapreduce is hard; let's go shopping
The first motivation behind the new framework is that, while mapreduce() is a flexible and powerful abstraction for aggregation, it's really overkill in many situations, as it requires you to re-frame your problem into a form that's amenable to calculation using mapreduce(). For instance, when I want to calculate the mean value of a property in a series of documents, trying to break that down into appropriate map, reduce, and finalize steps imposes some extra cognitive overhead that we'd like to avoid. So the new aggregation framework is (IMO) simpler.
The Javascript global interpreter lock is evil
The MapReduce algorithm, the basis of MongoDB's mapreduce() command, is a great approach to solving Embarrassingly Parallel problems. Each invocation of map, reduce, and finalize is completely independent of the others (though the map/reduce/finalize phases are order-dependent), so we should be able to dispatch these jobs to run in parallel without any problems.
Unfortunately, due to MongoDB's use of the SpiderMonkey Javascript engine, each mongod process is restricted to running a single Javascript thread at a time. So in order to get any parallelism with a MongoDB mapreduce(), you must run it on a sharded cluster, and on a cluster with N shards, you're limited to N-way parallelism.
The new aggregation framework, on the other hand, is implemented in C++ and has its own BSON-based language, so you're not hitting the Javascript interpreter at all.
Mapreduce can't actually do everything
If you read the my previous article on MongoDB aggregation, you might have noticed that the output format of a mapreduce() command is pretty restrictive, always being of the form
{ _id: ..., /* some (possibly compound) key */ value: ... /* some (possibly compound) value */ }
But what if you need the results in some other format? With mapreduce, you're stuck.
Another thing that mapreduce() doesn't do well is multiple computation phases. Of course, you can always run mapreduce() multiple times, but it would be nice if there were a way to to it all in one command.
The new aggregation framework, on the other hand, has phased computation at its heart, with the pipeline the fundamental structure of aggregation.
A series of tubes
So now to the rescue comes MongoDB's new aggregation framework. To understand the new framework, the first thing you need to know is that it's all based around the aggregation pipeline. Conceptually, MongoDB sends each document in a collection into the first operator on the pipeline, which may modify it and send it along to the next operation, similar to a UNIX pipeline.
Another way you can look at the new aggregation framework is that it's a 'super-find'. The aggregate command can do anthing the find() method can do, and quite a bit more. For instance, the following query:
q = db.my_collection.find({...some query...}, {...some fields...}) q = q.sort(...some sort...) q = q.skip(...some skip...) q = q.limit(...some limit...)
would be implemented by the following aggregate pipeline:
pipeline = [ { '$match': ...some query... }, { '$sort': ...some sort...}, { '$skip': ...some skip... }, { '$limit': ...some limit... } { '$project': ...some fields...}, ] q = db.command('aggregate', 'my_collection', pipeline=pipeline )
When you're using aggregate, one thing to keep in mind is that order matters. Because of this, you want to place the most selective parts of your pipeline first, in order to reduce the number of documents successive pipeline stages have to process.
Above, the pipeline (conceptually) does the following:
- For every document in 'my_collection', pass along only those documents that $match the given query
- $sort all the documents given up to this point by the sort key, and then send the sorted documents along
- $skip over the given number of documents, not sending them along to the successive stages of the pipeline, and then send the rest down the pipe.
- Only send the given $limit number of documents to the next stages of the pipeline
- $project the documents into a new format, sending along each re-shaped document
At the end, we have something that duplicates the output of the query above. Now, if we had decided to $sort, then $match, the server could have quite a few more documents to $sort.
Actually aggregating
Although it's nice that MongoDB has grown a 'super-find' operation, our aggregation pipelines can do much more. Suppose we want to compute an average of a number of x values, grouped by y values. First, we'll insert some random data for testing (I'm running my MongoDB server on port 27018, rather than the default 27017):
import pymongo import random conn = pymongo.Connection(port=27018) db = conn.agg db.data.insert([ ... dict(x=random.random(), y=random.randint(0,10)) ... for i in range(50)]) [ObjectId(...)...]
Basic grouping and document shaping
Now, to compute the average, we can use the $group pipeline operator:
pipeline = [ ... {'$group': { ... '_id': '$y', ... 'mean': {'$avg': '$x'} } } ] db.command('aggregate', 'data', pipeline=pipeline) {u'ok': 1.0, u'result': [ {u'_id': 8, u'mean': 0.45158769756086975}, {u'_id': 7, u'mean': 0.10767920380630946}, {u'_id': 9, u'mean': 0.537524071579183}, {u'_id': 5, u'mean': 0.4712010849527597}, {u'_id': 10, u'mean': 0.6450144108731366}, {u'_id': 3, u'mean': 0.5829812441520267}, {u'_id': 2, u'mean': 0.5139874597597852}, {u'_id': 1, u'mean': 0.35474322219101523}, {u'_id': 4, u'mean': 0.6813855492820068}, {u'_id': 0, u'mean': 0.47123430216880813}]}
The first thing to note above is the use of $x and $y. This is the aggregation framework's syntax for referring to fields in a document. This is necessary because we can do some simple operations on documents as well. For instance, suppose we wanted to double the x values. For this, we would use the $project operator:
pipeline = [ ... { '$project': { ... '_id': 0, ... 'x': p1, ... 'y': 1, ... 'double_x': { ... '$multiply': [ '$x', 2 ] ... } } } ] db.command('aggregate', 'data', pipeline=pipeline) {u'ok': 1.0, u'result': [ {u'y': 8, u'x': 0.0731577856277077, u'double_x': 0.1463155712554154}, {u'y': 8, u'x': 0.6628554147686313, u'double_x': 1.3257108295372626}, ... ] }
Unwinding arrays
Another nice thing we can do is to treat elements of an embedded array in a document as documents themselves. Suppose we had a number of comments embedded in a blog post, for instance:
db.blog.articles.insert( [ ... { 'title': 'First Post', ... 'comments': [ ... {'author': 'Someone'}, ... {'author': 'Someone Else'}, ... {'author': 'Someone'}, ... {'author': 'Someone'} ] }, ... { 'title': 'Second Post', ... 'comments': [ ... {'author': 'Another commenter'}, ... {'author': 'Someone Else'}, ... {'author': 'Someone'} ] } ] ) [ObjectId(...), ObjectId(...)]
Now, suppose we want to find out how many times each commenter has posted. To do this, we'll start by using the $unwind operator:
pipeline = [ {'$unwind': '$comments'} ] db.command('aggregate', 'blog.articles', pipeline=pipeline) {u'ok': 1.0, u'result': [ {u'_id': ObjectId(...), u'comments': {u'author': u'Someone'}, u'title': u'First Post'}, {u'_id': ObjectId('...'), u'comments': {u'author': u'Someone Else'}, u'title': u'First Post'}, ... ] }
Note here how the documents coming out of $unwind have the same structure as the input, except for the unwound array, which has been replaced by a single element. Once we have the unwound documents, we can add some other operators to calculate the number of posts per commenter:
pipeline = [ ... {'$unwind': '$comments'}, ... {'$group': { ... '_id': '$comments.author', ... 'comments': { '$sum': 1 } } } ] db.command('aggregate', 'blog.articles', pipeline=pipeline) {u'ok': 1.0, u'result': [ {u'_id': u'Another commenter', u'comments': 1}, {u'_id': u'Someone Else', u'comments': 2}, {u'_id': u'Someone', u'comments': 4}]}
One thing to note is that we can use $group as a sort of reverse-$unwind by using the agggregation function $push. Returning to our synthetic (x,y) example, we can "wind up" all the x values for a given y:
pipeline = [ ... {'$group': { ... '_id': '$y', ... 'xs': {'$push': '$x'} } }, ... {'$project': { ... '_id': 0, ... 'y': '$_id', ... 'xs': 1 } }, ... {'$sort': 'y'} ] db.command('aggregate', 'data', pipeline=pipeline) {u'ok': 1.0, u'result': [ {u'y': 0, u'xs': [ 0.5480646819579362, 0.06989250292970206, 0.1483275969049508, 0.37886136252339875, 0.944175551482085, 0.6528854869305418, 0.6699875222192363, 0.038172503680066416, 0.29335286756814327, 0.8896985443297418, 0.5501587033310872] }, ... ] }
Here, we first $grouped to compute the aggregate we are interested in, then applied a $project to rename the fields a bit, finishing up with a $sort to present the results in sorted order.
The new aggregation can't do everything (yet)
So now that you've seen the amazing new aggregation framework, you might be ready to drop everything else and use it. Unfortunately, there are a few things you need to be aware of that can bite you.
There is no $out operator
Sadly, the planned $out operator didn't make it into the 2.1 release of the aggregation framework. What this means is that you can't define a pipeline and have it put its results into another collection; you need to be ready to process the results when the command returns. Because of this, aggregation results are limited to 16MB due to the document size limit in MongoDB. So if you need to compute millions of groups, you're going to be stuck one of the following options:
- Breaking up your pipeline with $match or $skip/$limit so each pipeline's results are under 16MB
- Falling back to mapreduce
- Puling the data out of MongoDB and doing the aggregations in some external system (this is the approach used in Zarkov, for instance). all the computation to happen on the server.
There is no explain()
If you're interested in how your queries scale in MongoDB (and you should be), you've probably become quite accustomed to the explain() command. Unfortunately, there is no corresponding command for the new aggregation framework. This means you have to be careful when constructing your pipelines. The rules of thumb I use are
- Put your $match operator first. This operator can use an index (yay!), and you can actually test that by using the same query with find().explain().
- If possible, put your $sort immediately after the $match. This makes the first two stages behave just like a find().sort(), which again is explain()able.
- Be careful with the stateful operators. Most pipeline operators are stateless, so their memory usage does not change with the length of the pipeline. But $group and $sort are not, since they accumulate results in RAM. So be careful where you put them.
There are a number of other features in the new aggregation framework that I haven't covered here. For more details, visit the official docs. I'd be interested in hearing what you think about the new aggregation framework, particularly if anyone is using it in production (or pre-production). Let me know in the comments below!
Published at DZone with permission of Rick Copeland. See the original article here.
Opinions expressed by DZone contributors are their own.
Trending
-
A Data-Driven Approach to Application Modernization
-
What Is JHipster?
-
JavaFX Goes Mobile
-
How To Scan and Validate Image Uploads in Java
Comments