DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • JSON-Based Serialized LOB Pattern
  • Modify JSON Data in Postgres and Hibernate 6
  • Practical Generators in Go 1.23 for Database Pagination
  • Database Keys: A Comprehensive Guide

Trending

  • Can You Run a MariaDB Cluster on a $150 Kubernetes Lab? I Gave It a Shot
  • Apache Spark 4.0: Transforming Big Data Analytics to the Next Level
  • How GitHub Copilot Helps You Write More Secure Code
  • Security by Design: Building Full-Stack Applications With DevSecOps
  1. DZone
  2. Data Engineering
  3. Data
  4. Tour of Snowflake Ingestion Using CockroachDB and Redpanda Connect

Tour of Snowflake Ingestion Using CockroachDB and Redpanda Connect

This is an exploratory article delving into various ways to ingest real-time data into Snowflake and the levels of difficulty and effort required.

By 
Artem Ervits user avatar
Artem Ervits
DZone Core CORE ·
Aug. 15, 24 · Tutorial
Likes (1)
Comment
Save
Tweet
Share
5.3K Views

Join the DZone community and get the full member experience.

Join For Free

Previous Articles on Snowflake

  • Integrating Snowflake with Trino

Previous Articles on CockroachDB CDC

  • Using CockroachDB CDC with Apache Pulsar
  • Using CockroachDB CDC with Azure Event Hubs
  • SaaS Galore: Integrating CockroachDB with Confluent Kafka, FiveTran, and Snowflake
  • Using CockroachDB CDC with Confluent Cloud Kafka and Schema Registry
  • CockroachDB CDC using Minio as cloud storage sink
  • CockroachDB CDC using Hadoop Ozone S3 Gateway as cloud storage sink

Motivation

I work with financial services clients, and it's common to encounter a need for streaming changes in the operational data store into a data warehouse or a data lake. A former colleague recently reached out for advice on the fastest and most efficient way to load trade data into Snowflake. I've come up with at least three methods, which I will explore in a follow-up series of articles. However, I've decided to first explore Redpanda Connect, a solution that has recently caught my attention. This is by no means a conclusive guide on how changefeed data must be loaded into Snowflake; we're merely exploring the possibilities and discussing the pros and cons in later articles.

CockroachDB changefeeds are an enterprise feature and require a license. In this tutorial, I'm using a free-to-start version of CockroachDB Serverless, which has enterprise changefeeds enabled.

High-Level Steps

  • Deploy a CockroachDB cluster with enterprise changefeeds
  • Deploy Redpanda Connect
  • Deploy Snowflake
  • Verify
  • Conclusion

Step-By-Step Instructions

Deploy a CockroachDB Cluster With Enterprise Changefeeds

Start an instance of CockroachDB or use the managed service.

To enable CDC we need to execute the following commands:

SET CLUSTER SETTING cluster.organization = '<organization name>';

SET CLUSTER SETTING enterprise.license = '<secret>';

SET CLUSTER SETTING kv.rangefeed.enabled = true;


I am using CockroachDB Serverless and the above steps are not necessary. You may confirm whether the changefeeds are indeed enabled using the following command:

SHOW CLUSTER SETTING kv.rangefeed.enabled;


If the value is false, change it to true.

Generate sample data:

CREATE TABLE office_dogs (
     id INT PRIMARY KEY,
     name STRING);

INSERT INTO office_dogs VALUES
   (1, 'Petee'),
   (2, 'Carl');

UPDATE office_dogs SET name = 'Petee H' WHERE id = 1;


We've populated the table and then updated a record. Let's add more data to make it interesting:

INSERT INTO office_dogs SELECT generate_series(3, 10000), md5(random()::string);
SELECT * FROM office_dogs LIMIT 5;
id,name
1,Petee H
2,Carl
3,6e19280ae649efffa7a58584c7f46032
4,5e4e897f008bb752c8edfa64a3aed356
5,abc0d898318d27f23a43060f89d62e34
SELECT COUNT(*) FROM office_dogs;


Deploy Redpanda Connect

I'm running Redpanda Connect in a Docker Compose file.

docker compose -f compose-redpanda.yaml up -d


The contents of the file are:

services:

  redpanda:
    container_name: redpanda-connect
    hostname: redpanda-connect
    image: docker.redpanda.com/redpandadata/connect
    volumes:
      - ./redpanda/connect.yaml:/connect.yaml
      - /Users/aervits/.ssh/rsa_key.pem:/rsa_key.pem


I will be using the connect.yaml file as the foundation to connect all the components in this article. For more detailed information, you can refer to the documentation provided by Redpanda.

The most basic configuration looks like so:

input:
  stdin: {}

pipeline:
  processors: []

output:
  stdout: {}


Since I'm using CockroachDB input, mine looks like this:

input:
  # CockroachDB Input
  label: ""
  cockroachdb_changefeed:
    dsn: postgresql://<user>:<password>@<cockroachdb-cluster>:<port>/<database>?sslmode=verify-full
    tls:
      skip_cert_verify: true
      #enable_renegotiation: false
      #root_cas: ""
      #root_cas_file: ""
      client_certs: []
    tables: [table_for_cdc] # No default (required)
    cursor_cache: "" # No default (optional)
    auto_replay_nacks: true

pipeline:
  processors: []

output:
  stdout: {}


Leave the pipeline and output as default.

For reference, I'm including the repo with my source code where you can reference the values.

If you have been following along, you may have noticed that I haven't started a changefeed job in CockroachDB. The cockroachdb_changefeed input directly subscribes to the table, which can be observed by examining the logs using the command docker logs redpanda-connect --follow. If you look at the connect.yaml file, the output is sent to stdout:

{"primary_key":"[9998]","row":"{\"after\": {\"id\": 9998, \"name\": \"0794a9d1c99e8e47ee4515be6e0d736f\"}}","table":"office_dogs"}
{"primary_key":"[9999]","row":"{\"after\": {\"id\": 9999, \"name\": \"c85a6b38154f7e3085d467d567141d45\"}}","table":"office_dogs"}
{"primary_key":"[10000]","row":"{\"after\": {\"id\": 10000, \"name\": \"aae9e0849fff8f47e0371a4c06fb255b\"}}","table":"office_dogs"}


The next step is to configure Snowflake. We are not going to look at the available processors today.

Deploy Snowflake

I'm using a Snowflake trial account. You get a generous credit which should be sufficient to complete this tutorial.

We need to create a database and a table where we will output the changefeed data.

CREATE OR REPLACE DATABASE FROM_COCKROACH;
CREATE OR REPLACE TABLE OFFICE_DOGS (RECORD variant);


We also need to create a user with key-pair authentication as we're going to be using the Snowpipe service.

openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8


We must use an encrypted key as Redpanda doesn't support unencrypted versions.

Generate a public key:

openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub 


Lastly, generate a pem file from the private key:

openssl pkcs8 -in rsa_key.p8 -out rsa_key.pem


In Snowflake, alter the user to use the key pair generated in the previous step.

ALTER USER username SET rsa_public_key='MIIB...';


We can now populate the connect.yaml file with the required information for the snowflake_put output. This output type is for commercial use and requires a license, but since we're using it for demo purposes, we are able to proceed.

output:
  # Snowflake Output
  label: ""
  snowflake_put:
    account: <snowflake-account>
    user: <user>
    private_key_file: rsa_key.pem
    role: ACCOUNTADMIN
    database: <database>
    warehouse: <warehouse>
    schema: <schema>
    stage: "@%implicit_table_stage_name"
    path: "path"
    upload_parallel_threads: 4
    compression: NONE
    batching:
      count: 10
      period: 3s
      processors:
        - archive:
            format: json_array
    max_in_flight: 1


If we restart the compose environment and tail the logs, we can see the following:

level=info msg="Running main config from specified file" @service=benthos benthos_version=v4.32.1 path=/connect.yaml
level=info msg="Listening for HTTP requests at: http://0.0.0.0:4195" @service=benthos
level=info msg="Launching a Redpanda Connect instance, use CTRL+C to close" @service=benthos
level=info msg="Output type snowflake_put is now active" @service=benthos label="" path=root.output
level=info msg="Input type cockroachdb_changefeed is now active" @service=benthos label="" path=root.input


Let's look at the implicit table stage and observe if anything has changed.

list @%office_dogs
| dogs/f2f3cf47-d6bc-46f4-88f2-c82519b67481.json | 1312 | 30f709e4962bae9d10b48565d22e9f32 | Wed, 14 Aug 2024 18:58:43 GMT |
| dogs/f6adbf39-3955-4848-93c3-06f873a88078.json | 1312 | 28be7a619ef1e139599077e977ea130b | Wed, 14 Aug 2024 18:58:13 GMT |
| dogs/f8705606-eb07-400a-9ffe-da6834fa1a30.json | 1296 | 5afbdce0e8929fc38a2eb5e0f12b96d6 | Wed, 14 Aug 2024 18:57:29 GMT |
| dogs/f9e5c01a-7dda-4e76-840d-13b8a1e4946a.json | 1296 | 5480c01f1578f67afe2761c7619e9123 | Wed, 14 Aug 2024 18:57:32 GMT |
| dogs/fad4efe7-3f3f-48bc-bdb4-9f0310abcf4d.json | 1312 | 5942c6e2dbaef5ee257d4a9b8e68827d | Wed, 14 Aug 2024 18:58:04 GMT |


The files are ready to be copied into a table. Let's create a pipe:

CREATE OR REPLACE PIPE FROM_COCKROACH.PUBLIC.cockroach_pipe AUTO_INGEST = FALSE AS COPY INTO FROM_COCKROACH.PUBLIC.OFFICE_DOGS FROM (SELECT * FROM @%office_dogs) FILE_FORMAT = (TYPE = JSON COMPRESSION = AUTO STRIP_OUTER_ARRAY = TRUE);


The last remaining step is to refresh the pipe.

ALTER PIPE cockroach_pipe REFRESH;
| dogs/ff0871b1-6f49-43a4-a929-958d07f74046.json | SENT   |
| dogs/ff131d8d-3781-4cf6-8700-edd50dbb87de.json | SENT   |
| dogs/ff216da1-4f9d-4b37-9776-bcd559dd4a6f.json | SENT   |
| dogs/ff221430-4c3a-46be-bbc2-d335cc6cc9e3.json | SENT   |
| dogs/ffbd7d45-5084-4e36-8907-61874ac652b4.json | SENT   |
| dogs/fffb5fa6-23cc-4450-934a-29ccf01c67b9.json | SENT   |


Let's query the table in Snowflake:

SELECT * FROM OFFICE_DOGS LIMIT 5;
| {                                                                                       |
|   "primary_key": "[5241]",                                                              |
|   "row": "{\"after\": {\"id\": 5241, \"name\": \"5e0360a0d10d849afbbfa319a50bccf2\"}}", |
|   "table": "office_dogs"                                                                |
| }                                                                                       |
| {                                                                                       |
|   "primary_key": "[5242]",                                                              |
|   "row": "{\"after\": {\"id\": 5242, \"name\": \"62be250249afe74bfbc5dd356e7b0ad9\"}}", |
|   "table": "office_dogs"                                                                |
| }                                                                                       |
| {                                                                                       |
|   "primary_key": "[5243]",                                                              |
|   "row": "{\"after\": {\"id\": 5243, \"name\": \"7f286800a8a03e74938d09fdba52f869\"}}", |
|   "table": "office_dogs"                                                                |
| }                                                                                       |
| {                                                                                       |
|   "primary_key": "[5244]",                                                              |
|   "row": "{\"after\": {\"id\": 5244, \"name\": \"16a330b8f09bcd314f9760ffe26d0ae2\"}}", |
|   "table": "office_dogs"                                                                |
| }


We expect 10000 rows:

SELECT COUNT(*) FROM OFFICE_DOGS;
+----------+                                                                    
| COUNT(*) |
|----------|
|    10000 |
+----------+


The data is in JSON format. Let's create a view and flatten the data out.

CREATE VIEW v_office_dogs AS
    SELECT PARSE_JSON(record:row):after:id::INTEGER AS id,
           PARSE_JSON(record:row):after:name::STRING AS name FROM OFFICE_DOGS;


Query the view:

SELECT * FROM v_office_dogs WHERE id < 6;
+----+----------------------------------+                                       
| ID | NAME                             |
|----+----------------------------------|
|  1 | Petee H                          |
|  2 | Carl                             |
|  3 | 6e19280ae649efffa7a58584c7f46032 |
|  4 | 5e4e897f008bb752c8edfa64a3aed356 |
|  5 | abc0d898318d27f23a43060f89d62e34 |
+----+----------------------------------+


Verify

Let's make things a bit more interesting and delete data in CockroachDB.

DELETE FROM office_dogs WHERE name = 'Carl';
DELETE FROM office_dogs WHERE id = 1;


In Snowflake, let's refresh the pipe as of a few minutes ago:

ALTER PIPE cockroach_pipe REFRESH MODIFIED_AFTER='2024-08-14T12:10:00-07:00';


Notice there are a couple of files.

+------------------------------------------------+--------+                     
| File                                           | Status |
|------------------------------------------------+--------|
| dogs/2a4ee400-6b37-4513-97cb-097764a340bc.json | SENT   |
| dogs/8f5b5b69-8a00-4dbf-979a-60c3814d96b4.json | SENT   |
+------------------------------------------------+--------+


I must caution that if you run the REFRESH manually, you may cause duplicates in your Snowflake data. We will look at better approaches in a future article.

Let's look at the row count:

+----------+                                                                    
| COUNT(*) |
|----------|
|    10002 |
+----------+


The removal process didn't properly update in Snowflake as anticipated; it recognized the deleted records but failed to mirror the state in CockroachDB. We need to incorporate additional logic to achieve this. This will be a task for another time.

Lastly, I would like to note that using Redpanda Connect as a compose file is optional. You have the option to run the Docker container by executing the following command:

docker run --rm -it -v ./redpanda/connect.yaml:/connect.yaml -v ./snowflake/rsa_key.pem:/rsa_key.pem docker.redpanda.com/redpandadata/connect run

Architecture flow using CockroachDB, Redpanda, and Snowflake

Conclusion

Today, we explored Redpanda Connect as a means to deliver streaming changefeeds into Snowflake. We've only just begun to delve into this topic, and future articles will build upon the foundations laid today.

CockroachDB Database JSON Data (computing) Data Types

Published at DZone with permission of Artem Ervits. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • JSON-Based Serialized LOB Pattern
  • Modify JSON Data in Postgres and Hibernate 6
  • Practical Generators in Go 1.23 for Database Pagination
  • Database Keys: A Comprehensive Guide

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!