DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Enhancing Performance With Data Modeling: Techniques and Best Practices for Optimization in Snowflake
  • Handling Schema Versioning and Updates in Event Streaming Platforms Without Schema Registries
  • Testing Schema Registry: Spring Boot and Apache Kafka With JSON Schema
  • Building a Real-Time Data Warehouse With TiDB and Pravega

Trending

  • Cookies Revisited: A Networking Solution for Third-Party Cookies
  • How to Configure and Customize the Go SDK for Azure Cosmos DB
  • Contextual AI Integration for Agile Product Teams
  • Optimizing Integration Workflows With Spark Structured Streaming and Cloud Services
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Exploring Real-Time Data Ingestion Into Snowflake Using CockroachDB, Redpanda, and Kafka Connect

Exploring Real-Time Data Ingestion Into Snowflake Using CockroachDB, Redpanda, and Kafka Connect

Explore Kafka Connect as a solution to stream changefeeds into Snowflake for greater control over how messages are delivered to Snowflake.

By 
Artem Ervits user avatar
Artem Ervits
DZone Core CORE ·
Sep. 10, 24 · Tutorial
Likes (1)
Comment
Save
Tweet
Share
5.4K Views

Join the DZone community and get the full member experience.

Join For Free

Previous Articles on Snowflake

  • Tour of Snowflake ingestion using CockroachDB and Redpanda Connect
  • Integrating Snowflake with Trino

Previous Articles on CockroachDB CDC

  • Emitting Protocol Buffers with CockroachDB CDC Queries
  • Using CockroachDB CDC with Apache Pulsar
  • Using CockroachDB CDC with Azure Event Hubs
  • SaaS Galore: Integrating CockroachDB with Confluent Kafka, FiveTran, and Snowflake
  • Using CockroachDB CDC with Confluent Cloud Kafka and Schema Registry
  • CockroachDB CDC using Minio as cloud storage sink
  • CockroachDB CDC using Hadoop Ozone S3 Gateway as cloud storage sink

Motivation

This article builds upon the previous discussion in "Tour of Snowflake ingestion using CockroachDB and Redpanda Connect," where we investigated the process of streaming changefeeds from CockroachDB to Snowflake using Redpanda Connect and Snowpipe in batch mode. Here, we will shift our focus to Kafka Connect and demonstrate how both batch and streaming modes can be utilized for data ingestion into Snowflake.

Overview

  • Deploy a CockroachDB cluster with enterprise changefeeds
  • Deploy Snowflake
  • Deploy Kafka Connect
  • Verify
  • Conclusion

Detailed Instructions

Deploy a CockroachDB Cluster With Enterprise Changefeeds

Start by either launching a CockroachDB instance or utilizing a managed service.

  • To enable CDC, execute the following commands:
SET CLUSTER SETTING cluster.organization = '<organization name>';
SET CLUSTER SETTING enterprise.license = '<secret>';
SET CLUSTER SETTING kv.rangefeed.enabled = true;


  • Verify that changefeeds are enabled:
SHOW CLUSTER SETTING kv.rangefeed.enabled;


If the value is false, update it to true.

  • Create a source table:
CREATE TABLE cockroachdb (
     id INT PRIMARY KEY,
     value STRING DEFAULT md5(random()::text),
     created_at TIMESTAMPTZ DEFAULT now(),
     updated_at TIMESTAMPTZ DEFAULT NULL);


  • Insert random data:
INSERT INTO cockroachdb SELECT
   (generate_series(1, 10000));


  • Update a row:
UPDATE cockroachdb SET value = 'UPDATED', updated_at = now() WHERE id = 1;


  • Create a changefeed job pointing to a local instance of Redpanda:
CREATE CHANGEFEED FOR TABLE cockroachdb INTO 'kafka://redpanda:29092';


  • Inspect the data:
SELECT * FROM cockroachdb LIMIT 5;
  id |              value               |          created_at           |          updated_at
