Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

How to Integrate MongoDB With Reappt: Part I

DZone's Guide to

How to Integrate MongoDB With Reappt: Part I

MongoDB forms part of the NoSQL vanguard and is big in more than once sense, which is part of the attraction. So how hard would it be to reflect a MongoDB collection into the topic tree?

· Mobile Zone
Free Resource

MongoDB forms part of the NoSQL vanguard and is big in more than one sense, which is part of the attraction. It is both very popular, finding employ inside eBay, LinkedIn and Craigslist, and it excels in storing & organizing very large volumes of data.

That it stores its content as (what look and feel like) JSON documents makes it interesting from from a Reappt perspective. “How hard ...”, I asked myself, “...would it be to reflect a MongoDB collection into the topic tree?” Not so hard, as it turns out.

The Intent

MongoDB divides is storage into databases which in turn are divided into collections, much like a conventional RDMS uses tables. Our intent is to subscribe to a single collection and all changes to it, creating a discrete JSON topic for each document, and updating each topic in sync with the original document.

Configuring MongoDB for Replication

Our first obstacle: MongoDB doesn’t really do subscription in a pub/sub sense. So instead make use the replication feature, which is really intended for wiring together MongoDB replica sets. Plenty of 3rd party applications make use of it however, including Meteor who do include a more directly useful Publish/Subscribe feature in their product.

It’s an optional feature so it has to be deliberately configured. Typically MongoDB is run as a daemon, but for testing purposes we configure and run ours in the fore.

% mongo --nodb
MongoDB shell version: 3.2.6
> replicaSet = new ReplSetTest({"nodes" : 1, "nodeOptions": {"dbpath": "/tmp/mongodb"} });
> replicaSet.startSet()
> replicaSet.initiate();


As well as starting your MongoDB server this will also output a lot of diagnostic noise, so start another shell for MongoDB interaction.

% mongo localhost:20001
MongoDB shell version: 3.2.6
connecting to: localhost:20000/test
testReplSet:PRIMARY> use someDB
switched to db someDB
testReplSet:PRIMARY> for (i=0; i<100; i++) { db.someCollection.insert({count: i}) }
WriteResult({ "nInserted" : 1 })


The output from this Mongo testReplSet:PRIMARY confirms that we’re logged into the primary (and indeed only) member of the replication set. We then create 100 documents in a collection someDB.someCollection. This gives us our test subject.

Building the Adapter.

All sources for the adapter are available in Github, but here we shall lavish some explanation on the significant parts.

Placing the connectivity

/**
 * Command line bootstrap method.
 */
public static void main(String[] argv) throws Exception {

    final CommandlineArgs args = new CommandlineArgs();
    final JCommander jc = new JCommander(args);
    try {
        jc.parse(argv);
    }
    catch (ParameterException ex) {
        jc.usage();
        System.exit(1);
    }

    final MongoClient mongoClient = new MongoClient(args.getMongoHost());
    final MongoCollection<Document> collection = getCollection(mongoClient,
        args.getMongoDatabase(),
        args.getMongoCollection());

    final MongoCollection<Document> oplog = getCollection(mongoClient,
        "local",
        "oplog.rs");

    final Session session = Diffusion.sessions()
        .principal("admin")
        .password("password")
        .noReconnection()
        .open("ws://localhost:8080");

    final String topicRoot = args.getTopic() + "/" +
        args.getMongoDatabase() + "/" +
        args.getMongoCollection();

    Adapter.build(session, collection, oplog, topicRoot).run();
    mongoClient.close();
}

private static MongoCollection<Document> getCollection(
    final MongoClient mongoClient, String databaseName,
    String collectionName) {
    final MongoDatabase database = mongoClient.getDatabase(databaseName);
    return database.getCollection(collectionName);
}


We chose JCommander because we’ve used it before, and while http://mvnrepository.com/ shows nearly five times as many projects use Apache Commons CLI, the annotation based approach makes for a quicker & cleaner approach.

Having connected to MongoDB using Mongo’s own Javascript driver we then arrange access to two collections; firstly the collection given on the command line and secondly oplog.rs in the database local which exists only when replication is configured, and is used as a sink for all storage mutation events. The OpLog is a “capped” collection, making it a little different to regular collections:

  • Documents can only be appended to a capped collection (though an identically sized documents can be used to overwrite an existing document).
  • Capped collections are akin to circular buffers. They are bounded, and once all allocated storage is used, older storage is then reused.
  • Similarly to the way tail(1) works with files, a capped collection can entertain queries from ‘tailable’ cursors, such that when the cursor reaches the end of the collection the cursors waits while more documents are appended to the collection.

Building the Adapter

/**
 * Command line bootstrap method.
 */
public static void main(String[] argv) throws Exception {

    final CommandlineArgs args = new CommandlineArgs();
    final JCommander jc = new JCommander(args);
    try {
        jc.parse(argv);
    }
    catch (ParameterException ex) {
        jc.usage();
        System.exit(1);
    }

    final MongoClient mongoClient = new MongoClient(args.getMongoHost());
    final MongoCollection<Document> collection = getCollection(mongoClient,
        args.getMongoDatabase(),
        args.getMongoCollection());

    final MongoCollection<Document> oplog = getCollection(mongoClient,
        "local",
        "oplog.rs");

    final Session session = Diffusion.sessions()
        .principal("admin")
        .password("password")
        .noReconnection()
        .open("ws://localhost:8080");

    final String topicRoot = args.getTopic() + "/" +
        args.getMongoDatabase() + "/" +
        args.getMongoCollection();

    Adapter.build(session, collection, oplog, topicRoot).run();
    mongoClient.close();
}

private static MongoCollection<Document> getCollection(
    final MongoClient mongoClient, String databaseName,
    String collectionName) {
    final MongoDatabase database = mongoClient.getDatabase(databaseName);
    return database.getCollection(collectionName);
}


Adapter’s static build method gathers two key features. Firstly it obtains TopicUpdateControl which is used to update topics values, using which it obtains an exclusive ValueUpdater for a branch of the topic tree. Finally we obtain TopicControl in order that we may create and remove topics.

Now we build a MongoNamespace, encapsulating the database and collection used to listen for command events (specifically commands to drop the collection), before constructing and returning an Adapter object. Using a builder rather than a more complex constructor conveys many benefits (as covered in chapter 2 of in Effective Java by Josh Bloch), but my motivations here are: the constructors intent is clear and it is simpler to build an immutable (and therefore thread-safe) Adapter object.

Turning the Handle

/**
 * Enumerate all documents in the collection, then relay changes.
 */
private void run() {
    final long timeNow = System.currentTimeMillis();
    long topicCount = 0;
    LOG.info("Transcribing topics from {} to {}", collection.getNamespace(),
        topicRoot);
    try (
        final MongoCursor<Document> cursor = collection.find().iterator()) {
        while (cursor.hasNext()) {
            final Document update = cursor.next();
            LOG.debug("received: {}", update.toJson());

            transcribeDocument(update);
            topicCount++;
        }
    }
    LOG.info("Transcribed {} topics", topicCount);
    relayChanges(timeNow);
}

/**
 * Create new topic and set its state.
 */
private void transcribeDocument(Document update) {
    final ObjectId id = update.getObjectId("_id");
    final String topicPath = topicPaths.get(id);

    if (topicPath == null) {
        final String newTopicPath = topicRoot + "/" + id.toString();
        LOG.info("Creating {}", newTopicPath);
        topicPaths.put(id, newTopicPath);
        topicControl.addTopic(newTopicPath,
            TopicType.JSON,
            toBytes(update.toJson()),
            new AddCallback.Default());
    }
    else {
        LOG.info("Updating {}", topicPath);
        valueUpdater.update(topicPath, toBytes(update.toJson()),
            new UpdateCallback.Default());
    }
}

private static JSON toBytes(String json) {
  return Diffusion.dataTypes().json().fromJsonString(json);
}


The real work can begin now both the connectivity and state are in place. In method run() we do two things:

  1. Query the MongoDB collection for all documents, transcribing each.
  2. call relayChanges()

We use Java 8’s “Try With Resources” feature so that cursor opened on line 10 is closed a soon as the thread drops out of that scope.

Method transcribeDocument() has the task of building a new JSON topic from the content of the MongoDB document. If the topic has already been created (and therefore exists in topicPaths) we update the topic. Either branch makes use of method toBytes(String) which parses a JSON string down to a form suitable for the ValueUpdater.

Relaying the Changes

Method relayChanges(long) does what it says on the tin: relays changes from the MongoDB instance to the topic tree. Before listening to the OpLog for events it composes a filter to use. In this case we wish only to see events later that ‘now’, that are document-insert, update or delete events, or commands to drop the collection.

Interestingly BsonTimestamp resolves time only to the second, and nothing smaller. It uses an incrementing counter to disambiguate events within that second.

We make the call to oplog.find(filter), and build a TailableAwait cursor, and then read incoming events as they happen. We switching them to either processInsert(Document), processUpdate(Document), processDelete(Document) or processCommand(Document), depending on the value of property op (presumably short for operation).

Methods processUpdate(Document), processDelete(Document) and processCommand(Document) check that the event relates to the adapted collection. A more focussed filter here would be both more performant and eliminate the need for this step.

Method processInsert(Document) mostly reuses transcribeDocument() to update a topic with the new content embedded in the event.

Method processDelete(Document) uses the MongoDB document-id to look up the effected topic, and then remove it.

Method processUpdate(Document) is more interesting as the event contains instructions to change the document, but not the changed document itself. A more fully realised adapter implementation would interpret and react to the changes, however in this case we fetch the changed document from the collection using it’s ObjectId, and pass that to transcribeDocument to update the existing topic. This imposes the cost of an extra round-trip, for the sake of readability.

Summing Up

Above we have shown how little code is actually required to get started. As this is written the Github repository holds only 426 lines of Java. In the next blog we shall show how the adapter can be put to use.

Topics:
reappt ,mobile ,mongodb ,nosql

Published at DZone with permission of Martin Cowie, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}