DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Kafka JDBC Source Connector for Large Data
  • Non-blocking Database Migrations
  • Building an Enterprise CDC Solution
  • Building a Database Written in Node.js From the Ground Up

Trending

  • Advancing Robot Vision and Control
  • A Guide to Auto-Tagging and Lineage Tracking With OpenMetadata
  • Implementing API Design First in .NET for Efficient Development, Testing, and CI/CD
  • How To Introduce a New API Quickly Using Quarkus and ChatGPT
  1. DZone
  2. Data Engineering
  3. Databases
  4. Kafka Connect: Strategies To Handle Updates and Deletes

Kafka Connect: Strategies To Handle Updates and Deletes

Get tips on managing updates and deletes using Kafka Connect, which allows you to set up a continuous flow of data from one data source to a target database.

By 
Gabriel Solano user avatar
Gabriel Solano
·
Aug. 26, 21 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
9.6K Views

Join the DZone community and get the full member experience.

Join For Free

Kafka Connect is a great tool that allows you easily set up a continuous flow of data from one data source to a target database. It's very simple to configure, and quite useful when you have legacy systems that serve business data that you want, for one reason or another, in a different place. My typical use case is to move the data from Oracle tables to MongoDB collections consumed by microservices. This allows better scalability since we don't have to heavily hit the source tables with production queries.

One of the things not easily explained when you open your Kafka Connect manual is how you handle operations that modify existing data that was already moved; or in other words, updates and deletes. I thought this was a limitation of the typical JDBC/MongoDB pair of connectors we were using. At some moment I explored the Debezium Connectors that promised to capture these types of events and replicate them in the target database. The POC wasn't successful for us with OracleDB. We had limited access to these databases, and the level of configuration required for these connectors was not a straightforward solution.

As we kept playing with connectors, we found there are ways to handle these scenarios. I will explain two strategies. The first one, which is the most desirable one, requires a specific design in our source database. The second one is an alternative solution if that design is not present and cannot be changed for any reason.

The Basic Example

Let's imagine that we have an old system that handles promotions. To simplify our example, let's say we have this basic table with three columns. We need to continuously move this data from the SQL database to a document-based one like MongoDB. 

Basic three-column table

Basic Concepts

First, we need to make a quick description of two types of Kafka connectors that can be used: incremental and bulk. Strictly speaking, the JDBC connector has four modes: bulk, timestamp, incrementing, timestamp+incrementing.  I'm grouping the last three in incrementals because those share the same basic concept. You want to only move the new data that is detected from the source.

Bulk connectors always move the entire dataset. However, a lot depends on the use case of the data we're moving.  Ideally, incremental connectors are the best solution, as it's easier to manage small chunks of new data in terms of use of resources, or readiness of the data. The questions here are: how does Kafka Connect, using purely SQL queries, and how does it know when new data was inserted in the source?

The source connector config can use one of these two properties (or both): incrementing.column.name & timestamp.column.name.  Incrementing property uses an incremental column like an auto-generated id to detect when a new row is inserted. Timestamp property uses a column that is a DateTime to detect new changes. Kafka Connect holds an offset to append it to the SQL query used to grab our data from the source.

For example, if our table is called "promotions", we would use in our query property for the source connector as follows:

JSON
 
"query": "SELECT TITLE, DISCOUNT, PRODUCT_CATEGORY FROM PROMOTIONS",
"timestamp.column.name": "LAST_UPDATE_DATE"

Kafka inside modifies the query to something like this:

JSON
 
SELECT  * FROM ( SELECT TITLE, DISCOUNT, PRODUCT_CATEGORY FROM PROMOTIONS) 
WHERE LAST_UPDATE_DATE > {OFFSET_DATE}

On the sink connector side, which is the connector that saves the data in the target database, we need to set up a strategy to make proper upserts based on an ID. You can read more about that in the documentation of the sink connector you're using. In the case of the MongoDB connector, the typical setting I use is:

JSON
 
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy",

This indicates that the _id for our documents will come from the source data. In that case, our source query should include an _id column:

JSON
 
"query": "SELECT  PROMO_ID as \"_id\", TITLE, DISCOUNT, PRODUCT_CATEGORY FROM PROMOTIONS"

