Broker Load

数据仓库 PALO

  • 功能发布记录
  • 操作手册1
    • LDAP认证
    • 时区
    • 使用S3-SDK访问对象存储
    • 权限管理
    • 物化视图
    • 变量
    • 资源管理
    • 数据更新与删除
      • 标记删除
      • Sequence-Column
      • 数据更新
      • 数据删除
    • 备份与恢复
      • 备份与恢复
    • 数据导出1
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 数据导出概述
      • Export
    • 数据导出
      • 全量数据导出
      • 导出查询结果集
      • 导出总览
      • 导出数据到外部表
    • 查询加速1
      • 查询缓存
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
    • 数据导入
      • JSON格式数据导入说明
      • 导入本地数据
      • 导入BOS中的数据
      • 导入事务和原子性
      • 通过外部表同步数据
      • 使用JDBC同步数据
      • 列的映射、转换与过滤
      • 订阅Kafka日志
      • 严格模式
      • 导入总览
    • 数据更新与删除1
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 数据导入1
      • 高并发导入优化(Group Commit)
      • 导入概览
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 数据源
        • Kafka
        • S3 兼容存储
        • 从其他 TP 系统迁移数据
        • HDFS
        • 从其他 AP 系统迁移数据
        • Flink
        • 本地文件
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
  • 开发指南
    • 迁移ClickHouse数据
    • Doris集群间数据迁移
    • 数据更新与删除
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 查询加速
      • 查询缓存
      • Colocation Join
      • 高并发点查
      • Hint
        • Hint 概述
        • Leading Hint
        • Distribute Hint
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
      • 高效去重
        • BITMAP 精准去重
        • HLL 近似去重
      • 优化技术原理
        • TOPN 查询优化
        • 统计信息
        • Pipeline 执行引擎
        • 查询优化器介绍
        • Runtime Filter
      • 查询调优概述
        • 调优概述
        • 诊断工具
        • 分析工具
        • 调优流程
      • 查询优化实践
        • 常见调优参数
        • 计划调优
          • 使用 Hint 控制代价改写
          • 使用异步物化视图透明改写
          • 使用 Leading Hint 控制 Join 顺序
          • 优化表 Schema 设计
          • 使用分区裁剪优化扫表
          • 优化索引设计和使用
          • 使用 Hint 调整 Join Shuffle 方式
          • DML 计划调优
          • 使用 Colocate Group 优化 Join
          • 使用同步物化视图透明改写
          • 使用 SQL Cache 加速查询
        • 执行调优
          • 数据倾斜处理
          • RuntimeFilter 的等待时间调整
          • 并行度调优
    • 数据查询
      • 连接(JOIN)
      • 子查询
      • 复杂类型查询
      • 列转行 (Lateral View)
      • MySQL 兼容性
      • 聚合多维分析
      • 分析函数(窗口函数)
      • 公用表表达式(CTE)
      • 自定义函数
        • 别名函数
        • Java UDF, UDAF, UDTF
    • 数据导出
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 最佳实践
      • 数据导出概述
      • Export
    • 数据导入
      • 高并发导入优化(Group Commit)
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 导入最佳实践
      • 数据源
        • Kafka
        • Snowflake
        • S3 兼容存储
        • Google Cloud Storage
        • 从其他 TP 系统迁移数据
        • Azure Storage
        • 腾讯云 COS
        • MinIO
        • HDFS
        • 阿里云 OSS
        • 华为云 OBS
        • 从其他 AP 系统迁移数据
        • Flink
        • Redshift
        • Amazon S3
        • 本地文件
        • BigQuery
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
    • BI工具接入
      • Sugar
      • Navicat
      • Tableau
      • DBeaver
      • 永洪BI
      • FineBI(帆软)
    • 数据库连接
      • 通过 MySQL 协议连接
      • 基于 Arrow Flight SQL 的高速数据传输链路
    • 湖仓一体
      • 分析 S3或HDFS 上的文件
      • 湖仓一体概述
      • SQL 方言兼容
      • 弹性计算节点
      • 云服务认证接入
      • 元数据缓存
      • 外表统计信息
      • 数据缓存
      • 数据库分析
        • MySQL
        • JDBC Catalog
        • Oracle
        • OceanBase
        • SAP HANA
        • 阿里云 MaxCompute
        • ClickHouse
        • PostgreSQL
        • IBM Db2
        • SQL Server
        • Elasticsearch
      • 湖仓一体最佳实践
        • 使用 PALO 和 Paimon
        • 使用 PALO 和 Iceberg
        • 使用 PALO 和 Hudi
        • 使用 PALO 和 LakeSoul
      • 数据湖构建
        • Iceberg
        • Hive
      • 数据湖分析
        • Hudi Catalog
        • 阿里云 DLF
        • Iceberg Catalog
        • Paimon Catalog
        • Hive Catalog
    • 数据表设计
      • 行业混存
      • 数据压缩
      • Schema 变更
      • 数据类型
      • 自增列
      • 概览
      • 数据库建表最佳实践
      • 冷热数据分层
        • SSD 和 HDD 层级存储
        • 远程存储
        • 冷热数据分层概述
      • 表索引
        • 倒排索引
        • 前缀索引与排序键
        • N-Gram 索引
        • BloomFilter 索引
        • 索引概述
      • 数据划分
        • 数据分桶
        • 数据分布概念
        • 动态分区
        • 自动分区
        • 手动分区
        • 常见文档
      • 数据模型
        • 使用注意
        • 模型概述
        • 主键模型
        • 明细模型
        • 聚合模型
  • 版本发布历史
    • 百度数据仓库 Palo 2.0 版本全新发布
  • SQL手册
    • 字面常量
    • 别名
    • SQL-手册
    • 数据类型
    • SQL语句
    • 注释
    • 内置函数
    • 白名单管理
    • SQL操作符
    • 内置函数
      • 聚合函数
      • 位操作函数
      • 字符串函数
      • 条件函数
      • 数学函数
      • JSON解析函数
      • 类型转换函数
      • 格式转换函数
      • 通用函数
      • 时间和日期函数
      • BITMAP函数
      • 窗口函数
      • 哈希函数
      • HLL函数
    • 语法帮助
      • DML
        • INSERT
        • ROUTINE-LOAD
        • RESTORE
        • SELECT-INTO-OUTFILE
        • ALTER-ROUTINE-LOAD
        • BROKER-LOAD
        • BACKUP
        • EXPORT
        • STREAM-LOAD
      • DDL
        • CREATE-FILE
        • DROP-RESOURCE
        • CREATE-RESOURCE
        • CREATE-MATERIALIZED-VIEW
        • DROP-RESROUCE
        • CREATE-TABLE
        • DROP-REPOSITORY
        • CREATE-REPOSITORY
        • CREATE-ODBC-TABLE
      • 信息查看语句
        • SHOW-BACKUP
        • SHOW-ALTER-TABLE-MATERIALIZED-VIEW
        • SHOW-SNAPSHOT
        • SHOW-ROUTINE-LOAD
        • SHOW-CREATE-ROUTINE-LOAD
        • SHOW-ROLES
        • SHOW-GRANTS
        • SHOW-EXPORT
        • SHOW-ROUTINE-LOAD-TASK
        • SHOW-REPOSITORIES
        • SHOW-LOAD
        • SHOW-RESOURCES
        • SHOW-RESTORE
        • SHOW-PROPERTY
        • SHOW-FILE
      • 辅助命令
        • PAUSE-ROUTINE-LOAD
        • STOP-ROUTINE-LOAD
        • ALTER-ROUTINE-LOAD
        • CANCEL-LOAD
        • RESUME-ROUTINE-LOAD
      • 账户管理
        • SET-PROPERTY
        • REVOKE
        • GRANT
        • CREATE-ROLE
        • DROP-ROLE
        • CREATE-USER
        • DROP-USER
        • SET-PASSWORD
  • 快速入门
    • 快速上手
    • 存算分离
    • 存算一体
  • 典型实践
    • 如何开启Debug日志
    • 导入分析
    • 查询分析
  • 操作手册
    • 权限和子用户
    • 存算一体
      • 连接集群
      • 查询分析
      • 监控告警
        • 监控指标
        • 告警配置
      • 备份恢复
        • 通过管理页面备份与恢复
        • 备份与恢复
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 集群创建
        • 停止与删除
        • 重置管理员密码
        • 集群扩缩容
        • 集群详情
    • 存算分离
      • 连接集群
      • 计算组管理
        • 重启计算组
        • 创建计算组
      • 监控告警
        • 监控指标
        • 告警配置
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 停止与删除
        • 创建集群
        • 重置管理员密码
        • 集群详情
  • 服务等级协议SLA
    • 服务等级协议(SLA)v1.0
  • 产品概述
    • 系统架构
    • 产品特点
    • 产品介绍
  • 视频专区
    • 操作指南
    • 产品简介
  • 产品定价
    • 预付费
    • 计费说明
    • 后付费
所有文档
menu
没有找到结果,请重新输入

数据仓库 PALO

  • 功能发布记录
  • 操作手册1
    • LDAP认证
    • 时区
    • 使用S3-SDK访问对象存储
    • 权限管理
    • 物化视图
    • 变量
    • 资源管理
    • 数据更新与删除
      • 标记删除
      • Sequence-Column
      • 数据更新
      • 数据删除
    • 备份与恢复
      • 备份与恢复
    • 数据导出1
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 数据导出概述
      • Export
    • 数据导出
      • 全量数据导出
      • 导出查询结果集
      • 导出总览
      • 导出数据到外部表
    • 查询加速1
      • 查询缓存
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
    • 数据导入
      • JSON格式数据导入说明
      • 导入本地数据
      • 导入BOS中的数据
      • 导入事务和原子性
      • 通过外部表同步数据
      • 使用JDBC同步数据
      • 列的映射、转换与过滤
      • 订阅Kafka日志
      • 严格模式
      • 导入总览
    • 数据更新与删除1
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 数据导入1
      • 高并发导入优化(Group Commit)
      • 导入概览
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 数据源
        • Kafka
        • S3 兼容存储
        • 从其他 TP 系统迁移数据
        • HDFS
        • 从其他 AP 系统迁移数据
        • Flink
        • 本地文件
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
  • 开发指南
    • 迁移ClickHouse数据
    • Doris集群间数据迁移
    • 数据更新与删除
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 查询加速
      • 查询缓存
      • Colocation Join
      • 高并发点查
      • Hint
        • Hint 概述
        • Leading Hint
        • Distribute Hint
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
      • 高效去重
        • BITMAP 精准去重
        • HLL 近似去重
      • 优化技术原理
        • TOPN 查询优化
        • 统计信息
        • Pipeline 执行引擎
        • 查询优化器介绍
        • Runtime Filter
      • 查询调优概述
        • 调优概述
        • 诊断工具
        • 分析工具
        • 调优流程
      • 查询优化实践
        • 常见调优参数
        • 计划调优
          • 使用 Hint 控制代价改写
          • 使用异步物化视图透明改写
          • 使用 Leading Hint 控制 Join 顺序
          • 优化表 Schema 设计
          • 使用分区裁剪优化扫表
          • 优化索引设计和使用
          • 使用 Hint 调整 Join Shuffle 方式
          • DML 计划调优
          • 使用 Colocate Group 优化 Join
          • 使用同步物化视图透明改写
          • 使用 SQL Cache 加速查询
        • 执行调优
          • 数据倾斜处理
          • RuntimeFilter 的等待时间调整
          • 并行度调优
    • 数据查询
      • 连接(JOIN)
      • 子查询
      • 复杂类型查询
      • 列转行 (Lateral View)
      • MySQL 兼容性
      • 聚合多维分析
      • 分析函数(窗口函数)
      • 公用表表达式(CTE)
      • 自定义函数
        • 别名函数
        • Java UDF, UDAF, UDTF
    • 数据导出
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 最佳实践
      • 数据导出概述
      • Export
    • 数据导入
      • 高并发导入优化(Group Commit)
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 导入最佳实践
      • 数据源
        • Kafka
        • Snowflake
        • S3 兼容存储
        • Google Cloud Storage
        • 从其他 TP 系统迁移数据
        • Azure Storage
        • 腾讯云 COS
        • MinIO
        • HDFS
        • 阿里云 OSS
        • 华为云 OBS
        • 从其他 AP 系统迁移数据
        • Flink
        • Redshift
        • Amazon S3
        • 本地文件
        • BigQuery
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
    • BI工具接入
      • Sugar
      • Navicat
      • Tableau
      • DBeaver
      • 永洪BI
      • FineBI(帆软)
    • 数据库连接
      • 通过 MySQL 协议连接
      • 基于 Arrow Flight SQL 的高速数据传输链路
    • 湖仓一体
      • 分析 S3或HDFS 上的文件
      • 湖仓一体概述
      • SQL 方言兼容
      • 弹性计算节点
      • 云服务认证接入
      • 元数据缓存
      • 外表统计信息
      • 数据缓存
      • 数据库分析
        • MySQL
        • JDBC Catalog
        • Oracle
        • OceanBase
        • SAP HANA
        • 阿里云 MaxCompute
        • ClickHouse
        • PostgreSQL
        • IBM Db2
        • SQL Server
        • Elasticsearch
      • 湖仓一体最佳实践
        • 使用 PALO 和 Paimon
        • 使用 PALO 和 Iceberg
        • 使用 PALO 和 Hudi
        • 使用 PALO 和 LakeSoul
      • 数据湖构建
        • Iceberg
        • Hive
      • 数据湖分析
        • Hudi Catalog
        • 阿里云 DLF
        • Iceberg Catalog
        • Paimon Catalog
        • Hive Catalog
    • 数据表设计
      • 行业混存
      • 数据压缩
      • Schema 变更
      • 数据类型
      • 自增列
      • 概览
      • 数据库建表最佳实践
      • 冷热数据分层
        • SSD 和 HDD 层级存储
        • 远程存储
        • 冷热数据分层概述
      • 表索引
        • 倒排索引
        • 前缀索引与排序键
        • N-Gram 索引
        • BloomFilter 索引
        • 索引概述
      • 数据划分
        • 数据分桶
        • 数据分布概念
        • 动态分区
        • 自动分区
        • 手动分区
        • 常见文档
      • 数据模型
        • 使用注意
        • 模型概述
        • 主键模型
        • 明细模型
        • 聚合模型
  • 版本发布历史
    • 百度数据仓库 Palo 2.0 版本全新发布
  • SQL手册
    • 字面常量
    • 别名
    • SQL-手册
    • 数据类型
    • SQL语句
    • 注释
    • 内置函数
    • 白名单管理
    • SQL操作符
    • 内置函数
      • 聚合函数
      • 位操作函数
      • 字符串函数
      • 条件函数
      • 数学函数
      • JSON解析函数
      • 类型转换函数
      • 格式转换函数
      • 通用函数
      • 时间和日期函数
      • BITMAP函数
      • 窗口函数
      • 哈希函数
      • HLL函数
    • 语法帮助
      • DML
        • INSERT
        • ROUTINE-LOAD
        • RESTORE
        • SELECT-INTO-OUTFILE
        • ALTER-ROUTINE-LOAD
        • BROKER-LOAD
        • BACKUP
        • EXPORT
        • STREAM-LOAD
      • DDL
        • CREATE-FILE
        • DROP-RESOURCE
        • CREATE-RESOURCE
        • CREATE-MATERIALIZED-VIEW
        • DROP-RESROUCE
        • CREATE-TABLE
        • DROP-REPOSITORY
        • CREATE-REPOSITORY
        • CREATE-ODBC-TABLE
      • 信息查看语句
        • SHOW-BACKUP
        • SHOW-ALTER-TABLE-MATERIALIZED-VIEW
        • SHOW-SNAPSHOT
        • SHOW-ROUTINE-LOAD
        • SHOW-CREATE-ROUTINE-LOAD
        • SHOW-ROLES
        • SHOW-GRANTS
        • SHOW-EXPORT
        • SHOW-ROUTINE-LOAD-TASK
        • SHOW-REPOSITORIES
        • SHOW-LOAD
        • SHOW-RESOURCES
        • SHOW-RESTORE
        • SHOW-PROPERTY
        • SHOW-FILE
      • 辅助命令
        • PAUSE-ROUTINE-LOAD
        • STOP-ROUTINE-LOAD
        • ALTER-ROUTINE-LOAD
        • CANCEL-LOAD
        • RESUME-ROUTINE-LOAD
      • 账户管理
        • SET-PROPERTY
        • REVOKE
        • GRANT
        • CREATE-ROLE
        • DROP-ROLE
        • CREATE-USER
        • DROP-USER
        • SET-PASSWORD
  • 快速入门
    • 快速上手
    • 存算分离
    • 存算一体
  • 典型实践
    • 如何开启Debug日志
    • 导入分析
    • 查询分析
  • 操作手册
    • 权限和子用户
    • 存算一体
      • 连接集群
      • 查询分析
      • 监控告警
        • 监控指标
        • 告警配置
      • 备份恢复
        • 通过管理页面备份与恢复
        • 备份与恢复
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 集群创建
        • 停止与删除
        • 重置管理员密码
        • 集群扩缩容
        • 集群详情
    • 存算分离
      • 连接集群
      • 计算组管理
        • 重启计算组
        • 创建计算组
      • 监控告警
        • 监控指标
        • 告警配置
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 停止与删除
        • 创建集群
        • 重置管理员密码
        • 集群详情
  • 服务等级协议SLA
    • 服务等级协议(SLA)v1.0
  • 产品概述
    • 系统架构
    • 产品特点
    • 产品介绍
  • 视频专区
    • 操作指南
    • 产品简介
  • 产品定价
    • 预付费
    • 计费说明
    • 后付费
  • 文档中心
  • arrow
  • 数据仓库PALO
  • arrow
  • 操作手册1
  • arrow
  • 数据导入1
  • arrow
  • 导入方式
  • arrow
  • Broker Load
本页目录
  • 使用限制
  • 基本原理
  • 快速上手
  • 前置检查
  • 创建导入作业
  • 查看导入作业
  • 取消导入作业
  • 参考手册
  • 导入命令
  • 导入配置参数
  • 常见问题
  • 常见报错
  • S3 Load URL 访问方式
  • S3 Load 临时密钥
  • HDFS 认证方式
  • HDFS HA 模式
  • 其他 Broker 导入
  • 导入示例
  • 导入 HDFS 上的 TXT 文件
  • HDFS 需要配置 NameNode HA 的情况
  • 从 HDFS 导入数据,使用通配符匹配两批文件,分别导入到两个表中
  • 使用通配符从 HDFS 导入一批数据
  • 导入 Parquet 格式数据,指定 FORMAT 为 parquet
  • 导入数据,并提取文件路径中的分区字段
  • 对导入数据进行过滤
  • 导入数据,提取文件路径中的时间分区字段
  • 使用 Merge 方式导入
  • 导入时指定 source_sequence 列,保证替换顺序
  • 从其他 Broker 导入

Broker Load

更新时间:2025-08-21

Broker Load 通过 MySQL API 发起,Doris 会根据 LOAD 语句中的信息,主动从数据源拉取数据。Broker Load 是一个异步导入方式,需要通过 SHOW LOAD 语句查看导入进度和导入结果。

Broker Load 适合源数据存储在远程存储系统,比如对象存储或 HDFS,且数据量比较大的场景。 从 HDFS 或者 S3 直接读取,也可以通过 湖仓一体/TVF 中的 HDFS TVF 或者 S3 TVF 进行导入。基于 TVF 的 Insert Into 当前为同步导入,Broker Load 是一个异步的导入方式。

在 Doris 早期版本中,S3 Load 和 HDFS Load 都是通过 WITH BROKER 连接到具体的 Broker 进程实现的。 随着版本的更新,S3 Load 和 HDFS Load 作为最常用的导入方式得到了优化,现在它们不再依赖额外的 Broker 进程,但仍然使用与 Broker Load 类似的语法。 由于历史原因以及语法上的相似,S3 Load、HDFS Load 和 Broker Load 这三种导入方式被统称为 Broker Load。

使用限制

支持的存储后端:

  • S3 协议
  • HDFS 协议
  • 其他协议(需要相应的 Broker 进程)

支持的数据类型:

  • CSV
  • JSON
  • PARQUET
  • ORC

支持的压缩类型:

  • PLAIN
  • GZ
  • LZO
  • BZ2
  • LZ4FRAME
  • DEFLATE
  • LZOP
  • LZ4BLOCK
  • SNAPPYBLOCK
  • ZLIB
  • ZSTD

基本原理

用户在提交导入任务后,FE 会生成对应的 Plan 并根据目前 BE 的个数和文件的大小,将 Plan 分给 多个 BE 执行,每个 BE 执行一部分导入数据。

BE 在执行的过程中会从 Broker 拉取数据,在对数据 transform 之后将数据导入系统。所有 BE 均完成导入,由 FE 最终决定导入是否成功。

--图片

从上图中可以看到,BE 会依赖 Broker 进程来读取相应远程存储系统的数据。之所以引入 Broker 进程,主要是用来针对不同的远程存储系统,用户可以按照 Broker 进程的标准开发其相应的 Broker 进程,Broker 进程可以使用 Java 程序开发,更好的兼容大数据生态中的各类存储系统。由于 broker 进程和 BE 进程的分离,也确保了两个进程的错误隔离,提升 BE 的稳定性。

当前 BE 内置了对 HDFS 和 S3 两个 Broker 的支持,所以如果从 HDFS 和 S3 中导入数据,则不需要额外启动 Broker 进程。如果有自己定制的 Broker 实现,则需要部署相应的 Broker 进程。

快速上手

本节演示了一个 S3 Load 的例子。具体的使用语法,请参考 SQL 手册中的 Broker Load。

前置检查

  1. Doris 表权限

Broker Load 需要对目标表的 INSERT 权限。如果没有 INSERT 权限,可以通过 GRANT 命令给用户授权。

  1. S3 认证和连接信息

这里以 AWS S3 为例,从其他对象存储系统导入也可以作为参考。

  • AK 和 SK:首先需要找到或者重新生成 AWS Access keys,可以在 AWS console 的 My Security Credentials 找到生成方式。
  • REGION 和 ENDPOINT:REGION 可以在创建桶的时候选择也可以在桶列表中查看到。每个 REGION 的 S3 ENDPOINT 可以通过如下页面查到 AWS 文档。

创建导入作业

  1. 创建 CSV 文件 brokerload_example.csv 文件存储在 S3 上,其内容如下:
Plain Text
11,Emily,25
22,Benjamin,35
33,Olivia,28
44,Alexander,60
55,Ava,17
66,William,69
77,Sophia,32
88,James,64
99,Emma,37
1010,Liam,64
  1. 创建导入 Doris 表

在 Doris 中创建被导入的表,具体语法如下:

Plain Text
1CREATE TABLE testdb.test_brokerload(
2    user_id            BIGINT       NOT NULL COMMENT "user id",
3    name               VARCHAR(20)           COMMENT "name",
4    age                INT                   COMMENT "age"
5)
6DUPLICATE KEY(user_id)
7DISTRIBUTED BY HASH(user_id) BUCKETS 10;
  1. 使用 Broker Load 从 S3 导入数据。其中 bucket 名称和 S3 认证信息要根据实际填写:
Plain Text
1    LOAD LABEL broker_load_2022_04_01
2    (
3        DATA INFILE("s3://your_bucket_name/brokerload_example.csv")
4        INTO TABLE test_brokerload
5        COLUMNS TERMINATED BY ","
6        FORMAT AS "CSV"
7        (user_id, name, age)
8    )
9    WITH S3
10    (
11        "provider" = "S3",
12        "AWS_ENDPOINT" = "s3.us-west-2.amazonaws.com",
13        "AWS_ACCESS_KEY" = "<your-ak>",
14        "AWS_SECRET_KEY"="<your-sk>",
15        "AWS_REGION" = "us-west-2",
16        "compress_type" = "PLAIN"
17    )
18    PROPERTIES
19    (
20        "timeout" = "3600"
21    );

其中 provider 字段需要根据实际的对象存储服务商填写。 Doris 支持的 provider 列表:

  • "S3" (亚马逊 AWS)
  • "AZURE" (微软 Azure)
  • "GCP" (谷歌 GCP)
  • "OSS" (阿里云)
  • "COS" (腾讯云)
  • "OBS" (华为云)
  • "BOS" (百度云)

