ROUTINE-LOAD

数据仓库 PALO

  • 功能发布记录
  • 操作手册1
    • LDAP认证
    • 时区
    • 使用S3-SDK访问对象存储
    • 权限管理
    • 物化视图
    • 变量
    • 资源管理
    • 数据更新与删除
      • 标记删除
      • Sequence-Column
      • 数据更新
      • 数据删除
    • 备份与恢复
      • 备份与恢复
    • 数据导出1
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 数据导出概述
      • Export
    • 数据导出
      • 全量数据导出
      • 导出查询结果集
      • 导出总览
      • 导出数据到外部表
    • 查询加速1
      • 查询缓存
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
    • 数据导入
      • JSON格式数据导入说明
      • 导入本地数据
      • 导入BOS中的数据
      • 导入事务和原子性
      • 通过外部表同步数据
      • 使用JDBC同步数据
      • 列的映射、转换与过滤
      • 订阅Kafka日志
      • 严格模式
      • 导入总览
    • 数据更新与删除1
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 数据导入1
      • 高并发导入优化(Group Commit)
      • 导入概览
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 数据源
        • Kafka
        • S3 兼容存储
        • 从其他 TP 系统迁移数据
        • HDFS
        • 从其他 AP 系统迁移数据
        • Flink
        • 本地文件
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
  • 开发指南
    • 迁移ClickHouse数据
    • Doris集群间数据迁移
    • 数据更新与删除
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 查询加速
      • 查询缓存
      • Colocation Join
      • 高并发点查
      • Hint
        • Hint 概述
        • Leading Hint
        • Distribute Hint
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
      • 高效去重
        • BITMAP 精准去重
        • HLL 近似去重
      • 优化技术原理
        • TOPN 查询优化
        • 统计信息
        • Pipeline 执行引擎
        • 查询优化器介绍
        • Runtime Filter
      • 查询调优概述
        • 调优概述
        • 诊断工具
        • 分析工具
        • 调优流程
      • 查询优化实践
        • 常见调优参数
        • 计划调优
          • 使用 Hint 控制代价改写
          • 使用异步物化视图透明改写
          • 使用 Leading Hint 控制 Join 顺序
          • 优化表 Schema 设计
          • 使用分区裁剪优化扫表
          • 优化索引设计和使用
          • 使用 Hint 调整 Join Shuffle 方式
          • DML 计划调优
          • 使用 Colocate Group 优化 Join
          • 使用同步物化视图透明改写
          • 使用 SQL Cache 加速查询
        • 执行调优
          • 数据倾斜处理
          • RuntimeFilter 的等待时间调整
          • 并行度调优
    • 数据查询
      • 连接(JOIN)
      • 子查询
      • 复杂类型查询
      • 列转行 (Lateral View)
      • MySQL 兼容性
      • 聚合多维分析
      • 分析函数(窗口函数)
      • 公用表表达式(CTE)
      • 自定义函数
        • 别名函数
        • Java UDF, UDAF, UDTF
    • 数据导出
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 最佳实践
      • 数据导出概述
      • Export
    • 数据导入
      • 高并发导入优化(Group Commit)
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 导入最佳实践
      • 数据源
        • Kafka
        • Snowflake
        • S3 兼容存储
        • Google Cloud Storage
        • 从其他 TP 系统迁移数据
        • Azure Storage
        • 腾讯云 COS
        • MinIO
        • HDFS
        • 阿里云 OSS
        • 华为云 OBS
        • 从其他 AP 系统迁移数据
        • Flink
        • Redshift
        • Amazon S3
        • 本地文件
        • BigQuery
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
    • BI工具接入
      • Sugar
      • Navicat
      • Tableau
      • DBeaver
      • 永洪BI
      • FineBI(帆软)
    • 数据库连接
      • 通过 MySQL 协议连接
      • 基于 Arrow Flight SQL 的高速数据传输链路
    • 湖仓一体
      • 分析 S3或HDFS 上的文件
      • 湖仓一体概述
      • SQL 方言兼容
      • 弹性计算节点
      • 云服务认证接入
      • 元数据缓存
      • 外表统计信息
      • 数据缓存
      • 数据库分析
        • MySQL
        • JDBC Catalog
        • Oracle
        • OceanBase
        • SAP HANA
        • 阿里云 MaxCompute
        • ClickHouse
        • PostgreSQL
        • IBM Db2
        • SQL Server
        • Elasticsearch
      • 湖仓一体最佳实践
        • 使用 PALO 和 Paimon
        • 使用 PALO 和 Iceberg
        • 使用 PALO 和 Hudi
        • 使用 PALO 和 LakeSoul
      • 数据湖构建
        • Iceberg
        • Hive
      • 数据湖分析
        • Hudi Catalog
        • 阿里云 DLF
        • Iceberg Catalog
        • Paimon Catalog
        • Hive Catalog
    • 数据表设计
      • 行业混存
      • 数据压缩
      • Schema 变更
      • 数据类型
      • 自增列
      • 概览
      • 数据库建表最佳实践
      • 冷热数据分层
        • SSD 和 HDD 层级存储
        • 远程存储
        • 冷热数据分层概述
      • 表索引
        • 倒排索引
        • 前缀索引与排序键
        • N-Gram 索引
        • BloomFilter 索引
        • 索引概述
      • 数据划分
        • 数据分桶
        • 数据分布概念
        • 动态分区
        • 自动分区
        • 手动分区
        • 常见文档
      • 数据模型
        • 使用注意
        • 模型概述
        • 主键模型
        • 明细模型
        • 聚合模型
  • 版本发布历史
    • 百度数据仓库 Palo 2.0 版本全新发布
  • SQL手册
    • 字面常量
    • 别名
    • SQL-手册
    • 数据类型
    • SQL语句
    • 注释
    • 内置函数
    • 白名单管理
    • SQL操作符
    • 内置函数
      • 聚合函数
      • 位操作函数
      • 字符串函数
      • 条件函数
      • 数学函数
      • JSON解析函数
      • 类型转换函数
      • 格式转换函数
      • 通用函数
      • 时间和日期函数
      • BITMAP函数
      • 窗口函数
      • 哈希函数
      • HLL函数
    • 语法帮助
      • DML
        • INSERT
        • ROUTINE-LOAD
        • RESTORE
        • SELECT-INTO-OUTFILE
        • ALTER-ROUTINE-LOAD
        • BROKER-LOAD
        • BACKUP
        • EXPORT
        • STREAM-LOAD
      • DDL
        • CREATE-FILE
        • DROP-RESOURCE
        • CREATE-RESOURCE
        • CREATE-MATERIALIZED-VIEW
        • DROP-RESROUCE
        • CREATE-TABLE
        • DROP-REPOSITORY
        • CREATE-REPOSITORY
        • CREATE-ODBC-TABLE
      • 信息查看语句
        • SHOW-BACKUP
        • SHOW-ALTER-TABLE-MATERIALIZED-VIEW
        • SHOW-SNAPSHOT
        • SHOW-ROUTINE-LOAD
        • SHOW-CREATE-ROUTINE-LOAD
        • SHOW-ROLES
        • SHOW-GRANTS
        • SHOW-EXPORT
        • SHOW-ROUTINE-LOAD-TASK
        • SHOW-REPOSITORIES
        • SHOW-LOAD
        • SHOW-RESOURCES
        • SHOW-RESTORE
        • SHOW-PROPERTY
        • SHOW-FILE
      • 辅助命令
        • PAUSE-ROUTINE-LOAD
        • STOP-ROUTINE-LOAD
        • ALTER-ROUTINE-LOAD
        • CANCEL-LOAD
        • RESUME-ROUTINE-LOAD
      • 账户管理
        • SET-PROPERTY
        • REVOKE
        • GRANT
        • CREATE-ROLE
        • DROP-ROLE
        • CREATE-USER
        • DROP-USER
        • SET-PASSWORD
  • 快速入门
    • 快速上手
    • 存算分离
    • 存算一体
  • 典型实践
    • 如何开启Debug日志
    • 导入分析
    • 查询分析
  • 操作手册
    • 权限和子用户
    • 存算一体
      • 连接集群
      • 查询分析
      • 监控告警
        • 监控指标
        • 告警配置
      • 备份恢复
        • 通过管理页面备份与恢复
        • 备份与恢复
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 集群创建
        • 停止与删除
        • 重置管理员密码
        • 集群扩缩容
        • 集群详情
    • 存算分离
      • 连接集群
      • 计算组管理
        • 重启计算组
        • 创建计算组
      • 监控告警
        • 监控指标
        • 告警配置
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 停止与删除
        • 创建集群
        • 重置管理员密码
        • 集群详情
  • 服务等级协议SLA
    • 服务等级协议(SLA)v1.0
  • 产品概述
    • 系统架构
    • 产品特点
    • 产品介绍
  • 视频专区
    • 操作指南
    • 产品简介
  • 产品定价
    • 预付费
    • 计费说明
    • 后付费
所有文档
menu
没有找到结果,请重新输入

数据仓库 PALO

  • 功能发布记录
  • 操作手册1
    • LDAP认证
    • 时区
    • 使用S3-SDK访问对象存储
    • 权限管理
    • 物化视图
    • 变量
    • 资源管理
    • 数据更新与删除
      • 标记删除
      • Sequence-Column
      • 数据更新
      • 数据删除
    • 备份与恢复
      • 备份与恢复
    • 数据导出1
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 数据导出概述
      • Export
    • 数据导出
      • 全量数据导出
      • 导出查询结果集
      • 导出总览
      • 导出数据到外部表
    • 查询加速1
      • 查询缓存
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
    • 数据导入
      • JSON格式数据导入说明
      • 导入本地数据
      • 导入BOS中的数据
      • 导入事务和原子性
      • 通过外部表同步数据
      • 使用JDBC同步数据
      • 列的映射、转换与过滤
      • 订阅Kafka日志
      • 严格模式
      • 导入总览
    • 数据更新与删除1
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 数据导入1
      • 高并发导入优化(Group Commit)
      • 导入概览
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 数据源
        • Kafka
        • S3 兼容存储
        • 从其他 TP 系统迁移数据
        • HDFS
        • 从其他 AP 系统迁移数据
        • Flink
        • 本地文件
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
  • 开发指南
    • 迁移ClickHouse数据
    • Doris集群间数据迁移
    • 数据更新与删除
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 查询加速
      • 查询缓存
      • Colocation Join
      • 高并发点查
      • Hint
        • Hint 概述
        • Leading Hint
        • Distribute Hint
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
      • 高效去重
        • BITMAP 精准去重
        • HLL 近似去重
      • 优化技术原理
        • TOPN 查询优化
        • 统计信息
        • Pipeline 执行引擎
        • 查询优化器介绍
        • Runtime Filter
      • 查询调优概述
        • 调优概述
        • 诊断工具
        • 分析工具
        • 调优流程
      • 查询优化实践
        • 常见调优参数
        • 计划调优
          • 使用 Hint 控制代价改写
          • 使用异步物化视图透明改写
          • 使用 Leading Hint 控制 Join 顺序
          • 优化表 Schema 设计
          • 使用分区裁剪优化扫表
          • 优化索引设计和使用
          • 使用 Hint 调整 Join Shuffle 方式
          • DML 计划调优
          • 使用 Colocate Group 优化 Join
          • 使用同步物化视图透明改写
          • 使用 SQL Cache 加速查询
        • 执行调优
          • 数据倾斜处理
          • RuntimeFilter 的等待时间调整
          • 并行度调优
    • 数据查询
      • 连接(JOIN)
      • 子查询
      • 复杂类型查询
      • 列转行 (Lateral View)
      • MySQL 兼容性
      • 聚合多维分析
      • 分析函数(窗口函数)
      • 公用表表达式(CTE)
      • 自定义函数
        • 别名函数
        • Java UDF, UDAF, UDTF
    • 数据导出
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 最佳实践
      • 数据导出概述
      • Export
    • 数据导入
      • 高并发导入优化(Group Commit)
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 导入最佳实践
      • 数据源
        • Kafka
        • Snowflake
        • S3 兼容存储
        • Google Cloud Storage
        • 从其他 TP 系统迁移数据
        • Azure Storage
        • 腾讯云 COS
        • MinIO
        • HDFS
        • 阿里云 OSS
        • 华为云 OBS
        • 从其他 AP 系统迁移数据
        • Flink
        • Redshift
        • Amazon S3
        • 本地文件
        • BigQuery
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
    • BI工具接入
      • Sugar
      • Navicat
      • Tableau
      • DBeaver
      • 永洪BI
      • FineBI(帆软)
    • 数据库连接
      • 通过 MySQL 协议连接
      • 基于 Arrow Flight SQL 的高速数据传输链路
    • 湖仓一体
      • 分析 S3或HDFS 上的文件
      • 湖仓一体概述
      • SQL 方言兼容
      • 弹性计算节点
      • 云服务认证接入
      • 元数据缓存
      • 外表统计信息
      • 数据缓存
      • 数据库分析
        • MySQL
        • JDBC Catalog
        • Oracle
        • OceanBase
        • SAP HANA
        • 阿里云 MaxCompute
        • ClickHouse
        • PostgreSQL
        • IBM Db2
        • SQL Server
        • Elasticsearch
      • 湖仓一体最佳实践
        • 使用 PALO 和 Paimon
        • 使用 PALO 和 Iceberg
        • 使用 PALO 和 Hudi
        • 使用 PALO 和 LakeSoul
      • 数据湖构建
        • Iceberg
        • Hive
      • 数据湖分析
        • Hudi Catalog
        • 阿里云 DLF
        • Iceberg Catalog
        • Paimon Catalog
        • Hive Catalog
    • 数据表设计
      • 行业混存
      • 数据压缩
      • Schema 变更
      • 数据类型
      • 自增列
      • 概览
      • 数据库建表最佳实践
      • 冷热数据分层
        • SSD 和 HDD 层级存储
        • 远程存储
        • 冷热数据分层概述
      • 表索引
        • 倒排索引
        • 前缀索引与排序键
        • N-Gram 索引
        • BloomFilter 索引
        • 索引概述
      • 数据划分
        • 数据分桶
        • 数据分布概念
        • 动态分区
        • 自动分区
        • 手动分区
        • 常见文档
      • 数据模型
        • 使用注意
        • 模型概述
        • 主键模型
        • 明细模型
        • 聚合模型
  • 版本发布历史
    • 百度数据仓库 Palo 2.0 版本全新发布
  • SQL手册
    • 字面常量
    • 别名
    • SQL-手册
    • 数据类型
    • SQL语句
    • 注释
    • 内置函数
    • 白名单管理
    • SQL操作符
    • 内置函数
      • 聚合函数
      • 位操作函数
      • 字符串函数
      • 条件函数
      • 数学函数
      • JSON解析函数
      • 类型转换函数
      • 格式转换函数
      • 通用函数
      • 时间和日期函数
      • BITMAP函数
      • 窗口函数
      • 哈希函数
      • HLL函数
    • 语法帮助
      • DML
        • INSERT
        • ROUTINE-LOAD
        • RESTORE
        • SELECT-INTO-OUTFILE
        • ALTER-ROUTINE-LOAD
        • BROKER-LOAD
        • BACKUP
        • EXPORT
        • STREAM-LOAD
      • DDL
        • CREATE-FILE
        • DROP-RESOURCE
        • CREATE-RESOURCE
        • CREATE-MATERIALIZED-VIEW
        • DROP-RESROUCE
        • CREATE-TABLE
        • DROP-REPOSITORY
        • CREATE-REPOSITORY
        • CREATE-ODBC-TABLE
      • 信息查看语句
        • SHOW-BACKUP
        • SHOW-ALTER-TABLE-MATERIALIZED-VIEW
        • SHOW-SNAPSHOT
        • SHOW-ROUTINE-LOAD
        • SHOW-CREATE-ROUTINE-LOAD
        • SHOW-ROLES
        • SHOW-GRANTS
        • SHOW-EXPORT
        • SHOW-ROUTINE-LOAD-TASK
        • SHOW-REPOSITORIES
        • SHOW-LOAD
        • SHOW-RESOURCES
        • SHOW-RESTORE
        • SHOW-PROPERTY
        • SHOW-FILE
      • 辅助命令
        • PAUSE-ROUTINE-LOAD
        • STOP-ROUTINE-LOAD
        • ALTER-ROUTINE-LOAD
        • CANCEL-LOAD
        • RESUME-ROUTINE-LOAD
      • 账户管理
        • SET-PROPERTY
        • REVOKE
        • GRANT
        • CREATE-ROLE
        • DROP-ROLE
        • CREATE-USER
        • DROP-USER
        • SET-PASSWORD
  • 快速入门
    • 快速上手
    • 存算分离
    • 存算一体
  • 典型实践
    • 如何开启Debug日志
    • 导入分析
    • 查询分析
  • 操作手册
    • 权限和子用户
    • 存算一体
      • 连接集群
      • 查询分析
      • 监控告警
        • 监控指标
        • 告警配置
      • 备份恢复
        • 通过管理页面备份与恢复
        • 备份与恢复
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 集群创建
        • 停止与删除
        • 重置管理员密码
        • 集群扩缩容
        • 集群详情
    • 存算分离
      • 连接集群
      • 计算组管理
        • 重启计算组
        • 创建计算组
      • 监控告警
        • 监控指标
        • 告警配置
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 停止与删除
        • 创建集群
        • 重置管理员密码
        • 集群详情
  • 服务等级协议SLA
    • 服务等级协议(SLA)v1.0
  • 产品概述
    • 系统架构
    • 产品特点
    • 产品介绍
  • 视频专区
    • 操作指南
    • 产品简介
  • 产品定价
    • 预付费
    • 计费说明
    • 后付费
  • 文档中心
  • arrow
  • 数据仓库PALO
  • arrow
  • SQL手册
  • arrow
  • 语法帮助
  • arrow
  • DML
  • arrow
  • ROUTINE-LOAD
本页目录
  • ROUTINE LOAD
  • Description
  • Example
  • Keywords
  • 典型实践

ROUTINE-LOAD

更新时间:2025-08-21

ROUTINE LOAD

Description

例行导入(Routine Load)功能,支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 PALO 中。

目前仅支持通过无认证或者 SSL 认证方式,从 Kakfa 导入 CSV 或 Json 格式的数据。

语法:

SQL
1CREATE ROUTINE LOAD [db.]job_name ON tbl_name
2[merge_type]
3[load_properties]
4[job_properties]
5FROM data_source [data_source_properties]
  • [db.]job_name

    导入作业的名称,在同一个 database 内,相同名称只能有一个 job 在运行。

  • tbl_name

    指定需要导入的表的名称。

  • merge_type

    数据合并类型。默认为 APPEND,表示导入的数据都是普通的追加写操作。MERGE 和 DELETE 类型仅适用于 Unique Key 模型表。其中 MERGE 类型需要配合 [DELETE ON] 语句使用,以标注 Delete Flag 列。而 DELETE 类型则表示导入的所有数据皆为删除数据。

  • load_properties

    用于描述导入数据。组成如下:

    Plain Text
    1[column_separator],
    2[columns_mapping],
    3[preceding_filter],
    4[where_predicates],
    5[partitions],
    6[DELETE ON],
    7[ORDER BY]
    • column_separator

      指定列分隔符,默认为 \t

      COLUMNS TERMINATED BY ","

    • columns_mapping

      用于指定文件列和表中列的映射关系,以及各种列转换等。关于这部分详细介绍,可以参阅 [列的映射,转换与过滤] 文档。

      (k1, k2, tmpk1, k3 = tmpk1 + 1)

    • preceding_filter

      过滤原始数据。关于这部分详细介绍,可以参阅 [列的映射,转换与过滤] 文档。

    • where_predicates

      根据条件对导入的数据进行过滤。关于这部分详细介绍,可以参阅 [列的映射,转换与过滤] 文档。

      WHERE k1 > 100 and k2 = 1000

    • partitions

      指定导入目的表的哪些 partition 中。如果不指定,则会自动导入到对应的 partition 中。

      PARTITION(p1, p2, p3)

    • DELETE ON

      需配合 MEREGE 导入模式一起使用,仅针对 Unique Key 模型的表。用于指定导入数据中表示 Delete Flag 的列和计算关系。

      DELETE ON v3 >100

    • ORDER BY

      仅针对 Unique Key 模型的表。用于指定导入数据中表示 Sequence Col 的列。主要用于导入时保证数据顺序。

  • job_properties

    用于指定例行导入作业的通用参数。

    Plain Text
    1PROPERTIES (
    2    "key1" = "val1",
    3    "key2" = "val2"
    4)

    目前我们支持以下参数:

    1. desired_concurrent_number

      期望的并发度。一个例行导入作业会被分成多个子任务执行。这个参数指定一个作业最多有多少任务可以同时执行。必须大于0。默认为3。

      这个并发度并不是实际的并发度,实际的并发度,会通过集群的节点数、负载情况,以及数据源的情况综合考虑。

      "desired_concurrent_number" = "3"

    2. max_batch_interval/max_batch_rows/max_batch_size

      这三个参数分别表示:

      1. 每个子任务最大执行时间,单位是秒。范围为 5 到 60。默认为10。
      2. 每个子任务最多读取的行数。必须大于等于200000。默认是200000。
      3. 每个子任务最多读取的字节数。单位是字节,范围是 100MB 到 1GB。默认是 100MB。

      这三个参数,用于控制一个子任务的执行时间和处理量。当任意一个达到阈值,则任务结束。

      Plain Text
      1"max_batch_interval" = "20",
      2"max_batch_rows" = "300000",
      3"max_batch_size" = "209715200"
    3. max_error_number

      采样窗口内,允许的最大错误行数。必须大于等于0。默认是 0,即不允许有错误行。

      采样窗口为 max_batch_rows * 10。即如果在采样窗口内,错误行数大于 max_error_number,则会导致例行作业被暂停,需要人工介入检查数据质量问题。

      被 where 条件过滤掉的行不算错误行。

    4. strict_mode

      是否开启严格模式,默认为关闭。如果开启后,非空原始数据的列类型变换如果结果为 NULL,则会被过滤。指定方式为:

      "strict_mode" = "true"

    5. timezone

      指定导入作业所使用的时区。默认为使用 Session 的 timezone 参数。该参数会影响所有导入涉及的和时区有关的函数结果。

    6. format

      指定导入数据格式,默认是csv,支持json格式。

    7. jsonpaths

      当导入数据格式为 json 时,可以通过 jsonpaths 指定抽取 Json 数据中的字段。

      -H "jsonpaths: [\"$.k2\", \"$.k1\"]"

    8. strip_outer_array

      当导入数据格式为 json 时,strip_outer_array 为 true 表示 Json 数据以数组的形式展现,数据中的每一个元素将被视为一行数据。默认值是 false。

      -H "strip_outer_array: true"

    9. json_root

      当导入数据格式为 json 时,可以通过 json_root 指定 Json 数据的根节点。PALO 将通过 json_root 抽取根节点的元素进行解析。默认为空。

      -H "json_root: $.RECORDS"

  • FROM data_source [data_source_properties]

    数据源的类型。当前支持:

    Plain Text
    1FROM KAFKA
    2(
    3    "key1" = "val1",
    4    "key2" = "val2"
    5)

    data_source_properties 支持如下数据源属性:

    1. kafka_broker_list

      Kafka 的 broker 连接信息。格式为 ip:host。多个broker之间以逗号分隔。

      "kafka_broker_list" = "broker1:9092,broker2:9092"

    2. kafka_topic

      指定要订阅的 Kafka 的 topic。

      "kafka_topic" = "my_topic"

    3. kafka_partitions/kafka_offsets

      指定需要订阅的 kafka partition,以及对应的每个 partition 的起始 offset。如果指定时间,则会从大于等于该时间的最近一个 offset 处开始消费。

      offset 可以指定从大于等于 0 的具体 offset,或者:

      • OFFSET_BEGINNING: 从有数据的位置开始订阅。
      • OFFSET_END: 从末尾开始订阅。
      • 时间格式,如:"2021-05-22 11:00:00"

      如果没有指定,则默认从 OFFSET_END 开始订阅 topic 下的所有 partition。

      Plain Text
      1"kafka_partitions" = "0,1,2,3",
      2"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
      Plain Text
      1"kafka_partitions" = "0,1,2,3",
      2"kafka_offsets" = "2021-05-22 11:00:00,2021-05-22 11:00:00,2021-05-22 11:00:00"

      注意,时间格式不能和 OFFSET 格式混用。

    4. property

      指定自定义kafka参数。功能等同于kafka shell中 "--property" 参数。

      当参数的 value 为一个文件时,需要在 value 前加上关键词:"FILE:"。

      关于如何创建文件,请参阅 CREATE FILE 命令文档。

      更多支持的自定义参数,请参阅 librdkafka 的官方 CONFIGURATION 文档中,client 端的配置项。如:

      Plain Text
      1"property.client.id" = "12345",
      2"property.ssl.ca.location" = "FILE:ca.pem"
      1. 使用 SSL 连接 Kafka 时,需要指定以下参数:

        Plain Text
        1"property.security.protocol" = "ssl",
        2"property.ssl.ca.location" = "FILE:ca.pem",
        3"property.ssl.certificate.location" = "FILE:client.pem",
        4"property.ssl.key.location" = "FILE:client.key",
        5"property.ssl.key.password" = "abcdefg"

        其中:

        property.security.protocol 和 property.ssl.ca.location 为必须,用于指明连接方式为 SSL,以及 CA 证书的位置。

        如果 Kafka server 端开启了 client 认证,则还需设置:

        Plain Text
        1"property.ssl.certificate.location"
        2"property.ssl.key.location"
        3"property.ssl.key.password"

        分别用于指定 client 的 public key,private key 以及 private key 的密码。

      2. 指定kafka partition的默认起始offset

        如果没有指定 kafka_partitions/kafka_offsets,默认消费所有分区。

        此时可以指定 kafka_default_offsets 指定起始 offset。默认为 OFFSET_END,即从末尾开始订阅。

        示例:

        Plain Text
        1"property.kafka_default_offsets" = "OFFSET_BEGINNING"

Example

  1. 为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务。指定列分隔符和 group.id 和 client.id,并且自动默认消费所有分区,且从有数据的位置(OFFSET_BEGINNING)开始订阅

    SQL
    1CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    2COLUMNS TERMINATED BY ",",
    3COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
    4PROPERTIES
    5(
    6    "desired_concurrent_number"="3",
    7    "max_batch_interval" = "20",
    8    "max_batch_rows" = "300000",
    9    "max_batch_size" = "209715200",
    10    "strict_mode" = "false"
    11)
    12FROM KAFKA
    13(
    14    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    15    "kafka_topic" = "my_topic",
    16    "property.group.id" = "xxx",
    17    "property.client.id" = "xxx",
    18    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
    19);
  2. 为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务。导入任务为严格模式。

    SQL
    1CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    2COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
    3PRECEDING FILTER k1 = 1,
    4WHERE k1 > 100 and k2 like "%doris%"
    5PROPERTIES
    6(
    7    "desired_concurrent_number"="3",
    8    "max_batch_interval" = "20",
    9    "max_batch_rows" = "300000",
    10    "max_batch_size" = "209715200",
    11    "strict_mode" = "false"
    12)
    13FROM KAFKA
    14(
    15    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    16    "kafka_topic" = "my_topic",
    17    "kafka_partitions" = "0,1,2,3",
    18    "kafka_offsets" = "101,0,0,200"
    19);
  3. 通过 SSL 认证方式,从 Kafka 集群导入数据。同时设置 client.id 参数。导入任务为非严格模式,时区为 Africa/Abidjan

    SQL
    1CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    2COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
    3WHERE k1 > 100 and k2 like "%doris%"
    4PROPERTIES
    5(
    6    "desired_concurrent_number"="3",
    7    "max_batch_interval" = "20",
    8    "max_batch_rows" = "300000",
    9    "max_batch_size" = "209715200",
    10    "strict_mode" = "false",
    11    "timezone" = "Africa/Abidjan"
    12)
    13FROM KAFKA
    14(
    15    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    16    "kafka_topic" = "my_topic",
    17    "property.security.protocol" = "ssl",
    18    "property.ssl.ca.location" = "FILE:ca.pem",
    19    "property.ssl.certificate.location" = "FILE:client.pem",
    20    "property.ssl.key.location" = "FILE:client.key",
    21    "property.ssl.key.password" = "abcdefg",
    22    "property.client.id" = "my_client_id"
    23);
  4. 导入 Json 格式数据。默认使用 Json 中的字段名作为列名映射。指定导入 0,1,2 三个分区,起始 offset 都为 0

    SQL
    1CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1
    2COLUMNS(category,price,author)
    3PROPERTIES
    4(
    5    "desired_concurrent_number"="3",
    6    "max_batch_interval" = "20",
    7    "max_batch_rows" = "300000",
    8    "max_batch_size" = "209715200",
    9    "strict_mode" = "false",
    10    "format" = "json"
    11)
    12FROM KAFKA
    13(
    14    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    15    "kafka_topic" = "my_topic",
    16    "kafka_partitions" = "0,1,2",
    17    "kafka_offsets" = "0,0,0"
    18);
  5. 导入 Json 数据,并通过 Jsonpaths 抽取字段,并指定 Json 文档根节点

    SQL
    1CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    2COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
    3PROPERTIES
    4(
    5    "desired_concurrent_number"="3",
    6    "max_batch_interval" = "20",
    7    "max_batch_rows" = "300000",
    8    "max_batch_size" = "209715200",
    9    "strict_mode" = "false",
    10    "format" = "json",
    11    "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
    12    "json_root" = "$.RECORDS"
    13    "strip_outer_array" = "true"
    14)
    15FROM KAFKA
    16(
    17    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    18    "kafka_topic" = "my_topic",
    19    "kafka_partitions" = "0,1,2",
    20    "kafka_offsets" = "0,0,0"
    21);
  6. 为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务。并且使用条件过滤。

    SQL
    1CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    2WITH MERGE
    3COLUMNS(k1, k2, k3, v1, v2, v3),
    4WHERE k1 > 100 and k2 like "%doris%",
    5DELETE ON v3 >100
    6PROPERTIES
    7(
    8    "desired_concurrent_number"="3",
    9    "max_batch_interval" = "20",
    10    "max_batch_rows" = "300000",
    11    "max_batch_size" = "209715200",
    12    "strict_mode" = "false"
    13)
    14FROM KAFKA
    15(
    16    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    17    "kafka_topic" = "my_topic",
    18    "kafka_partitions" = "0,1,2,3",
    19    "kafka_offsets" = "101,0,0,200"
    20);
  7. 导入数据到含有 sequence 列的 Unique Key 模型表中

    SQL
    1CREATE ROUTINE LOAD example_db.test_job ON example_tbl
    2COLUMNS TERMINATED BY ",",
    3COLUMNS(k1,k2,source_sequence,v1,v2),
    4ORDER BY source_sequence
    5PROPERTIES
    6(
    7    "desired_concurrent_number"="3",
    8    "max_batch_interval" = "30",
    9    "max_batch_rows" = "300000",
    10    "max_batch_size" = "209715200"
    11) FROM KAFKA
    12(
    13    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    14    "kafka_topic" = "my_topic",
    15    "kafka_partitions" = "0,1,2,3",
    16    "kafka_offsets" = "101,0,0,200"
    17);
  8. 从指定的时间点开始消费

    SQL
    1CREATE ROUTINE LOAD example_db.test_job ON example_tbl
    2PROPERTIES
    3(
    4    "desired_concurrent_number"="3",
    5    "max_batch_interval" = "30",
    6    "max_batch_rows" = "300000",
    7    "max_batch_size" = "209715200"
    8) FROM KAFKA
    9(
    10    "kafka_broker_list" = "broker1:9092,broker2:9092",
    11    "kafka_topic" = "my_topic",
    12    "kafka_default_offset" = "2021-05-21 10:00:00"
    13);

Keywords

Plain Text
1CREATE, ROUTINE, LOAD

典型实践

  1. 关于指定消费的 Partition 和 Offset

    PALO 支持指定 Partition 和 Offset 开始消费,还支持了指定时间点进行消费的功能。这里说明下对应参数的配置关系。

    有三个相关参数:

    • kafka_partitions:指定待消费的 partition 列表,如:"0, 1, 2, 3"。
    • kafka_offsets:指定每个分区的起始offset,必须和 kafka_partitions 列表个数对应。如:"1000, 1000, 2000, 2000"
    • property.kafka_default_offset:指定分区默认的起始offset。

    在创建导入作业时,这三个参数可以有以下组合:

    组合 kafka_partitions kafka_offsets property.kafka_default_offset 行为
    1 No No No 系统会自动查找topic对应的所有分区并从 OFFSET_END 开始消费
    2 No No Yes 系统会自动查找topic对应的所有分区并从 default offset 指定的位置开始消费
    3 Yes No No 系统会从指定分区的 OFFSET_END 开始消费
    4 Yes Yes No 系统会从指定分区的指定offset 处开始消费
    5 Yes No Yes 系统会从指定分区,default offset 指定的位置开始消费

上一篇
INSERT
下一篇
RESTORE