Kafka

数据仓库 PALO

  • 功能发布记录
  • 操作手册1
    • LDAP认证
    • 时区
    • 使用S3-SDK访问对象存储
    • 权限管理
    • 物化视图
    • 变量
    • 资源管理
    • 数据更新与删除
      • 标记删除
      • Sequence-Column
      • 数据更新
      • 数据删除
    • 备份与恢复
      • 备份与恢复
    • 数据导出1
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 数据导出概述
      • Export
    • 数据导出
      • 全量数据导出
      • 导出查询结果集
      • 导出总览
      • 导出数据到外部表
    • 查询加速1
      • 查询缓存
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
    • 数据导入
      • JSON格式数据导入说明
      • 导入本地数据
      • 导入BOS中的数据
      • 导入事务和原子性
      • 通过外部表同步数据
      • 使用JDBC同步数据
      • 列的映射、转换与过滤
      • 订阅Kafka日志
      • 严格模式
      • 导入总览
    • 数据更新与删除1
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 数据导入1
      • 高并发导入优化(Group Commit)
      • 导入概览
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 数据源
        • Kafka
        • S3 兼容存储
        • 从其他 TP 系统迁移数据
        • HDFS
        • 从其他 AP 系统迁移数据
        • Flink
        • 本地文件
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
  • 开发指南
    • 迁移ClickHouse数据
    • Doris集群间数据迁移
    • 数据更新与删除
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 查询加速
      • 查询缓存
      • Colocation Join
      • 高并发点查
      • Hint
        • Hint 概述
        • Leading Hint
        • Distribute Hint
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
      • 高效去重
        • BITMAP 精准去重
        • HLL 近似去重
      • 优化技术原理
        • TOPN 查询优化
        • 统计信息
        • Pipeline 执行引擎
        • 查询优化器介绍
        • Runtime Filter
      • 查询调优概述
        • 调优概述
        • 诊断工具
        • 分析工具
        • 调优流程
      • 查询优化实践
        • 常见调优参数
        • 计划调优
          • 使用 Hint 控制代价改写
          • 使用异步物化视图透明改写
          • 使用 Leading Hint 控制 Join 顺序
          • 优化表 Schema 设计
          • 使用分区裁剪优化扫表
          • 优化索引设计和使用
          • 使用 Hint 调整 Join Shuffle 方式
          • DML 计划调优
          • 使用 Colocate Group 优化 Join
          • 使用同步物化视图透明改写
          • 使用 SQL Cache 加速查询
        • 执行调优
          • 数据倾斜处理
          • RuntimeFilter 的等待时间调整
          • 并行度调优
    • 数据查询
      • 连接(JOIN)
      • 子查询
      • 复杂类型查询
      • 列转行 (Lateral View)
      • MySQL 兼容性
      • 聚合多维分析
      • 分析函数(窗口函数)
      • 公用表表达式(CTE)
      • 自定义函数
        • 别名函数
        • Java UDF, UDAF, UDTF
    • 数据导出
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 最佳实践
      • 数据导出概述
      • Export
    • 数据导入
      • 高并发导入优化(Group Commit)
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 导入最佳实践
      • 数据源
        • Kafka
        • Snowflake
        • S3 兼容存储
        • Google Cloud Storage
        • 从其他 TP 系统迁移数据
        • Azure Storage
        • 腾讯云 COS
        • MinIO
        • HDFS
        • 阿里云 OSS
        • 华为云 OBS
        • 从其他 AP 系统迁移数据
        • Flink
        • Redshift
        • Amazon S3
        • 本地文件
        • BigQuery
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
    • BI工具接入
      • Sugar
      • Navicat
      • Tableau
      • DBeaver
      • 永洪BI
      • FineBI(帆软)
    • 数据库连接
      • 通过 MySQL 协议连接
      • 基于 Arrow Flight SQL 的高速数据传输链路
    • 湖仓一体
      • 分析 S3或HDFS 上的文件
      • 湖仓一体概述
      • SQL 方言兼容
      • 弹性计算节点
      • 云服务认证接入
      • 元数据缓存
      • 外表统计信息
      • 数据缓存
      • 数据库分析
        • MySQL
        • JDBC Catalog
        • Oracle
        • OceanBase
        • SAP HANA
        • 阿里云 MaxCompute
        • ClickHouse
        • PostgreSQL
        • IBM Db2
        • SQL Server
        • Elasticsearch
      • 湖仓一体最佳实践
        • 使用 PALO 和 Paimon
        • 使用 PALO 和 Iceberg
        • 使用 PALO 和 Hudi
        • 使用 PALO 和 LakeSoul
      • 数据湖构建
        • Iceberg
        • Hive
      • 数据湖分析
        • Hudi Catalog
        • 阿里云 DLF
        • Iceberg Catalog
        • Paimon Catalog
        • Hive Catalog
    • 数据表设计
      • 行业混存
      • 数据压缩
      • Schema 变更
      • 数据类型
      • 自增列
      • 概览
      • 数据库建表最佳实践
      • 冷热数据分层
        • SSD 和 HDD 层级存储
        • 远程存储
        • 冷热数据分层概述
      • 表索引
        • 倒排索引
        • 前缀索引与排序键
        • N-Gram 索引
        • BloomFilter 索引
        • 索引概述
      • 数据划分
        • 数据分桶
        • 数据分布概念
        • 动态分区
        • 自动分区
        • 手动分区
        • 常见文档
      • 数据模型
        • 使用注意
        • 模型概述
        • 主键模型
        • 明细模型
        • 聚合模型
  • 版本发布历史
    • 百度数据仓库 Palo 2.0 版本全新发布
  • SQL手册
    • 字面常量
    • 别名
    • SQL-手册
    • 数据类型
    • SQL语句
    • 注释
    • 内置函数
    • 白名单管理
    • SQL操作符
    • 内置函数
      • 聚合函数
      • 位操作函数
      • 字符串函数
      • 条件函数
      • 数学函数
      • JSON解析函数
      • 类型转换函数
      • 格式转换函数
      • 通用函数
      • 时间和日期函数
      • BITMAP函数
      • 窗口函数
      • 哈希函数
      • HLL函数
    • 语法帮助
      • DML
        • INSERT
        • ROUTINE-LOAD
        • RESTORE
        • SELECT-INTO-OUTFILE
        • ALTER-ROUTINE-LOAD
        • BROKER-LOAD
        • BACKUP
        • EXPORT
        • STREAM-LOAD
      • DDL
        • CREATE-FILE
        • DROP-RESOURCE
        • CREATE-RESOURCE
        • CREATE-MATERIALIZED-VIEW
        • DROP-RESROUCE
        • CREATE-TABLE
        • DROP-REPOSITORY
        • CREATE-REPOSITORY
        • CREATE-ODBC-TABLE
      • 信息查看语句
        • SHOW-BACKUP
        • SHOW-ALTER-TABLE-MATERIALIZED-VIEW
        • SHOW-SNAPSHOT
        • SHOW-ROUTINE-LOAD
        • SHOW-CREATE-ROUTINE-LOAD
        • SHOW-ROLES
        • SHOW-GRANTS
        • SHOW-EXPORT
        • SHOW-ROUTINE-LOAD-TASK
        • SHOW-REPOSITORIES
        • SHOW-LOAD
        • SHOW-RESOURCES
        • SHOW-RESTORE
        • SHOW-PROPERTY
        • SHOW-FILE
      • 辅助命令
        • PAUSE-ROUTINE-LOAD
        • STOP-ROUTINE-LOAD
        • ALTER-ROUTINE-LOAD
        • CANCEL-LOAD
        • RESUME-ROUTINE-LOAD
      • 账户管理
        • SET-PROPERTY
        • REVOKE
        • GRANT
        • CREATE-ROLE
        • DROP-ROLE
        • CREATE-USER
        • DROP-USER
        • SET-PASSWORD
  • 快速入门
    • 快速上手
    • 存算分离
    • 存算一体
  • 典型实践
    • 如何开启Debug日志
    • 导入分析
    • 查询分析
  • 操作手册
    • 权限和子用户
    • 存算一体
      • 连接集群
      • 查询分析
      • 监控告警
        • 监控指标
        • 告警配置
      • 备份恢复
        • 通过管理页面备份与恢复
        • 备份与恢复
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 集群创建
        • 停止与删除
        • 重置管理员密码
        • 集群扩缩容
        • 集群详情
    • 存算分离
      • 连接集群
      • 计算组管理
        • 重启计算组
        • 创建计算组
      • 监控告警
        • 监控指标
        • 告警配置
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 停止与删除
        • 创建集群
        • 重置管理员密码
        • 集群详情
  • 服务等级协议SLA
    • 服务等级协议(SLA)v1.0
  • 产品概述
    • 系统架构
    • 产品特点
    • 产品介绍
  • 视频专区
    • 操作指南
    • 产品简介
  • 产品定价
    • 预付费
    • 计费说明
    • 后付费
