Written by Luke Lovett.
It’s been almost a year since the last feature release of the MongoDB Connector for Hadoop. We’re very pleased to announce the release of 1.4, which contains a few excellent improvements and many bugfixes. In this article, I’d like to focus on one improvement in particular that really improves the performance of the connector: support for Hadoop’s speculative execution.
What is Speculative Execution?
When processing data in a distributed system like Hadoop, it’s possible for some of that processing workload to be bottlenecked by one or two nodes that are especially slow. There could be many reasons why the nodes are slow such as software misconfiguration or hardware failure. Perhaps the network around these nodes is especially saturated. Whatever the cause, the Hadoop job cannot complete until these slow nodes catch up on their work.
Speculative execution allows Hadoop to reschedule some tasks at the end of a job redundantly. This means that several nodes may end up with exactly the same segment of data to process, even though only one of these nodes needs to complete that work successfully in order for the job to finish. It becomes a race to the finish: if one or two of the nodes in the Hadoop cluster are slower than the rest, then any other faster node, given the same task, may complete it first. In this case, the tasks scheduled on the slower nodes are cancelled, and the job completes earlier than it would if it had to wait on the slower nodes.
This seems like a clever optimization so far, but it’s one that caused some serious problems when writing to MongoDB from the Hadoop connector. Let’s explore what was causing this problem a little deeper...
MongoOutputFormat creates a MongoRecordWriter, which sends data as soon as it receives it straight to MongoDB using the MongoDB Java Driver. Recall that allowing speculative execution on a Hadoop cluster allows the ResourceManager to schedule tasks redundantly. If all these redundant tasks are running at once, and it is very likely that they are, then each one of them is writing to MongoDB, regardless of whether it ultimately will complete.
In cases where the job emits documents that have an _id field already, this can result in DuplicateKeyErrors. One of the redundant tasks finished the race first, but the losers will still try to insert documents with IDs that already exist because they were inserted by the winner! If the job emits documents that don’t have an _id, then the Java Driver adds them automatically. If no duplicate IDs are generated by the driver, we dodge the DuplicateKeyErrors, but now we have duplicate documents in our database with different IDs! Either way, this is not desirable.
Previously, we recommended that users turn off speculative execution. This avoids this nasty behavior but shuts off a useful feature. I took a look at otherHadoopconnectors, and they all face similar problems and make the same recommendation. The problem seemed endemic to writing to a live-running system.
Is there any case where Hadoop speculative execution can avoid duplicating records in the output? The answer is yes, and when writing output to a file, the solution is pretty easy. Each task creates a scratch directory where it can write temporary files. As each task begins to generate output, this output gets written to a temporary file. Another piece of the puzzle is a class known as an OutputCommitter. The OutputCommitter defines methods that are called when a job or a task is about to start, has been aborted, or completed. Usually, each OutputFormat defines a type of OutputCommitter to use. If you were using FileOutputFormat, for example, you’re also using the FileOutputCommitter.
The FileOutputCommitter just deletes all the temporary directories immediately for tasks that were aborted. In the case of our slow nodes, their tasks were rescheduled on other faster nodes and finished before the slow nodes did, so now the slow nodes are cleaned up. The tasks that finished on the fast nodes have their temporary files collected together into a single directory that represents the output for the entire job. Since the output comes only from tasks that completed successfully, there are no duplicate records from the output.
We took a similar approach supporting speculative execution for writing to MongoDB. MongoRecordWriter, instead of writing directly to MongoDB, writes instead to a temporary directory. Each insert or update operation has a special serialized format that gets written. When a task is aborted, these files are removed. When a task completes, the MongoOutputCommitter reads the file and performs each operation.
This is sufficient for allowing the Hadoop connector to play nice with speculative execution. At this point, however, we can go a step further to allow another optimization.
For almost a year now, MongoDB drivers have supported a bulk operations API. MongoDB server versions 2.6 and later support bulk operations, which tend to complete much faster than the same operations sent serially. The Hadoop connector has never taken advantage of the bulk API. However, now that each Task produces a frozen batch of operations already in the form of a temporary file, it’s fairly straightforward to use the bulk API to send these operations to MongoDB.
Using the bulk API in a Hadoop job, which can process and produce terabytes or even petabytes of documents, makes an enormous positive impact on performance. We did our best to measure exactly what kind of benefit this provides. We wrote an “identity” MapReduce job (i.e., a job whose output is identical to the input and has no processing in the middle). The input for the job was a large BSON file, akin to what might be produced by the “mongodump” program.
We compared the performance of the “identity” job before and after the MongoOutputCommitter and bulk write changes on a 5-node Hadoop cluster running CDH4. The input to the job was the “enron emails” data set, which comprises 501,513 documents, each one about 4k in size. Before the MongoOutputCommitter and bulk write changes, the Hadoop job took 147 minutes to complete. Of course, some of this measurement represents time spent moving splits between nodes in the Hadoop cluster, but much of that time was Hadoop connector overhead since there was no processing required in this Job. After the bulk write changes, the same job took 6 minutes! If we assume that most of the remaining 6 minutes of execution is connector overhead as well (moving data to MongoDB still has to take some amount of time), then that’s almost a 96% improvement!
Not only have we fixed a bug, but we’ve made a tremendous improvement on the performance of the connector in a pretty common use case (i.e., using MongoDB as a sink for Hadoop jobs). We’re hoping that this improvement and others in 1.4 make our users very happy, and that our users continue to be a great supportive community around this project. To take advantage of the improvement discussed here and many others, download version 1.4.0 of the MongoDB Hadoop Connector by adding the following to your pom.xml:
<dependency> <groupId>org.mongodb.mongo-hadoop</groupId> <artifactId>mongo-hadoop-core</artifactId> <version>1.4.0</version> </dependency>
Or if you use Gradle, try this:
You can also visit the project’s home on Github and download the jars directly from the “releases” page.
Finally, you can read all the release notes here.
The JVM Drivers Team
Want more MongoDB? Learn how Apache Spark and MongoDB work together to turn analytics into real-time.