DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

How does AI transform chaos engineering from an experiment into a critical capability? Learn how to effectively operationalize the chaos.

Data quality isn't just a technical issue: It impacts an organization's compliance, operational efficiency, and customer satisfaction.

Are you a front-end or full-stack developer frustrated by front-end distractions? Learn to move forward with tooling and clear boundaries.

Developer Experience: Demand to support engineering teams has risen, and there is a shift from traditional DevOps to workflow improvements.

Related

  • Data Pipeline Using MongoDB and Kafka Connect on Kubernetes
  • Building Robust Real-Time Data Pipelines With Python, Apache Kafka, and the Cloud
  • Request Tracing in Spring Cloud Stream Data Pipelines With Kafka Binder
  • Data Ingestion Into Azure Data Explorer Using Kafka Connect

Trending

  • Exploring Reactive and Proactive Observability in the Modern Monitoring Landscape
  • Understanding the Mandelbrot Set: A Beautiful Benchmark for Computing Power
  • How I Built an AI Portal for Document Q and A, Summarization, Transcription, Translation, and Extraction
  • Operationalizing Data Quality in Cloud ETL Workflows: Automated Validation and Anomaly Detection
  1. DZone
  2. Testing, Deployment, and Maintenance
  3. DevOps and CI/CD
  4. Implementing MongoDB to Elastic Search 7.X Data Pipeline

Implementing MongoDB to Elastic Search 7.X Data Pipeline

In this article, we will see how to implement a data pipeline from an application to Mongo DB database and from there into an Elastic Search.

By 
Swarnava Chakraborty user avatar
Swarnava Chakraborty
·
Nov. 16, 20 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
10.0K Views

Join the DZone community and get the full member experience.

Join For Free

In this article, we will see how to implement a data pipeline from an application to Mongo DB database and from there into an Elastic Search keeping the same document ID using Kafka connect in a Microservice Architecture. In recent days and years, all the microservices architectures are asynchronous in nature and are very loosely coupled. At the same time, the prime approach to have minimum code (minimum maintenance and cost), no batch systems (real-time data), and promising performance without data loss fear. Keeping all the features in mind Kafka and Kafka connect is the best solution so far to integrate different sources and sinks in one architecture to have very robust and reliable results.

We will Depp drive and implement such a solution using Debezium Kafka connect to achieve a very robust pipeline of data from one application into Mongo and then into Elastic cluster.

We are using here Debezium Kafka and Kafka connect, MongoDB 4.0.4, Elastic Search 7.7.0.

microservices

The step by step solution of the use case is given below,

1. Run Zookeeper and Kafka

docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.9

docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.9

2. Run Elastic Search and Mongo DB (With Replication)

docker run -d --name elastic -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.7.0

To Use Mongo 4.X for data pipeline, first we need to implement replica features in Mongo. Step by step solution for the same is given below,

sudo su (For windows Run as Admin)

docker run --name mongodb -v /home/data/db:/data/db -p 27018:27018 -d mongo:4.0.4 --replSet replica0

docker exec -it mongodb bash

mongo

> rs.initiate({_id: "replica0", members:[{_id: 0, host: "172.17.0.5:27017"}]})

initiate

replica0:SECONDARY> use admin

switched to db admin

replica0:PRIMARY> db.createUser({user: "abc",pwd: "abc",roles: ["dbOwner"]})

Successfully added user: { "user" : " abc ", "roles" : [ "dbOwner" ] }

replica0:PRIMARY> use mediastream

switched to db mediastream

Now, any operation under this mediastream database will be qualified for the CDC approach. The topic name will be – “mongodb.mediastream.<collection-name>”. Also, this will be the index name for Elastic Search.

3. Run an “Enriched Kafka Connect” Which Will Integrate the Microservice Application To MongoDB and Then MongoDB With Elastic Search Keeping the Document ID the Same.

Here I am using an enriched Kafka Connect image from Debezium which is capable enough to solve the statement.

docker run -d --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link elastic:elastic --link mongodb:mongodb --link kafka:kafka swarnavac/connect:1.0 

4. Implementing Source and Sink Connectors.

Once all these components are ready, we need to connect these by Kafka Connect Rest APIs one by one. First Create a topic with the collection name in Kafka,

[kafka@2c0cf1ca7b0b ~]$ ./bin/kafka-topics.sh --create --zookeeper 172.17.0.2:2181 --replication-factor 1 --partitions 1 --topic test

The first one is Mongo Sink Connector which will sink data into Mongo DB from Microservices via Kafka,

POST: http://<HostIP>:8083/connectors/

JSON
 




xxxxxxxxxx
1
18



1
{
2
    "name": "mongo-sink",
3
    "config": {
4
        "name": "mongo-sink",
5
        "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
6
        "tasks.max": "1",
7
        "topics": "test",
8
        "connection.uri": "mongodb://abc:[email protected]:27017",
9
        "database": "mediastream",
10
        "collection": "test",
11
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
12
        "key.converter.schemas.enable": false,
13
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
14
        "value.converter.schemas.enable": false,
15
        "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy",
16
        "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy"
17
    }
18
}



Here, mongodb://abc:[email protected]:27017 is the connection string, and abc:abc is the username and password for your case.

Also, 172.17.0.5 is the host IP for Mongo DB. This might change in your case. The second one is the Mongo Source connector which will publish data into Kafka from Mongo,

POST: HTTP://<HostIP>:8083/connectors/

JSON
 




xxxxxxxxxx
1
37



1
{
2

          
3
     "name": "mongo-source",
4
     "config": {
5
           "name": "mongo-source",
6
            "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
7
            "mongodb.name": "mongodb",
8
            "mongodb.hosts": "replica0/172.17.0.5:27017",
9
            "mongodb.user": "abc",
10
            "mongodb.password": "abc",
11
            "tasks.max": "1",
12
            "database.history.kafka.bootstrap.servers": "kafka:9092",
13
            "database.history.kafka.topic": "schema-changes.mediastream",
14
            "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
15
            "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
16
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
17
            "transforms": "unwrap",
18
            "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState"
19
     }
20
}



The last one is the Elastic Sink which will update the data from Kafka into Elastic Search.

POST: http://<HostIP>:8083/connectors/

JSON
 




xxxxxxxxxx
1
29



1
{
2
     "name": "elastic-sink",
3
     "config": {
4
            "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
5
           "tasks.max": "1",
6
           "topics": "mongodb.mediastream.test",
7
            "connection.url": "http://elastic:9200",
8
            "transforms": "unwrap,key",
9
            "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
10
            "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
11
            "transforms.key.field": "id",
12
           "type.name": "_doc",
13
            "key.ignore": "false"
14

          
15
     }
16

          
17
}



We can see all the created connectors by,

Java
 




x
11


 
1
GET http://<HostIP>:8083/connectors/
2

          
3
[
4

          
5
 "mongo-sink",
6

          
7
 "elastic-sink",
8

          
9
 "mongo-source"
10

          
11
]



We can also see the newly created index in Elastic Search,

http://<HostIP>:9200/mongodb.mediastream.test/

JSON
 




xxxxxxxxxx
1



1
{"mongodb.mediastream.test":{"aliases":{},"mappings":{},"settings":{"index":{"creation_date":"1604676478948","number_of_shards":"1","number_of_replicas":"1","uuid":"TgWV0PaYSBe1P3IGrl0eaw","version":{"created":"7070099"},"provided_name":"mongodb.mediastream.test"}}}}



5. Testing the Implementation.

Creating a result in Mongo in the test collection,

admin

Created same in Elastic,

http://<HostIP>:9200/mongodb.mediastream.test/_search?pretty=true

code snippet


Updating the same result in Mongo,

code snippet

Also updated in Elastic,

code snippet

That's All! Similarly, if you publish a record in Kafka with the topic “test” (This should be from Microservice) it will go via MongoSink -> MongoSource -> ElasticSink.

Data (computing) MongoDB Pipeline (software) kafka Docker (software)

Opinions expressed by DZone contributors are their own.

Related

  • Data Pipeline Using MongoDB and Kafka Connect on Kubernetes
  • Building Robust Real-Time Data Pipelines With Python, Apache Kafka, and the Cloud
  • Request Tracing in Spring Cloud Stream Data Pipelines With Kafka Binder
  • Data Ingestion Into Azure Data Explorer Using Kafka Connect

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • [email protected]

Let's be friends: