DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Allow Users to Track Fitness Status in Your App
  • Reconciling Privacy Preferences Across Two Datastores With Snowflake and Airflow
  • Building a Deterministic Event Correlation Engine in Go for High-Volume Alert Systems
  • Python Async/Sync: Advanced Blocking Detection and Best Practices (Part 2)

Trending

  • Mocking Kafka for Local Spring Development
  • Contract-First Integration: Building Scalable Systems With Flyway, OpenAPI, and Kafka
  • Securing the AI Host: Spring AI MCP Server Communication With API Keys
  • Zero-Downtime Deployments for Java Apps on Kubernetes
  1. DZone
  2. Data Engineering
  3. Databases
  4. Part 3: Transforming MongoDB CDC Event Messages

Part 3: Transforming MongoDB CDC Event Messages

This post is part 3 of a series on event-driven systems with Memphis.dev. We added a transformer service to deserialize MongoDB CDC records into JSON.

By 
RJ Nowling user avatar
RJ Nowling
·
Aug. 31, 23 · Tutorial
Likes (1)
Comment
Save
Tweet
Share
2.3K Views

Join the DZone community and get the full member experience.

Join For Free

In our last blog post, we introduced a reference implementation for capturing change data capture (CDC) events from a MongoDB database using Debezium Server and Memphis.dev. At the end of the post, we noted that MongoDB records are serialized as strings in Debezium CDC messages like so:

Python
{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",
            ...
         }
}


We want to use the Schemaverse functionality of Memphis.dev to check messages against an expected schema. Messages that don’t match the schema are routed to a dead letter station so that they don’t impact downstream consumers. If this all sounds like ancient Greek, don’t worry! We’ll explain the details in our next blog post.

To use functionality like Schemaverse, we need to deserialize the MongoDB records as JSON documents. This blog post describes a modification to our MongoDB CDC pipeline that adds a transformer service to deserialize the MongoDB records to JSON documents.

Overview of the Solution

The previous solution consisted of six components:

1. Todo Item Generator: Inserts a randomly generated to-do item in the MongoDB collection every 0.5 seconds. Each to-do item contains a description, creation timestamp, optional due date, and completion status.

2. MongoDB: Configured with a single database containing a single collection (todo_items).

3. Debezium Server: Instance of Debezium Server configured with MongoDB source and HTTP Client sink connectors.

4. Memphis.dev REST Gateway: Uses the out-of-the-box configuration.

5. Memphis.dev: Configured with a single station (todo-cdc-events) and single user (todocdcservice)

6. Printing Consumer: A script that uses the Memphis.dev Python SDK to consume messages and print them to the console.

In this iteration, we are adding two additional components:

1. Transformer Service: A transformer service that consumes messages from the todo-cdc-events station, deserializes the MongoDB records, and pushes them to the cleaned-todo-cdc-events station.

2. Cleaned Printing Consumer: A second instance of the printing consumer that prints messages pushed to the cleaned-todo-cdc-events station.

The updated architecture looks like this:
Data Flow Diagram

A Deep Dive Into the Transformer Service

Skeleton of the Message Transformer Service

The transformer service uses the Memphis.dev Python SDK. Let’s walk through the transformer implementation. The main() method of our transformer first connects to the Memphis.dev broker. The connection details are grabbed from environmental variables. The host, username, password, input station name, and output station name are passed using environmental variables in accordance with suggestions from the Twelve-Factor App manifesto.

Python
async def main():
    try:
        print("Waiting on messages...")
        memphis = Memphis()
        await memphis.connect(host=os.environ[HOST_KEY],
                              username=os.environ[USERNAME_KEY],
                              password=os.environ[PASSWORD_KEY])


Once a connection is established, we create consumer and producer objects. In Memphis.dev, consumers, and producers have names. These names appear in the Memphis.dev UI, offering transparency into the system operations. 

Python
print("Creating consumer")
        consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
                                          consumer_name="transformer",
                                          consumer_group="")

        print("Creating producer")
        producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
                                          producer_name="transformer")


The consumer API uses the callback function design pattern. When messages are pulled from the broker, the provided function is called with a list of messages as its argument. 

Python
print("Creating handler")
        msg_handler = create_handler(producer)

        print("Setting handler")
        consumer.consume(msg_handler)


After setting up the callback, we kick off the asyncio event loop. At this point, the transformer service pauses and waits until messages are available to pull from the broker.

  •  Keep your main thread alive so the consumer will keep receiving data.
Python
await asyncio.Event().wait()


Creating the Message Handler Function

The create function for the message handler takes a producer object and returns a callback function. Since the callback function only takes a single argument, we use the closure pattern to implicitly pass the producer to the msg_handler function when we create it.

The msg_handler function is passed three arguments when called: a list of messages, an error (if one occurred), and a context consisting of a dictionary. Our handler loops over the messages, calls the transform function on each, sends the messages to the second station using the producer, and acknowledges that the message has been processed. In Memphis.dev, messages are not marked off as delivered until the consumer acknowledges them. This prevents messages from being dropped if an error occurs during processing.

Python
def create_handler(producer):
    async def msg_handler(msgs, error, context):
        try:
            for msg in msgs:
                transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
                await producer.produce(message=transformed_msg)
                await msg.ack()
        except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
            print(e)
            return

    return msg_handler


The Message Transformer Function

Now, we get to the meat of the service: the message transformer function. Message payloads (returned by the get_data() method) are stored as bytearray objects. We use the Python json library to deserialize the messages into a hierarchy of Python collections (list and dict) and primitive types (int, float, str, and None).

Python
def deserialize_mongodb_cdc_event(input_msg):
    obj = json.loads(input_msg)


We expect the object to have a payload property with an object as the value. That object then has two properties (“before” and “after”), which are either None or strings containing serialized JSON objects. We use the JSON library again to deserialize and replace the strings with the objects. 

Python
 
if "payload" in obj:
        payload = obj["payload"]

        if "before" in payload:
            before_payload = payload["before"]
            if before_payload is not None:
                payload["before"] = json.loads(before_payload)

        if "after" in payload:
            after_payload = payload["after"]
            if after_payload is not None:
                payload["after"] = json.loads(after_payload)


Lastly, we reserialize the entire JSON record and convert it back into a bytearray for transmission to the broker. 

Python
 
output_s = json.dumps(obj)
    output_msg = bytearray(output_s, "utf-8")
    return output_msg


Hooray! Our objects now look like so:

Python
 
{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" :
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},

...
}
}


Running the Transformer Service

If you followed the seven steps in the previous blog post, you only need to run three additional steps. to start the transformer service and verify that it's working:

Step 8: Start the Transformer Service

Python
 
$ docker compose up -d cdc-transformer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cdc-transformer                                  Started                                                            1.3s


Step 9: Start the Second Printing Consumer

Python
 
$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cleaned-printing-consumer                        Started                                                            1.3s


Step 10: Check the Memphis UI

When the transformer starts producing messages to Memphis.dev, a second station named “cleaned-todo-cdc-events” will be created. You should see this new station on the Station Overview page in the Memphis.dev UI like so:

Check the Memphis UI

The details page for the “cleaned-todo-cdc-events” page should show the transformer attached as a producer, the printing consumer, and the transformed messages: 

The details page for the “cleaned-todo-cdc-events” page

Congratulations! We’re now ready to tackle validating messages using Schemaverse in our next blog post. Stay tuned!

In case you missed parts 1 and 2:

  • Part 1: Integrating Debezium Server and Memphis.dev for Streaming Change Data Capture (CDC) Events.
  • Part 2: Change Data Capture (CDC) for MongoDB with Debezium and Memphis.dev.
Change data capture MongoDB Software development kit Event Object (computer science) Data Types

Published at DZone with permission of RJ Nowling. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Allow Users to Track Fitness Status in Your App
  • Reconciling Privacy Preferences Across Two Datastores With Snowflake and Airflow
  • Building a Deterministic Event Correlation Engine in Go for High-Volume Alert Systems
  • Python Async/Sync: Advanced Blocking Detection and Best Practices (Part 2)

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook