DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • What Are Protocol Buffers?
  • Compile Protocol Buffers Using Maven
  • Building a Real-Time Change Data Capture Pipeline With Debezium, Kafka, and PostgreSQL
  • Next Evolution in Integration: Architecting With Intent Using Model Context Protocol

Trending

  • Efficient API Communication With Spring WebClient
  • IoT and Cybersecurity: Addressing Data Privacy and Security Challenges
  • System Coexistence: Bridging Legacy and Modern Architecture
  • After 9 Years, Microsoft Fulfills This Windows Feature Request
  1. DZone
  2. Data Engineering
  3. Databases
  4. Emitting Protocol Buffers Using CockroachDB CDC Queries

Emitting Protocol Buffers Using CockroachDB CDC Queries

Follow this demonstration using several recent features to demonstrate the ability to serialize CockroachDB rows to proto and emit via CDC Queries.

By 
Artem Ervits user avatar
Artem Ervits
DZone Core CORE ·
Sep. 01, 23 · Tutorial
Likes (1)
Comment
Save
Tweet
Share
5.3K Views

Join the DZone community and get the full member experience.

Join For Free

Previous Articles on CockroachDB CDC

  • Using CockroachDB CDC with Apache Pulsar
  • Using CockroachDB CDC with Azure Event Hubs
  • Using CockroachDB CDC with Confluent Cloud Kafka and Schema Registry
  • SaaS Galore: Integrating CockroachDB With Confluent Kafka, Fivetran, and Snowflake
  • CockroachDB CDC Using Minio as Cloud Storage Sink - Part 3
  • CockroachDB CDC using Hadoop Ozone S3 Gateway as cloud storage sink

Motivation

Protocol Buffers are language-neutral, platform-neutral extensible mechanisms for serializing structured data. It's a common choice for platforms needing to pass messages between systems. CockroachDB is a distributed SQL database built on a transactional and strongly-consistent key-value store. It scales horizontally; survives disk, machine, rack, and even datacenter failures with minimal latency disruption and no manual intervention; supports strongly consistent ACID transactions; and provides a familiar SQL API for structuring, manipulating, and querying data.

There is no official support for Protocol Buffers in CockroachDB Changefeeds, even though we use Protocol Buffers extensively in the code. A recent customer conversation led to this experiment where I'm going to use several recent features to demonstrate the ability to serialize CockroachDB rows to proto and emit via CDC Queries. This is the first time we're looking at CDC Queries. This is a new flexible way to express CockroachDB streams.

This tutorial assumes you have an enterprise license. Given the features in this tutorial are unavailable as a product, you have to follow the steps exactly as described to pull the right source code to make it work. These features are not available in any of the available offerings from Cockroach Labs.

High-Level Steps

  • Build CockroachDB with the Protocol Buffers function
  • Deploy a CockroachDB cluster with enterprise changefeeds
  • Deploy a Kafka Consumer
  • Verify
  • Conclusion

Step-by-Step Instructions

Build CockroachDB With the Protocol Buffers Function

Before I show you how to get this working, I'd like to express my gratitude to Yevgeniy Miretskiy, who works on the CDC team for the capability and his mentorship to get this working. The source code for the feature is available in the following commit. For brevity, I will skip the steps to setup a build environment.

Check out the pull request:

gh pr checkout 89955


Run the preliminary steps to build Cockroach from the source.

./dev doctor


Finally, build the code.

bazel build pkg/cmd/cockroach-short


Navigate to the directory with the built package.

cd _bazel/bin/pkg/cmd/cockroach


Deploy a CockroachDB Cluster With Enterprise Changefeeds

Start an instance of CockroachDB using the built package:

./cockroach start-single-node --insecure --background


To enable CDC we need to execute the following commands:

SET CLUSTER SETTING cluster.organization = '<organization name>';

SET CLUSTER SETTING enterprise.license = '<secret>';

SET CLUSTER SETTING kv.rangefeed.enabled = true;


Generate sample data:

CREATE TABLE office_dogs (
     id INT PRIMARY KEY,
     name STRING);

INSERT INTO office_dogs VALUES
   (1, 'Petee'),
   (2, 'Carl');

UPDATE office_dogs SET name = 'Petee H' WHERE id = 1;
SELECT * FROM office_dogs;
  id |  name
-----+----------
   1 | Petee H
   2 | Carl


The function we are going to use to convert rows to Protocol Buffers is crdb_internal.row_to_proto(). With the given pull request, this function is readily available for querying.

SELECT crdb_internal.row_to_proto(office_dogs) FROM office_dogs;
                          crdb_internal.row_to_proto
------------------------------------------------------------------------------
  \x0a110a046e616d6512091a07506574656520480a0f0a026964120911000000000000f03f
  \x0a0e0a046e616d6512061a044361726c0a0f0a0269641209110000000000000040


It takes the row and serializes it as proto. We can decode the row back to human readable form using the following query, skip the \x, and copy the rest of the output into the following function:

SELECT crdb_internal.pb_to_json('google.protobuf.Struct',
decode('0a110a046e616d6512091a07506574656520480a0f0a026964120911000000000000f03f', 'hex')) AS proto;
             proto
--------------------------------
  {"id": 1, "name": "Petee H"}
SELECT crdb_internal.pb_to_json('google.protobuf.Struct',
decode('0a0e0a046e616d6512061a044361726c0a0f0a0269641209110000000000000040', 'hex')) AS proto;
            proto
-----------------------------
  {"id": 2, "name": "Carl"}


We can use this function in the CDC query, but first, let's set up a webhook sink for a quick demonstration of changefeed queries.

git clone https://github.com/cockroachlabs/cdc-webhook-sink-test-server.git
cd cdc-webhook-sink-test-server
cd go-https-server
chmod +x server.sh
./server.sh
./server.sh
......+.........+.....+....+...+.....+...+.+.....+++++++++++++++++++++++++++++++++++++++++++++*.+...........+.+......+.....+...+....+...+......+...........+...+......+++++++++++++++++++++++++++++++++++++++++++++*......+.....+.............+...+..+.+.....................+......+..+.+++++
-----
2023/08/30 09:39:05 starting server on port 3000


With all of the basics in place, we can create a changefeed.

CREATE CHANGEFEED INTO 'webhook-https://localhost:3000?insecure_tls_skip_verify=true' WITH updated AS SELECT crdb_internal.row_to_proto(office_dogs) AS proto FROM office_dogs;
        job_id
----------------------
  895654238351589377


Verify

Looking at the terminal where the webhook sink is running:

2023/08/30 11:11:25 {"payload":[{"__crdb__": {"updated": "1693408285757033000.0000000000"}, "proto": "\\x0a0e0a046e616d6512061a044361726c0a0f0a0269641209110000000000000040"}],"length":1}
2023/08/30 11:11:25 {"payload":[{"__crdb__": {"updated": "1693408285757033000.0000000000"}, "proto": "\\x0a110a046e616d6512091a07506574656520480a0f0a026964120911000000000000f03f"}],"length":1}


Let's update a record in the office_dogs TABLE:

UPDATE office_dogs SET name = 'Tarzan' WHERE id = 1; 
2023/08/30 11:12:58 {"payload":[{"__crdb__": {"updated": "1693408377084928000.0000000000"}, "proto": "\\x0a100a046e616d6512081a065461727a616e0a0f0a026964120911000000000000f03f"}],"length":1}


If we use the decode function to inspect the payload:

SELECT crdb_internal.pb_to_json('google.protobuf.Struct', decode('0a100a046e616d6512081a065461727a616e0a0f0a026964120911000000000000f03f', 'hex')) AS proto;
             proto
-------------------------------
  {"id": 1, "name": "Tarzan"}


I have to mention that the emitted messages are of dynamically typed format and not strongly typed. If your use case requires strongly typed, it's a conversation we have to have another time.

Conclusion

This is how you can leverage CockroachDB CDC Queries with built-in functions. This function is not available, but it can be, given higher demand. Hopefully, you've found this article useful. Please reach out to our tech if you need this capability and we will consider it in the future.

CockroachDB Protocol Buffers Protocol (object-oriented programming) Change data capture

Published at DZone with permission of Artem Ervits. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • What Are Protocol Buffers?
  • Compile Protocol Buffers Using Maven
  • Building a Real-Time Change Data Capture Pipeline With Debezium, Kafka, and PostgreSQL
  • Next Evolution in Integration: Architecting With Intent Using Model Context Protocol

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!