Over a million developers have joined DZone.

Scala-Powered Streaming Migrations in MongoDB on Millions of Records

· Database Zone

To stay on top of the changing nature of the data connectivity world and to help enterprises navigate these changes, download this whitepaper from Progress Data Direct that explores the results of the 2016 Data Connectivity Outlook survey.

 

In my project at work we have some production systems using MongoDB as their primary database. Quite a number of the collections we have there could be called “huge” and we sometimes have to migrate data from one (let’s call it a “schema” for simplicity – ok?) schema to another one, or perform some statistics on the entire collection.

So the first thing you’d try to do is a typical foreach on a rogue query like this:

Person where(_.age > 18) foreach { p => /*...*/ }

… and do whatever stats etc you need to do for each of the elements. Turns out that there’s quite a few reasons this sucks (bad time), and wouldn’t even get close to computing the entire thing. Some problems are:

  • the cursor WILL timeout if used like this
  • we’re mapping stuff to objects of Person, so that takes time and memory

(ps: we could do worse than the example above – don’t try to get a huge collection in memory… ;-)). Moving on to the solution, there’s a few things we have to do here. First, stop the cursor from timing out, which can be achieved by setting the apropriate option on the Cursor:

cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT)

As you’ve already noticed… we’re down to the low-level api here… For now let’s say I’m ok with that (we’ll enable rogue a bit later again). So putting this in context, you’d have to write:

def withCursor[T](cursor: => DBCursor)(f: DBCursor => T) =
  try { f(c) } finally { cursor.close() }

// somewhere...
MongoDB.useCollection(meta.mongoIdentifier, meta.collectionName) { coll =>
  withCursor(coll.find()) { cursor =>
    cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT)
    // ...

Ok, here you can use the cursor and iterate over the whole collection. That’s quite a bit of code but we got to the point the cursor is usable, and won’t time out… The next step is to package this up in a nice function, and allow passing in a rogue query object. I’ll paste in the end result of those steps so you can analise it yourself:

  def stream[T <: MongoRecord[T]]
            (meta: MongoMetaRecord[T], query: Option[DBObject] = None)
            (callback: T => Unit) {
    MongoDB.useCollection(meta.mongoIdentifier, meta.collectionName) { coll =>
      withCursor(coll.find(query.getOrElse(null))) { cursor =>
        cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT)

        while(cursor.hasNext) {
          val next = cursor.next()
          val obj = meta.fromDBObject(next)

          callback(obj)
        }
      }
    }
  }

// can be used like this:
val meta = Person
stream(meta) { migrateIt(_) }

// or with the query supplied:
import scalaz.Scalaz._
import com.foursquare.rogue.Rogue._
val query = meta where(_.age > 18)
stream(meta, query.asDBObject.some) { migrateIt }

There is a minor annoyance with writing query.asDBObject.some (which is Scalaz’s equivalent of Some(query.asDBObject))… Let’s fix this with a simple delegating function:

def stream[T <: MongoRecord[T]]
          (meta: MongoMetaRecord[T], 
           query: MongoMetaRecord[T] => BaseQuery[_, _, _, _, _, _, _])
          (callback: T => Unit) {
  stream(meta, query(meta).asDBObject.some)(callback)
}

// so we can call it like:
import com.foursquare.rogue.Rogue._
stream(meta, meta.where(_.age > 18)) { migrateIt }

But I still am not happy with this. As already mentioned, we’re dealing with huge collections, so the processing may well take up to a day sometimes. Let’s see what we can cut off here… Oh yeah – let’s not select fields we don’t use. Let’s revise our code to implement a streamSelect version of the above, which will only select fields we’re interested in from mongo:

  /** So we don't have to manually extract field names */
  def select[T](on: T, fields: Function1[T, BaseField]*): Seq[String] =
    for(f <- fields) yield f(on).name

  def streamSelect[T <: MongoRecord[T]]
                  (meta: MongoMetaRecord[T], 
                   select: Seq[String] = Nil, 
                   query: DBObject)(callback: List[Any] => Unit) {
    MongoDB.useCollection(meta.mongoIdentifier, meta.collectionName) { coll =>
      val selectFields = new BasicDBObject(select.map(_ -> 1).toMap)

      withCursor(coll.find(query, selectFields)) { cursor =>
        cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT)

        while(cursor.hasNext) {
          val next = cursor.next()

          val fieldValues = select map { next.get(_) match {
              case l: BasicDBList => l.toList
              case other => other
            }
          }

          callback(fieldValues)
        }
      }
    }
  }


// which can be used as:
val m = Person
val above18 = m where (_.age > 18)
streamSelect(m, select[Person](m, _.name, _.age, _.height), query = above18) { 
  case (name: String) :: (age: Int) :: (h: Double) :: Nil =>
   // ...
}

// instead of using the bellow helper you can pass names manually:
select[Person](m, _.name, _.age, _.height) == List(m.name.name, m.age.name, m.height.name)

Here we only select the fields we really need – which has proven to be a big performance boost. It’s quite readable, though I wasn’t able to get rid of the

[Person]

type parameter in the

select

helper. We’re using such streams wherever we know there’s “a lot of stuff to process” or in so-called “preloads”, where we compute a set of values from an entire collection for alter re-use.

You may have noticed that all this has not been very TypeSafe (the callback isn’t). So… you might ask, did we implement a “typesafe version of our streams? And in fact we did, though it’s tuple based, so we had to implement the same thing multiple times – for different tuples. I’ll paste just the usage of the TypeSafe version here (and if you’re interested I can do a follow-up blog post about them):

val m = Person
streamTypesafe(m)(m.age, m.name, m.height)) {
  (age, name, height) =>
  // yes, age: Int, name: String, and height: Double! :-)  
}

What we’ve gained using those streams is the ability to easily write all the migrations we need to, and we’re still elastic enough to for example – remove fields from the collection (with some processing, so just a mongo query won’t be enough). In streamSelect we’re open to have multiple case statements, so even if the collection is not homogenic, we can match null on some of the fields and still proceed over the entire collection – if needed.

Anyway, I hope you found this bunch of code snippets interesting or useful – we certainly do in our day to day coding :-)




Turn Data Into a Powerful Asset, Not an Obstacle with Democratize Your Data, a Progress Data Direct whitepaper that explains how to provide data access for your users anywhere, anytime and from any source.

Topics:

Published at DZone with permission of Konrad Malawski, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}