Integrating Apache Doris and Hudi for Data Querying and Migration
Learn how to integrate Apache Doris with Apache Hudi for efficient federated querying, real-time analytics, and seamless data migration in big data environments.
Join the DZone community and get the full member experience.
Join For FreeIn the field of big data analytics, real-time data availability, query performance, and flexibility are crucial. With the rise of the Lakehouse architecture as a new paradigm in big data, integrating Apache Doris, a high-performance real-time analytical database, with Apache Hudi, a data lake management framework focused on incremental data processing, provides powerful federated query and data migration capabilities.
This integration enables seamless collaboration while accelerating enterprise data architecture upgrades. In this article, we will explore how to integrate Apache Doris with Hudi to achieve efficient federated querying and data migration, along with best practices and optimization techniques.
1. Introduction
Apache Doris is a real-time analytical database based on the Massively Parallel Processing (MPP) architecture, known for its high query performance and ease of use. It is widely applied in scenarios such as reporting analytics, ad-hoc queries, unified data warehousing, and accelerating federated queries in data lakes.
Apache Hudi, on the other hand, is an open-source data management framework designed to simplify incremental data processing in data lakes while supporting transactional updates and efficient data management.
By integrating Doris with Hudi, users can leverage Doris's query capabilities to directly access Hudi data (federated query) or migrate Hudi data efficiently into Doris for more complex analysis. This article will provide an in-depth guide on achieving this integration, alongside practical insights and optimization strategies.
Key Features of Doris-Hudi Integration
Apache Doris enhances its ability to read Apache Hudi tables, deeply integrating with Hudi’s real-time data management capabilities to provide efficient, flexible, and cost-effective data querying and analytics solutions. The key features of Doris's Hudi support include:
- Copy on Write (COW) tables: Supports snapshot queries.
- Merge on Read (MOR) tables: Supports snapshot queries and read-optimized queries.
- Time travel: Allows users to access historical data states.
- Incremental read: Efficiently processes data changes.
The combination of Apache Doris’s high-performance query execution and Apache Hudi’s real-time data management features enables not only real-time analytics but also robust data auditing, historical data retrieval, and incremental processing. This integration has been validated in real-world business scenarios by multiple community users.
Use Cases
The integration between Doris and Hudi shines in various real-world applications:
1. Real-Time Data Analytics and Processing
In use cases such as financial transaction analytics, real-time ad-click stream analysis, and e-commerce user behavior analysis, data needs to be continuously updated and queried efficiently.
Hudi ensures real-time data consistency, while Doris’s high-performance querying handles large-scale analytical workloads with ease. The combination of both enables real-time, high-throughput data analytics.
2. Data Auditing and Historical Retrieval
Industries such as finance and healthcare, where data security and accuracy are paramount, require reliable historical data retrieval and auditing. Hudi’s Time Travel functionality allows users to view past states of data, and Doris’s efficient query engine ensures fast retrieval and analysis of data at specific points in time, making it ideal for forensic analysis and compliance auditing.
3. Incremental Data Processing
Big data analytics often involves handling large datasets with frequent updates. Hudi supports incremental data reading, enabling users to process only the changed data instead of performing full-table updates. Doris’s incremental query capabilities further enhance efficiency, significantly reducing resource consumption while improving data processing performance.
4. Federated Queries Across Multiple Data Sources
Enterprise data often resides in multiple databases and diverse data sources. Doris’s Multi-Catalog feature enables seamless mapping and synchronization across different data sources, allowing enterprises to integrate and analyze heterogeneous data efficiently without excessive data movement.
Integrating Apache Doris With Hudi
Doris leverages its multi-catalog capability to integrate with external data sources such as Apache Hudi. Multi-catalog automatically maps metadata from external data sources, enabling federated querying and data ingestion. Below are the integration steps:
Environment and Data Preparation
Environment Setup
Component | Version |
---|---|
Apache Doris | 2.1.4 |
Apache Hudi | 0.14 |
Apache Spark | 3.4.2 |
Apache Hive | 2.1.3 |
MinIO | 2022-05-26T05-48-41Z |
Creating Hudi Tables Using Spark
-- Start Spark
./login-spark.sh
-- Use default database
spark-sql> use default;
-- Create Copy-on-Write (COW) table
spark-sql> CREATE TABLE customer_cow
USING hudi
TBLPROPERTIES (
type = 'cow',
primaryKey = 'c_custkey',
preCombineField = 'c_name'
)
PARTITIONED BY (c_nationkey)
AS SELECT * FROM customer;
-- Create Merge-on-Read (MOR) table
spark-sql> CREATE TABLE customer_mor
USING hudi
TBLPROPERTIES (
type = 'mor',
primaryKey = 'c_custkey',
preCombineField = 'c_name'
)
PARTITIONED BY (c_nationkey)
AS SELECT * FROM customer;
Creating a Hudi Catalog in Doris
1. Execute the following SQL to create a Hudi catalog in Doris:
CREATE CATALOG hudi_catalog PROPERTIES (
"type"="hms",
"hive.metastore.uris"="thrift://hive-metastore:9083",
"s3.access_key"="minio",
"s3.secret_key"="minio123",
"s3.endpoint"="http://minio:9000",
"s3.region"="us-east-1",
"use_path_style"="true"
);
Once configured, Doris automatically scans Hudi’s metadata, allowing users to access Hudi databases and tables through hudi_catalog
.
2. Refresh the catalog to synchronize Hudi tables.
doris> REFRESH CATALOG hudi_catalog;
After this step, any data modifications made in Hudi using Spark will be instantly visible in Doris without requiring a manual refresh.
3. Insert the data into Hudi COW and MOR tables.
When using Spark to manipulate data in Hudi, changes are immediately visible in Doris without requiring a catalog refresh. The following Spark SQL commands insert a row into both Copy-on-Write (COW) and Merge-on-Read (MOR) tables:
spark-sql> insert into customer_cow values (100, "Customer#000000100", "jD2xZzi", "25-430-914-2194", 3471.59, "BUILDING", "cial ideas. final, furious requests", 25);
spark-sql> insert into customer_mor values (100, "Customer#000000100", "jD2xZzi", "25-430-914-2194", 3471.59, "BUILDING", "cial ideas. final, furious requests", 25);
4. Doris can directly query the latest data inserted.
doris> use hudi_catalog.default;
doris> select * from customer_cow where c_custkey = 100;
doris> select * from customer_mor where c_custkey = 100;
5. Inserting data with an existing c_custkey
value updates the record:
spark-sql> insert into customer_cow values (32, "Customer#000000032_update", "jD2xZzi", "25-430-914-2194", 3471.59, "BUILDING", "cial ideas. final, furious requests", 15);
spark-sql> insert into customer_mor values (32, "Customer#000000032_update", "jD2xZzi", "25-430-914-2194", 3471.59, "BUILDING", "cial ideas. final, furious requests", 15);
6. After updating, Doris reflects the changes:
doris> select * from customer_cow where c_custkey = 32;
doris> select * from customer_mor where c_custkey = 32;
7. Incremental read and time travel.
Incremental Read
Hudi supports incremental reads, allowing users to fetch changes within a specific time range. Doris can query the changes made after inserting c_custkey=100
:
doris> select * from customer_cow@incr('beginTime'='20240603015018572'); spark-sql> select * from hudi_table_changes('customer_cow', 'latest_state', '20240603015018572');
doris> select * from customer_mor@incr('beginTime'='20240603015058442'); spark-sql> select * from hudi_table_changes('customer_mor', 'latest_state', '20240603015058442');
Time Travel
Doris enables querying historical snapshots of Hudi data. First, use Spark to retrieve commit history:
spark-sql> call show_commits(table => 'customer_cow', limit => 10); spark-sql> call show_commits(table => 'customer_mor', limit => 10);
Then, use Doris to query the data before a specific snapshot:
doris> select * from customer_cow for time as of '20240603015018572' where c_custkey = 32 or c_custkey = 100; spark-sql> select * from customer_mor timestamp as of '20240603015018572' where c_custkey = 32 or c_custkey = 100;
doris> select * from customer_mor for time as of '20240603015058442' where c_custkey = 32 or c_custkey = 100; spark-sql> select * from customer_mor timestamp as of '20240603015058442' where c_custkey = 32 or c_custkey = 100;
8. Hudi data consists of baseline (merged Parquet files) and incremental data (resulting from INSERT, UPDATE, DELETE). Doris optimally queries baseline data using its native Parquet reader, while incremental data is accessed via JNI and the Hudi Java SDK.
Using EXPLAIN to Verify Optimization
For COW tables, all splits are baseline data, enabling optimal performance:
doris> explain select * from customer_cow where c_custkey = 32;
| 0:VHUDI_SCAN_NODE(68) |
| table: customer_cow |
| predicates: (c_custkey[#5] = 32) |
| inputSplitNum=101, totalFileSize=45338886, scanRanges=101 |
| partition=26/26 |
| cardinality=1, numNodes=1 |
| pushdown agg=NONE |
| hudiNativeReadSplits=101/101 |
For MOR tables, a majority of splits are baseline data, while some require JNI for incremental reads:
doris> explain select * from customer_mor where c_custkey = 32;
| 0:VHUDI_SCAN_NODE(68) |
| table: customer_mor |
| predicates: (c_custkey[#5] = 32) |
| inputSplitNum=101, totalFileSize=45340731, scanRanges=101 |
| partition=26/26 |
| cardinality=1, numNodes=1 |
| pushdown agg=NONE |
| hudiNativeReadSplits=100/101 |
Observing Baseline and Incremental Data Differences
Deleting a record via Spark and verifying Doris’s behavior:
spark-sql> delete from customer_cow where c_custkey = 64;
doris> explain select * from customer_cow where c_custkey = 64;
spark-sql> delete from customer_mor where c_custkey = 64;
doris> explain select * from customer_mor where c_custkey = 64;
By using partition filters, Doris accesses only relevant partitions, improving query efficiency.
-- customer_xxx is partitioned by c_nationkey, we can use the partition column to prune data
doris> explain select * from customer_mor where c_custkey = 64 and c_nationkey = 15;
| 0:VHUDI_SCAN_NODE(68) |
| table: customer_mor |
| predicates: (c_custkey[#5] = 64), (c_nationkey[#12] = 15) |
| inputSplitNum=4, totalFileSize=1798186, scanRanges=4 |
| partition=1/26 |
| cardinality=1, numNodes=1 |
| pushdown agg=NONE |
| hudiNativeReadSplits=3/4 |
Conclusion
Integrating Apache Doris with Apache Hudi offers a powerful combination for real-time analytics, federated queries, and incremental data processing. By leveraging Doris’s high-performance query engine and Hudi’s efficient data lake management, enterprises can streamline data workflows, reduce costs, and enhance analytical capabilities. This integration is a game-changer for organizations looking to unify Lakehouse architectures with real-time querying capabilities.
As we look forward to 2025, I am excited about the milestones ahead. The remarkable progress we’ve achieved is made possible by the incredible support of the Doris community, including its dedicated users and developers.
Opinions expressed by DZone contributors are their own.
Comments