从其他 TP 系统迁移数据
更新时间:2025-08-21
Multi-Catalog
使用 Catalog 映射为外表,然后使用 INSERT INTO 或者 CREATE-TABLE-AS-SELECT 语句,完成数据导入。
以 MySQL 为例:
SQL
1CREATE CATALOG mysql_catalog properties(
2 'type' = 'jdbc',
3 'user' = 'root',
4 'password' = '123456',
5 'jdbc_url' = 'jdbc:mysql://host:3306/mysql_db',
6 'driver_url' = 'mysql-connector-java-8.0.25.jar',
7 'driver_class' = 'com.mysql.cj.jdbc.Driver'
8);
9
10-- 通过 insert 导入
11INSERT INTO internal.doris_db.tbl1
12SELECT * FROM iceberg_catalog.iceberg_db.table1;
13
14-- 通过 ctas 导入
15CREATE TABLE internal.doris_db.tbl1
16PROPERTIES('replication_num' = '1')
17AS
18SELECT * FROM iceberg_catalog.iceberg_db.table1;
Flink Doris Connector
可以借助于 Flink 完成 TP 系统的离线和实时同步。
-
离线同步可以使用 Flink 的 JDBC Source 和 Doris Sink 完成数据的导入,以 FlinkSQL 为例:
SQL1CREATE TABLE student_source ( 2 id INT, 3 name STRING, 4 age INT 5 PRIMARY KEY (id) NOT ENFORCED 6) WITH ( 7 'connector' = 'jdbc', 8 'url' = 'jdbc:mysql://localhost:3306/mydatabase', 9 'table-name' = 'students', 10 'username' = 'username', 11 'password' = 'password', 12); 13 14CREATE TABLE student_sink ( 15 id INT, 16 name STRING, 17 age INT 18 ) 19 WITH ( 20 'connector' = 'doris', 21 'fenodes' = '127.0.0.1:8030', 22 'table.identifier' = 'test.students', 23 'username' = 'root', 24 'password' = 'password', 25 'sink.label-prefix' = 'doris_label' 26); 27 28INSERT into student_sink select * from student_source; -
实时同步可以借助 FlinkCDC,完成全量和增量数据的读取,以 FlinkSQL 为例:
SQL1SET 'execution.checkpointing.interval' = '10s'; 2 3CREATE TABLE cdc_mysql_source ( 4 id int 5 ,name VARCHAR 6 ,PRIMARY KEY (id) NOT ENFORCED 7) WITH ( 8'connector' = 'mysql-cdc', 9'hostname' = '127.0.0.1', 10'port' = '3306', 11'username' = 'root', 12'password' = 'password', 13'database-name' = 'database', 14'table-name' = 'table' 15); 16 17-- 支持同步 insert/update/delete 事件 18CREATE TABLE doris_sink ( 19id INT, 20name STRING 21) 22WITH ( 23 'connector' = 'doris', 24 'fenodes' = '127.0.0.1:8030', 25 'table.identifier' = 'database.table', 26 'username' = 'root', 27 'password' = '', 28 'sink.properties.format' = 'json', 29 'sink.properties.read_json_by_line' = 'true', 30 'sink.enable-delete' = 'true', -- 同步删除事件 31 'sink.label-prefix' = 'doris_label' 32); 33 34insert into doris_sink select id,name from cdc_mysql_source;同时对于 TP 数据库中 整库或者多表的同步操作,可以使用 Flink Doris Connector 提供的整库同步功能,一键完成 TP 数据库的写入,如:
Shell1<FLINK_HOME>bin/flink run \ 2 -Dexecution.checkpointing.interval=10s \ 3 -Dparallelism.default=1 \ 4 -c org.apache.doris.flink.tools.cdc.CdcTools \ 5 lib/flink-doris-connector-1.16-24.0.1.jar \ 6 mysql-sync-database \ 7 --database test_db \ 8 --mysql-conf hostname=127.0.0.1 \ 9 --mysql-conf port=3306 \ 10 --mysql-conf username=root \ 11 --mysql-conf password=123456 \ 12 --mysql-conf database-name=mysql_db \ 13 --including-tables "tbl1|test.*" \ 14 --sink-conf fenodes=127.0.0.1:8030 \ 15 --sink-conf username=root \ 16 --sink-conf password=123456 \ 17 --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ 18 --sink-conf sink.label-prefix=label \ 19 --table-conf replication_num=1
Spark Connector
可以通过 Spark Connector 的 JDBC Source 和 Doris Sink 完成数据的写入。
Java
1val jdbcDF = spark.read
2 .format("jdbc")
3 .option("url", "jdbc:postgresql:dbserver")
4 .option("dbtable", "schema.tablename")
5 .option("user", "username")
6 .option("password", "password")
7 .load()
8
9 jdbcDF.write.format("doris")
10 .option("doris.table.identifier", "db.table")
11 .option("doris.fenodes", "127.0.0.1:8030")
12 .option("user", "root")
13 .option("password", "")
14 .save()
DataX / Seatunnel / CloudCanal 等三方工具
除此之外,也可以使用第三方同步工具来进行数据同步,更多可参考:
- DataX
- Seatunnel
- CloudCanal