所有文档
menu
没有找到结果,请重新输入

数据仓库 PALO

  • 功能发布记录
  • 操作手册1
    • LDAP认证
    • 时区
    • 使用S3-SDK访问对象存储
    • 权限管理
    • 物化视图
    • 变量
    • 资源管理
    • 数据更新与删除
      • 标记删除
      • Sequence-Column
      • 数据更新
      • 数据删除
    • 备份与恢复
      • 备份与恢复
    • 数据导出1
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 数据导出概述
      • Export
    • 数据导出
      • 全量数据导出
      • 导出查询结果集
      • 导出总览
      • 导出数据到外部表
    • 查询加速1
      • 查询缓存
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
    • 数据导入
      • JSON格式数据导入说明
      • 导入本地数据
      • 导入BOS中的数据
      • 导入事务和原子性
      • 通过外部表同步数据
      • 使用JDBC同步数据
      • 列的映射、转换与过滤
      • 订阅Kafka日志
      • 严格模式
      • 导入总览
    • 数据更新与删除1
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 数据导入1
      • 高并发导入优化(Group Commit)
      • 导入概览
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 数据源
        • Kafka
        • S3 兼容存储
        • 从其他 TP 系统迁移数据
        • HDFS
        • 从其他 AP 系统迁移数据
        • Flink
        • 本地文件
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
  • 开发指南
    • 迁移ClickHouse数据
    • Doris集群间数据迁移
    • 数据更新与删除
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 查询加速
      • 查询缓存
      • Colocation Join
      • 高并发点查
      • Hint
        • Hint 概述
        • Leading Hint
        • Distribute Hint
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
      • 高效去重
        • BITMAP 精准去重
        • HLL 近似去重
      • 优化技术原理
        • TOPN 查询优化
        • 统计信息
        • Pipeline 执行引擎
        • 查询优化器介绍
        • Runtime Filter
      • 查询调优概述
        • 调优概述
        • 诊断工具
        • 分析工具
        • 调优流程
      • 查询优化实践
        • 常见调优参数
        • 计划调优
          • 使用 Hint 控制代价改写
          • 使用异步物化视图透明改写
          • 使用 Leading Hint 控制 Join 顺序
          • 优化表 Schema 设计
          • 使用分区裁剪优化扫表
          • 优化索引设计和使用
          • 使用 Hint 调整 Join Shuffle 方式
          • DML 计划调优
          • 使用 Colocate Group 优化 Join
          • 使用同步物化视图透明改写
          • 使用 SQL Cache 加速查询
        • 执行调优
          • 数据倾斜处理
          • RuntimeFilter 的等待时间调整
          • 并行度调优
    • 数据查询
      • 连接(JOIN)
      • 子查询
      • 复杂类型查询
      • 列转行 (Lateral View)
      • MySQL 兼容性
      • 聚合多维分析
      • 分析函数(窗口函数)
      • 公用表表达式(CTE)
      • 自定义函数
        • 别名函数
        • Java UDF, UDAF, UDTF
    • 数据导出
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 最佳实践
      • 数据导出概述
      • Export
    • 数据导入
      • 高并发导入优化(Group Commit)
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 导入最佳实践
      • 数据源
        • Kafka
        • Snowflake
        • S3 兼容存储
        • Google Cloud Storage
        • 从其他 TP 系统迁移数据
        • Azure Storage
        • 腾讯云 COS
        • MinIO
        • HDFS
        • 阿里云 OSS
        • 华为云 OBS
        • 从其他 AP 系统迁移数据
        • Flink
        • Redshift
        • Amazon S3
        • 本地文件
        • BigQuery
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
    • BI工具接入
      • Sugar
      • Navicat
      • Tableau
      • DBeaver
      • 永洪BI
      • FineBI(帆软)
    • 数据库连接
      • 通过 MySQL 协议连接
      • 基于 Arrow Flight SQL 的高速数据传输链路
    • 湖仓一体
      • 分析 S3或HDFS 上的文件
      • 湖仓一体概述
      • SQL 方言兼容
      • 弹性计算节点
      • 云服务认证接入
      • 元数据缓存
      • 外表统计信息
      • 数据缓存
      • 数据库分析
        • MySQL
        • JDBC Catalog
        • Oracle
        • OceanBase
        • SAP HANA
        • 阿里云 MaxCompute
        • ClickHouse
        • PostgreSQL
        • IBM Db2
        • SQL Server
        • Elasticsearch
      • 湖仓一体最佳实践
        • 使用 PALO 和 Paimon
        • 使用 PALO 和 Iceberg
        • 使用 PALO 和 Hudi
        • 使用 PALO 和 LakeSoul
      • 数据湖构建
        • Iceberg
        • Hive
      • 数据湖分析
        • Hudi Catalog
        • 阿里云 DLF
        • Iceberg Catalog
        • Paimon Catalog
        • Hive Catalog
    • 数据表设计
      • 行业混存
      • 数据压缩
      • Schema 变更
      • 数据类型
      • 自增列
      • 概览
      • 数据库建表最佳实践
      • 冷热数据分层
        • SSD 和 HDD 层级存储
        • 远程存储
        • 冷热数据分层概述
      • 表索引
        • 倒排索引
        • 前缀索引与排序键
        • N-Gram 索引
        • BloomFilter 索引
        • 索引概述
      • 数据划分
        • 数据分桶
        • 数据分布概念
        • 动态分区
        • 自动分区
        • 手动分区
        • 常见文档
      • 数据模型
        • 使用注意
        • 模型概述
        • 主键模型
        • 明细模型
        • 聚合模型
  • 版本发布历史
    • 百度数据仓库 Palo 2.0 版本全新发布
  • SQL手册
    • 字面常量
    • 别名
    • SQL-手册
    • 数据类型
    • SQL语句
    • 注释
    • 内置函数
    • 白名单管理
    • SQL操作符
    • 内置函数
      • 聚合函数
      • 位操作函数
      • 字符串函数
      • 条件函数
      • 数学函数
      • JSON解析函数
      • 类型转换函数
      • 格式转换函数
      • 通用函数
      • 时间和日期函数
      • BITMAP函数
      • 窗口函数
      • 哈希函数
      • HLL函数
    • 语法帮助
      • DML
        • INSERT
        • ROUTINE-LOAD
        • RESTORE
        • SELECT-INTO-OUTFILE
        • ALTER-ROUTINE-LOAD
        • BROKER-LOAD
        • BACKUP
        • EXPORT
        • STREAM-LOAD
      • DDL
        • CREATE-FILE
        • DROP-RESOURCE
        • CREATE-RESOURCE
        • CREATE-MATERIALIZED-VIEW
        • DROP-RESROUCE
        • CREATE-TABLE
        • DROP-REPOSITORY
        • CREATE-REPOSITORY
        • CREATE-ODBC-TABLE
      • 信息查看语句
        • SHOW-BACKUP
        • SHOW-ALTER-TABLE-MATERIALIZED-VIEW
        • SHOW-SNAPSHOT
        • SHOW-ROUTINE-LOAD
        • SHOW-CREATE-ROUTINE-LOAD
        • SHOW-ROLES
        • SHOW-GRANTS
        • SHOW-EXPORT
        • SHOW-ROUTINE-LOAD-TASK
        • SHOW-REPOSITORIES
        • SHOW-LOAD
        • SHOW-RESOURCES
        • SHOW-RESTORE
        • SHOW-PROPERTY
        • SHOW-FILE
      • 辅助命令
        • PAUSE-ROUTINE-LOAD
        • STOP-ROUTINE-LOAD
        • ALTER-ROUTINE-LOAD
        • CANCEL-LOAD
        • RESUME-ROUTINE-LOAD
      • 账户管理
        • SET-PROPERTY
        • REVOKE
        • GRANT
        • CREATE-ROLE
        • DROP-ROLE
        • CREATE-USER
        • DROP-USER
        • SET-PASSWORD
  • 快速入门
    • 快速上手
    • 存算分离
    • 存算一体
  • 典型实践
    • 如何开启Debug日志
    • 导入分析
    • 查询分析
  • 操作手册
    • 权限和子用户
    • 存算一体
      • 连接集群
      • 查询分析
      • 监控告警
        • 监控指标
        • 告警配置
      • 备份恢复
        • 通过管理页面备份与恢复
        • 备份与恢复
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 集群创建
        • 停止与删除
        • 重置管理员密码
        • 集群扩缩容
        • 集群详情
    • 存算分离
      • 连接集群
      • 计算组管理
        • 重启计算组
        • 创建计算组
      • 监控告警
        • 监控指标
        • 告警配置
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 停止与删除
        • 创建集群
        • 重置管理员密码
        • 集群详情
  • 服务等级协议SLA
    • 服务等级协议(SLA)v1.0
  • 产品概述
    • 系统架构
    • 产品特点
    • 产品介绍
  • 视频专区
    • 操作指南
    • 产品简介
  • 产品定价
    • 预付费
    • 计费说明
    • 后付费
  • 文档中心
  • arrow
  • 数据仓库PALO
  • arrow
  • 操作手册1
  • arrow
  • 数据导入1
  • arrow
  • 数据源
  • arrow
  • Kafka
