Tracking Changes in MongoDB With Scala and Akka
Leverage change streams feature in MongoDB to get every change occurring in real-time.
Join the DZone community and get the full member experience.Join For Free
Need for Real-Time Consistent Data
Many different databases are used at Adform, each tailored for specific requirements, but what is common for these use cases is the necessity for a consistent interchange of data between these data stores. It’s a tedious task to keep the origin of that data and its copies consistent manually, not to mention that with a sufficiently large number of multiplications the origin may not be the source of truth anymore. The need for having its own copy of data is also dictated by the necessity of loose coupling and performance. It wouldn’t be practical to be constantly impacted by every change made in the source system. The answer here is an event-based architecture which allows to keep every change consistent and provides us with the possibility of restoring the sequence of changes related to particular entities. For those reasons, the decision was made to use the publisher/subscriber model. MongoDB’s change streams saved the day, finally letting us say farewell to much more complex oplog tailing.
As of version 3.6 MongoDB offers change data capture implementation named as change streams. It allows us to follow every modification made to an entire database or chosen set of collections. Previous versions already offered some solution to that problem by means of oplog (operation log) mechanism but tailing it directly had serious drawbacks, especially huge traffic caused by iteration overall changes to all collections and lack of reliable API allowing to resume tracking after any interruption. Change streams solve these issues by hiding oplog’s nook and crannies from us behind refined API interoperable with reactive streams implementations.
Simple Demo Application
I prepared a small application written in Scala and Akka presenting change streams in action.
The dockerfile I prepared for this demo customizes MongoDB image with a single node replica set setup. For change streams to work, the replica set consisting of at least one node must be configured. Even though it’s not feasible for production, it works well for simplicity’s sake here.
The replica set is not initiated by default — it has to be done explicitly.
Build Docker Image
Let’s build the image using some user friendly tag, e.g. single_rs.
Run MongoDB in Container
The 27017 port is MongoDB’s default.
Replica set verification
We can easily look inside our container using
docker exec command:
Then we can get into MongoDB shell by just typing:
Check replica set’s status:
You should see a similar output to the following:
The replica set consists of only one node, hence the
members array has only one object and the
infoMessage field informs us that there is no secondary node to sync from.
Test Database Setup
While having the access to our instance’s shell we can also create test database and test collection we will subscribe to.
It’s a very simple application that subscribes to the given MongoDB instance and prints incoming records. I leave you with the link to the source code at the bottom of this page.
We need only one dependency — alpakka MongoDB connector which is reactive streams implementation.
Rather self-explanatory — it’s just required sbt version.
Main class — it’s a simplistic chunk of code that points to our container with MongoDB and listens to incoming events, printing them on the standard output when they come.
Run the application
You should see initial logs like these:
Let’s add some records
We see that the two inserted (insert OperationType) records (placed in the
fullDocument field) were consumed. It’s worth to notice that with every record we get resume token as well — it can be persisted and used to restart the stream after any interruption (e.g. application’s crash).
Execute some more operations
Let’s give a look to the produced output:
We got four different change events. The three of them correspond in the same order to operations executed in MongoDB in the previous step. The last is a special one, implicating the stream’s closure. It’s important to notice that delete events come without full document, but we are still able to identify what was removed by using
documentKey property. After the occurrence of event indicating drop of database/collection or changing its name, we get invalidate event, which ends the stream and we need to manually recover from it.
- Always prefer change streams to oplog for its friendly API and features like resuming the stream, pushing filtering predicates directly to MongoDB (aggregation pipeline) or total ordering of incoming records
- Supported reactive streams implementations give you features like back pressure out of the box
- Change streams work only with replica sets enabled
- The alpakka library offers more sinks than the one used for demo purposes
Sink.foreach— you can easily improve that sample application to sink e.g. to Kafka by passing such sink provided by the library
- Change streams don’t stream the initial data existing in the collection before creating subscriber’s session — it means that it has to be implemented separately
Opinions expressed by DZone contributors are their own.