DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
Securing Your Software Supply Chain with JFrog and Azure
Register Today

Trending

  • From On-Prem to SaaS
  • Essential Architecture Framework: In the World of Overengineering, Being Essential Is the Answer
  • Implementing a Serverless DevOps Pipeline With AWS Lambda and CodePipeline
  • Using Render Log Streams to Log to Papertrail

Trending

  • From On-Prem to SaaS
  • Essential Architecture Framework: In the World of Overengineering, Being Essential Is the Answer
  • Implementing a Serverless DevOps Pipeline With AWS Lambda and CodePipeline
  • Using Render Log Streams to Log to Papertrail
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Using CockroachDB CDC With Apache Pulsar

Using CockroachDB CDC With Apache Pulsar

As of this writing, CockroachDB does not have a direct integration with Apache Pulsar. Here, integrate CockroachDB CDC with Pulsar via the existing Kafka sink.

Artem Ervits user avatar by
Artem Ervits
CORE ·
Apr. 12, 23 · Tutorial
Like (3)
Save
Tweet
Share
7.58K Views

Join the DZone community and get the full member experience.

Join For Free

Previous Articles on CockroachDB CDC

  • Using CockroachDB CDC with Azure Event Hubs
  • Using CockroachDB CDC with Confluent Cloud Kafka and Schema Registry
  • SaaS Galore: Integrating CockroachDB with Confluent Kafka, Fivetran, and Snowflake
  • CockroachDB CDC using Minio as a cloud storage sink
  • CockroachDB CDC using Hadoop Ozone S3 Gateway as a cloud storage sink

Motivation

Apache Pulsar is a cloud-native distributed messaging and streaming platform. In my customer conversations, it most often comes up when compared to Apache Kafka. I have a customer needing a Pulsar sink support as they rely on Pulsar's multi-region capabilities. CockroachDB does not have a native Pulsar sink; however, the Pulsar project supports Kafka on Pulsar protocol support, and that's the core of today's article.

This tutorial assumes you have an enterprise license, you can also leverage our managed offerings where enterprise changefeeds are enabled by default. I am going to demonstrate the steps using a Docker environment instead.

High-Level Steps

  • Deploy Apache Pulsar
  • Deploy a CockroachDB cluster with enterprise changefeeds
  • Deploy a Kafka Consumer
  • Verify
  • Conclusion

Step-By-Step Instructions

Deploy Apache Pulsar

Since I'm using Docker, I'm relying on the KoP Docker Compose environment provided by the Stream Native platform, which spearheads the development of Apache Pulsar.

I've used the service taken from the KoP example almost as-is aside from a few differences:

pulsar:
    container_name: pulsar
    hostname: pulsar
    image: streamnative/sn-pulsar:2.11.0.5
    command: >
      bash -c "bin/apply-config-from-env.py conf/standalone.conf &&
      exec bin/pulsar standalone -nss -nfw" # disable stream storage and functions worker
    environment:
      allowAutoTopicCreationType: partitioned
      brokerDeleteInactiveTopicsEnabled: "false"
      PULSAR_PREFIX_messagingProtocols: kafka
      PULSAR_PREFIX_kafkaListeners: PLAINTEXT://pulsar:9092
      PULSAR_PREFIX_brokerEntryMetadataInterceptors: org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
      PULSAR_PREFIX_webServicePort: "8088"
    ports:
      - 6650:6650
      - 8088:8088
      - 9092:9092


I removed PULSAR_PREFIX_kafkaAdvertisedListeners: PLAINTEXT://127.0.0.1:19092 as I don't need it, I also changed the exposed port for - 19092:9092 to - 9092:9092. My PULSAR_PREFIX_kafkaListeners address points to a Docker container with the hostname pulsar. I will need to access the address from other containers and I can't rely on the localhost. I'm also using a more recent version of the image than the one in their docs.

Deploy a CockroachDB Cluster With Enterprise Changefeeds

I am using a 3-node cluster in Docker. If you've followed my previous articles, you should be familiar with it.

I am using Flyway to set up the schema and seed the tables. The actual schema and data are taken from the changefeed examples we have in our docs. The only difference is I'm using a database called example.

To enable CDC we need to execute the following commands:

SET CLUSTER SETTING cluster.organization = '<organization name>';

SET CLUSTER SETTING enterprise.license = '<secret>';

SET CLUSTER SETTING kv.rangefeed.enabled = true;


Again, if you don't have an enterprise license, you won't be able to complete this tutorial. Feel free to use our Dedicated or Serverless instances if you want to follow along.

Finally, after the tables and the data are in place, we can create a changefeed on these tables.

CREATE CHANGEFEED FOR TABLE office_dogs, employees INTO 'kafka://pulsar:9092';


Here I am using the Kafka port and the address of the Pulsar cluster, in my case pulsar.

        job_id
----------------------
  855538618543276035
(1 row)

NOTICE: changefeed will emit to topic office_dogs
NOTICE: changefeed will emit to topic employees

Time: 50ms total (execution 49ms / network 1ms)