如不在列表中 (例如 MinIO),可以尝试使用 "S3" (兼容 AWS 模式)

查看导入作业

Broker load 是一个异步的导入方式,具体导入结果可以通过 SHOW LOAD 命令查看

Plain Text
1mysql> show load order by createtime desc limit 1\G;
2*************************** 1. row ***************************
3         JobId: 41326624
4         Label: broker_load_2022_04_01
5         State: FINISHED
6      Progress: ETL:100%; LOAD:100%
7          Type: BROKER
8       EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27
9      TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
10      ErrorMsg: NULL
11    CreateTime: 2022-04-01 18:59:06
12  EtlStartTime: 2022-04-01 18:59:11
13 EtlFinishTime: 2022-04-01 18:59:11
14 LoadStartTime: 2022-04-01 18:59:11
15LoadFinishTime: 2022-04-01 18:59:11
16           URL: NULL
17    JobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[36728051]},"FileNumber":1,"FileSize":5540}
181 row in set (0.01 sec)

取消导入作业

当 Broker load 作业状态不为 CANCELLED 或 FINISHED 时,可以被用户手动取消。取消时需要指定待取消导入任务的 Label。取消导入命令语法可执行 CANCEL LOAD 查看。

例如:取消数据库 demo 上,label 为 broker_load_2022_04_01 的导入作业

Plain Text
1CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_04_01";

参考手册

导入命令

Plain Text
1LOAD LABEL load_label
2(
3data_desc1[, data_desc2, ...]
4)
5WITH [S3|HDFS|BROKER broker_name] 
6[broker_properties]
7[load_properties]
8[COMMENT "comments"];

其中 WITH 子句指定了如何访问存储系统,broker_properties 则是该访问方式的配置参数

  • S3: 使用 S3 协议的存储系统
  • HDFS: 使用 HDFS 协议的存储系统
  • BROKER broker_name: 其他协议的存储系统。可以通过 SHOW BROKER 查看目前可选的 broker_name 列表。更多信息见常见问题中的 "其他 Broker 导入"

导入配置参数

load properties

Property 名称 类型 默认值 说明
"timeout" Long 14400 导入的超时时间,单位秒。范围是 1 秒 ~ 259200 秒。
"max_filter_ratio" Float 0.0 最大容忍可过滤(数据不规范等原因)的数据比例,默认零容忍。取值范围是 0~1。当导入的错误率超过该值,则导入失败。数据不规范不包括通过 where 条件过滤掉的行。
"strict_mode" Boolean false 是否开启严格模式。
"partial_columns" Boolean false 是否使用部分列更新,只在表模型为 Unique Key 且采用 Merge on Write 时有效。
"timezone" String "Asia/Shanghai" 本次导入所使用的时区。该参数会影响所有导入涉及的和时区有关的函数结果。
"load_parallelism" Integer 8 每个 BE 上并发 instance 数量的上限。
"send_batch_parallelism" Integer 1 sink 节点发送数据的并发度,仅在关闭 memtable 前移时生效。
"load_to_single_tablet" Boolean "false" 是否每个分区只导入一个 tablet,默认值为 false。该参数只允许在对带有 random 分桶的 OLAP 表导数的时候设置。
"skip_lines" Integer "0" 跳过 CSV 文件的前几行。当设置 format 设置为 csv_with_names 或 csv_with_names_and_types 时,该参数会失效。
"trim_double_quotes" Boolean "false" 是否裁剪掉导入文件每个字段最外层的双引号。
"priority" "HIGH" 或 "NORMAL" 或 "LOW" "NORMAL" 导入任务的优先级。

fe.conf

下面几个配置属于 Broker load 的系统级别配置,也就是作用于所有 Broker load 导入任务的配置。主要通过修改 fe.conf来调整配置值。

Session Variable 类型 默认值 说明
min_bytes_per_broker_scanner Long 67108864 (64 MB) 一个 Broker Load 作业中单 BE 处理的数据量的最小值,单位:字节。
max_bytes_per_broker_scanner Long 536870912000 (500 GB) 一个 Broker Load 作业中单 BE 处理的数据量的最大值,单位:字节。通常一个导入作业支持的最大数据量为 max_bytes_per_broker_scanner * BE 节点数。如果需要导入更大数据量,则需要适当调整 max_bytes_per_broker_scanner 参数的大小。
max_broker_concurrency Integer 10 限制了一个作业的最大的导入并发数。
default_load_parallelism Integer 8 每个 BE 节点最大并发 instance 数
broker_load_default_timeout_second 14400 Broker Load 导入的默认超时时间,单位:秒。

常见问题

常见报错

1. 导入报错:Scan bytes per broker scanner exceed limit:xxx

请参照文档中最佳实践部分,修改 FE 配置项 max_bytes_per_broker_scanner 和 max_broker_concurrency

2. 导入报错:failed to send batch 或 TabletWriter add batch with unknown id

适当修改 query_timeout 和 streaming_load_rpc_max_alive_time_sec。

3. 导入报错:LOAD_RUN_FAIL; msg:Invalid Column Name:xxx

如果是 PARQUET 或者 ORC 格式的数据,则文件头的列名需要与 doris 表中的列名保持一致,如:

Plain Text
1(tmp_c1,tmp_c2)
2SET
3(
4    id=tmp_c2,
5    name=tmp_c1
6)

代表获取在 parquet 或 orc 中以 (tmp_c1, tmp_c2) 为列名的列,映射到 doris 表中的 (id, name) 列。如果没有设置 set, 则以 column 中的列作为映射。

注:如果使用某些 hive 版本直接生成的 orc 文件,orc 文件中的表头并非 hive meta 数据,而是(_col0, _col1, _col2, ...), 可能导致 Invalid Column Name 错误,那么则需要使用 set 进行映射

4. 导入报错:Failed to get S3 FileSystem for bucket is null/empty

bucket 信息填写不正确或者不存在。或者 bucket 的格式不受支持。使用 GCS 创建带_的桶名时,比如:s3://gs_bucket/load_tbl,S3 Client 访问 GCS 会报错,建议创建 bucket 路径时不使用_。

5. 导入超时

导入的 timeout 默认超时时间为 4 小时。如果超时,不推荐用户将导入最大超时时间直接改大来解决问题。单个导入时间如果超过默认的导入超时时间 4 小时,最好是通过切分待导入文件并且分多次导入来解决问题。因为超时时间设置过大,那么单次导入失败后重试的时间成本很高。

可以通过如下公式计算出 Doris 集群期望最大导入文件数据量:

Plain Text
1期望最大导入文件数据量 = 14400s * 10M/s * BE 个数
2比如:集群的 BE 个数为 10个
3期望最大导入文件数据量 = 14400s * 10M/s * 10 = 1440000M ≈ 1440G
4
5注意:一般用户的环境可能达不到 10M/s 的速度,所以建议超过 500G 的文件都进行文件切分,再导入。

S3 Load URL 访问方式

  • S3 SDK 默认使用 virtual-hosted-style 方式。但某些对象存储系统可能没开启或没支持 virtual-hosted-style 方式的访问,此时我们可以添加 use_path_style 参数来强制使用 path-style 方式:
Plain Text
1  WITH S3
2  (
3        "AWS_ENDPOINT" = "AWS_ENDPOINT",
4        "AWS_ACCESS_KEY" = "AWS_ACCESS_KEY",
5        "AWS_SECRET_KEY"="AWS_SECRET_KEY",
6        "AWS_REGION" = "AWS_REGION",
7        "use_path_style" = "true"
8  )

S3 Load 临时密钥

  • 支持使用临时秘钥 (TOKEN) 访问所有支持 S3 协议的对象存储,用法如下:
Plain Text
1  WITH S3
2  (
3      "AWS_ENDPOINT" = "AWS_ENDPOINT",
4      "AWS_ACCESS_KEY" = "AWS_TEMP_ACCESS_KEY",
5      "AWS_SECRET_KEY" = "AWS_TEMP_SECRET_KEY",
6      "AWS_TOKEN" = "AWS_TEMP_TOKEN",
7      "AWS_REGION" = "AWS_REGION"
8  )

HDFS 认证方式

  1. 简单认证

简单认证即 Hadoop 配置 hadoop.security.authentication 为 simple。

Plain Text
1(
2    "username" = "user",
3    "password" = ""
4);

username 配置为要访问的用户,密码置空即可。

  1. Kerberos 认证

该认证方式需提供以下信息:

  • hadoop.security.authentication:指定认证方式为 Kerberos。
  • hadoop.kerberos.principal:指定 Kerberos 的 principal。
  • hadoop.kerberos.keytab:指定 Kerberos 的 keytab 文件路径。该文件必须为 Broker 进程所在服务器上的文件的绝对路径。并且可以被 Broker 进程访问。
  • kerberos_keytab_content:指定 Kerberos 中 keytab 文件内容经过 base64 编码之后的内容。这个跟 kerberos_keytab 配置二选一即可。

示例如下:

Plain Text
1(
2    "hadoop.security.authentication" = "kerberos",
3    "hadoop.kerberos.principal" = "doris@YOUR.COM",
4    "hadoop.kerberos.keytab" = "/home/doris/my.keytab"
5)
6(
7    "hadoop.security.authentication" = "kerberos",
8    "hadoop.kerberos.principal" = "doris@YOUR.COM",
9    "kerberos_keytab_content" = "ASDOWHDLAWIDJHWLDKSALDJSDIWALD"
10)

采用 Kerberos 认证方式,需要 krb5.conf (opens new window) 文件,krb5.conf 文件包含 Kerberos 的配置信息,通常,应该将 krb5.conf 文件安装在目录/etc 中。可以通过设置环境变量 KRB5_CONFIG 覆盖默认位置。krb5.conf 文件的内容示例如下:

Plain Text
1[libdefaults]
2    default_realm = DORIS.HADOOP
3    default_tkt_enctypes = des3-hmac-sha1 des-cbc-crc
4    default_tgs_enctypes = des3-hmac-sha1 des-cbc-crc
5    dns_lookup_kdc = true
6    dns_lookup_realm = false
7
8[realms]
9    DORIS.HADOOP = {
10        kdc = kerberos-doris.hadoop.service:7005
11    }

HDFS HA 模式

这个配置用于访问以 HA 模式部署的 HDFS 集群。

  • dfs.nameservices:指定 HDFS 服务的名字,自定义,如:"dfs.nameservices" = "my_ha"。
  • dfs.ha.namenodes.xxx:自定义 namenode 的名字,多个名字以逗号分隔。其中 xxx 为 dfs.nameservices 中自定义的名字,如: "dfs.ha.namenodes.my_ha" = "my_nn"。
  • dfs.namenode.rpc-address.xxx.nn:指定 namenode 的 rpc 地址信息。其中 nn 表示 dfs.ha.namenodes.xxx 中配置的 namenode 的名字,如:"dfs.namenode.rpc-address.my_ha.my_nn" = "host:port"。
  • dfs.client.failover.proxy.provider.[nameservice ID]:指定 client 连接 namenode 的 provider,默认为:org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider。

示例如下:

Plain Text
1(
2    "fs.defaultFS" = "hdfs://my_ha",
3    "dfs.nameservices" = "my_ha",
4    "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
5    "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
6    "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
7    "dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
8)

HA 模式可以和前面两种认证方式组合,进行集群访问。如通过简单认证访问 HA HDFS:

Plain Text
1(
2    "username"="user",
3    "password"="passwd",
4    "fs.defaultFS" = "hdfs://my_ha",
5    "dfs.nameservices" = "my_ha",
6    "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
7    "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
8    "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
9    "dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
10)

其他 Broker 导入

其他远端存储系统的 Broker 是 Doris 集群中的可选进程,主要用于支持 Doris 对远端存储中文件和目录的读写。目前,Doris 提供了多种远端存储系统的 Broker 实现。 历史版本中,Doris 还支持过各种对象存储的 Broker,但现在更推荐使用 WITH S3方式来导入对象存储中的数据,而不再推荐使用WITH BROKER。

  • 腾讯云 CHDFS
  • 腾讯云 GFS
  • JuiceFS

Broker 通过提供一个 RPC 服务端口来提供服务,是一个无状态的 Java 进程,负责为远端存储的读写操作封装一些类 POSIX 的文件操作,如 open,pread,pwrite 等等。除此之外,Broker 不记录任何其他信息,所以包括远端存储的连接信息、文件信息、权限信息等等,都需要通过参数在 RPC 调用中传递给 Broker 进程,才能使得 Broker 能够正确读写文件。

Broker 仅作为一个数据通路,并不参与任何计算,因此仅需占用较少的内存。通常一个 Doris 系统中会部署一个或多个 Broker 进程。并且相同类型的 Broker 会组成一个组,并设定一个 名称(Broker name)。

这里主要介绍 Broker 在访问不同远端存储时需要的参数,如连接信息、权限认证信息等等。

Broker 信息

Broker 的信息包括 名称(Broker name)和 认证信息 两部分。通常的语法格式如下:

Plain Text
1WITH BROKER "broker_name" 
2(
3    "username" = "xxx",
4    "password" = "yyy",
5    "other_prop" = "prop_value",
6    ...
7);
  • 名称

通常用户需要通过操作命令中的 WITH BROKER "broker_name"子句来指定一个已经存在的 Broker Name。Broker Name 是用户在通过 ALTER SYSTEM ADD BROKER 命令添加 Broker 进程时指定的一个名称。一个名称通常对应一个或多个 Broker 进程。Doris 会根据名称选择可用的 Broker 进程。用户可以通过 SHOW BROKER 命令查看当前集群中已经存在的 Broker。

备注
Broker Name 只是一个用户自定义名称,不代表 Broker 的类型。

  • 认证信息

不同的 Broker 类型,以及不同的访问方式需要提供不同的认证信息。认证信息通常在 WITH BROKER "broker_name"之后的 Property Map 中以 Key-Value 的方式提供。

导入示例

导入 HDFS 上的 TXT 文件

Plain Text
1LOAD LABEL demo.label_20220402
2(
3    DATA INFILE("hdfs://host:port/tmp/test_hdfs.txt")
4    INTO TABLE `load_hdfs_file_test`
5    COLUMNS TERMINATED BY "\t"            
6    (id,age,name)
7) 
8with HDFS
9(
10  "fs.defaultFS"="hdfs://host:port",
11  "hadoop.username" = "user"
12)
13PROPERTIES
14(
15    "timeout"="1200",
16    "max_filter_ratio"="0.1"
17);

HDFS 需要配置 NameNode HA 的情况

Plain Text
1LOAD LABEL demo.label_20220402
2(
3    DATA INFILE("hdfs://hafs/tmp/test_hdfs.txt")
4    INTO TABLE `load_hdfs_file_test`
5    COLUMNS TERMINATED BY "\t"            
6    (id,age,name)
7) 
8with HDFS
9(
10    "hadoop.username" = "user",
11    "fs.defaultFS"="hdfs://hafs",
12    "dfs.nameservices" = "hafs",
13    "dfs.ha.namenodes.hafs" = "my_namenode1, my_namenode2",
14    "dfs.namenode.rpc-address.hafs.my_namenode1" = "nn1_host:rpc_port",
15    "dfs.namenode.rpc-address.hafs.my_namenode2" = "nn2_host:rpc_port",
16    "dfs.client.failover.proxy.provider.hafs" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
17)
18PROPERTIES
19(
20    "timeout"="1200",
21    "max_filter_ratio"="0.1"
22);

从 HDFS 导入数据,使用通配符匹配两批文件,分别导入到两个表中

Plain Text
1LOAD LABEL example_db.label2
2(
3    DATA INFILE("hdfs://host:port/input/file-10*")
4    INTO TABLE `my_table1`
5    PARTITION (p1)
6    COLUMNS TERMINATED BY ","
7    (k1, tmp_k2, tmp_k3)
8    SET (
9        k2 = tmp_k2 + 1,
10        k3 = tmp_k3 + 1
11    ),
12    DATA INFILE("hdfs://host:port/input/file-20*")
13    INTO TABLE `my_table2`
14    COLUMNS TERMINATED BY ","
15    (k1, k2, k3)
16)
17with HDFS
18(
19  "fs.defaultFS"="hdfs://host:port",
20  "hadoop.username" = "user"
21);

使用通配符匹配导入两批文件 file-10* 和 file-20*。分别导入到 my_table1 和 my_table2 两张表中。其中 my_table1 指定导入到分区 p1 中,并且将导入源文件中第二列和第三列的值 +1 后导入。

使用通配符从 HDFS 导入一批数据

Plain Text
1LOAD LABEL example_db.label3
2(
3    DATA INFILE("hdfs://host:port/user/doris/data/*/*")
4    INTO TABLE `my_table`
5    COLUMNS TERMINATED BY "\\x01"
6)
7with HDFS
8(
9  "fs.defaultFS"="hdfs://host:port",
10  "hadoop.username" = "user"
11);

指定分隔符为 Hive 经常用的默认分隔符\\x01,并使用通配符 * 指定 data目录下所有目录的所有文件。

导入 Parquet 格式数据,指定 FORMAT 为 parquet

Plain Text
1```SQL
2LOAD LABEL example_db.label4
3(
4    DATA INFILE("hdfs://host:port/input/file")
5    INTO TABLE `my_table`
6    FORMAT AS "parquet"
7    (k1, k2, k3)
8)
9with HDFS
10(
11  "fs.defaultFS"="hdfs://host:port",
12  "hadoop.username" = "user"
13);

默认是通过文件后缀判断。

导入数据,并提取文件路径中的分区字段

Plain Text
1LOAD LABEL example_db.label5
2(
3    DATA INFILE("hdfs://host:port/input/city=beijing/*/*")
4    INTO TABLE `my_table`
5    FORMAT AS "csv"
6    (k1, k2, k3)
7    COLUMNS FROM PATH AS (city, utc_date)
8)
9with HDFS
10(
11  "fs.defaultFS"="hdfs://host:port",
12  "hadoop.username" = "user"
13);

my_table 表中的列为 k1, k2, k3, city, utc_date。

其中 hdfs://hdfs_host:hdfs_port/user/doris/data/input/dir/city=beijing 目录下包括如下文件:

Plain Text
1hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-01/0000.csv
2hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-02/0000.csv
3hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-03/0000.csv
4hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-04/0000.csv

文件中只包含 k1, k2, k3 三列数据,city, utc_date 这两列数据会从文件路径中提取。

对导入数据进行过滤

Plain Text
1LOAD LABEL example_db.label6
2(
3    DATA INFILE("hdfs://host:port/input/file")
4    INTO TABLE `my_table`
5    (k1, k2, k3)
6    SET (
7        k2 = k2 + 1
8    )
9    PRECEDING FILTER k1 = 1
10    WHERE k1 > k2
11)
12with HDFS
13(
14  "fs.defaultFS"="hdfs://host:port",
15  "hadoop.username" = "user"
16);

只有原始数据中,k1 = 1,并且转换后,k1 > k2 的行才会被导入。

导入数据,提取文件路径中的时间分区字段

Plain Text
1LOAD LABEL example_db.label7
2(
3    DATA INFILE("hdfs://host:port/user/data/*/test.txt") 
4    INTO TABLE `tbl12`
5    COLUMNS TERMINATED BY ","
6    (k2,k3)
7    COLUMNS FROM PATH AS (data_time)
8    SET (
9        data_time=str_to_date(data_time, '%Y-%m-%d %H%%3A%i%%3A%s')
10    )
11)
12with HDFS
13(
14  "fs.defaultFS"="hdfs://host:port",
15  "hadoop.username" = "user"
16);

提示
时间包含 %3A。在 hdfs 路径中,不允许有 ':',所有 ':' 会由 %3A 替换。

路径下有如下文件:

Plain Text
1/user/data/data_time=2020-02-17 00%3A00%3A00/test.txt
2/user/data/data_time=2020-02-18 00%3A00%3A00/test.txt

表结构为:

Plain Text
1CREATE TABLE IF NOT EXISTS tbl12 (
2    data_time DATETIME,
3    k2        INT,
4    k3        INT
5) DISTRIBUTED BY HASH(data_time) BUCKETS 10
6PROPERTIES (
7    "replication_num" = "3"
8);

使用 Merge 方式导入

Plain Text
1LOAD LABEL example_db.label8
2(
3    MERGE DATA INFILE("hdfs://host:port/input/file")
4    INTO TABLE `my_table`
5    (k1, k2, k3, v2, v1)
6    DELETE ON v2 > 100
7)
8with HDFS
9(
10  "fs.defaultFS"="hdfs://host:port",
11  "hadoop.username"="user"
12)
13PROPERTIES
14(
15    "timeout" = "3600",
16    "max_filter_ratio" = "0.1"
17);

使用 Merge 方式导入。my_table 必须是一张 Unique Key 的表。当导入数据中的 v2 列的值大于 100 时,该行会被认为是一个删除行。导入任务的超时时间是 3600 秒,并且允许错误率在 10% 以内。

导入时指定 source_sequence 列,保证替换顺序

Plain Text
1LOAD LABEL example_db.label9
2(
3    DATA INFILE("hdfs://host:port/input/file")
4    INTO TABLE `my_table`
5    COLUMNS TERMINATED BY ","
6    (k1,k2,source_sequence,v1,v2)
7    ORDER BY source_sequence
8) 
9with HDFS
10(
11  "fs.defaultFS"="hdfs://host:port",
12  "hadoop.username"="user"
13);

my_table必须是 Unique Key 模型表,并且指定了 Sequence 列。数据会按照源数据中 source_sequence 列的值来保证顺序性。

  • 导入指定文件格式为 json,并指定 json_root、jsonpaths
Plain Text
1LOAD LABEL example_db.label10
2(
3    DATA INFILE("hdfs://host:port/input/file.json")
4    INTO TABLE `my_table`
5    FORMAT AS "json"
6    PROPERTIES(
7      "json_root" = "$.item",
8      "jsonpaths" = "[\"$.id\", \"$.city\", \"$.code\"]"
9    )       
10)
11with HDFS
12(
13  "fs.defaultFS"="hdfs://host:port",
14  "hadoop.username"="user"
15);

jsonpaths 也可以与 column list 及 SET (column_mapping)配合使用:

Plain Text
1LOAD LABEL example_db.label10
2(
3    DATA INFILE("hdfs://host:port/input/file.json")
4    INTO TABLE `my_table`
5    FORMAT AS "json"
6    (id, code, city)
7    SET (id = id * 10)
8    PROPERTIES(
9      "json_root" = "$.item",
10      "jsonpaths" = "[\"$.id\", \"$.city\", \"$.code\"]"
11    )       
12)
13with HDFS
14(
15  "fs.defaultFS"="hdfs://host:port",
16  "hadoop.username"="user"
17);

从其他 Broker 导入

  • 阿里云 OSS
Plain Text
1(
2    "fs.oss.accessKeyId" = "",
3    "fs.oss.accessKeySecret" = "",
4    "fs.oss.endpoint" = ""
5)
  • 百度云 BOS

当前使用 BOS 时需要下载相应的 SDK 包,具体配置与使用,可以参考 BOS HDFS 官方文档。在下载完成并解压后将 jar 包放到 broker 的 lib 目录下。

Plain Text
1(
2    "fs.bos.access.key" = "xx",
3    "fs.bos.secret.access.key" = "xx",
4    "fs.bos.endpoint" = "xx"
5)
  • 华为云 OBS
Plain Text
1(
2    "fs.obs.access.key" = "xx",
3    "fs.obs.secret.key" = "xx",
4    "fs.obs.endpoint" = "xx"
5)
  • JuiceFS
Plain Text
1(
2    "fs.defaultFS" = "jfs://xxx/",
3    "fs.jfs.impl" = "io.juicefs.JuiceFileSystem",
4    "fs.AbstractFileSystem.jfs.impl" = "io.juicefs.JuiceFS",
5    "juicefs.meta" = "xxx",
6    "juicefs.access-log" = "xxx"
7)
  • GCS

在使用 Broker 访问 GCS 时,Project ID 是必须的,其他参数可选,所有参数配置请参考 GCS Config

Plain Text
1(
2    "fs.gs.project.id" = "Your Project ID",
3    "fs.AbstractFileSystem.gs.impl" = "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
4    "fs.gs.impl" = "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
5)

上一篇
数据源
下一篇
MySQL Load