DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Debezium Serialization With Apache Avro and Apicurio Service Registry
  • Handling Schema Versioning and Updates in Event Streaming Platforms Without Schema Registries
  • Kafka JDBC Source Connector for Large Data
  • Data Platform: Building an Enterprise CDC Solution

Trending

  • Scaling Microservices With Docker and Kubernetes on Production
  • Advancing Your Software Engineering Career in 2025
  • Efficient API Communication With Spring WebClient
  • Event-Driven Microservices: How Kafka and RabbitMQ Power Scalable Systems
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Simplify Migrating From Kafka to Pulsar With Kafka Connect Support

Simplify Migrating From Kafka to Pulsar With Kafka Connect Support

In this article, we will discuss streaming sources that push data into Pulsar from another system and sinks that send data from Pulsar to another destination.

By 
Enrico Olivelli user avatar
Enrico Olivelli
·
Jan. 05, 22 · Opinion
Likes (3)
Comment
Save
Tweet
Share
4.4K Views

Join the DZone community and get the full member experience.

Join For Free

Large-scale implementations of any system, such as the event-streaming platform Apache Kafka, often involve customizations and tools and plugins developed in-house. When it’s time to transition from one system to another, the task can become complicated, drawn-out, and error-prone. Often the benefits of an alternative system (which can include significant cost savings and other efficiencies) are outweighed by the risks and costs of migration. As a result, an organization can end up locked into a suboptimal situation, footing a bigger bill than necessary and missing out on modern features that help move the business forward faster. 

These risks and costs can be mitigated by making the transition process iterative, breaking off the vendor lock-in in small, manageable steps, and avoiding the "big bang" switch that often results in delayed delivery and increases the cost of running two systems in parallel for A|B testing. 

Let's take a quick look at the existing ecosystem that helps navigate the transition from Kafka to Apache Pulsar, dive into the new addition to the ecosystem in Pulsar 2.8, and look at important changes in Pulsar IO API and Pulsar Schema API that automate and simplify schema handling in Sinks.

Throughout this article, we will follow the convention of discussing streaming sources, that push data into Pulsar from another system, and sinks, that send data from Pulsar to another destination.

The Current State of the Pulsar-Kafka Ecosystem

Built-In Connectors to Kafka

Built-in connectors simplify pulling/pushing data between Pulsar and Kafka topics. 

This is useful if you want to leave existing systems running on Kafka while building new functionality on Pulsar.

More details are available in the Pulsar documentation:

  • Source (ingest data to Pulsar from Kafka): https://pulsar.apache.org/docs/en/io-kafka-source/ 
  • Sink (send data from Pulsar to Kafka): https://pulsar.apache.org/docs/en/io-kafka-sink/

Kafka on Pulsar

Kafka on Pulsar (KoP) is the recommended way to use the native Kafka client with Pulsar. 

KoP is a protocol handler. This means that it interprets the Kafka protocol at the network level and translates it into Pulsar requests. There are three key advantages to this approach:

  1. KoP works with all Kafka clients.
  2. KoP uses the well-defined interface between Kafka client and server.
  3. Client code does not need to change at all.

Kafka Connect Adaptor

Most people use Kafka (and Pulsar) via connectors to other systems, rather than writing low-level client code by hand. Pulsar has native connectors available for the most popular systems, but as of this writing, there are many more connectors for Kafka that do not yet exist for Pulsar, including private connectors created in-house for use at a single company.

The Kafka Connect Adaptor (KCA) bridges this gap. KCA is a Pulsar Source and Sink that runs a Kafka Connect Sink or Source. The Kafka Connect Adaptor Sink is new in Pulsar 2.8.

Currently, the documentation is scarce, but using KCA is simple. We will look at examples of using both the KCA Sink and Source below.

Using Kafka Connect Adaptor Sink

Using Kafka Connect Adaptor Sink is fairly straightforward. All you need to do is package the Kafka Connect connector, create the configuration, and use it as a regular Pulsar Sink.

Step 1: Package

Use Kafka Connect Adaptor NAR https://github.com/apache/pulsar/tree/master/pulsar-io/kafka-connect-adaptor-nar as a starting point (for simplicity, I'll edit it directly) and add your Kafka Connector Sink to the list of the dependencies in pom.xml. Here’s what this would look like with the Kinesis Kafka connector sink:

diff
 
diff --git a/pulsar-io/kafka-connect-adaptor-nar/pom.xml b/pulsar-io/kafka-connect-adaptor-nar/pom.xml
index ea9bedbd056..c7fa9a1ebca 100644
--- a/pulsar-io/kafka-connect-adaptor-nar/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor-nar/pom.xml
@@ -36,6 +36,11 @@
       pulsar-io-kafka-connect-adaptor
       ${project.version}
     
+ 
+ com.amazonaws
+ amazon-kinesis-kafka-connector
+ 0.0.9-SNAPSHOT
+ 
 


Build the NAR:

$ mvn -f pulsar-io/kafka-connect-adaptor-nar/pom.xml clean package -DskipTests


Step 2: Configuration

The Sink expects "processingGuarantees" to be "EFFECTIVELY_ONCE"`, configs pointing to the Pulsar instance & topic to store processed offsets at, topic to read the data from, and configuration to pass to the Kafka Connect Sink.

For example:

YAML
 
processingGuarantees: "EFFECTIVELY_ONCE"
configs:
  "topic": "my-topic"
  "offsetStorageTopic": "kafka-connect-sink-offset-kinesis"
  "pulsarServiceUrl": "pulsar://localhost:6650/" 
  "kafkaConnectorSinkClass": "com.amazon.kinesis.kafka.AmazonKinesisSinkConnector"
  # The following properties passed directly to Kafka Connect Sink and defined by it
  "kafkaConnectorConfigProperties":
     "name": "test-kinesis-sink"
     'connector.class': "com.amazon.kinesis.kafka.AmazonKinesisSinkConnector"
     "tasks.max": "1"
     "topics": "my-topic"
     "kinesisEndpoint": "kinesis.us-east-1.amazonaws.com"
     "region": "us-east-1"
     "streamName": "test-kinesis"
     "singleKinesisProducerPerPartition": "true"
     "pauseConsumption": "true"
     "maxConnections": "1"


Step 3: Profit!

Follow regular Pulsar's steps to use the packaged connector: https://pulsar.apache.org/docs/en/io-use/ 

Using Kafka Connect Adaptor Source

KCA Source has been available since Pulsar version 2.3.0. In the simplest case, its usage is similar to the KCA Sink’s: add the dependency and build, provide configuration and run.

Currently, KCA Source only supports Sources that return data in Apache Avro or JSON formats. 

For detailed examples of the use of the Source Adaptor please look at Pulsar’s Debezium Connector.

Under the Hood: Building a Better Developer Experience for Pulsar IO

Apache Pulsar 2.8 offers many improvements to the Java Pulsar Schema API and to the Pulsar IO API that helped to fill in the gaps between Kafka Connect and Pulsar IO. These improvements were foundational for Kafka Connect Adaptor Sink and result in easier development of Pulsar IO Sinks in general.

The Kafka Connect user must explicitly configure the Sink (or the Kafka Consumer) deserializer configuration in order to use the correct deserializer, even if the code is not tied to a particular schema. The power of the updated Pulsar Schema API makes everything automatic and removes the need for explicit configurations.

Let’s take a deeper look at the Pulsar IO API improvements below; for more technical details, please refer to the PIP-85.

Runtime Handling of the Schema

We have contributed the support for coding schema-aware Pulsar IO Sinks that do not depend on a particular schema at build time. In other words, in Pulsar 2.7 you had to declare the schema type in your sink:

Java
 
class MySink implements Sink {
     public void write(Record record) {
     }
}


To support “String” and “GenericRecord” (JSON and Avro structures) you had to create two classes and the user who deploys the Sink had to use the “--classname” argument to set the correct implementation for the given topic.

In Pulsar 2.8 you can simply use this syntax:

Java
 
class MySink implements Sink {
      public void write(Record record) {}
}


This sink will work with every schema type and with topics without a schema. It also supports schema evolution and KeyValue schema type.

Seamless Support of KeyValue Messages

The second gap between Kafka Connect and Pulsar IO was the lack of seamless support for KeyValue messages.

For many versions, Pulsar offered the powerful KeyValue schema type that supported setting a schema for the Key and the Value. With a Sink<GenericObject> you can handle the KeyValue schema as well, writing your code only once and keeping it simpler.

Access message and schema details for messages consumed with Schema.AUTO_CONSUME

Pulsar uses a special AUTO_CONSUME schema to validate and deserialize messages using schemas received from the broker. Currently, it supports Avro, JSON, and ProtobufNativeSchema schemas. You can find more details in the documentation https://pulsar.apache.org/docs/en/schema-understand/#auto_consume 

Before Pulsar 2.8, AUTO_CONSUME allowed you to decode the message according to the version of the schema attached to the message but did not allow access to the exact schema definition. Pulsar 2.8 enhances the API by providing access to this information:

Java
 
Schema schema = message.getReaderSchema().get();

org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) schema.getNativeSchema().get();
org.apache.avro.generic.GenericRecord nativeRecord = (org.apache.avro.generic.GenericRecord) consumedRecord.getNativeObject();


Message.getReaderSchema() method returns the actual schema used for decoding the message, even in the case of the special AUTO_CONSUME Schema. Such schema automatically downloads new versions of the Schema while the topic evolves. 

Schema.getNativeSchema() and GenericRecord.getNativeObject() methods provide access to the underlying implementation of the schema and the Java model of the message. In particular, you can access the Avro schema and the Avro GenericObject instance under the covers.

Summing Up

The new Kafka Connect Adaptor completes the Pulsar-Kafka compatibility ecosystem. This ecosystem currently allows an iterative transition from Kafka to Pulsar, supports the use of native Kafka clients with Pulsar, the use of Kafka Connect connectors on Pulsar, and data transfer between two systems. 

With all these great features available, we hope the focus shifts from worrying about the complexity of onboarding Pulsar over existing Kafka implementations to finding new ways their business can benefit from the power of Pulsar. 

kafka Schema avro Connector (mathematics) Data (computing)

Published at DZone with permission of Enrico Olivelli. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Debezium Serialization With Apache Avro and Apicurio Service Registry
  • Handling Schema Versioning and Updates in Event Streaming Platforms Without Schema Registries
  • Kafka JDBC Source Connector for Large Data
  • Data Platform: Building an Enterprise CDC Solution

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!