Everything seems to work and changefeed does not error out.

Deploy a Kafka Consumer

To validate data is being written to Pulsar, we need to stand up a Kafka client. I've created an image that downloads and installs Kafka. Once the entire Docker Compose environment is running, we can access the client and run the console consumer to verify.

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server pulsar:9092 --topic office_dogs --from-beginning
{"after": {"id": 1, "name": "Petee H"}}
{"after": {"id": 2, "name": "Carl"}}


If we want to validate that new data is flowing, let's insert another record into CockroachDB:

INSERT INTO office_dogs VALUES (3, 'Test');


The consumer will print a new row:

{"after": {"id": 3, "name": "Test"}}


Since we've created two topics, let's now look at the employees topic.

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server pulsar:9092 --topic employees --from-beginning
{"after": {"dog_id": 1, "employee_name": "Lauren", "rowid": 855539880336523267}}
{"after": {"dog_id": 2, "employee_name": "Spencer", "rowid": 855539880336654339}}


Similarly, let's update a record and see the changes propagate to Pulsar.

UPDATE employees SET employee_name = 'Spencer Kimball' WHERE dog_id = 2;
{"after": {"dog_id": 2, "employee_name": "Spencer Kimball", "rowid": 855539880336654339}}


Verify

We've confirmed we can produce messages to Pulsar topics using the Kafka protocol via KoP. We've also confirmed we can consume using the Kafka console consumer. We can also use the native Pulsar tooling to confirm the data is consumable from Pulsar. I installed the Pulsar Python client, pip install pulsar-client, on the Kafka client machine and created a Python script with the following code:

import pulsar

client = pulsar.Client('pulsar://pulsar:6650')
consumer = client.subscribe('employees',
                            subscription_name='my-sub')

while True:
    msg = consumer.receive()
    print("Received message: '%s'" % msg.data())
    consumer.acknowledge(msg)

client.close()


I execute the script:

root@kafka-client:/opt/kafka# python3 consume_messages.py 
2023-04-11 14:17:21.761 INFO  [281473255101472] Client:87 | Subscribing on Topic :employees
2023-04-11 14:17:21.762 INFO  [281473255101472] ClientConnection:190 | [<none> -> pulsar://pulsar:6650] Create ClientConnection, timeout=10000
2023-04-11 14:17:21.762 INFO  [281473255101472] ConnectionPool:97 | Created connection for pulsar://pulsar:6650
2023-04-11 14:17:21.763 INFO  [281473230237984] ClientConnection:388 | [172.28.0.3:33826 -> 172.28.0.6:6650] Connected to broker
2023-04-11 14:17:21.771 INFO  [281473230237984] HandlerBase:72 | [persistent://public/default/employees-partition-0, my-sub, 0] Getting connection from pool
2023-04-11 14:17:21.776 INFO  [281473230237984] ClientConnection:190 | [<none> -> pulsar://pulsar:6650] Create ClientConnection, timeout=10000
2023-04-11 14:17:21.776 INFO  [281473230237984] ConnectionPool:97 | Created connection for pulsar://localhost:6650
2023-04-11 14:17:21.776 INFO  [281473230237984] ClientConnection:390 | [172.28.0.3:33832 -> 172.28.0.6:6650] Connected to broker through proxy. Logical broker: pulsar://localhost:6650
2023-04-11 14:17:21.786 INFO  [281473230237984] ConsumerImpl:238 | [persistent://public/default/employees-partition-0, my-sub, 0] Created consumer on broker [172.28.0.3:33832 -> 172.28.0.6:6650] 
2023-04-11 14:17:21.786 INFO  [281473230237984] MultiTopicsConsumerImpl:274 | Successfully Subscribed to a single partition of topic in TopicsConsumer. Partitions need to create : 0
2023-04-11 14:17:21.786 INFO  [281473230237984] MultiTopicsConsumerImpl:137 | Successfully Subscribed to Topics


Let's insert a record into the employees tables:

INSERT INTO employees (dog_id, employee_name) VALUES (3, 'Test');
UPDATE employees SET employee_name = 'Artem' WHERE dog_id = 3;


The Pulsar client output is as follows:

Received message: 'b'{"after": {"dog_id": 3, "employee_name": "Test", "rowid": 855745376561364994}}''
Received message: 'b'{"after": {"dog_id": 3, "employee_name": "Artem", "rowid": 855745376561364994}}''


Conclusion

This is how you can leverage existing CockroachDB capability with non-standard services like Apache Pulsar. Hopefully, you've found this article useful and can start leveraging the existing Kafka sink with non-standard message brokers.

CockroachDB Docker (software) kafka Cloud native computing

Published at DZone with permission of . See the original article here.

Opinions expressed by DZone contributors are their own.

Trending

  • From On-Prem to SaaS
  • Essential Architecture Framework: In the World of Overengineering, Being Essential Is the Answer
  • Implementing a Serverless DevOps Pipeline With AWS Lambda and CodePipeline
  • Using Render Log Streams to Log to Papertrail

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com

Let's be friends: