DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Kafka JDBC Source Connector for Large Data
  • Data Platform: Building an Enterprise CDC Solution
  • Kafka Connect: Strategies To Handle Updates and Deletes
  • The Evolution of Adaptive Frameworks

Trending

  • 5 Common Security Pitfalls in Serverless Architectures
  • How to Parse Large XML Files in PHP Without Running Out of Memory
  • Why Round-Robin Won't Save You: Load Balancing Challenges in Data Streaming Services With Heterogeneous Traffic
  • Securing the AI Host: Spring AI MCP Server Communication With API Keys
  1. DZone
  2. Data Engineering
  3. Big Data
  4. The Real-time Data Transfer Magic of Doris Kafka Connector's "Data Package": Part 1

The Real-time Data Transfer Magic of Doris Kafka Connector's "Data Package": Part 1

One Man Stands Guard, and Ten Thousand Cannot Pass! Learn all about real-time data import, transformation, and error handling with Doris Kafka Connector.

By 
Michael Hayden user avatar
Michael Hayden
·
Sep. 12, 25 · Analysis
Likes (1)
Comment
Save
Tweet
Share
2.9K Views

Join the DZone community and get the full member experience.

Join For Free

Apache Doris provides multi-dimensional data ingestion capabilities. In addition to the built-in Routine Load and Flink's support for reading from Kafka and writing to Doris, the Doris Kafka Connector [1], as an extended component of the Kafka Connect ecosystem, not only supports importing Kafka data into Doris but also relies on the vast Kafka Connect ecosystem to achieve the following features [2]:

Rich Format Support

  • Natively parses complex formats such as Avro/Protobuf.
  • Automatically registers and converts schemas.
  • Optimizes the efficient processing of binary data streams.

Heterogeneous Integration of Multiple Data Sources

  • Relational databases: MySQL, Oracle, SQL Server, DB2, Informix, etc.
  • NoSQL databases: MongoDB, Cassandra, etc.
  • Message queue systems: ActiveMQ, IBM MQ, RabbitMQ, etc.
  • Cloud data warehouses: Snowflake, Google BigQuery, Amazon Redshift, etc.

Heterogeneous integration of multiple data sources

CDC Incremental Expansion

  • Supports parsing the data format generated by Debezium to achieve change data capture.
  • Can serve as a supplement to the CDC function of the Doris Flink Connector (Currently, Flink CDC does not support capturing databases such as Informix and Spanner).

This article will briefly introduce the main concepts and application practices of the Doris Kafka Connector.

Introduction to Kafka Connect: The Unsung Hero of Data Flow

Kafka Connect is a core open-source component of the Apache Kafka ecosystem, designed as a standardized data integration platform. It enables efficient data flow between heterogeneous systems through a unified abstract interface.

As a scalable distributed architecture, Kafka Connect achieves technical decoupling through a pluggable connector architecture. Users do not need to code the underlying logic of the data pipeline, but can complete end-to-end data synchronization through declarative configuration (as shown below, users can simply configure to import data into Kafka and export it to Doris). This low-code integration mode significantly reduces the technical threshold, allowing data analysts and business architects to focus on data value mining rather than getting bogged down in the implementation details of the transmission mechanism. Its loosely coupled features are reflected in three aspects:

  1. The upstream and downstream systems are buffered and decoupled through Kafka.
  2. The standardized interface design ensures that the expansion of the data source and the target end do not affect each other, effectively guaranteeing the maintainability of system evolution.
  3. Kafka Connect can also perform lightweight transformations on the data as it passes through, avoiding intrusion into the business logic of the source system.Kafka Connect

Core Concepts of Kafka Connect: The True King of "Connections"

Kafka Connect is typically composed of the following parts:

Connectors: The Super Data Movers

There are two types: source connectors and sink connectors. Source connectors ingest data from databases into Kafka topics, while sink connectors export data from Kafka topics to other systems. The current implementation of Doris Kafka Connect is a SinkConnector, which only supports importing data from Kafka topics into Doris. In addition, Kafka currently provides hundreds of connectors on Confluent Hub, and users can use these connectors to build data pipelines between any systems centered around Kafka.

Tasks

Coordinated by connectors, they are responsible for the actual data replication work. They allow a single job to be broken down into multiple tasks, providing built-in parallel support and scalable data replication capabilities.

Workers

The processes that execute connectors and tasks are divided into two modes: Standalone workers and Distributed workers.

  • Standalone mode: A single process is responsible for executing all connections and tasks, which is suitable for the testing and development stages.
  • Distributed mode: In the distributed mode, Connect can provide scalability and automatic fault tolerance. By using the same group.id, you can start multiple worker processes, which will automatically coordinate and arrange the execution of connectors and tasks to ensure that these operations can be carried out efficiently across all available worker processes. When a new worker process joins, a worker process is shut down, or a worker process fails unexpectedly, the remaining worker processes will automatically detect this change and quickly coordinate to reassign the connectors and tasks to the updated set of available worker processes, ensuring the stable operation of the entire system and the continuity of data processing. The following figure shows the distributed mode architecture of Kafka Connect [3]. Multiple worker processes run in parallel, and each worker contains different connector instances (such as Conn 1 and Conn 2). Each connector instance is split into multiple tasks, and each task is responsible for processing specific data partitions, thereby achieving horizontal expansion and task parallelism and demonstrating high scalability and load balancing characteristics.Workers

Converters: The "Transformers" of Data

They are used to convert data formats between the systems that Connect sends or receives data from. Commonly used Kafka Connect converters include JsonConverter, StringConverter, and ByteArrayConverter. 

In addition to serializing and deserializing data using Apache Avro, Google Protobuf, and JSON Schema, a schema registry must be deployed to manage the Avro schema information and versions (as shown in the following figure [4], the Schema Registry is used to manage schemas). Commonly used ones include the open-source Apicurio Registry and Confluent Schema Registry.

Converters

Transforms: The "Magicians" of Data

They can perform simple modifications and conversions on a single message, and multiple transforms can be configured in a chain within the connector. Common transforms include Filter, ReplaceField, etc. [5].

Dead Letter Queue: The "Rest Area" for Erroneous Data

In a data flow processing system, there may be situations where messages cannot be correctly processed due to various reasons (such as format errors or content not meeting requirements). A Dead-letter Queue (DLQ) is a special type of message queue that temporarily stores messages that the software system cannot process due to errors. It is only applicable to sink connectors, and the working process is shown in the following figure.

Dead letter queue

The errors that can usually be handled by the Kafka Connect dead letter queue are shown in the following table. The dead letter queue provides flexibility by allowing users to set retry strategies, timeouts, and other parameters according to the type of failure, ensuring more precise control over the message flow. For related content, please refer to Errors and Dead Letter Queues and Sink Connector Config.

Connector Lifecycle Stage Description Handled?
start When the connector is first started, it will perform the required initialization operations, such as connecting to the data store. No
poll (for source connector) Reads records from the source data store. No
convert Reads/writes data to/from a Kafka topic and serializes or deserializes JSON/Avro, etc. Yes
transform Applies any configured single-message transformations. Yes
put (for sink connector) Writes records to the target data store. No


Installation and Deployment of Kafka Connect: Let's Build Your Data Connection Bridge Together

This article assumes that Apache Kafka has already been installed. If not, you can refer to the Kafka official documentation for the relevant operations. Kafka Connect supports two startup modes: Standalone and Distributed.

Standalone Mode: I Can Handle It Alone

In the Standalone mode, all configurations are stored in the config/connect-standalone.properties file. In essence, this file is like your battle manual, and you can start operations directly after configuring it. The commonly used configuration items include:

Plain Text
 
# Broker address
bootstrap.servers=<ip:host>

# Offset storage file location
offset.storage.file.filename=./offset_data/offset.connect

# It is recommended to increase the max.poll.interval.ms time of Kafka to more than 30 minutes. The default is 5 minutes.
# To avoid the timeout of data consumption during Stream Load import and the consumer being kicked out of the consumer group.
max.poll.interval.ms=1800000
consumer.max.poll.interval.ms=1800000
offset.flush.interval.ms=10000

# Enable schema support for keys and values
# Used for the structural verification of JSON data
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# Plugin path
plugin.path=./plugins

# Serialization configuration for keys and values
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter


After the configuration is completed, starting Kafka Connect is as simple as executing one command:

Plain Text
 
$KAFKA_HOME/bin/connecto-standalone.sh $KAFKA_HOME/bin/connecto-standalone.properties


Distributed Mode: The Power of Teamwork

In the distributed mode, the worker nodes of Kafka Connect are "fearless". It stores all the status information (including offsets, configurations, and statuses) in Kafka instead of locally. That is to say, even if a node fails, other nodes can recover from the topics in Kafka and continue to work stably! Isn't that cool?

So, if you want to deploy in a large-scale, high-availability environment, the Distributed mode is definitely your best choice. Next, let's see how to implement the deployment.

In practical applications, the Distributed mode is more suitable for large-scale, high-availability production environments. Therefore, in this article, the practice of Doris Kafka Connect mainly focuses on deploying in the Distributed mode.

Deployment Environment: Our Superpower Inventory

Component Version Deployment Node Description
kafka_2.12-3.7 3.7.2 10.16.10.6 Kafka Broker, used for message delivery and storage.
OpenJdk 11 10.16.10.6, 172.21.16.12 Java runtime environment, used to run Kafka and Kafka Connect.
Kafka Connect Worker 3.7.2 10.16.10.6, 172.21.16.12 Distributed worker nodes of Kafka Connect, used for data integration.
Doris Kafka Connect 24.0.0 10.16.10.6, 172.21.16.12 Connector used to synchronize data from Kafka to Doris.
Confluent Kafka Connect Avro Converter 7.8.0 10.16.10.6, 172.21.16.12 Converter used to process data in Avro format.
Kafka Connect File 3.7.2 10.16.10.6, 172.21.16.12 Connector used to read data from files or write data to files.
Confluent Kafka Connect Datagen 0.6.6 10.16.10.6, 172.21.16.12 Connector used to generate test data.
Apache Doris 2.1.8 172.21.16.12


Deployment Steps: Build Your Kafka Connect Empire

1. Create a personalized configuration.

Configure the connect-distributed.properties file on each deployment node (such as 10.16.10.6 and 172.21.16.12). Here, take 10.16.10.6 as an example, and other nodes are similar, just modify the listeners configuration item.

Plain Text
 
# Basic configuration
bootstrap.servers=<ip:port>        # Kafka broker address
group.id=connect-cluster               # Worker group ID
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Storage topic configuration
config.storage.topic=connect-configs    # Configuration storage topic
offset.storage.topic=connect-offsets    # Offset storage topic
status.storage.topic=connect-status     # Status storage topic
config.storage.replication.factor=1     # Configuration topic replication factor
offset.storage.replication.factor=1     # Offset topic replication factor
status.storage.replication.factor=1     # Status topic replication factor

# Worker thread configuration
offset.flush.interval.ms=10000         # Offset flush interval
plugin.path=./plugins  # Plugin path


# REST API configuration
listeners=http://10.16.10.6:8083        # REST API listening address

2. Deploy the plugin.

Download the required Kafka Connect plugins and place them in the plugin.path directory specified in the configuration. This is like putting the superhero's equipment into your battle suit pocket, ready to be used at any time.

  • Download and extract confluentinc-kafka-connect-datagen and other plugins.
  • Place the doris-kafka-connector-24.0.0.jar in the plugins directory.
  • In addition, if the lib directory of Kafka already contains the JAR files related to connect-file, you can directly copy these files from the lib directory to the plugins directory to ensure that Kafka Connect can correctly load these plugins.

Deploy the plugin

Since Kafka Connect can use various converters, transforms, and connectors, to avoid dependency conflicts between different connectors, Kafka Connect provides the following class isolation mechanisms [7]:

[IMPORTANT]
  1. Subdirectory containing JAR and its dependencies: Place the plugin and its dependent JAR files in a subdirectory under the plugin.path configuration path. For example, the two directories of confluentinc-kafka-connect.
  2. Uber-JAR containing the plugin and its dependencies: Package the plugin and all its dependencies into a single JAR file. For example, doris-kafka-connector-24.0.0.jar.
  3. Directory containing the plugin and its dependent class files: Place the class files of the plugin and its dependencies directly in the directory structure that matches the Java package structure.

3. Start Kafka Connect Distributed.

After completing the above operations, execute the following commands on both 172.21.16.12 and 10.16.10.6 to start Kafka Connect:

Plain Text
 
[dev@VM-16-12-centos kafka_2.12-3.7.2]$ bin/connect-distributed.sh -daemon config/connect-distributed.properties

[dev@VM-10-6-centos kafka_2.12-3.7.2]$ bin/connect-distributed.sh -daemon config/connect-distributed.properties

4. Verify Kafka Connect.

After starting Kafka Connect, you can view the relevant information about Kafka Connect through the Rest Api. For more operations, you can refer to the Kafka Connect Rest API. The following shows how to obtain information about Kafka Connect through the Rest api.

  • Check the Kafka Connect version to ensure that you don't fall behind in version updates!

Check the Kafka Connect version

  • Check the plugins loaded by the Kafka Connect Cluster to keep it in full combat mode!

Check the plugins loaded by the Kafka Connect Cluster

The Wonderful Adventure of Data Ingestion With Doris Kafka Connect 

The Guide for JSON Beginners: The Simple and Straightforward Import Method  (Common JSON Data)

Do you want to play with data streams using Kafka and Doris? Let's embark on a relaxed and enjoyable "import journey"! Today, we will import some common JSON data from the orders Topic in Kafka through the Doris Kafka Connector. Since you are a beginner, get ready, and let's start right away!

1. What Does the JSON Data in Kafka Look Like?

Suppose you already have a Kafka topic (such as orders), and the message format inside is as follows:

Plain Text
 
{"order_id": "ORDER_0001", "product_id": "PROD_627", "amount": 576.99, "timestamp": "2025-02-07T09:00:36", "order_source": "Online Order"}
{"order_id": "ORDER_0002", "product_id": "PROD_445", "amount": 579.99, "timestamp": "2025-02-07T09:00:36", "order_source": "Offline Order"}
{"order_id": "ORDER_0003", "product_id": "PROD_718", "amount": 264.99, "timestamp": "2025-02-07T09:00:36", "order_source": "Offline Order"}
{"order_id": "ORDER_0004", "product_id": "PROD_276", "amount": 552.99, "timestamp": "2025-02-07T09:00:36", "order_source": "Offline Order"}


It looks like common JSON data, right? That's exactly it! Now we want to bring them into Doris.

2. Create a Table

Let's start with a table creation statement to prepare for receiving data:

SQL
 
CREATE TABLE `orders` (
  `order_id` varchar(20) NULL,
  `product_id` varchar(20) NULL,
  `amount` decimal(19,2) NULL,
  `timestamp` datetime NULL
) ENGINE=OLAP
UNIQUE KEY(`order_id`)
DISTRIBUTED BY HASH(`order_id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);


The table is created! Now our Doris is ready to receive the JSON data from Kafka.

3. Start the Import

Now, we have entered the key step of the import. Through Kafka Connect, we can easily import the messages in Kafka into Doris. Here are the specific commands to achieve the import operation:

Start the import

Parameter Analysis

Here are the parameters you need to know and their explanations to ensure that you understand each item clearly:

Parameter Name Default Value Required Explanation
name - Yes The application name of Kafka Connect, which must be unique.
connector.class - Yes The type of the connector. Use the Doris Kafka Sink Connector: org.apache.doris.kafka.connector.DorisSinkConnector
topics - Yes The list of Kafka Topics to subscribe to. Multiple Topics are separated by commas: topic1,topic2
doris.urls - Yes The connection addresses of Doris FE (Front End). Multiple addresses are separated by commas: 10.20.30.1,10.20.30.2
doris.http.port - Yes The port for the Doris HTTP protocol. The default value is 8030.
doris.query.port - Yes The port for the Doris MySQL protocol. The default value is 9030.
doris.user - Yes The username used to connect to Doris.
doris.password - Yes The password used to connect to Doris.
doris.database - Yes The Doris database to write to. If there are multiple databases, the topic2table.map needs to configure the specific database name.
doris.topic2table.map - No The mapping relationship between Kafka Topics and Doris tables. The mapping format for multiple Topics and tables: topic1:db1.tbl1
buffer.count.records 10000 No The number of records buffered in memory for each Kafka partition. When this number is reached, the records will be flushed to Doris. The default is 10000.
buffer.flush.time 120 No The buffer flush time interval, in seconds. The default value is 120 seconds.
buffer.size.bytes 5000000 (5MB) No The maximum size of the buffer for each Kafka partition, in bytes. The default value is 5MB.


4. Verify the Import Status

You can verify the task status of Kafka Connect through the following command to see if everything is going smoothly:

Verify the import status

Through this command, you can confirm the running status of the task on the two nodes 10.16 and 172, ensuring that the data is flowing smoothly into Doris.

5. Check the Results in Doris

Finally, query the data in Doris to see if it has arrived as expected:

Check results

Look, the data has been successfully imported into Doris! Don't you feel like you've transformed from a Kafka novice to a data stream expert in an instant?

The Metamorphosis of Data: The Transformation Spell of the ETL Magician (Using Transform)

Xiaodong is a data development engineer at a large e-commerce platform. Recently, he received a challenging task: to conduct sentiment analysis and user behavior pattern mining based on users' evaluations of products. The problem is that these evaluation data come in various forms - there are texts, images, and even videos! With the continuous emergence of new features, the traditional predefined storage table format is no longer sufficient.

Fortunately, Xiaodong has recently come across the variant data type in Doris. It can store complex data structures of various data types (such as integers, strings, booleans, etc.) without the need to define all columns in the table in advance. It's simply a magic weapon for solving such problems!

1. The Table Structure in Doris: The Variant Type in Action

SQL
 
CREATE TABLE `orders_variant` (
  `order_id` BIGINT NOT NULL AUTO_INCREMENT,
  `order_info` variant
) ENGINE=OLAP
UNIQUE KEY(`order_id`)
DISTRIBUTED BY HASH(`order_id`) BUCKETS 3
PROPERTIES (
  "replication_allocation" = "tag.location.default: 1"
);


However, when he saw the data in the following format, Xiaodong frowned again:

Plain Text
 
{"order_id": "ORDER_0001", "product_id": "PROD_491", "amount": 119.99, "timestamp": "2025-02-07T14:00:16", "address": "Address_5168"}
{"order_id": "ORDER_0002", "product_id": "PROD_109", "amount": 365.99, "timestamp": "2025-02-07T14:00:16", "address": "Address_8310", "userid": "User_8677", "email": "[email protected]"}
{"order_id": "ORDER_0003", "product_id": "PROD_417", "amount": 275.99, "timestamp": "2025-02-07T14:00:16", "userid": "User_1350"}
{"order_id": "ORDER_0004", "product_id": "PROD_612", "amount": 220.99, "timestamp": "2025-02-07T14:00:16", "address": "Address_1056", "email": "[email protected]"}
{"order_id": "ORDER_0005", "product_id": "PROD_300", "amount": 709.99, "timestamp": "2025-02-07T14:00:16", "address": "Address_6146"}


The problem is: Xiaodong hopes to import the data into Doris without changing the original data format, but he also wants to recombine these JSON strings into a single attribute (such as order_info). In this way, he can make full use of the Variant data type in Doris to ensure that data processing is both efficient and flexible.

Just as Xiaodong was frowning, his good friend Xiaoliang walked over with a smile and said, "Don't worry, leave this to me. The HoistField tranforms operator in Kafka Connect can handle it!"

Xiaodong's eyes lit up. It turned out that Xiaoliang was referring to a very practical converter in Kafka Connect. It can encapsulate the originally flat data into a single field, making it easier for subsequent data processing.

2. Use the HoistField Converter to Easily Reorganize the Data

Following Xiaoliang's suggestion, Xiaodong used the HoistField transform in Kafka Connect. Before the data is imported into Doris, it will package each field of each record into an order_info attribute. Here are the changes in the data before and after:

Data before processing:

Plain Text
 
{
  "order_id": "ORDER_0001",
  "product_id": "PROD_491",
  "amount": 119.99,
  "timestamp": "2025-02-07T14:00:16",
  "address": "Address_5168"
}


Data after processing:

Plain Text
 
{
  "order_info": {
    "order_id": "ORDER_0001",
    "product_id": "PROD_491",
    "amount": 119.99,
    "timestamp": "2025-02-07T14:00:16",
    "address": "Address_5168"
  }
}


3. Configure the Kafka Connect Rest API

Xiaodong configured Kafka Connect to import the processed data into Doris. The configuration is as follows

Plain Text
 
curl -i http://10.16.10.6:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name":"kf_orders_variant_sink_doris",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"tasks.max":"2",
"topics":"orders_variant",
"doris.topic2table.map": "orders_variant:orders_variant",
"buffer.count.records":"10000",
"buffer.flush.time":"11",
"buffer.size.bytes":"5000000",
"doris.urls":"172.21.16.12",
"doris.user":"root",
"doris.password":"",
"doris.http.port":"38230",
"doris.query.port":"39230",
"doris.database":"testdb",
"transforms": "HoistField",
"transforms.HoistField.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.HoistField.field": "order_info",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}'


The result after importing into Doris: Everything went smoothly, and Xiaodong smiled as he looked at the data.

Finally, after the magical processing of the Doris Kafka Connector, the data was successfully imported into Doris. The data was stored as columns and dynamic sub-columns according to the orders_variant JSON keys and their corresponding values, and the format perfectly met the requirements. Here are the results after the import:

Results after the import


After finishing everything, Xiaodong couldn't wait to learn more about the Kafka Connect transform operators from Xiaoliang. For more information, you can refer to Single Message Transforms.

The Error Recycling Bin: Build a Loving Hut for Lost Data (Dead Letter Queue)

The Doris Kafka Connector supports multiple data serialization formats. However, when the serialization data format of the upstream does not match that of the downstream, the task will stop immediately, and you have to handle it manually and restart.

However, don't panic! We can deal with this gracefully through the Kafka Connect Dead Letter Queue (DLQ). When a data serialization error occurs, the DLQ Topic can store the relevant error information, making it convenient for subsequent troubleshooting. At the same time, with the help of the Topic Header, you can quickly locate the cause of the error. Next, let's happily experience the practical application of the dead letter queue.

1. Example of Error Data

When you expect to import JSON data from the error_orders_json topic as follows into Doris, it is possible that you accidentally insert dirty data. For example, insert an invalid JSON data "invalid-json" into this topic as follows.

Plain Text
 
echo "invalid-json" | ./bin/kafka-console-producer.sh \
  --bootstrap-server 10.16.10.6:29092 \
  --topic error_orders_json


2. Configure the Dead Letter Queue via the Kafka REST API

To prevent the task from terminating due to a single piece of error data, we can add dead letter queue parameters to the Kafka Connector configuration:

Plain Text
 
curl -i http://10.16.10.6:8083/connectors -H "Content-Type: application/json" -X POST -d '{
  "name":"kf-error-json-sink-doris",
  "config":{
    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
    "tasks.max":"2",
    "topics":"error_orders_json",
    "doris.topic2table.map": "error_orders_json:orders_dlq",
    "buffer.count.records":"10000",
    "buffer.flush.time":"11",
    "buffer.size.bytes":"5000000",
    "doris.urls":"172.21.16.12",
    "doris.user":"root",
    "doris.password":"",
    "doris.http.port":"38230",
    "doris.query.port":"39230",
    "doris.database":"testdb",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",

    "errors.tolerance":"all",
    "errors.deadletterqueue


Explanation of parameters:

Parameter Explanation
errors.tolerance Sets the fault tolerance level for message processing. "all" allows skipping error messages, while "none" stops the processing immediately when an error is encountered.
errors.deadletterqueue.topic.name Specifies the name of the Dead Letter Queue (DLQ) Topic. Failed messages will be sent to this Topic.
errors.deadletterqueue.context.headers.enable Determines whether to include context information such as the original Topic, partition, offset, and error information in the dead letter messages.
errors.deadletterqueue.topic.replication.factor Sets the replication factor of the DLQ Topic, which affects the high availability of the data.
errors.log.enable Decides whether to enable error logging, which is convenient for subsequent troubleshooting.


3. How to Consume Error Messages in the Dead Letter Queue

Error messages will be stored in the orders_dlq Topic. We can use the following command to view detailed error information:

Plain Text
 
./bin/kafka-console-consumer.sh \
--bootstrap-server 10.16.10.6:29092 \
--topic orders_dlq --from-beginning \
--property print.key=true \
--property print.value=true \
--property print.headers=true


In the orders_dlq, we can find that the message invalid-json-1 was dropped into the dead letter queue due to a format error:

The message invalid-json-1 was dropped into the dead letter queue due to a format error


The dead letter queue is like a "protection station for lost children" in the data pipeline, providing a place for error data so that it won't affect the stability of the overall system. By reasonably configuring Kafka Connect, we can centrally manage the error data and take remedial measures at an appropriate time. We hope this small feature can help you and make data processing smoother!

Certainly, the application scenarios of the dead letter queue are not limited to these. There are more "frustrating yet comical" data incidents waiting for you to discover [8]:

  • The metamorphosis of data: Schema changes, format mismatches, and the data directly "mutates" and cannot be written in!
  • The target system throws a tantrum: Network glitches, full storage, and the data can't get in at the door!
  • Dirty data causes trouble: Business logic errors, malicious data playing tricks, leaving the system caught off guard!

Want to make the data flow more stable? Set up the dead letter queue!

Summary of the Article

This article provides a comprehensive analysis of the Doris Kafka Connector's basic composition, working principle, and deployment method. Through three practical scenarios - importing common JSON data, utilizing the Transform operator, and utilizing the dead letter queue - it helps users quickly understand its core value in data flow.

It should be emphasized that this article only covers some application scenarios of the Doris Kafka Connector. As an important component of the Apache Doris ecosystem, it not only efficiently connects the Kafka data stream. Compared with the Apache Doris (up to Apache Doris 2.1.x and Apache Doris 3.0.x) Routine Load import, it supports more diverse data formats (such as Avro, Protobuf, JsonSchema, and ByteArray). At the same time, the CDC function can also serve as a powerful supplement to the Doris Flink Connector, further enhancing Doris's capabilities in the field of real-time data processing. Therefore, we will launch a series of articles to comprehensively demonstrate its powerful applications through more practical cases.

Preview of the Next Issue

In the next issue, we will explore how to use Doris Kafka Connect to import relational database data in real-time, and support multiple data formats such as Avro, Protobuf, ByteArray, as well as the data import form of one stream for multiple tables. Stay tuned! We will deeply analyze the technical details and practical experience of data flow to help you master the core points of data import in Apache Doris.

Message of the Article

Thank this era,
It allows us to witness the rapid development of intelligence.
In the AI era,
Data drives innovation and connects everything.
Embrace the future,
In the wave of emerging technologies, be the one who cannot be defined.

References

  1. Doris Kafka Connector
  2. Introduction to Kafka Connect
  3. Distributed workers
  4. Apicurio Registry
  5. Kafka Connect Single Message Transform
  6. Table
  7. KIP-146 - Classloading Isolation in Connect
  8. What is Kafka Dead Letter Queue?
Connector (mathematics) Data (computing) Guard (information security) kafka

Opinions expressed by DZone contributors are their own.

Related

  • Kafka JDBC Source Connector for Large Data
  • Data Platform: Building an Enterprise CDC Solution
  • Kafka Connect: Strategies To Handle Updates and Deletes
  • The Evolution of Adaptive Frameworks

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook