DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Keep Your Application Secrets Secret
  • Manage Microservices With Docker Compose
  • Introduction to Data Replication With MariaDB Using Docker Containers
  • Common Performance Management Mistakes

Trending

  • Start Coding With Google Cloud Workstations
  • Event-Driven Architectures: Designing Scalable and Resilient Cloud Solutions
  • Testing SingleStore's MCP Server
  • The Human Side of Logs: What Unstructured Data Is Trying to Tell You
  1. DZone
  2. Data Engineering
  3. Databases
  4. Tracking Changes in MongoDB With Scala and Akka

Tracking Changes in MongoDB With Scala and Akka

Leverage change streams feature in MongoDB to get every change occurring in real-time.

By 
Hubert Skowronek user avatar
Hubert Skowronek
·
Updated Oct. 16, 20 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
8.9K Views

Join the DZone community and get the full member experience.

Join For Free

Need for Real-Time Consistent Data

Many different databases are used at Adform, each tailored for specific requirements, but what is common for these use cases is the necessity for a consistent interchange of data between these data stores. It’s a tedious task to keep the origin of that data and its copies consistent manually, not to mention that with a sufficiently large number of multiplications the origin may not be the source of truth anymore. The need for having its own copy of data is also dictated by the necessity of loose coupling and performance. It wouldn’t be practical to be constantly impacted by every change made in the source system. The answer here is an event-based architecture which allows to keep every change consistent and provides us with the possibility of restoring the sequence of changes related to particular entities. For those reasons, the decision was made to use the publisher/subscriber model. MongoDB’s change streams saved the day, finally letting us say farewell to much more complex oplog tailing.

Change Streams

As of version 3.6 MongoDB offers change data capture implementation named as change streams. It allows us to follow every modification made to an entire database or chosen set of collections. Previous versions already offered some solution to that problem by means of oplog (operation log) mechanism but tailing it directly had serious drawbacks, especially huge traffic caused by iteration overall changes to all collections and lack of reliable API allowing to resume tracking after any interruption. Change streams solve these issues by hiding oplog’s nook and crannies from us behind refined API interoperable with reactive streams implementations.

Simple Demo Application

I prepared a small application written in Scala and Akka presenting change streams in action.

Dockerfile

The dockerfile I prepared for this demo customizes MongoDB image with a single node replica set setup. For change streams to work, the replica set consisting of at least one node must be configured. Even though it’s not feasible for production, it works well for simplicity’s sake here.

Dockerfile
x
 
1
FROM mongo:4.4.1-bionic
2
RUN echo "rs.initiate();" > /docker-entrypoint-initdb.d/replica-init.js
3
CMD [ "--replSet", "single_node_replica_set" ]


The replica set is not initiated by default — it has to be done explicitly.

Build Docker Image

Let’s build the image using some user friendly tag, e.g. single_rs.

Shell
xxxxxxxxxx
1
 
1
$ docker build . -t mongo:single_rs


Run MongoDB in Container

Shell
xxxxxxxxxx
1
 
1
$ docker run -p 27017:27017 -d --name mongo mongo:single_rs


The 27017 port is MongoDB’s default.

Replica set verification

We can easily look inside our container using docker exec command:

Shell
xxxxxxxxxx
1
 
1
$ docker exec -it mongo bash


Then we can get into MongoDB shell by just typing:

# mongo

Check replica set’s status:

> rs.status()

You should see a similar output to the following:

JSON
xxxxxxxxxx
1
26
 
1
...
2
"members" : [
3
  {
4
   "_id" : 0,
5
   "name" : "127.0.0.1:27017",
6
   "health" : 1,
7
   "state" : 1,
8
   "stateStr" : "PRIMARY",
9
   "uptime" : 118,
10
   "optime" : {
11
    "ts" : Timestamp(1601225360, 1),
12
    "t" : NumberLong(2)
13
   },
14
   "optimeDate" : ISODate("2020-09-27T16:49:20Z"),
15
   "syncSourceHost" : "",
16
   "syncSourceId" : -1,
17
   "infoMessage" : "Could not find member to sync from",
18
   "electionTime" : Timestamp(1601225250, 1),
19
   "electionDate" : ISODate("2020-09-27T16:47:30Z"),
20
   "configVersion" : 1,
21
   "configTerm" : 2,
22
   "self" : true,
23
   "lastHeartbeatMessage" : ""
24
  }
25
 ],
26
...


The replica set consists of only one node, hence the members array has only one object and the infoMessage field informs us that there is no secondary node to sync from.

Test Database Setup

While having the access to our instance’s shell we can also create test database and test collection we will subscribe to.

Shell
xxxxxxxxxx
1
 
1
> use test_db
2
> db.createCollection("test_collection")


Application Code

It’s a very simple application that subscribes to the given MongoDB instance and prints incoming records. I leave you with the link to the source code at the bottom of this page.

Directory Structure

Shell
xxxxxxxxxx
1
12
 
1
.
2
├── build.sbt
3
├── Dockerfile
4
├── project
5
│   └── build.properties
6
└── src
7
    └── main
8
        └── scala
9
            └── com
10
                └── adform
11
                    └── demo
12
                        └── Main.scala


build.sbt

Scala
xxxxxxxxxx
1
10
 
1
lazy val root = (project in file("."))
2
  .settings(
3
    name := "MongoDB Change Tracking Demo",
4
    scalaVersion := "2.12.8",
5
    version := "1.0"
6
  )
7
8
libraryDependencies ++= Seq(
9
  "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "2.0.2"
10
)


We need only one dependency — alpakka MongoDB connector which is reactive streams implementation.

build.properties

Scala
xxxxxxxxxx
1
 
1
sbt.version=1.3.4


Rather self-explanatory — it’s just required sbt version.

Main.scala

Scala
xxxxxxxxxx
1
32
 
1
package com.adform.demo
2
3
import akka.actor.ActorSystem
4
import akka.stream.ActorMaterializer
5
import akka.stream.alpakka.mongodb.scaladsl.MongoSource
6
import akka.stream.scaladsl.Sink
7
import com.mongodb.reactivestreams.client.MongoClients
8
import com.mongodb.{ConnectionString, MongoClientSettings}
9
10
object Main {
11
12
  def main(args: Array[String]): Unit = {
13
14
    val settings = MongoClientSettings
15
      .builder()
16
      .applyConnectionString(new ConnectionString("mongodb://localhost:27017"))
17
      .build()
18
    val db = MongoClients.create(settings).getDatabase("test_db")
19
    val collection = db.getCollection("test_collection")
20
21
    val publisher = collection.watch()
22
23
    val source = MongoSource(publisher)
24
25
    implicit val system: ActorSystem = ActorSystem()
26
    implicit val mat: ActorMaterializer = ActorMaterializer()
27
28
    source.runWith(Sink.foreach(println))
29
30
  }
31
32
}


Main class — it’s a simplistic chunk of code that points to our container with MongoDB and listens to incoming events, printing them on the standard output when they come.

Run the application

Shell
xxxxxxxxxx
1
 
1
$ sbt run


You should see initial logs like these:

Shell
xxxxxxxxxx
1
10
 
1
Sep 28, 2020 11:30:22 PM com.mongodb.diagnostics.logging.JULLogger log
2
INFO: Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
3
4
Sep 28, 2020 11:30:22 PM com.mongodb.diagnostics.logging.JULLogger log
5
INFO: Opened connection [connectionId{localValue:1, serverValue:8}] to localhost:27017
6
7
Sep 28, 2020 11:30:22 PM com.mongodb.diagnostics.logging.JULLogger log
8
INFO: Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=REPLICA_SET_PRIMARY, state=CONNECTED, ok=true, version=ServerVersion{versionList=[4, 4, 1]}, minWireVersion=0, maxWireVersion=9, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=8637134, setName='single_node_replica_set', canonicalAddress=127.0.0.1:27017, hosts=[127.0.0.1:27017], passives=[], arbiters=[], primary='127.0.0.1:27017', tagSet=TagSet{[]}, electionId=7fffffff0000000000000002, setVersion=1, lastWriteDate=Mon Sep 28 23:30:18 CEST 2020, lastUpdateTimeNanos=44003362611716}
9
10
Sep 28, 2020 11:30:24 PM com.mongodb.diagnostics.logging.JULLogger log
11
INFO: Opened connection [connectionId{localValue:2, serverValue:9}] to localhost:27017


Let’s add some records

Shell
xxxxxxxxxx
1
 
1
> db.test_collection.insertMany([{_id: 1, name: "test_record_1"}, {_id: 2, name: "test_record_2"}])


Application’s output

Shell
xxxxxxxxxx
1
 
1
ChangeStreamDocument{ operationType=OperationType{value='insert'}, resumeToken={"_data": "825F7255FC000000012B022C0100296E5A1004B3CC867FB8934F57A5CD65B672BF625F461E5F6964002B020004", "_typeBits": {"$binary": "QA==", "$type": "00"}}, namespace=test_db.test_collection, destinationNamespace=null, fullDocument=Document{{_id=1.0, name=test_record_1}}, documentKey={"_id": 1.0}, clusterTime=Timestamp{value=6877654121768288257, seconds=1601328636, inc=1}, updateDescription=null, txnNumber=null, lsid=null}
2
3
ChangeStreamDocument{ operationType=OperationType{value='insert'}, resumeToken={"_data": "825F7255FC000000022B022C0100296E5A1004B3CC867FB8934F57A5CD65B672BF625F461E5F6964002B040004", "_typeBits": {"$binary": "QA==", "$type": "00"}}, namespace=test_db.test_collection, destinationNamespace=null, fullDocument=Document{{_id=2.0, name=test_record_2}}, documentKey={"_id": 2.0}, clusterTime=Timestamp{value=6877654121768288258, seconds=1601328636, inc=2}, updateDescription=null, txnNumber=null, lsid=null}


We see that the two inserted (insert OperationType) records (placed in the fullDocument field) were consumed. It’s worth to notice that with every record we get resume token as well — it can be persisted and used to restart the stream after any interruption (e.g. application’s crash).

Execute some more operations

Shell
xxxxxxxxxx
1
 
1
> db.test_collection.remove({_id: 1})
2
> db.test_collection.update({_id: 2}, {"name": "updated name"})
3
> db.test_collection.drop()


Let’s give a look to the produced output:

Shell
xxxxxxxxxx
1
 
1
ChangeStreamDocument{ operationType=OperationType{value='delete'}, resumeToken={"_data": "825F73A158000000022B022C0100296E5A1004B3CC867FB8934F57A5CD65B672BF625F461E5F6964002B020004", "_typeBits": {"$binary": "QA==", "$type": "00"}}, namespace=test_db.test_collection, destinationNamespace=null, fullDocument=null, documentKey={"_id": 1.0}, clusterTime=Timestamp{value=6878018455254073346, seconds=1601413464, inc=2}, updateDescription=null, txnNumber=null, lsid=null}
2
3
ChangeStreamDocument{ operationType=OperationType{value='replace'}, resumeToken={"_data": "825F73A199000000012B022C0100296E5A1004B3CC867FB8934F57A5CD65B672BF625F461E5F6964002B040004", "_typeBits": {"$binary": "QA==", "$type": "00"}}, namespace=test_db.test_collection, destinationNamespace=null, fullDocument=Document{{_id=2.0, name=updated name}}, documentKey={"_id": 2.0}, clusterTime=Timestamp{value=6878018734426947585, seconds=1601413529, inc=1}, updateDescription=null, txnNumber=null, lsid=null}
4
5
ChangeStreamDocument{ operationType=OperationType{value='drop'}, resumeToken={"_data": "825F73A1C3000000012B022C0100296E5A1004B3CC867FB8934F57A5CD65B672BF625F04"}, namespace=test_db.test_collection, destinationNamespace=null, fullDocument=null, documentKey=null, clusterTime=Timestamp{value=6878018914815574017, seconds=1601413571, inc=1}, updateDescription=null, txnNumber=null, lsid=null}
6
7
ChangeStreamDocument{ operationType=OperationType{value='invalidate'}, resumeToken={"_data": "825F73A1C3000000012B022C0100296F5A1004B3CC867FB8934F57A5CD65B672BF625F04"}, namespace=null, destinationNamespace=null, fullDocument=null, documentKey=null, clusterTime=Timestamp{value=6878018914815574017, seconds=1601413571, inc=1}, updateDescription=null, txnNumber=null, lsid=null}


We got four different change events. The three of them correspond in the same order to operations executed in MongoDB in the previous step. The last is a special one, implicating the stream’s closure. It’s important to notice that delete events come without full document, but we are still able to identify what was removed by using documentKey property. After the occurrence of event indicating drop of database/collection or changing its name, we get invalidate event, which ends the stream and we need to manually recover from it.

Takeaways

  • Always prefer change streams to oplog for its friendly API and features like resuming the stream, pushing filtering predicates directly to MongoDB (aggregation pipeline) or total ordering of incoming records
  • Supported reactive streams implementations give you features like back pressure out of the box
  • Change streams work only with replica sets enabled
  • The alpakka library offers more sinks than the one used for demo purposes Sink.foreach — you can easily improve that sample application to sink e.g. to Kafka by passing such sink provided by the library
  • Change streams don’t stream the initial data existing in the collection before creating subscriber’s session — it means that it has to be implemented separately

Source Code

https://github.com/hubert-skowronek/mongodb-change-streams-demo

MongoDB Scala (programming language) Stream (computing) Database shell application Docker (software) Akka (toolkit) Reactive Streams

Opinions expressed by DZone contributors are their own.

Related

  • Keep Your Application Secrets Secret
  • Manage Microservices With Docker Compose
  • Introduction to Data Replication With MariaDB Using Docker Containers
  • Common Performance Management Mistakes

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!