At this point, we have the basic configuration to detect new insertions. Every time a new promotion is added with a new timestamp, the source connector will grab it and move it to the desired destination. But having this exact same configuration, we can accomplish our total goal of detecting updates and deletions. What we need is a proper design of our data source.

Modifying the Timestamp Column on Every Update

If we want to make sure our updates are handled and reflected in the target database, we need to make sure that every update that is made in the source table(s) also updates the timestamp column value. That can be accomplished via the application that writes to it passing the current timestamp as a parameter of the update operation, or creating a trigger that listens to the update event. And since the sink connector handles upserts based on the id, the update is also reflected in the target document.

Soft Deletions

To be able to handle deletions, we require our previous step along with what is considered good practice in database designs: soft deletions. This is the practice of not deleting (hard deletion) a record in the database whenever that is needed, but simply marking it with a special flag that indicates the record is no longer valid/active. This has its own benefits in terms of recoverability or auditing. This means, of course, that our applications or store procedures need to know about this design and filter out the inactive records when querying the data.

If it's hard to update the application that deletes the records to make soft deletions instead (in case the data source was not designed to consider that), we can also use a trigger to capture the hard deletion and do a soft one instead. 

What we need to do for our Kafka Connect purposes is to also change our timestamp column value whenever the record is flagged as inactive. In this example, we are setting the HOT SUMMER promotion to non-active setting the ACTIVE column in 0. LAST_UPDATE_DATE is also modified with the most recent date which will make the source connector grab the record.

Other sample table

When the data is moved, for example to MongoDB, in order to consume it we would need to filter also based on this ACTIVE field:

JSON
 
db.getCollection('promotions').find({active: 1})

Versioned Bulks

The last approach option that can be used in case we have to deal with an unchangeable design doesn't allow modifications in the source schema to have the timestamp column or the active flag.  This option has what I called versioned bulks. As I explained before, the bulk connector moves the entire dataset every time it's invoked. In most scenarios, I had encountered it's always preferable to have incremental updates, but in this scenario, we can take advantage of the bulk option.

Since we need to keep track of what has been newly inserted, updated, or deleted, we could move the data every time, adding an extra column that identifies a snapshot of our data. We could also use a timestamp of when the data was queried. Since timestamps are naturally descendant ordered values, if we want the latest snapshot, we can easily filter by the last or second to last (I will explain why this may be better) snapshot once data is moved to the target place.

The query in Oracle would look like this:
JSON
 
"query": "SELECT  PROMO_ID as \"_id\", TITLE, DISCOUNT, PRODUCT_CATEGORY, 
TO_CHAR(SYSDATE, 'yyyymmddhh24miss') AS SNAPSHOT FROM PROMOTIONS"
This approach requires a few configurations that are critical for the right performance when using the final dataset. And as you can imagine, indexes are the big deal here, more importantly, in the new snapshot column. Another important consideration is the space consumed. Depending on the number of records in every snapshot, we might need to delete old versions. We could use some scheduled task for that, or configured a TTL as we can do with MongoDB indexes.

When consuming the data, we would need first to grab the latest snapshot. I mentioned that the second to last might be better. The reason for that is that the latest one can be an in-progress one. In other words, data could be moving when you're doing a query to consume it.  If your queries to the target database are aggregations of any sort, you could be getting incomplete results. So with the latest snapshot, we have no certainty it's in a ready-to-consume status. If we grab the second to last one, we have certainty that the snapshot is complete.

In the next example, two versions of the data are moved. Version 2021073012000 has three documents. The newer version 2021080112000 has two documents, and one document has an updated version of the discount. As you can see, every version is a snapshot in time of the data source.

Snapshot in time of the data source

This approach is a little tricky, and should not be our first option, but it's an option I wanted to leave out there in case it's useful for someone.

Database kafka Data (computing) sql Connector (mathematics) Snapshot (computer storage) IT

Opinions expressed by DZone contributors are their own.

Related

  • Kafka JDBC Source Connector for Large Data
  • Non-blocking Database Migrations
  • Building an Enterprise CDC Solution
  • Building a Database Written in Node.js From the Ground Up

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!