DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • DNS Propagation Doesn't Have to Take 24 Hours
  • Mastering System Design: A Comprehensive Guide to System Scaling for Millions, Part 2
  • Scalable JWT Token Revocation in Spring Boot
  • Building a High-Throughput Distributed Sequence Generator Using the Hi-Lo Algorithm

Trending

  • Jakarta EE 12: Entering the Data Age of Enterprise Java
  • Persistent Memory for AI Agents Using LangChain's Deep Agents
  • Liquid Glass, Material 3, and a Lot of Plumbing
  • Good Data, Bad Metric: A Mutation Testing Pattern for Analytics Engineering
  1. DZone
  2. Data Engineering
  3. Databases
  4. Clean Up Your Outbox Tables With Programmatic TTL

Clean Up Your Outbox Tables With Programmatic TTL

The goal of this post is to show how you can programmatically remove records from an Outbox table that have been flushed to its sink (e.g., Kafka).

By 
Chris Casano user avatar
Chris Casano
·
Sep. 01, 21 · Code Snippet
Likes (2)
Comment
Save
Tweet
Share
4.9K Views

Join the DZone community and get the full member experience.

Join For Free

For those familiar with the Outbox Pattern, CockroachDB provides some unique capabilities for handling these types of architectural patterns. One common method is to use Changefeeds in CockroachDB to send an acknowledgment message back to the originating service that the database transaction was committed. Changefeeds are great in this scenario in that they can be emitted on a record mutation on the table (except Import), connect to a message bus like Kafka, and emit the payload in a mildly low latent (~100ms) fashion. However, one circumstance of this pattern is having historical records build up in the Outbox table. Fortunately, we have a rather nifty solution that can clean up these Outbox tables.

So the goal in this post is to show how you can programmatically remove records from an Outbox table that have been flushed to its sink (i.e Kafka). The idea here is to create a clean-up job that removes records where the MVCC timestamp of an Outbox record is adequately past the high watermark of a Changefeed.

If you want to cut the chase, proceed directly to Step 3 to find the TTL statement to execute. Steps 1 and 2 show how you can find the details in the CockroachDB catalog.

Lastly, the steps below use S3 as a sink instead of Kafka but it can easily be repurposed to use Kafka by changing the sink target in the Changefeed.

High-Level Steps

  • Get the list of outbox tables that need TTL
  • Find the active Changefeed for those tables
  • For each table, delete rows where the mvcc_internal timestamp of the row is < the high watermark of the Changefeed

You can run this test using CockroachCloud or CockroachDB. If using CockroachDB, I would recommend trying this using cockroach demo.

Shell
 
cockroach demo --nodes 3 --empty


Step 0 - Create a Schema To Test With

For this schema, we'll create 1 decoy table and 3 outbox tables. We'll also insert some initial records, create a Changefeed on only 2 outbox tables and then add more data after the Changefeeds are created. We want to show how the SQL TTL script only picks up Outbox tables with Changefeeds. Notice how one Changefeed includes two tables: outbox_t2 and outbox_t3. We'll make sure to pick up both of the tables.

SQL
 
set cluster setting  kv.rangefeed.enabled = true;

create table test_t0 (i int primary key);

create table outbox_t1 (i int primary key);
create table outbox_t2 (i int primary key);
create table outbox_t3 (i int primary key);

insert into test_t0 values (unique_rowid());
insert into outbox_t1 values (unique_rowid());
insert into outbox_t2 values (unique_rowid());
insert into outbox_t3 values (unique_rowid());

CREATE CHANGEFEED FOR TABLE test_t0 INTO 'experimental-s3://chrisc-test/changefeed/ttl/test?AUTH=implicit' WITH updated, resolved = '1m';

CREATE CHANGEFEED FOR TABLE outbox_t1 INTO 'experimental-s3://chrisc-test/changefeed/ttl/outbox1?AUTH=implicit' WITH updated, resolved = '1m';

CREATE CHANGEFEED FOR TABLE outbox_t2, outbox_t3 INTO 'experimental-s3://chrisc-test/changefeed/ttl/outbox2?AUTH=implicit' WITH updated, resolved = '1m';

insert into test_t0 values (unique_rowid());
insert into outbox_t1 values (unique_rowid());
insert into outbox_t2 values (unique_rowid());
insert into outbox_t3 values (unique_rowid());


Let's also verify the Changefeeds are sending data to our sink:

SQL
 
aws s3 ls s3://chrisc-test/changefeed/ttl/ --recursive | grep ndjson


The output should look like this:

Shell
 
2021-08-19 21:46:33        222 changefeed/ttl/outbox1/2021-08-20/202108200145322858970000000000000-fad43b3554ca0a0b-1-5-00000000-outbox_t1-1.ndjson
2021-08-19 22:05:35        180 changefeed/ttl/outbox1/2021-08-20/202108200204308217750000000000001-fad43b3554ca0a0b-1-5-00000001-outbox_t1-1.ndjson
2021-08-19 21:46:33        222 changefeed/ttl/outbox2/2021-08-20/202108200145325129600000000000000-212aeaca8652f3bc-1-8-00000000-outbox_t2-1.ndjson
2021-08-19 21:46:33        222 changefeed/ttl/outbox2/2021-08-20/202108200145325129600000000000000-212aeaca8652f3bc-1-8-00000001-outbox_t3-1.ndjson
2021-08-19 22:05:35        180 changefeed/ttl/outbox2/2021-08-20/202108200204310221340000000000001-212aeaca8652f3bc-1-8-00000002-outbox_t2-1.ndjson
2021-08-19 22:05:35        180 changefeed/ttl/outbox2/2021-08-20/202108200204310221340000000000001-212aeaca8652f3bc-1-8-00000003-outbox_t3-1.ndjson
2021-08-19 21:46:33        222 changefeed/ttl/test/2021-08-20/202108200145319247430000000000000-64e78d6565e58782-1-2-00000000-test_t0-1.ndjson


Step 1 - Get the List of Outbox Tables That Need TTL

This is a simple selection that queries the internal catalog of CockroachDB to find tables that have an 'outbox' prefix.

SQL
 
select table_catalog, table_name
from information_schema.tables
where table_name like 'outbox%';


And the output should be:

Shell
 
table_catalog | table_name
----------------+-------------
defaultdb     | outbox_t1
defaultdb     | outbox_t2
defaultdb     | outbox_t3
(3 rows)


Pretty simple.

Step 2 - Find the Active Changefeeds for the Outbox Tables

Again, let's query CockroachDB's internal catalog to see which tables that are prefixed with 'outbox' also have a running Changefeed.

SQL
 
select
j.job_id,
n."parentID",
n2.name as "database",
j.id,
n.name as "table",
j.high_water_timestamp
from system.namespace n
inner join
(
  select job_id, unnest(descriptor_ids) as id, high_water_timestamp
  from crdb_internal.jobs
  where "job_type" = 'CHANGEFEED'
    and "status" = 'running'
) j
on j.id = n.id
inner join
system.namespace n2
on n."parentID" = n2.id
where n."parentID" != 0
  and n.name like 'outbox%'
;


The output should be something like this:

Shell
 
job_id             | parentID | database  | id |   table   |      high_water_timestamp
---------------------+----------+-----------+----+-----------+---------------------------------
686009654600433665 |       50 | defaultdb | 53 | outbox_t1 | 1629424169484287000.0000000000
686009655343546369 |       50 | defaultdb | 55 | outbox_t3 | 1629424169684500000.0000000000
686009655343546369 |       50 | defaultdb | 54 | outbox_t2 | 1629424169684500000.0000000000
(3 rows)

Time: 10ms total (execution 10ms / network 0ms)


Step 3 - Create Those TTL-ish Delete Statements