-----+----------------------------------+-------------------------------+--------------------------------
   1 | UPDATED                          | 2024-09-09 13:17:57.837984+00 | 2024-09-09 13:17:57.917108+00
   2 | 27a41183599c44251506e2971ba78426 | 2024-09-09 13:17:57.837984+00 | NULL
   3 | 3bf8bc26a750a15691ec4d7ddbb7f5e5 | 2024-09-09 13:17:57.837984+00 | NULL
   4 | b8c5786e8651ddfb3a68eabeadb52f2e | 2024-09-09 13:17:57.837984+00 | NULL
   5 | 3a24df165773639ce89d0d877e7103b7 | 2024-09-09 13:17:57.837984+00 | NULL
(5 rows)


The next step is to set up the Snowflake Kafka connector.

Deploy Snowflake

  • Create a database and schema for outputting changefeed data:
USE ROLE SYSADMIN;
CREATE OR REPLACE DATABASE KAFKADB;
CREATE OR REPLACE SCHEMA kafka_schema;


Follow the Snowflake documentation to configure the Kafka connector.

  • Create the necessary tables:
create or replace table kafkatb_batch(
    RECORD_METADATA VARIANT,
    RECORD_CONTENT VARIANT
);

create or replace table kafkatb_streaming(
    RECORD_METADATA VARIANT,
    RECORD_CONTENT VARIANT
);


  • Set up roles and permissions:
-- Use a role that can create and manage roles and privileges.
USE ROLE securityadmin;

-- Create a Snowflake role with the privileges to work with the connector.
CREATE OR REPLACE ROLE kafka_connector_role_1;

-- Grant privileges on the database.
GRANT USAGE ON DATABASE kafkadb TO ROLE kafka_connector_role_1;

-- Grant privileges on the schema.
GRANT USAGE ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE TABLE ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE STAGE ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE PIPE ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1;

-- Only required if the Kafka connector will load data into an existing table.
GRANT OWNERSHIP ON TABLE KAFKADB.KAFKA_SCHEMA.kafkatb_batch TO ROLE kafka_connector_role_1;
GRANT OWNERSHIP ON TABLE KAFKADB.KAFKA_SCHEMA.kafkatb_streaming TO ROLE kafka_connector_role_1;

-- Grant the custom role to an existing user.
GRANT ROLE kafka_connector_role_1 TO USER username;

-- Set the custom role as the default role for the user.
-- If you encounter an 'Insufficient privileges' error, verify the role that has the OWNERSHIP privilege on the user.
ALTER USER username SET DEFAULT_ROLE = kafka_connector_role_1;


Ensure you follow the documentation for setting up key pair authentication for the Snowflake Kafka connector.

Deploy Kafka Connect

  • Run Redpanda using Docker Compose.
docker compose -f compose-redpandadata.yaml up -d


  • Once up, navigate to the Redpanda Console.

Redpanda console

  • Click into the cockroachdb topic:

cockroachdb topic

  • Install the Snowflake Kafka connector:
confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:latest


  • Use the following configuration for Kafka Connect in distributed mode, saved as connect-distributed.properties:
bootstrap.servers=172.18.0.3:29092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/usr/share/confluent-hub-components,plugin.path=/usr/local/share/kafka/plugins,/usr/share/filestream-connectors


  • Deploy Kafka Connect in distributed mode:
./kafka-connect/bin/connect-distributed.sh connect-distributed.properties


  • Register the Snowflake connector with the following configuration, saved as snowflake-sink-batch.json:
{
    "name":"snowflake-sink-batch",
    "config":{
      "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
      "tasks.max":"8",
      "topics":"cockroachdb",
      "snowflake.topic2table.map": "cockroachdb:kafkatb_batch",
      "buffer.count.records":"10000",
      "buffer.flush.time":"60",
      "buffer.size.bytes":"5000000",
      "snowflake.url.name":"account-name:443",
      "snowflake.user.name":"username",
      "snowflake.private.key":"private-key",
      "snowflake.private.key.passphrase":"",
      "snowflake.database.name":"kafkadb",
      "snowflake.schema.name":"kafka_schema",
      "snowflake.role.name":"kafka_connector_role_1",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
    }
  }


  • Publish the connector configuration:
curl -d @"snowflake-sink-batch.json" -H "Content-Type: application/json" -X POST http://kafka-connect:8083/connectors


  • Verify the connector in the Kafka Connect UI and in the Kafka Connect section of the Redpanda Console.

Verify the connector in the Kafka Connect UI and in the Kafka Connect section of the Redpanda Console

If you click on the snowflake-sink-batch sink, you can see additional information.

snowflake-sink-batch sink additional information

The comprehensive steps needed to set this up are thoroughly outlined in the tutorial.

Data will now flow into Snowflake in batch mode, with updates occurring every 60 seconds as determined by the buffer.flush.time parameter.

You can now query the data in Snowflake:

select * from kafkatb_batch limit 5;


If everything is configured correctly, the data from CockroachDB should be available in Snowflake in real-time or in batches, depending on your configuration.

  • record_metadata:
{
  "CreateTime": 1725887877966,
  "key": "[3]",
  "offset": 30007,
  "partition": 0,
  "topic": "cockroachdb"
}


  • record_content:
{
  "after": {
    "created_at": "2024-09-09T13:17:57.837984Z",
    "id": 1,
    "updated_at": "2024-09-09T13:17:57.917108Z",
    "value": "UPDATED"
  }
}


  • The next step is to configure the connector in streaming mode. First, stop the current connector with the following command:
curl -X DELETE http://localhost:8083/connectors/snowflake-sink-batch


  • The updated connector configuration will appear as follows:
{
    "name":"snowflake-sink-streaming",
    "config":{
      "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
      "tasks.max":"8",
      "topics":"cockroachdb",
      "snowflake.topic2table.map": "cockroachdb:kafkatb_streaming",
      "buffer.count.records":"10000",
      "buffer.flush.time":"10",
      "buffer.size.bytes":"5000000",
      "snowflake.url.name":"<snowflake-account>:443",
      "snowflake.user.name":"username",
      "snowflake.private.key":"private-key",
      "snowflake.private.key.passphrase":"",
      "snowflake.database.name":"kafkadb",
      "snowflake.schema.name":"kafka_schema",
      "snowflake.role.name":"kafka_connector_role_1",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter":"org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable":"false",
      "snowflake.ingestion.method": "SNOWPIPE_STREAMING",
      "errors.log.enable":"true",
      "schemas.enable":"false"

    }
  }


Take note of the snowflake.ingestion.method parameter. This feature removes the need to wait 60 seconds to push data to Snowflake, allowing us to reduce the buffer.flush.time to 10 seconds.

  • To deploy the connector, use the following command:
curl -d @"snowflake-sink-streaming.json" -H "Content-Type: application/json" -X POST http://kafka-connect:8083/connectors


Shortly after deployment, the data will be available in the Snowflake table.

The previous examples demonstrated how data was ingested into predefined Snowflake tables. The following method will automatically infer the schema from the Kafka messages:

  {
    "name":"snowflake-sink-streaming-schematized",
    "config":{
      "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
      "tasks.max":"8",
      "topics":"cockroachdb",
      "snowflake.topic2table.map": "cockroachdb:kafkatb_streaming_schematized",
      "buffer.count.records":"10000",
      "buffer.flush.time":"10",
      "buffer.size.bytes":"5000000",
      "snowflake.url.name":"<snowflake-account>:443",
      "snowflake.user.name":"username",
      "snowflake.private.key":"private-key",
      "snowflake.private.key.passphrase":"",
      "snowflake.database.name":"kafkadb",
      "snowflake.schema.name":"kafka_schema",
      "snowflake.role.name":"kafka_connector_role_1",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter":"org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable":"false",
      "snowflake.ingestion.method": "SNOWPIPE_STREAMING",
      "errors.log.enable":"true",
      "schemas.enable":"false",
      "snowflake.enable.schematization": "TRUE"
    }
  }


  • Save this as snowflake-sink-streaming-schematized.json and deploy it using:
curl -d @"snowflake-sink-streaming-schematized.json" -H "Content-Type: application/json" -X POST http://kafka-connect:8083/connectors


  • Upon deployment, a new table will be created in Snowflake with the following schema:
create or replace TABLE KAFKADB.KAFKA_SCHEMA.KAFKATB_STREAMING_SCHEMATIZED (
    RECORD_METADATA VARIANT COMMENT 'created by automatic table creation from Snowflake Kafka Connector',
    AFTER VARIANT COMMENT 'column created by schema evolution from Snowflake Kafka Connector'
);


  • To inspect the table, use the following query:
SELECT after AS record FROM kafkatb_streaming_schematized LIMIT 5;


Sample result:

{
  "created_at": "2024-09-09T16:39:34.993226Z",
  "id": 18712,
  "updated_at": null,
  "value": "0d6bd8a4a790aab95c97a084d17bd820"
}


Verify

  • We can flatten the data for easier manipulation using the following query:
USE ROLE securityadmin;
GRANT CREATE VIEW ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1;

USE ROLE kafka_connector_role_1;
USE DATABASE KAFKADB;
USE SCHEMA KAFKA_SCHEMA;
CREATE VIEW v_kafkatb_batch_flattened AS
    SELECT PARSE_JSON(record_content:after):id AS ID,
        PARSE_JSON(record_content:after):value AS VALUE,
        PARSE_JSON(record_content:after):created_at AS CREATED_AT,
        PARSE_JSON(record_content:after):updated_at AS UPDATED_AT
    FROM kafkatb_batch;

SELECT * FROM v_kafkatb_batch_flattened limit 1;
ID    VALUE        CREATED_AT                      UPDATED_AT
1   "UPDATED"    "2024-09-09T13:17:57.837984Z"    "2024-09-09T13:17:57.917108Z"


  • Alternatively, for the schematized table, the view creation statement would be:
CREATE VIEW v_kafkatb_streaming_schematized_flattened AS
    SELECT PARSE_JSON(after):id AS ID,
        PARSE_JSON(after):value AS VALUE,
        PARSE_JSON(after):created_at AS CREATED_AT,
        PARSE_JSON(after):updated_at AS UPDATED_AT
    FROM kafkatb_streaming_schematized;


  • To verify the data flow, make an update in CockroachDB and check for the changes in Snowflake:
UPDATE cockroachdb 
  SET value = 'UPDATED', updated_at = now() 
WHERE  
  id = 20000; 


  • In Snowflake, execute the following query to confirm the update:
SELECT * FROM v_kafkatb_streaming_schematized_flattened where VALUE = 'UPDATED';


Sample result:

ID    VALUE        CREATED_AT                      UPDATED_AT
20000    "UPDATED"    "2024-09-09T18:15:13.460078Z"    "2024-09-09T18:16:56.550778Z"
19999    "UPDATED"    "2024-09-09T18:15:13.460078Z"    "2024-09-09T18:15:27.365272Z"


The architectural diagram is below:

Architectural diagram

Conclusion

In this process, we explored Kafka Connect as a solution to stream changefeeds into Snowflake. This approach provides greater control over how messages are delivered to Snowflake, leveraging the Snowflake Kafka Connector with Snowpipe Streaming for real-time, reliable data ingestion.

CockroachDB kafka Schema Cloud native computing Data warehouse

Published at DZone with permission of Artem Ervits. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Enhancing Performance With Data Modeling: Techniques and Best Practices for Optimization in Snowflake
  • Handling Schema Versioning and Updates in Event Streaming Platforms Without Schema Registries
  • Testing Schema Registry: Spring Boot and Apache Kafka With JSON Schema
  • Building a Real-Time Data Warehouse With TiDB and Pravega

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!