Over a million developers have joined DZone.

How to Speed Up MongoDB MapReduce by 20x

· Java Zone

Learn more about Kotlin, a new programming language designed to solve problems that software developers face every day brought to you in partnership with JetBrains.

Analytics is becoming an increasingly important topic with MongoDB since it is in use for more and more large critical projects. People are tired of using different software to do analytics (Hadoop being pretty involving), and they typically require a massive transfer of data that can be costly.

MongoDB offers two ways to analyze data in-place: MapReduce and the Aggregation Framework. MR is extremely flexible and easy to take on. It works well with sharding and allows for a very large output. MR was heavily improved in MongoDB v2.4 by the JavaScript engine swap from Spider Monkey to V8. The chief complaint about it is that it is quite slow, especially compared to the Agg Framework (which uses C++). Let’s see if we can squeeze some juice out of it.

The Exercise

Let’s insert 10 million documents containing a single integer value between 0 and 1 million. This means that on average, 10 documents have the same value.

> for (var i = 0; i < 10000000; ++i){ db.uniques.insert({ dim0: Math.floor(Math.random()*1000000) });}
> db.uniques.findOne()
{ "_id" : ObjectId("51d3c386acd412e22c188dec"), "dim0" : 570859 }
> db.uniques.ensureIndex({dim0: 1})
> db.uniques.stats()
{
        "ns" : "test.uniques",
        "count" : 10000000,
        "size" : 360000052,
        "avgObjSize" : 36.0000052,
        "storageSize" : 582864896,
        "numExtents" : 18,
        "nindexes" : 2,
        "lastExtentSize" : 153874432,
        "paddingFactor" : 1,
        "systemFlags" : 1,
        "userFlags" : 0,
        "totalIndexSize" : 576040080,
        "indexSizes" : {
                "_id_" : 324456384,
                "dim0_1" : 251583696
        },
        "ok" : 1
}

From here we want to get the count of unique values. This can be done easily with the following MR job:

> db.runCommand(
{ mapreduce: "uniques", 
map: function () { emit(this.dim0, 1); }, 
reduce: function (key, values) { return Array.sum(values); }, 
out: "mrout" })
{
        "result" : "mrout",
        "timeMillis" : 1161960,
        "counts" : {
                "input" : 10000000,
                "emit" : 10000000,
                "reduce" : 1059138,
                "output" : 999961
        },
        "ok" : 1
}

As you can see, in the output it takes about 1200 seconds (tested on EC2 M3 instance). There are 10 million maps, 1 million reduces, 999961 documents in output. The result looks like this:

> db.mrout.find()
{ "_id" : 1, "value" : 10 }
{ "_id" : 2, "value" : 5 }
{ "_id" : 3, "value" : 6 }
{ "_id" : 4, "value" : 10 }
{ "_id" : 5, "value" : 9 }
{ "_id" : 6, "value" : 12 }
{ "_id" : 7, "value" : 5 }
{ "_id" : 8, "value" : 16 }
{ "_id" : 9, "value" : 10 }
{ "_id" : 10, "value" : 13 }
...

Using Sorting

I’ve outlined in a previous post how beneficial using a sort can be for MR. It is a very poorly understood feature. In this case, processing the input unsorted means that the MR engine will get the values in random order and will not have the opportunity to reduce at all in RAM. Instead it will have to write all the documents back to disk in a temporary collection, to later read them back in order and reduce. Let’s see if using a sort helps:

> db.runCommand(
{ mapreduce: "uniques", 
map: function () { emit(this.dim0, 1); }, 
reduce: function (key, values) { return Array.sum(values); }, 
out: "mrout", 
sort: {dim0: 1} })
{
        "result" : "mrout",
        "timeMillis" : 192589,
        "counts" : {
                "input" : 10000000,
                "emit" : 10000000,
                "reduce" : 1000372,
                "output" : 999961
        },
        "ok" : 1
}

That’s a big help indeed! We’re down to 192s which is already a 6x improvement. The number of reduces is about the same, but now they are done in RAM before the results are written to disk.

Using Multiple Threads

MongoDB does not multithread a single MR job - it will only multithread multiple jobs. But with the multi-core CPUs it could be very advantageous to parallelize the job within a single server, Hadoop style. What we need is really to subdivide the input into several chunks and spin up one MR job for each chunk. Maybe the data set has an easy way to get split, but otherwise the splitVector command (not documented) enables you to very quickly find split points:

> db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32000000})
{
    "timeMillis" : 6006,
	"splitKeys" : [
		{
			"dim0" : 18171
		},
		{
			"dim0" : 36378
		},
		{
			"dim0" : 54528
		},
		{
			"dim0" : 72717
		},
…
		{
			"dim0" : 963598
		},
		{
			"dim0" : 981805
		}
	],
	"ok" : 1
}

This command only takes about 5s to find the split points over 10m documents, that’s fast! So now we just need a way to create multiple MR jobs. From an application server it would be pretty easy using multiple threads and a query with $gt / $lt for the MR command. From the shell, one can use the ScopedThread object, which works as follows:

> var t = new ScopedThread(mapred, 963598, 981805)
> t.start()
> t.join()

So now we can put together some quick JS code which will spawn four threads (as many as cores), wait and display the results:

> var res = db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32 *1024 * 1024 })
> var keys = res.splitKeys
> keys.length
39
> var mapred = function(min, max) { 
return db.runCommand({ mapreduce: "uniques", 
map: function () { emit(this.dim0, 1); }, 
reduce: function (key, values) { return Array.sum(values); }, 
out: "mrout" + min, 
sort: {dim0: 1}, 
query: { dim0: { $gte: min, $lt: max } } }) }
> var numThreads = 4
> var inc = Math.floor(keys.length / numThreads) + 1
> threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
min:0 max:274736
min:274736 max:524997
min:524997 max:775025
min:775025 max:{ "$maxKey" : 1 }
connecting to: test
connecting to: test
connecting to: test
connecting to: test
> for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
{ 
        "result" : "mrout0",
        "timeMillis" : 205790,
        "counts" : {
                "input" : 2750002,
                "emit" : 2750002,
                "reduce" : 274828,
                "output" : 274723
        },
        "ok" : 1
}
{ 
        "result" : "mrout274736",
        "timeMillis" : 189868,
        "counts" : {
                "input" : 2500013,
                "emit" : 2500013,
                "reduce" : 250364,
                "output" : 250255
        },
        "ok" : 1
} 
{
        "result" : "mrout524997",
        "timeMillis" : 191449,
        "counts" : {
                "input" : 2500014,
                "emit" : 2500014,
                "reduce" : 250120,
                "output" : 250019
        },
        "ok" : 1
}
{
        "result" : "mrout775025",
        "timeMillis" : 184945,
        "counts" : {
                "input" : 2249971,
                "emit" : 2249971,
                "reduce" : 225057,
                "output" : 224964
        },
        "ok" : 1
}

The 1st thread does a bit more than the other ones, but still it amounts to about 190s per thread … which means this is not faster than 1 thread! That is curious, since using ‘top’ you can see all cores working to some extent.

Using Multiple Databases

The issue is that there is too much lock contention between the threads. MR is not very altruistic when locking (it yields every 1000 reads), and since MR jobs does a lot of writing too, threads end up waiting on each other. Since MongoDB has individual locks per database, let’s try using a different output db for each thread:

> var mapred = function(min, max) { 
return db.runCommand({ mapreduce: "uniques", 
map: function () { emit(this.dim0, 1); }, 
reduce: function (key, values) { return Array.sum(values); }, 
out: { replace: "mrout" + min, db: "mrdb" + min }, 
sort: {dim0: 1}, 
query: { dim0: { $gte: min, $lt: max } } }) }
> threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
min:0 max:274736
min:274736 max:524997
min:524997 max:775025
min:775025 max:{ "$maxKey" : 1 }
connecting to: test
connecting to: test
connecting to: test
connecting to: test
> for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
...
{ 
        "result" : {
                "db" : "mrdb274736",
                "collection" : "mrout274736"
        },
        "timeMillis" : 105821,
        "counts" : {
                "input" : 2500013,
                "emit" : 2500013,
                "reduce" : 250364,
                "output" : 250255
        },
        "ok" : 1
}
...

That’s more like it! We are now down to 100s, which means about 2x improvement compared to a single thread. Not as good as hoped, but still good. Here I have four cores so I only get 2x, but an 8-core CPU will give you 4x etc.

Using the Pure JavaScript Mode

There is something very interesting that comes up when splitting the input data between threads: each thread now only has about 250,000 unique keys to output as opposed to 1m. This means that we can make use of the “pure JS mode” that can be turned on using jsMode:true. When on, MongoDB will not translate objects back and forth from JS to BSON as it is doing the processing, and instead it reduces all objects from an internal JS dictionary with a limit of 500,000 keys. Let’s see if that helps:

> var mapred = function(min, max) { 
return db.runCommand({ mapreduce: "uniques", 
map: function () { emit(this.dim0, 1); }, 
reduce: function (key, values) { return Array.sum(values); }, 
out: { replace: "mrout" + min, db: "mrdb" + min }, 
sort: {dim0: 1}, 
query: { dim0: { $gte: min, $lt: max } }, 
jsMode: true }) }
> threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
min:0 max:274736
min:274736 max:524997
min:524997 max:775025
min:775025 max:{ "$maxKey" : 1 }
connecting to: test
connecting to: test
connecting to: test
connecting to: test
> for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
...
{ 
        "result" : {
                "db" : "mrdb274736",
                "collection" : "mrout274736"
        },
        "timeMillis" : 70507,
        "counts" : {
                "input" : 2500013,
                "emit" : 2500013,
                "reduce" : 250156,
                "output" : 250255
        },
        "ok" : 1
}
...

We are now down to 70s, getting there! The jsMode can really help, especially when objects have many fields. Here there is a single number field, and it still helped by 30%.

Improvement in MongoDB v2.6

Very early in v2.6 development we got rid of a piece of code that sets an optional “args” parameter of any JS function call. This was not standard, nor used, but it was kept for legacy reason (see SERVER-4654). Let’s pull MongoDB from the master Git repository, compile it and run the test again:

...
{ 
        "result" : {
                "db" : "mrdb274736",
                "collection" : "mrout274736"
        },
        "timeMillis" : 62785,
        "counts" : {
                "input" : 2500013,
                "emit" : 2500013,
                "reduce" : 250156,
                "output" : 250255
        },
        "ok" : 1
}
...

There is definitely an improvement there since we are down to 60s, so about 10-15%. This change also improved the overall heap consumption of the JS engine.

Conclusion

Looking back, we’ve started at 1200s and ended at 60s for the same MR job, which represents a 20x improvement! This improvement should be available to most use cases, even if some of the tricks are not ideal (e.g. using multiple output dbs / collections). Nevertheless, this can give people ideas on how to speed up their MR jobs and hopefully some of those features will be made easier to use in the future. The following ticket will make ‘splitVector’ command more available, and this ticket will improve multiple MR jobs on the same database. Cheers!

The Java Zone is brought to you in partnership with JetBrains.  Discover how powerful static code analysis and ergonomic design make development not only productive but also an enjoyable experience.

Topics:

Published at DZone with permission of Antoine Girbal , DZone MVB .

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}