本页目录
  • 使用 Routine Load 消费 Kafka 数据
  • 使用限制
  • 操作示例
  • 使用 Doris Kafka Connector 消费 Kafka 数据
  • 以 Distributed 模式启动
  • 消费普通数据
  • 消费 Debezium 组件采集的数据
  • 消费 AVRO 序列化格式数据
  • 消费 Protobuf 序列化格式数据

Kafka

更新时间:2025-08-21

Doris 提供以下方式从 Kafka 导入数据:

  • 使用 Routine Load 消费 Kafka 数据

Doris 通过 Routine Load 持续消费 Kafka Topic 中的数据。提交 Routine Load 作业后,Doris 会实时生成导入任务,消费 Kafka 集群中指定 Topic 的消息。Routine Load 支持 CSV 和 JSON 格式,具备 Exactly-Once 语义,确保数据不丢失且不重复。更多文档请参考 Routine Load。

  • Doris Kafka Connector 消费 Kafka 数据

Doris Kafka Connector 是将 Kafka 数据流导入 Doris 数据库的工具。用户可通过 Kafka Connect 插件轻松导入多种序列化格式(如 JSON、Avro、Protobuf),并支持解析 Debezium 组件的数据格式。更多文档请参考 Doris Kafka Connector。

在大多数情况下,可以直接选择 Routine Load 进行数据导入,无需集成外部组件即可消费 Kafka 数据。当需要加载 Avro、Protobuf 格式的数据,或通过 Debezium 采集的上游数据库数据时,可以使用 Doris Kafka Connector。

使用 Routine Load 消费 Kafka 数据

使用限制

支持的消息格式为 CSV 和 JSON。CSV 每个消息为一行,且行尾不包含换行符;

默认支持 Kafka 0.10.0.0 及以上版本。若需使用旧版本(如 0.9.0,0.8.2,0.8.1,0.8.0),需修改 BE 配置,将 kafka_broker_version_fallback 设置为兼容的旧版本,或在创建 Routine Load 时设置 property.broker.version.fallback。使用旧版本可能导致部分新特性无法使用,如根据时间设置 Kafka 分区的 offset。

操作示例

在 Doris 中通过 CREATE ROUTINE LOAD 命令创建常驻 Routine Load 导入任务,分为单表导入和多表导入。详细语法请参考 CREATE ROUTINE LOAD。

单表导入

第 1 步:准备数据

在 Kafka 中,样本数据如下:

Plain Text
1kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-routine-load-csv --from-beginning
21,Emily,25

第 2 步:在库中创建表

在 Doris 中创建被导入的表,具体语法如下:

Plain Text
1CREATE TABLE testdb.test_routineload_tbl(
2    user_id            BIGINT       NOT NULL COMMENT "user id",
3    name               VARCHAR(20)           COMMENT "name",
4    age                INT                   COMMENT "age"
5)
6DUPLICATE KEY(user_id)
7DISTRIBUTED BY HASH(user_id) BUCKETS 10;

第 3 步:创建 Routine Load job 导入数据至单表

在 Doris 中,使用 CREATE ROUTINE LOAD 命令创建导入作业:

Plain Text
1CREATE ROUTINE LOAD testdb.example_routine_load_csv ON test_routineload_tbl
2COLUMNS TERMINATED BY ",",
3COLUMNS(user_id, name, age)
4FROM KAFKA(
5    "kafka_broker_list" = "192.168.88.62:9092",
6    "kafka_topic" = "test-routine-load-csv",
7    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
8);

第 4 步:检查导入数据

Plain Text
1mysql> select * from test_routineload_tbl;
2+-----------+----------------+------+
3| user_id   | name           | age  |
4+-----------+----------------+------+
5|  1        | Emily          | 25   |
6+-----------+----------------+------+
71 rows in set (0.01 sec)

多表导入

对于需要同时导入多张表的场景,Kafka 中的数据必须包含表名信息,格式为:table_name|data。例如,导入 CSV 数据时,格式应为:table_name|val1,val2,val3。请注意,表名必须与 Doris 中的表名完全一致,否则导入将失败,并且不支持后面介绍的 column_mapping 配置。

第 1 步:准备数据

在 Kafka 中,样本数据如下:

Plain Text
1kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-multi-table-load --from-beginning
2test_multi_table_load1|1,Emily,25
3test_multi_table_load2|2,Benjamin,35

第 2 步:在库中创建表

在 Doris 中创建被导入的表,具体语法如下:

表 1:

Plain Text
1CREATE TABLE test_multi_table_load1(
2    user_id            BIGINT       NOT NULL COMMENT "用户 ID",
3    name               VARCHAR(20)           COMMENT "用户姓名",
4    age                INT                   COMMENT "用户年龄"
5)
6DUPLICATE KEY(user_id)
7DISTRIBUTED BY HASH(user_id) BUCKETS 10;

表 2:

Plain Text
1CREATE TABLE test_multi_table_load2(
2    user_id            BIGINT       NOT NULL COMMENT "用户 ID",
3    name               VARCHAR(20)           COMMENT "用户姓名",
4    age                INT                   COMMENT "用户年龄"
5)
6DUPLICATE KEY(user_id)
7DISTRIBUTED BY HASH(user_id) BUCKETS 10;

第 3 步:创建 Routine Load job 导入数据至多表

在 Doris 中,使用 CREATE ROUTINE LOAD 命令创建导入作业:

Plain Text
1CREATE ROUTINE LOAD example_multi_table_load
2COLUMNS TERMINATED BY ","
3FROM KAFKA(
4    "kafka_broker_list" = "192.168.88.62:9092",
5    "kafka_topic" = "test-multi-table-load",
6    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
7);

第 4 步:检查导入数据

Plain Text
1mysql> select * from test_multi_table_load1;
2+------+----------------+------+
3| id   | name           | age  |
4+------+----------------+------+
5|  1   | Emily          | 25   |
6+------+----------------+------+
71 rows in set (0.01 sec)
8
9mysql> select * from test_multi_table_load2;
10+------+----------------+------+
11| id   | name           | age  |
12+------+----------------+------+
13|  2   | Benjamin       | 35   |
14+------+----------------+------+
151 rows in set (0.01 sec)

配置安全认证

有关带有认证的 Kafka 配置方法,请参见 Kafka 安全认证。

使用 Doris Kafka Connector 消费 Kafka 数据

Doris Kafka Connector 是将 Kafka 数据流导入 Doris 数据库的工具。用户可通过 Kafka Connect 插件轻松导入多种序列化格式(如 JSON、Avro、Protobuf),并支持解析 Debezium 组件的数据格式。

以 Distributed 模式启动

Distributed 模式为 Kafka Connect 提供可扩展性和自动容错功能。在此模式下,可以使用相同的 group.id 启动多个工作进程,它们会协调在所有可用工作进程中安排连接器和任务的执行。

  1. 在 $KAFKA_HOME 下创建 plugins 目录,将下载好的 doris-kafka-connector jar 包放入其中。
  2. 配置config/connect-distributed.properties:
Plain Text
1# 修改 kafka server 地址
2bootstrap.servers=127.0.0.1:9092
3
4# 修改 group.id,同一集群的需要一致
5group.id=connect-cluster
6
7# 修改为创建的 plugins 目录
8# 注意:此处请填写 Kafka 的直接路径。例如:plugin.path=/opt/kafka/plugins
9plugin.path=$KAFKA_HOME/plugins
10
11# 建议将 Kafka 的 max.poll.interval.ms 时间调大到 30 分钟以上,默认 5 分钟
12# 避免 Stream Load 导入数据消费超时,消费者被踢出消费群组
13max.poll.interval.ms=1800000
14consumer.max.poll.interval.ms=1800000
  1. 启动:
Plain Text
1$KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties
  1. 消费 Kafka 数据:
Plain Text
1curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
2  "name":"test-doris-sink-cluster",
3  "config":{
4    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
5    "topics":"topic_test",
6    "doris.topic2table.map": "topic_test:test_kafka_tbl",
7    "buffer.count.records":"10000",
8    "buffer.flush.time":"120",
9    "buffer.size.bytes":"5000000",
10    "doris.urls":"10.10.10.1",
11    "doris.user":"root",
12    "doris.password":"",
13    "doris.http.port":"8030",
14    "doris.query.port":"9030",
15    "doris.database":"test_db",
16    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
17    "value.converter":"org.apache.kafka.connect.storage.StringConverter"
18  }
19}'

操作 Kafka Connect

Plain Text
1# 查看 connector 状态
2curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/status -X GET
3# 删除当前 connector
4curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster -X DELETE
5# 暂停当前 connector
6curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/pause -X PUT
7# 重启当前 connector
8curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/resume -X PUT
9# 重启 connector 内的 tasks
10curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/tasks/0/restart -X POST

关于 Distributed 模式的介绍请参见 Distributed Workers。

消费普通数据

  1. 导入数据样本:

在 Kafka 中,样本数据如下:

Plain Text
1kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-data-topic --from-beginning
2{"user_id":1,"name":"Emily","age":25}
3{"user_id":2,"name":"Benjamin","age":35}
4{"user_id":3,"name":"Olivia","age":28}
5{"user_id":4,"name":"Alexander","age":60}
6{"user_id":5,"name":"Ava","age":17}
7{"user_id":6,"name":"William","age":69}
8{"user_id":7,"name":"Sophia","age":32}
9{"user_id":8,"name":"James","age":64}
10{"user_id":9,"name":"Emma","age":37}
11{"user_id":10,"name":"Liam","age":64}
  1. 创建需要导入的表:

在 Doris 中创建被导入的表,具体语法如下:

Plain Text
1CREATE TABLE test_db.test_kafka_connector_tbl(
2    user_id            BIGINT       NOT NULL COMMENT "user id",
3    name               VARCHAR(20)           COMMENT "name",
4    age                INT                   COMMENT "age"
5)
6DUPLICATE KEY(user_id)
7DISTRIBUTED BY HASH(user_id) BUCKETS 12;
  1. 创建导入任务:

在部署 Kafka Connect 的机器上,通过 curl 命令提交如下导入任务:

Plain Text
1curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
2  "name":"test-doris-sink-cluster",
3  "config":{
4    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
5    "tasks.max":"10",
6    "topics":"test-data-topic",
7    "doris.topic2table.map": "test-data-topic:test_kafka_connector_tbl",
8    "buffer.count.records":"10000",
9    "buffer.flush.time":"120",
10    "buffer.size.bytes":"5000000",
11    "doris.urls":"10.10.10.1",
12    "doris.user":"root",
13    "doris.password":"",
14    "doris.http.port":"8030",
15    "doris.query.port":"9030",
16    "doris.database":"test_db",
17    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
18    "value.converter":"org.apache.kafka.connect.storage.StringConverter"
19  }
20}'

消费 Debezium 组件采集的数据

  1. MySQL 数据库中有如下表:
Plain Text
1CREATE TABLE test.test_user (
2  user_id int NOT NULL ,
3  name varchar(20),
4  age int,
5  PRIMARY KEY (user_id)
6) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
7
8insert into test.test_user values(1,'zhangsan',20);
9insert into test.test_user values(2,'lisi',21);
10insert into test.test_user values(3,'wangwu',22);
  1. 在 Doris 创建被导入的表:
Plain Text
1CREATE TABLE test_db.test_user(
2    user_id            BIGINT       NOT NULL COMMENT "user id",
3    name               VARCHAR(20)           COMMENT "name",
4    age                INT                   COMMENT "age"
5)
6UNIQUE KEY(user_id)
7DISTRIBUTED BY HASH(user_id) BUCKETS 12;
  1. 部署 Debezium connector for MySQL 组件,参考:Debezium connector for MySQL。
  2. 创建 doris-kafka-connector 导入任务:

假设通过 Debezium 采集到的 MySQL 表数据在 mysql_debezium.test.test_user Topic 中:

Plain Text
1curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
2  "name":"test-debezium-doris-sink",
3  "config":{
4    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
5    "tasks.max":"10",
6    "topics":"mysql_debezium.test.test_user",
7    "doris.topic2table.map": "mysql_debezium.test.test_user:test_user",
8    "buffer.count.records":"10000",
9    "buffer.flush.time":"120",
10    "buffer.size.bytes":"5000000",
11    "doris.urls":"10.10.10.1",
12    "doris.user":"root",
13    "doris.password":"",
14    "doris.http.port":"8030",
15    "doris.query.port":"9030",
16    "doris.database":"test_db",
17    "converter.mode":"debezium_ingestion",
18    "enable.delete":"true",
19    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
20    "value.converter":"org.apache.kafka.connect.json.JsonConverter"
21  }
22}'

消费 AVRO 序列化格式数据

Plain Text
1curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{ 
2  "name":"doris-avro-test", 
3  "config":{ 
4    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", 
5    "topics":"avro_topic", 
6    "tasks.max":"10",
7    "doris.topic2table.map": "avro_topic:avro_tab", 
8    "buffer.count.records":"100000", 
9    "buffer.flush.time":"120", 
10    "buffer.size.bytes":"10000000", 
11    "doris.urls":"10.10.10.1", 
12    "doris.user":"root", 
13    "doris.password":"", 
14    "doris.http.port":"8030", 
15    "doris.query.port":"9030", 
16    "doris.database":"test", 
17    "load.model":"stream_load",
18    "key.converter":"io.confluent.connect.avro.AvroConverter",
19    "key.converter.schema.registry.url":"http://127.0.0.1:8081",
20    "value.converter":"io.confluent.connect.avro.AvroConverter",
21    "value.converter.schema.registry.url":"http://127.0.0.1:8081"
22  } 
23}'

消费 Protobuf 序列化格式数据

Plain Text
1curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{ 
2  "name":"doris-protobuf-test", 
3  "config":{ 
4    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", 
5    "topics":"proto_topic", 
6    "tasks.max":"10",
7    "doris.topic2table.map": "proto_topic:proto_tab", 
8    "buffer.count.records":"100000", 
9    "buffer.flush.time":"120", 
10    "buffer.size.bytes":"10000000", 
11    "doris.urls":"10.10.10.1", 
12    "doris.user":"root", 
13    "doris.password":"", 
14    "doris.http.port":"8030", 
15    "doris.query.port":"9030", 
16    "doris.database":"test", 
17    "load.model":"stream_load",
18    "key.converter":"io.confluent.connect.protobuf.ProtobufConverter",
19    "key.converter.schema.registry.url":"http://127.0.0.1:8081",
20    "value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
21    "value.converter.schema.registry.url":"http://127.0.0.1:8081"
22  } 
23}'

上一篇
导入时实现数据转换
下一篇
S3 兼容存储