DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Mastering System Design: A Comprehensive Guide to System Scaling for Millions, Part 2
  • Scalable JWT Token Revocation in Spring Boot
  • Manual Sharding in PostgreSQL: A Step-by-Step Implementation Guide
  • Why Database Migrations Take Months and How to Speed Them Up

Trending

  • Next Evolution in Integration: Architecting With Intent Using Model Context Protocol
  • GitHub Copilot's New AI Coding Agent Saves Developers Time – And Requires Their Oversight
  • Accelerating Debugging in Integration Testing: An Efficient Search-Based Workflow for Impact Localization
  • Orchestrating Microservices with Dapr: A Unified Approach
  1. DZone
  2. Data Engineering
  3. Databases
  4. Clean Up Your Outbox Tables With Programmatic TTL

Clean Up Your Outbox Tables With Programmatic TTL

The goal of this post is to show how you can programmatically remove records from an Outbox table that have been flushed to its sink (e.g., Kafka).

By 
Chris Casano user avatar
Chris Casano
·
Sep. 01, 21 · Code Snippet
Likes (2)
Comment
Save
Tweet
Share
4.8K Views

Join the DZone community and get the full member experience.

Join For Free

For those familiar with the Outbox Pattern, CockroachDB provides some unique capabilities for handling these types of architectural patterns. One common method is to use Changefeeds in CockroachDB to send an acknowledgment message back to the originating service that the database transaction was committed. Changefeeds are great in this scenario in that they can be emitted on a record mutation on the table (except Import), connect to a message bus like Kafka, and emit the payload in a mildly low latent (~100ms) fashion. However, one circumstance of this pattern is having historical records build up in the Outbox table. Fortunately, we have a rather nifty solution that can clean up these Outbox tables.

So the goal in this post is to show how you can programmatically remove records from an Outbox table that have been flushed to its sink (i.e Kafka). The idea here is to create a clean-up job that removes records where the MVCC timestamp of an Outbox record is adequately past the high watermark of a Changefeed.

If you want to cut the chase, proceed directly to Step 3 to find the TTL statement to execute. Steps 1 and 2 show how you can find the details in the CockroachDB catalog.

Lastly, the steps below use S3 as a sink instead of Kafka but it can easily be repurposed to use Kafka by changing the sink target in the Changefeed.

High-Level Steps

  • Get the list of outbox tables that need TTL
  • Find the active Changefeed for those tables
  • For each table, delete rows where the mvcc_internal timestamp of the row is < the high watermark of the Changefeed

You can run this test using CockroachCloud or CockroachDB. If using CockroachDB, I would recommend trying this using cockroach demo.

Shell
 
cockroach demo --nodes 3 --empty


Step 0 - Create a Schema To Test With

For this schema, we'll create 1 decoy table and 3 outbox tables. We'll also insert some initial records, create a Changefeed on only 2 outbox tables and then add more data after the Changefeeds are created. We want to show how the SQL TTL script only picks up Outbox tables with Changefeeds. Notice how one Changefeed includes two tables: outbox_t2 and outbox_t3. We'll make sure to pick up both of the tables.

SQL
 
set cluster setting  kv.rangefeed.enabled = true;

create table test_t0 (i int primary key);

create table outbox_t1 (i int primary key);
create table outbox_t2 (i int primary key);
create table outbox_t3 (i int primary key);

insert into test_t0 values (unique_rowid());
insert into outbox_t1 values (unique_rowid());
insert into outbox_t2 values (unique_rowid());
insert into outbox_t3 values (unique_rowid());

CREATE CHANGEFEED FOR TABLE test_t0 INTO 'experimental-s3://chrisc-test/changefeed/ttl/test?AUTH=implicit' WITH updated, resolved = '1m';

CREATE CHANGEFEED FOR TABLE outbox_t1 INTO 'experimental-s3://chrisc-test/changefeed/ttl/outbox1?AUTH=implicit' WITH updated, resolved = '1m';

CREATE CHANGEFEED FOR TABLE outbox_t2, outbox_t3 INTO 'experimental-s3://chrisc-test/changefeed/ttl/outbox2?AUTH=implicit' WITH updated, resolved = '1m';

insert into test_t0 values (unique_rowid());
insert into outbox_t1 values (unique_rowid());
insert into outbox_t2 values (unique_rowid());
insert into outbox_t3 values (unique_rowid());


Let's also verify the Changefeeds are sending data to our sink:

SQL
 
aws s3 ls s3://chrisc-test/changefeed/ttl/ --recursive | grep ndjson


The output should look like this:

Shell
 
2021-08-19 21:46:33        222 changefeed/ttl/outbox1/2021-08-20/202108200145322858970000000000000-fad43b3554ca0a0b-1-5-00000000-outbox_t1-1.ndjson
2021-08-19 22:05:35        180 changefeed/ttl/outbox1/2021-08-20/202108200204308217750000000000001-fad43b3554ca0a0b-1-5-00000001-outbox_t1-1.ndjson
2021-08-19 21:46:33        222 changefeed/ttl/outbox2/2021-08-20/202108200145325129600000000000000-212aeaca8652f3bc-1-8-00000000-outbox_t2-1.ndjson
2021-08-19 21:46:33        222 changefeed/ttl/outbox2/2021-08-20/202108200145325129600000000000000-212aeaca8652f3bc-1-8-00000001-outbox_t3-1.ndjson
2021-08-19 22:05:35        180 changefeed/ttl/outbox2/2021-08-20/202108200204310221340000000000001-212aeaca8652f3bc-1-8-00000002-outbox_t2-1.ndjson
2021-08-19 22:05:35        180 changefeed/ttl/outbox2/2021-08-20/202108200204310221340000000000001-212aeaca8652f3bc-1-8-00000003-outbox_t3-1.ndjson
2021-08-19 21:46:33        222 changefeed/ttl/test/2021-08-20/202108200145319247430000000000000-64e78d6565e58782-1-2-00000000-test_t0-1.ndjson


Step 1 - Get the List of Outbox Tables That Need TTL

This is a simple selection that queries the internal catalog of CockroachDB to find tables that have an 'outbox' prefix.

SQL
 
select table_catalog, table_name
from information_schema.tables
where table_name like 'outbox%';


And the output should be:

Shell
 
table_catalog | table_name
----------------+-------------
defaultdb     | outbox_t1
defaultdb     | outbox_t2
defaultdb     | outbox_t3
(3 rows)


Pretty simple.

Step 2 - Find the Active Changefeeds for the Outbox Tables

Again, let's query CockroachDB's internal catalog to see which tables that are prefixed with 'outbox' also have a running Changefeed.

SQL
 
select
j.job_id,
n."parentID",
n2.name as "database",
j.id,
n.name as "table",
j.high_water_timestamp
from system.namespace n
inner join
(
  select job_id, unnest(descriptor_ids) as id, high_water_timestamp
  from crdb_internal.jobs
  where "job_type" = 'CHANGEFEED'
    and "status" = 'running'
) j
on j.id = n.id
inner join
system.namespace n2
on n."parentID" = n2.id
where n."parentID" != 0
  and n.name like 'outbox%'
;


The output should be something like this:

Shell
 
job_id             | parentID | database  | id |   table   |      high_water_timestamp
---------------------+----------+-----------+----+-----------+---------------------------------
686009654600433665 |       50 | defaultdb | 53 | outbox_t1 | 1629424169484287000.0000000000
686009655343546369 |       50 | defaultdb | 55 | outbox_t3 | 1629424169684500000.0000000000
686009655343546369 |       50 | defaultdb | 54 | outbox_t2 | 1629424169684500000.0000000000
(3 rows)

Time: 10ms total (execution 10ms / network 0ms)


Step 3 - Create Those TTL-ish Delete Statements

And this is really the only statement you need to run. This will create the delete statements of records to delete in the outbox table. The records that will be deleted will compare the high water timestamp of the change feed to the mvcc timestamp of the record. The high water timestamp of the Changefeed is the checkpoint that indicates records that have been sent to their sink up to a particular timestamp. This is one of my favorite queries...

SQL
 
select
'delete from ' || n2.name || '.' || n.name || ' where crdb_internal_mvcc_timestamp < ' || j.high_water_timestamp::STRING || ';' as "SQL"
from system.namespace n
inner join
(
  select job_id, unnest(descriptor_ids) as id, high_water_timestamp
  from crdb_internal.jobs
  where "job_type" = 'CHANGEFEED'
    and "status" = 'running'
) j
on j.id = n.id
inner join
system.namespace n2
on n."parentID" = n2.id
where n."parentID" != 0
  and n.name like 'outbox%'
;


The output here will create the delete statements for you to run:

Shell
 
SQL
-------------------------------------------------------------------------------------------------------
delete from defaultdb.outbox_t1 where crdb_internal_mvcc_timestamp < 1629424830259619000.0000000000
delete from defaultdb.outbox_t3 where crdb_internal_mvcc_timestamp < 1629424830459942000.0000000000
delete from defaultdb.outbox_t2 where crdb_internal_mvcc_timestamp < 1629424830459942000.0000000000
(3 rows)


Step 4 - Run the Delete Statements

This is the last step where we remove the records that have already been emitted to our sink.

Shell
 
> delete from defaultdb.outbox_t1 where crdb_internal_mvcc_timestamp < 1629425070821775000.0000000000;
delete from defaultdb.outbox_t3 where crdb_internal_mvcc_timestamp < 1629425071022134000.0000000000;
delete from defaultdb.outbox_t2 where crdb_internal_mvcc_timestamp < 1629425071022134000.0000000000;
DELETE 2

Time: 4ms total (execution 4ms / network 0ms)

DELETE 2

Time: 2ms total (execution 2ms / network 0ms)

DELETE 2

Time: 3ms total (execution 3ms / network 0ms)


Step 5 - Clean Up

Lastly, let's clean up our files that we placed in S3.

aws s3 rm s3://chrisc-test/changefeed/ttl/ --recursive

Conclusion

You can easily create a program to use the query in Step 3, execute it, then take the SQL output and execute the generated statements. Below is a quick and dirty example of how you can do this in Python.

Python
 
import psycopg2
conn = psycopg2.connect(database="defaultdb", user="demo", password="demo61304" ,host="localhost", port=26257)
conn.set_session(autocommit=True)
cur = conn.cursor()
sql=""" select 'delete from ' || n2.name || '.' || n.name || ' where crdb_internal_mvcc_timestamp < ' || j.high_water_timestamp::STRING || ';' as "SQL" from system.namespace n inner join ( select job_id, unnest(descriptor_ids) as id, high_water_timestamp from crdb_internal.jobs where "job_type" = 'CHANGEFEED' and "status" = 'running' ) j on j.id = n.id inner join system.namespace n2 on n."parentID" = n2.id where n."parentID" != 0 and n.name like 'outbox%'; """
cur.execute(sql)
ttl=cur.fetchall()
ttlsql='; '.join(map(str,ttl)).replace("'","").replace("(","").replace(";,)","")
cur.execute(ttlsql)
cur.statusmessage
cur.close()
conn.close()


I hope you enjoyed this creative way of removing historical records from your Outbox tables!



Database Time to live

Published at DZone with permission of Chris Casano. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Mastering System Design: A Comprehensive Guide to System Scaling for Millions, Part 2
  • Scalable JWT Token Revocation in Spring Boot
  • Manual Sharding in PostgreSQL: A Step-by-Step Implementation Guide
  • Why Database Migrations Take Months and How to Speed Them Up

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!