And this is really the only statement you need to run. This will create the delete statements of records to delete in the outbox table. The records that will be deleted will compare the high water timestamp of the change feed to the mvcc timestamp of the record. The high water timestamp of the Changefeed is the checkpoint that indicates records that have been sent to their sink up to a particular timestamp. This is one of my favorite queries...

SQL
 
select
'delete from ' || n2.name || '.' || n.name || ' where crdb_internal_mvcc_timestamp < ' || j.high_water_timestamp::STRING || ';' as "SQL"
from system.namespace n
inner join
(
  select job_id, unnest(descriptor_ids) as id, high_water_timestamp
  from crdb_internal.jobs
  where "job_type" = 'CHANGEFEED'
    and "status" = 'running'
) j
on j.id = n.id
inner join
system.namespace n2
on n."parentID" = n2.id
where n."parentID" != 0
  and n.name like 'outbox%'
;


The output here will create the delete statements for you to run:

Shell
 
SQL
-------------------------------------------------------------------------------------------------------
delete from defaultdb.outbox_t1 where crdb_internal_mvcc_timestamp < 1629424830259619000.0000000000
delete from defaultdb.outbox_t3 where crdb_internal_mvcc_timestamp < 1629424830459942000.0000000000
delete from defaultdb.outbox_t2 where crdb_internal_mvcc_timestamp < 1629424830459942000.0000000000
(3 rows)


Step 4 - Run the Delete Statements

This is the last step where we remove the records that have already been emitted to our sink.

Shell
 
> delete from defaultdb.outbox_t1 where crdb_internal_mvcc_timestamp < 1629425070821775000.0000000000;
delete from defaultdb.outbox_t3 where crdb_internal_mvcc_timestamp < 1629425071022134000.0000000000;
delete from defaultdb.outbox_t2 where crdb_internal_mvcc_timestamp < 1629425071022134000.0000000000;
DELETE 2

Time: 4ms total (execution 4ms / network 0ms)

DELETE 2

Time: 2ms total (execution 2ms / network 0ms)

DELETE 2

Time: 3ms total (execution 3ms / network 0ms)


Step 5 - Clean Up

Lastly, let's clean up our files that we placed in S3.

aws s3 rm s3://chrisc-test/changefeed/ttl/ --recursive

Conclusion

You can easily create a program to use the query in Step 3, execute it, then take the SQL output and execute the generated statements. Below is a quick and dirty example of how you can do this in Python.

Python
 
import psycopg2
conn = psycopg2.connect(database="defaultdb", user="demo", password="demo61304" ,host="localhost", port=26257)
conn.set_session(autocommit=True)
cur = conn.cursor()
sql=""" select 'delete from ' || n2.name || '.' || n.name || ' where crdb_internal_mvcc_timestamp < ' || j.high_water_timestamp::STRING || ';' as "SQL" from system.namespace n inner join ( select job_id, unnest(descriptor_ids) as id, high_water_timestamp from crdb_internal.jobs where "job_type" = 'CHANGEFEED' and "status" = 'running' ) j on j.id = n.id inner join system.namespace n2 on n."parentID" = n2.id where n."parentID" != 0 and n.name like 'outbox%'; """
cur.execute(sql)
ttl=cur.fetchall()
ttlsql='; '.join(map(str,ttl)).replace("'","").replace("(","").replace(";,)","")
cur.execute(ttlsql)
cur.statusmessage
cur.close()
conn.close()


I hope you enjoyed this creative way of removing historical records from your Outbox tables!



Database Time to live

Published at DZone with permission of Chris Casano. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • DNS Propagation Doesn't Have to Take 24 Hours
  • Mastering System Design: A Comprehensive Guide to System Scaling for Millions, Part 2
  • Scalable JWT Token Revocation in Spring Boot
  • Building a High-Throughput Distributed Sequence Generator Using the Hi-Lo Algorithm

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook