高并发导入优化(Group Commit)

数据仓库 PALO

  • 功能发布记录
  • 操作手册1
    • LDAP认证
    • 时区
    • 使用S3-SDK访问对象存储
    • 权限管理
    • 物化视图
    • 变量
    • 资源管理
    • 数据更新与删除
      • 标记删除
      • Sequence-Column
      • 数据更新
      • 数据删除
    • 备份与恢复
      • 备份与恢复
    • 数据导出1
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 数据导出概述
      • Export
    • 数据导出
      • 全量数据导出
      • 导出查询结果集
      • 导出总览
      • 导出数据到外部表
    • 查询加速1
      • 查询缓存
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
    • 数据导入
      • JSON格式数据导入说明
      • 导入本地数据
      • 导入BOS中的数据
      • 导入事务和原子性
      • 通过外部表同步数据
      • 使用JDBC同步数据
      • 列的映射、转换与过滤
      • 订阅Kafka日志
      • 严格模式
      • 导入总览
    • 数据更新与删除1
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 数据导入1
      • 高并发导入优化(Group Commit)
      • 导入概览
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 数据源
        • Kafka
        • S3 兼容存储
        • 从其他 TP 系统迁移数据
        • HDFS
        • 从其他 AP 系统迁移数据
        • Flink
        • 本地文件
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
  • 开发指南
    • 迁移ClickHouse数据
    • Doris集群间数据迁移
    • 数据更新与删除
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 查询加速
      • 查询缓存
      • Colocation Join
      • 高并发点查
      • Hint
        • Hint 概述
        • Leading Hint
        • Distribute Hint
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
      • 高效去重
        • BITMAP 精准去重
        • HLL 近似去重
      • 优化技术原理
        • TOPN 查询优化
        • 统计信息
        • Pipeline 执行引擎
        • 查询优化器介绍
        • Runtime Filter
      • 查询调优概述
        • 调优概述
        • 诊断工具
        • 分析工具
        • 调优流程
      • 查询优化实践
        • 常见调优参数
        • 计划调优
          • 使用 Hint 控制代价改写
          • 使用异步物化视图透明改写
          • 使用 Leading Hint 控制 Join 顺序
          • 优化表 Schema 设计
          • 使用分区裁剪优化扫表
          • 优化索引设计和使用
          • 使用 Hint 调整 Join Shuffle 方式
          • DML 计划调优
          • 使用 Colocate Group 优化 Join
          • 使用同步物化视图透明改写
          • 使用 SQL Cache 加速查询
        • 执行调优
          • 数据倾斜处理
          • RuntimeFilter 的等待时间调整
          • 并行度调优
    • 数据查询
      • 连接(JOIN)
      • 子查询
      • 复杂类型查询
      • 列转行 (Lateral View)
      • MySQL 兼容性
      • 聚合多维分析
      • 分析函数(窗口函数)
      • 公用表表达式(CTE)
      • 自定义函数
        • 别名函数
        • Java UDF, UDAF, UDTF
    • 数据导出
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 最佳实践
      • 数据导出概述
      • Export
    • 数据导入
      • 高并发导入优化(Group Commit)
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 导入最佳实践
      • 数据源
        • Kafka
        • Snowflake
        • S3 兼容存储
        • Google Cloud Storage
        • 从其他 TP 系统迁移数据
        • Azure Storage
        • 腾讯云 COS
        • MinIO
        • HDFS
        • 阿里云 OSS
        • 华为云 OBS
        • 从其他 AP 系统迁移数据
        • Flink
        • Redshift
        • Amazon S3
        • 本地文件
        • BigQuery
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
    • BI工具接入
      • Sugar
      • Navicat
      • Tableau
      • DBeaver
      • 永洪BI
      • FineBI(帆软)
    • 数据库连接
      • 通过 MySQL 协议连接
      • 基于 Arrow Flight SQL 的高速数据传输链路
    • 湖仓一体
      • 分析 S3或HDFS 上的文件
      • 湖仓一体概述
      • SQL 方言兼容
      • 弹性计算节点
      • 云服务认证接入
      • 元数据缓存
      • 外表统计信息
      • 数据缓存
      • 数据库分析
        • MySQL
        • JDBC Catalog
        • Oracle
        • OceanBase
        • SAP HANA
        • 阿里云 MaxCompute
        • ClickHouse
        • PostgreSQL
        • IBM Db2
        • SQL Server
        • Elasticsearch
      • 湖仓一体最佳实践
        • 使用 PALO 和 Paimon
        • 使用 PALO 和 Iceberg
        • 使用 PALO 和 Hudi
        • 使用 PALO 和 LakeSoul
      • 数据湖构建
        • Iceberg
        • Hive
      • 数据湖分析
        • Hudi Catalog
        • 阿里云 DLF
        • Iceberg Catalog
        • Paimon Catalog
        • Hive Catalog
    • 数据表设计
      • 行业混存
      • 数据压缩
      • Schema 变更
      • 数据类型
      • 自增列
      • 概览
      • 数据库建表最佳实践
      • 冷热数据分层
        • SSD 和 HDD 层级存储
        • 远程存储
        • 冷热数据分层概述
      • 表索引
        • 倒排索引
        • 前缀索引与排序键
        • N-Gram 索引
        • BloomFilter 索引
        • 索引概述
      • 数据划分
        • 数据分桶
        • 数据分布概念
        • 动态分区
        • 自动分区
        • 手动分区
        • 常见文档
      • 数据模型
        • 使用注意
        • 模型概述
        • 主键模型
        • 明细模型
        • 聚合模型
  • 版本发布历史
    • 百度数据仓库 Palo 2.0 版本全新发布
  • SQL手册
    • 字面常量
    • 别名
    • SQL-手册
    • 数据类型
    • SQL语句
    • 注释
    • 内置函数
    • 白名单管理
    • SQL操作符
    • 内置函数
      • 聚合函数
      • 位操作函数
      • 字符串函数
      • 条件函数
      • 数学函数
      • JSON解析函数
      • 类型转换函数
      • 格式转换函数
      • 通用函数
      • 时间和日期函数
      • BITMAP函数
      • 窗口函数
      • 哈希函数
      • HLL函数
    • 语法帮助
      • DML
        • INSERT
        • ROUTINE-LOAD
        • RESTORE
        • SELECT-INTO-OUTFILE
        • ALTER-ROUTINE-LOAD
        • BROKER-LOAD
        • BACKUP
        • EXPORT
        • STREAM-LOAD
      • DDL
        • CREATE-FILE
        • DROP-RESOURCE
        • CREATE-RESOURCE
        • CREATE-MATERIALIZED-VIEW
        • DROP-RESROUCE
        • CREATE-TABLE
        • DROP-REPOSITORY
        • CREATE-REPOSITORY
        • CREATE-ODBC-TABLE
      • 信息查看语句
        • SHOW-BACKUP
        • SHOW-ALTER-TABLE-MATERIALIZED-VIEW
        • SHOW-SNAPSHOT
        • SHOW-ROUTINE-LOAD
        • SHOW-CREATE-ROUTINE-LOAD
        • SHOW-ROLES
        • SHOW-GRANTS
        • SHOW-EXPORT
        • SHOW-ROUTINE-LOAD-TASK
        • SHOW-REPOSITORIES
        • SHOW-LOAD
        • SHOW-RESOURCES
        • SHOW-RESTORE
        • SHOW-PROPERTY
        • SHOW-FILE
      • 辅助命令
        • PAUSE-ROUTINE-LOAD
        • STOP-ROUTINE-LOAD
        • ALTER-ROUTINE-LOAD
        • CANCEL-LOAD
        • RESUME-ROUTINE-LOAD
      • 账户管理
        • SET-PROPERTY
        • REVOKE
        • GRANT
        • CREATE-ROLE
        • DROP-ROLE
        • CREATE-USER
        • DROP-USER
        • SET-PASSWORD
  • 快速入门
    • 快速上手
    • 存算分离
    • 存算一体
  • 典型实践
    • 如何开启Debug日志
    • 导入分析
    • 查询分析
  • 操作手册
    • 权限和子用户
    • 存算一体
      • 连接集群
      • 查询分析
      • 监控告警
        • 监控指标
        • 告警配置
      • 备份恢复
        • 通过管理页面备份与恢复
        • 备份与恢复
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 集群创建
        • 停止与删除
        • 重置管理员密码
        • 集群扩缩容
        • 集群详情
    • 存算分离
      • 连接集群
      • 计算组管理
        • 重启计算组
        • 创建计算组
      • 监控告警
        • 监控指标
        • 告警配置
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 停止与删除
        • 创建集群
        • 重置管理员密码
        • 集群详情
  • 服务等级协议SLA
    • 服务等级协议(SLA)v1.0
  • 产品概述
    • 系统架构
    • 产品特点
    • 产品介绍
  • 视频专区
    • 操作指南
    • 产品简介
  • 产品定价
    • 预付费
    • 计费说明
    • 后付费
所有文档
menu
没有找到结果,请重新输入

数据仓库 PALO

  • 功能发布记录
  • 操作手册1
    • LDAP认证
    • 时区
    • 使用S3-SDK访问对象存储
    • 权限管理
    • 物化视图
    • 变量
    • 资源管理
    • 数据更新与删除
      • 标记删除
      • Sequence-Column
      • 数据更新
      • 数据删除
    • 备份与恢复
      • 备份与恢复
    • 数据导出1
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 数据导出概述
      • Export
    • 数据导出
      • 全量数据导出
      • 导出查询结果集
      • 导出总览
      • 导出数据到外部表
    • 查询加速1
      • 查询缓存
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
    • 数据导入
      • JSON格式数据导入说明
      • 导入本地数据
      • 导入BOS中的数据
      • 导入事务和原子性
      • 通过外部表同步数据
      • 使用JDBC同步数据
      • 列的映射、转换与过滤
      • 订阅Kafka日志
      • 严格模式
      • 导入总览
    • 数据更新与删除1
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 数据导入1
      • 高并发导入优化(Group Commit)
      • 导入概览
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 数据源
        • Kafka
        • S3 兼容存储
        • 从其他 TP 系统迁移数据
        • HDFS
        • 从其他 AP 系统迁移数据
        • Flink
        • 本地文件
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
  • 开发指南
    • 迁移ClickHouse数据
    • Doris集群间数据迁移
    • 数据更新与删除
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 查询加速
      • 查询缓存
      • Colocation Join
      • 高并发点查
      • Hint
        • Hint 概述
        • Leading Hint
        • Distribute Hint
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
      • 高效去重
        • BITMAP 精准去重
        • HLL 近似去重
      • 优化技术原理
        • TOPN 查询优化
        • 统计信息
        • Pipeline 执行引擎
        • 查询优化器介绍
        • Runtime Filter
      • 查询调优概述
        • 调优概述
        • 诊断工具
        • 分析工具
        • 调优流程
      • 查询优化实践
        • 常见调优参数
        • 计划调优
          • 使用 Hint 控制代价改写
          • 使用异步物化视图透明改写
          • 使用 Leading Hint 控制 Join 顺序
          • 优化表 Schema 设计
          • 使用分区裁剪优化扫表
          • 优化索引设计和使用
          • 使用 Hint 调整 Join Shuffle 方式
          • DML 计划调优
          • 使用 Colocate Group 优化 Join
          • 使用同步物化视图透明改写
          • 使用 SQL Cache 加速查询
        • 执行调优
          • 数据倾斜处理
          • RuntimeFilter 的等待时间调整
          • 并行度调优
    • 数据查询
      • 连接(JOIN)
      • 子查询
      • 复杂类型查询
      • 列转行 (Lateral View)
      • MySQL 兼容性
      • 聚合多维分析
      • 分析函数(窗口函数)
      • 公用表表达式(CTE)
      • 自定义函数
        • 别名函数
        • Java UDF, UDAF, UDTF
    • 数据导出
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 最佳实践
      • 数据导出概述
      • Export
    • 数据导入
      • 高并发导入优化(Group Commit)
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 导入最佳实践
      • 数据源
        • Kafka
        • Snowflake
        • S3 兼容存储
        • Google Cloud Storage
        • 从其他 TP 系统迁移数据
        • Azure Storage
        • 腾讯云 COS
        • MinIO
        • HDFS
        • 阿里云 OSS
        • 华为云 OBS
        • 从其他 AP 系统迁移数据
        • Flink
        • Redshift
        • Amazon S3
        • 本地文件
        • BigQuery
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
    • BI工具接入
      • Sugar
      • Navicat
      • Tableau
      • DBeaver
      • 永洪BI
      • FineBI(帆软)
    • 数据库连接
      • 通过 MySQL 协议连接
      • 基于 Arrow Flight SQL 的高速数据传输链路
    • 湖仓一体
      • 分析 S3或HDFS 上的文件
      • 湖仓一体概述
      • SQL 方言兼容
      • 弹性计算节点
      • 云服务认证接入
      • 元数据缓存
      • 外表统计信息
      • 数据缓存
      • 数据库分析
        • MySQL
        • JDBC Catalog
        • Oracle
        • OceanBase
        • SAP HANA
        • 阿里云 MaxCompute
        • ClickHouse
        • PostgreSQL
        • IBM Db2
        • SQL Server
        • Elasticsearch
      • 湖仓一体最佳实践
        • 使用 PALO 和 Paimon
        • 使用 PALO 和 Iceberg
        • 使用 PALO 和 Hudi
        • 使用 PALO 和 LakeSoul
      • 数据湖构建
        • Iceberg
        • Hive
      • 数据湖分析
        • Hudi Catalog
        • 阿里云 DLF
        • Iceberg Catalog
        • Paimon Catalog
        • Hive Catalog
    • 数据表设计
      • 行业混存
      • 数据压缩
      • Schema 变更
      • 数据类型
      • 自增列
      • 概览
      • 数据库建表最佳实践
      • 冷热数据分层
        • SSD 和 HDD 层级存储
        • 远程存储
        • 冷热数据分层概述
      • 表索引
        • 倒排索引
        • 前缀索引与排序键
        • N-Gram 索引
        • BloomFilter 索引
        • 索引概述
      • 数据划分
        • 数据分桶
        • 数据分布概念
        • 动态分区
        • 自动分区
        • 手动分区
        • 常见文档
      • 数据模型
        • 使用注意
        • 模型概述
        • 主键模型
        • 明细模型
        • 聚合模型
  • 版本发布历史
    • 百度数据仓库 Palo 2.0 版本全新发布
  • SQL手册
    • 字面常量
    • 别名
    • SQL-手册
    • 数据类型
    • SQL语句
    • 注释
    • 内置函数
    • 白名单管理
    • SQL操作符
    • 内置函数
      • 聚合函数
      • 位操作函数
      • 字符串函数
      • 条件函数
      • 数学函数
      • JSON解析函数
      • 类型转换函数
      • 格式转换函数
      • 通用函数
      • 时间和日期函数
      • BITMAP函数
      • 窗口函数
      • 哈希函数
      • HLL函数
    • 语法帮助
      • DML
        • INSERT
        • ROUTINE-LOAD
        • RESTORE
        • SELECT-INTO-OUTFILE
        • ALTER-ROUTINE-LOAD
        • BROKER-LOAD
        • BACKUP
        • EXPORT
        • STREAM-LOAD
      • DDL
        • CREATE-FILE
        • DROP-RESOURCE
        • CREATE-RESOURCE
        • CREATE-MATERIALIZED-VIEW
        • DROP-RESROUCE
        • CREATE-TABLE
        • DROP-REPOSITORY
        • CREATE-REPOSITORY
        • CREATE-ODBC-TABLE
      • 信息查看语句
        • SHOW-BACKUP
        • SHOW-ALTER-TABLE-MATERIALIZED-VIEW
        • SHOW-SNAPSHOT
        • SHOW-ROUTINE-LOAD
        • SHOW-CREATE-ROUTINE-LOAD
        • SHOW-ROLES
        • SHOW-GRANTS
        • SHOW-EXPORT
        • SHOW-ROUTINE-LOAD-TASK
        • SHOW-REPOSITORIES
        • SHOW-LOAD
        • SHOW-RESOURCES
        • SHOW-RESTORE
        • SHOW-PROPERTY
        • SHOW-FILE
      • 辅助命令
        • PAUSE-ROUTINE-LOAD
        • STOP-ROUTINE-LOAD
        • ALTER-ROUTINE-LOAD
        • CANCEL-LOAD
        • RESUME-ROUTINE-LOAD
      • 账户管理
        • SET-PROPERTY
        • REVOKE
        • GRANT
        • CREATE-ROLE
        • DROP-ROLE
        • CREATE-USER
        • DROP-USER
        • SET-PASSWORD
  • 快速入门
    • 快速上手
    • 存算分离
    • 存算一体
  • 典型实践
    • 如何开启Debug日志
    • 导入分析
    • 查询分析
  • 操作手册
    • 权限和子用户
    • 存算一体
      • 连接集群
      • 查询分析
      • 监控告警
        • 监控指标
        • 告警配置
      • 备份恢复
        • 通过管理页面备份与恢复
        • 备份与恢复
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 集群创建
        • 停止与删除
        • 重置管理员密码
        • 集群扩缩容
        • 集群详情
    • 存算分离
      • 连接集群
      • 计算组管理
        • 重启计算组
        • 创建计算组
      • 监控告警
        • 监控指标
        • 告警配置
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 停止与删除
        • 创建集群
        • 重置管理员密码
        • 集群详情
  • 服务等级协议SLA
    • 服务等级协议(SLA)v1.0
  • 产品概述
    • 系统架构
    • 产品特点
    • 产品介绍
  • 视频专区
    • 操作指南
    • 产品简介
  • 产品定价
    • 预付费
    • 计费说明
    • 后付费
  • 文档中心
  • arrow
  • 数据仓库PALO
  • arrow
  • 开发指南
  • arrow
  • 数据导入
  • arrow
  • 高并发导入优化(Group Commit)
本页目录
  • Group Commit 模式
  • Group Commit 使用方式
  • 使用 JDBC
  • 使用 Golang 进行 Group Commit
  • INSERT INTO VALUES
  • Stream Load
  • 自动提交条件
  • 修改提交间隔
  • 修改提交数据量
  • 相关系统配置
  • BE 配置
  • 使用限制
  • 性能
  • Stream Load 日志场景测试
  • JDBC
  • Insert into sync 模式小批量数据
  • Insert into sync 模式大批量数据

高并发导入优化(Group Commit)

更新时间:2025-08-21

在高频小批量写入场景下,传统的导入方式存在以下问题:

  • 每个导入都会创建一个独立的事务,都需要经过 FE 解析 SQL 和生成执行计划,影响整体性能
  • 每个导入都会生成一个新的版本,导致版本数快速增长,增加了后台 compaction 的压力

为了解决这些问题,PALO 引入了 Group Commit 机制。Group Commit 不是一种新的导入方式,而是对现有导入方式的优化扩展,主要针对:

  • INSERT INTO tbl VALUES(...) 语句
  • Stream Load 导入

通过将多个小批量导入在后台合并成一个大的事务提交,显著提升了高并发小批量写入的性能。同时,Group Commit 与 PreparedStatement 结合使用可以获得更高的性能提升。

Group Commit 模式

Group Commit 写入有三种模式,分别是:

  • 关闭模式(off_mode)

    不开启 Group Commit。

  • 同步模式(sync_mode)

    PALO 根据负载和表的 group_commit_interval属性将多个导入在一个事务提交,事务提交后导入返回。这适用于高并发写入场景,且在导入完成后要求数据立即可见。

  • 异步模式(async_mode)

    PALO 首先将数据写入 WAL (Write Ahead Log),然后导入立即返回。PALO 会根据负载和表的group_commit_interval属性异步提交数据,提交之后数据可见。为了防止 WAL 占用较大的磁盘空间,单次导入数据量较大时,会自动切换为sync_mode。这适用于写入延迟敏感以及高频写入的场景。

Group Commit 使用方式

假如表的结构为:

SQL
1CREATE TABLE `dt` (
2    `id` int(11) NOT NULL,
3    `name` varchar(50) NULL,
4    `score` int(11) NULL
5) ENGINE=OLAP
6DUPLICATE KEY(`id`)
7DISTRIBUTED BY HASH(`id`) BUCKETS 1
8PROPERTIES (
9    "replication_num" = "1"
10);

使用 JDBC

当用户使用 JDBC insert into values方式写入时,为了减少 SQL 解析和生成规划的开销,我们在 FE 端支持了 MySQL 协议的 PreparedStatement 特性。当使用 PreparedStatement 时,SQL 和其导入规划将被缓存到 Session 级别的内存缓存中,后续的导入直接使用缓存对象,降低了 FE 的 CPU 压力。下面是在 JDBC 中使用 PreparedStatement 的例子:

1. 设置 JDBC URL 并在 Server 端开启 Prepared Statement

Plain Text
1url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500

2. 配置 group_commit session 变量,有如下两种方式:

  • 通过 JDBC url 设置,增加sessionVariables=group_commit=async_mode
Plain Text
1url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=500&sessionVariables=group_commit=async_mode
  • 通过执行 SQL 设置
Plain Text
1try (Statement statement = conn.createStatement()) {
2    statement.execute("SET group_commit = async_mode;");
3}

3. 使用 PreparedStatement

Java
1private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
2private static final String URL_PATTERN = "jdbc:mysql://%s:%d/%s?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=50&sessionVariables=group_commit=async_mode";
3private static final String HOST = "127.0.0.1";
4private static final int PORT = 9087;
5private static final String DB = "db";
6private static final String TBL = "dt";
7private static final String USER = "root";
8private static final String PASSWD = "";
9private static final int INSERT_BATCH_SIZE = 10;
10
11private static void groupCommitInsertBatch() throws Exception {
12    Class.forName(JDBC_DRIVER);
13    // add rewriteBatchedStatements=true and cachePrepStmts=true in JDBC url
14    // set session variables by sessionVariables=group_commit=async_mode in JDBC url
15    try (Connection conn = DriverManager.getConnection(
16            String.format(URL_PATTERN, HOST, PORT, DB), USER, PASSWD)) {
17
18        String query = "insert into " + TBL + " values(?, ?, ?)";
19        try (PreparedStatement stmt = conn.prepareStatement(query)) {
20            for (int j = 0; j < 5; j++) {
21                // 10 rows per insert
22                for (int i = 0; i < INSERT_BATCH_SIZE; i++) {
23                    stmt.setInt(1, i);
24                    stmt.setString(2, "name" + i);
25                    stmt.setInt(3, i + 10);
26                    stmt.addBatch();
27                }
28                int[] result = stmt.executeBatch();
29            }
30        }
31    } catch (Exception e) {
32        e.printStackTrace();
33    }
34}

注意:由于高频的 insert into 语句会打印大量的 audit log,对最终性能有一定影响,默认关闭了打印 prepared 语句的 audit log。可以通过设置 session variable 的方式控制是否打印 prepared 语句的 audit log。

SQL
1# 配置 session 变量开启打印parpared语句的audit log, 默认为false即关闭打印parpared语句的audit log。
2set enable_prepared_stmt_audit_log=true;

使用 Golang 进行 Group Commit

Golang 的 prepared 语句支持有限,所以我们可以通过手动客户端攒批的方式提高 Group Commit 的性能,以下为一个示例程序。

Golang
1package main
2
3import (
4	"database/sql"
5	"fmt"
6	"math/rand"
7	"strings"
8	"sync"
9	"sync/atomic"
10	"time"
11
12	_ "github.com/go-sql-driver/mysql"
13)
14
15const (
16	host     = "127.0.0.1"
17	port     = 9038
18	db       = "test"
19	user     = "root"
20	password = ""
21	table    = "async_lineitem"
22)
23
24var (
25	threadCount = 20
26	batchSize   = 100
27)
28
29var totalInsertedRows int64
30var rowsInsertedLastSecond int64
31
32func main() {
33	dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true", user, password, host, port, db)
34	db, err := sql.Open("mysql", dbDSN)
35	if err != nil {
36		fmt.Printf("Error opening database: %s\n", err)
37		return
38	}
39	defer db.Close()
40
41	var wg sync.WaitGroup
42	for i := 0; i < threadCount; i++ {
43		wg.Add(1)
44		go func() {
45			defer wg.Done()
46			groupCommitInsertBatch(db)
47		}()
48	}
49
50	go logInsertStatistics()
51
52	wg.Wait()
53}
54
55func groupCommitInsertBatch(db *sql.DB) {
56	for {
57		valueStrings := make([]string, 0, batchSize)
58		valueArgs := make([]interface{}, 0, batchSize*16)
59		for i := 0; i < batchSize; i++ {
60		    valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
61			valueArgs = append(valueArgs, rand.Intn(1000))
62			valueArgs = append(valueArgs, rand.Intn(1000))
63			valueArgs = append(valueArgs, rand.Intn(1000))
64			valueArgs = append(valueArgs, rand.Intn(1000))
65			valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true})
66			valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true})
67			valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true})
68			valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true})
69			valueArgs = append(valueArgs, "N")
70			valueArgs = append(valueArgs, "O")
71			valueArgs = append(valueArgs, time.Now())
72			valueArgs = append(valueArgs, time.Now())
73			valueArgs = append(valueArgs, time.Now())
74			valueArgs = append(valueArgs, "DELIVER IN PERSON")
75			valueArgs = append(valueArgs, "SHIP")
76			valueArgs = append(valueArgs, "N/A")
77		}
78		stmt := fmt.Sprintf("INSERT INTO %s VALUES %s",
79			table, strings.Join(valueStrings, ","))
80		_, err := db.Exec(stmt, valueArgs...)
81		if err != nil {
82			fmt.Printf("Error executing batch: %s\n", err)
83			return
84		}
85		atomic.AddInt64(&rowsInsertedLastSecond, int64(batchSize))
86		atomic.AddInt64(&totalInsertedRows, int64(batchSize))
87	}
88}
89
90func logInsertStatistics() {
91	for {
92		time.Sleep(1 * time.Second)
93		fmt.Printf("Total inserted rows: %d\n", totalInsertedRows)
94		fmt.Printf("Rows inserted in the last second: %d\n", rowsInsertedLastSecond)
95		rowsInsertedLastSecond = 0
96	}
97}

INSERT INTO VALUES

  • 异步模式
SQL
1# 配置 session 变量开启 group commit (默认为 off_mode),开启异步模式
2mysql> set group_commit = async_mode;
3
4# 这里返回的 label 是 group_commit 开头的,可以区分出是否使用了 group commit
5mysql> insert into dt values(1, 'Bob', 90), (2, 'Alice', 99);
6Query OK, 2 rows affected (0.05 sec)
7{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}
8
9# 可以看出这个 label, txn_id 和上一个相同,说明是攒到了同一个导入任务中
10mysql> insert into dt(id, name) values(3, 'John');
11Query OK, 1 row affected (0.01 sec)
12{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}
13
14# 不能立刻查询到
15mysql> select * from dt;
16Empty set (0.01 sec)
17
18# 10 秒后可以查询到,可以通过表属性 group_commit_interval 控制数据可见延迟。
19mysql> select * from dt;
20+------+-------+-------+
21| id   | name  | score |
22+------+-------+-------+
23|    1 | Bob   |    90 |
24|    2 | Alice |    99 |
25|    3 | John  |  NULL |
26+------+-------+-------+
273 rows in set (0.02 sec)
  • 同步模式
SQL
1# 配置 session 变量开启 group commit (默认为 off_mode),开启同步模式
2mysql> set group_commit = sync_mode;
3
4# 这里返回的 label 是 group_commit 开头的,可以区分出是否谁用了 group commit,导入耗时至少是表属性 group_commit_interval。
5mysql> insert into dt values(4, 'Bob', 90), (5, 'Alice', 99);
6Query OK, 2 rows affected (10.06 sec)
7{'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'}
8
9# 数据可以立刻读出
10mysql> select * from dt;
11+------+-------+-------+
12| id   | name  | score |
13+------+-------+-------+
14|    1 | Bob   |    90 |
15|    2 | Alice |    99 |
16|    3 | John  |  NULL |
17|    4 | Bob   |    90 |
18|    5 | Alice |    99 |
19+------+-------+-------+
205 rows in set (0.03 sec)
  • 关闭模式
SQL
1mysql> set group_commit = off_mode;

Stream Load

假如data.csv的内容为:

SQL
16,Amy,60
27,Ross,98
  • 异步模式
SQL
1# 导入时在 header 中增加"group_commit:async_mode"配置
2
3curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode"  -H "column_separator:,"  http://{fe_host}:{http_port}/api/db/dt/_stream_load
4{
5    "TxnId": 7009,
6    "Label": "group_commit_c84d2099208436ab_96e33fda01eddba8",
7    "Comment": "",
8    "GroupCommit": true,
9    "Status": "Success",
10    "Message": "OK",
11    "NumberTotalRows": 2,
12    "NumberLoadedRows": 2,
13    "NumberFilteredRows": 0,
14    "NumberUnselectedRows": 0,
15    "LoadBytes": 19,
16    "LoadTimeMs": 35,
17    "StreamLoadPutTimeMs": 5,
18    "ReadDataTimeMs": 0,
19    "WriteDataTimeMs": 26
20}
21
22# 返回的 GroupCommit 为 true,说明进入了 group commit 的流程
23# 返回的 Label 是 group_commit 开头的,是真正消费数据的导入关联的 label
  • 同步模式
SQL
1# 导入时在 header 中增加"group_commit:sync_mode"配置
2
3curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode"  -H "column_separator:,"  http://{fe_host}:{http_port}/api/db/dt/_stream_load
4{
5    "TxnId": 3009,
6    "Label": "group_commit_d941bf17f6efcc80_ccf4afdde9881293",
7    "Comment": "",
8    "GroupCommit": true,
9    "Status": "Success",
10    "Message": "OK",
11    "NumberTotalRows": 2,
12    "NumberLoadedRows": 2,
13    "NumberFilteredRows": 0,
14    "NumberUnselectedRows": 0,
15    "LoadBytes": 19,
16    "LoadTimeMs": 10044,
17    "StreamLoadPutTimeMs": 4,
18    "ReadDataTimeMs": 0,
19    "WriteDataTimeMs": 10038
20}
21
22# 返回的 GroupCommit 为 true,说明进入了 group commit 的流程
23# 返回的 Label 是 group_commit 开头的,是真正消费数据的导入关联的 label

自动提交条件

当满足时间间隔 (默认为 10 秒) 或数据量 (默认为 64 MB) 其中一个条件时,会自动提交数据。这两个参数需要配合使用,建议根据实际场景进行调优。

修改提交间隔

默认提交间隔为 10 秒,用户可以通过修改表的配置调整:

SQL
1# 修改提交间隔为 2 秒
2ALTER TABLE dt SET ("group_commit_interval_ms" = "2000");

参数调整建议:

  • 较短的间隔 (如 2 秒):

    • 优点:数据可见性延迟更低,适合对实时性要求较高的场景
    • 缺点:提交次数增多,版本数增长更快,后台 compaction 压力更大
  • 较长的间隔 (如 30 秒):

    • 优点:提交批次更大,版本数增长更慢,系统开销更小
    • 缺点:数据可见性延迟更高

建议根据业务对数据可见性延迟的容忍度来设置,如果系统压力大,可以适当增加间隔。

修改提交数据量

Group Commit 的默认提交数据量为 64 MB,用户可以通过修改表的配置调整:

SQL
1# 修改提交数据量为 128MB
2ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728");

参数调整建议:

  • 较小的阈值 (如 32MB):

    • 优点:内存占用更少,适合资源受限的环境
    • 缺点:提交批次较小,吞吐量可能受限
  • 较大的阈值 (如 256MB):

    • 优点:批量提交效率更高,系统吞吐量更大
    • 缺点:占用更多内存

建议根据系统内存资源和数据可靠性要求来权衡。如果内存充足且追求更高吞吐,可以适当增加到 128MB 或更大。

相关系统配置

BE 配置

  1. group_commit_wal_path

    • 描述:group commit 存放 WAL 文件的目录
    • 默认值:默认在用户配置的storage_root_path的各个目录下创建一个名为wal的目录。配置示例:
    Plain Text
    1group_commit_wal_path=/data1/storage/wal;/data2/storage/wal;/data3/storage/wal

使用限制

  • Group Commit 限制条件

    • INSERT INTO VALUES 语句在以下情况下会退化为非 Group Commit 方式:

      • 使用事务写入 (Begin; INSERT INTO VALUES; COMMIT)
      • 指定 Label (INSERT INTO dt WITH LABEL {label} VALUES)
      • VALUES 中包含表达式 (INSERT INTO dt VALUES (1 + 100))
      • 列更新写入
      • 表不支持轻量级模式更改
    • Stream Load 在以下情况下会退化为非 Group Commit 方式:

      • 使用两阶段提交
      • 指定 Label (-H "label:my_label")
      • 列更新写入
      • 表不支持轻量级模式更改
  • Unique 模型

    • Group Commit 不保证提交顺序,建议使用 Sequence 列来保证数据一致性。
  • WAL 限制

    • async_mode 写入会将数据写入 WAL,成功后删除,失败时通过 WAL 恢复。
    • WAL 文件是单副本存储的,如果对应磁盘损坏或文件误删可能导致数据丢失。
    • 下线 BE 节点时,使用 DECOMMISSION 命令以防数据丢失。
    • async_mode 在以下情况下切换为 sync_mode:

      • 导入数据量过大(超过 WAL 单目录 80% 空间)
      • 不知道数据量的 chunked stream load
      • 磁盘可用空间不足
    • 重量级 Schema Change 时,拒绝 Group Commit 写入,客户端需重试。

性能

我们分别测试了使用Stream Load和JDBC在高并发小数据量场景下group commit(使用async mode) 的写入性能。

Stream Load 日志场景测试

机器配置

  • 1 台 FE:阿里云 8 核 CPU、16GB 内存、1 块 100GB ESSD PL1 云磁盘
  • 3 台 BE:阿里云 16 核 CPU、64GB 内存、1 块 1TB ESSD PL1 云磁盘
  • 1 台测试客户端:阿里云 16 核 CPU、64GB 内存、1 块 100GB ESSD PL1 云磁盘
  • 测试版本为 PALO-2.1.5

数据集

  • httplogs 数据集,总共 31GB、2.47 亿条

测试工具

  • doris-streamloader

测试方法

  • 对比 非 group_commit 和 group_commit的 async_mode 模式下,设置不同的单并发数据量和并发数,导入 247249096 行数据

测试结果

导入方式 单并发数据量 并发数 耗时 (秒) 导入速率 (行/秒) 导入吞吐 (MB/秒)
group_commit 10 KB 10 3306 74,787 9.8
group_commit 10 KB 30 3264 75,750 10.0
group_commit 100 KB 10 424 582,447 76.7
group_commit 100 KB 30 366 675,543 89.0
group_commit 500 KB 10 187 1,318,661 173.7
group_commit 500 KB 30 183 1,351,087 178.0
group_commit 1 MB 10 178 1,385,148 182.5
group_commit 1 MB 30 178 1,385,148 182.5
group_commit 10 MB 10 177 1,396,887 184.0
非 group_commit 1 MB 10 2824 87,536 11.5
非 group_commit 10 MB 10 450 549,442 68.9
非 group_commit 10 MB 30 177 1,396,887 184.0

在上面的group_commit测试中,BE 的 CPU 使用率在 10-40% 之间。

可以看出,group_commit 模式在小数据量并发导入的场景下,能有效的提升导入性能,同时减少版本数,降低系统合并数据的压力。

JDBC

机器配置

  • 1 台 FE:阿里云 8 核 CPU、16GB 内存、1 块 100GB ESSD PL1 云磁盘
  • 1 台 BE:阿里云 16 核 CPU、64GB 内存、1 块 500GB ESSD PL1 云磁盘
  • 1 台测试客户端:阿里云 16 核 CPU、64GB 内存、1 块 100GB ESSD PL1 云磁盘
  • 测试版本为 PALO-2.1.5
  • 关闭打印 parpared 语句的 audit log 以提高性能

数据集

  • tpch sf10 lineitem 表数据集,30 个文件,总共约 22 GB,1.8 亿行

测试工具

  • DataX

测试方法

  • 通过 txtfilereader 向 mysqlwriter 写入数据,配置不同并发数和单个 INSERT 的行数

测试结果

单个 insert 的行数 并发数 导入速率 (行/秒) 导入吞吐 (MB/秒)
100 10 107,172 11.47
100 20 140,317 14.79
100 30 142,882 15.28
在上面的测试中,FE 的 CPU 使用率在 60-70% 左右,BE 的 CPU 使用率在 10-20% 左右。

Insert into sync 模式小批量数据

机器配置

  • 1 台 FE:阿里云 16 核 CPU、64GB 内存、1 块 500GB ESSD PL1 云磁盘
  • 5 台 BE:阿里云 16 核 CPU、64GB 内存、1 块 1TB ESSD PL1 云磁盘。
  • 1 台测试客户端:阿里云 16 核 CPU、64GB 内存、1 块 100GB ESSD PL1 云磁盘
  • 测试版本为 PALO-2.1.5

数据集

  • tpch sf10 lineitem 表数据集。
  • 建表语句为
SQL
1CREATE TABLE IF NOT EXISTS lineitem (
2  L_ORDERKEY    INTEGER NOT NULL,
3  L_PARTKEY     INTEGER NOT NULL,
4  L_SUPPKEY     INTEGER NOT NULL,
5  L_LINENUMBER  INTEGER NOT NULL,
6  L_QUANTITY    DECIMAL(15,2) NOT NULL,
7  L_EXTENDEDPRICE  DECIMAL(15,2) NOT NULL,
8  L_DISCOUNT    DECIMAL(15,2) NOT NULL,
9  L_TAX         DECIMAL(15,2) NOT NULL,
10  L_RETURNFLAG  CHAR(1) NOT NULL,
11  L_LINESTATUS  CHAR(1) NOT NULL,
12  L_SHIPDATE    DATE NOT NULL,
13  L_COMMITDATE  DATE NOT NULL,
14  L_RECEIPTDATE DATE NOT NULL,
15  L_SHIPINSTRUCT CHAR(25) NOT NULL,
16  L_SHIPMODE     CHAR(10) NOT NULL,
17  L_COMMENT      VARCHAR(44) NOT NULL
18)
19DUPLICATE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)
20DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 32
21PROPERTIES (
22  "replication_num" = "3"
23);

测试工具

  • Jmeter

需要设置的 jmeter 参数如下:

  1. 设置测试前的 init 语句,set group_commit=async_mode以及set enable_nereids_planner=false。
  2. 开启 jdbc 的 prepared statement,完整的 url 为jdbc:mysql://127.0.0.1:9030?useServerPrepStmts=true&useLocalSessionState=true&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSqlLimit=99999&prepStmtCacheSize=50&sessionVariables=group_commit=async_mode,enable_nereids_planner=false。
  3. 设置导入类型为 prepared update statement。
  4. 设置导入语句。
  5. 设置每次需要导入的值,注意,导入的值与导入值的类型要一一匹配。

测试方法

  • 通过 Jmeter 向PALO写数据。每个并发每次通过 insert into 写入 1 行数据。

测试结果

  • 数据单位为行每秒。
  • 以下测试分为 30,100,500 并发。

30 并发 sync 模式 5 个 BE3 副本性能测试

Group commit interval 10ms 20ms 50ms 100ms
321.5 307.3 285.8 224.3

100 并发 sync 模式性能测试

Group commit interval 10ms 20ms 50ms 100ms
1175.2 1108.7 1016.3 704.5

500 并发 sync 模式性能测试

Group commit interval 10ms 20ms 50ms 100ms
3289.8 3686.7 3280.7 2609.2

Insert into sync 模式大批量数据

机器配置

  • 1 台 FE:阿里云 16 核 CPU、64GB 内存、1 块 500GB ESSD PL1 云磁盘
  • 5 台 BE:阿里云 16 核 CPU、64GB 内存、1 块 1TB ESSD PL1 云磁盘。注:测试中分别用了 1 台,3 台,5 台 BE 进行测试。
  • 1 台测试客户端:阿里云 16 核 CPU、64GB 内存、1 块 100GB ESSD PL1 云磁盘
  • 测试版本为 PALO-2.1.5

数据集

  • tpch sf10 lineitem 表数据集。
  • 建表语句为
SQL
1CREATE TABLE IF NOT EXISTS lineitem (
2  L_ORDERKEY    INTEGER NOT NULL,
3  L_PARTKEY     INTEGER NOT NULL,
4  L_SUPPKEY     INTEGER NOT NULL,
5  L_LINENUMBER  INTEGER NOT NULL,
6  L_QUANTITY    DECIMAL(15,2) NOT NULL,
7  L_EXTENDEDPRICE  DECIMAL(15,2) NOT NULL,
8  L_DISCOUNT    DECIMAL(15,2) NOT NULL,
9  L_TAX         DECIMAL(15,2) NOT NULL,
10  L_RETURNFLAG  CHAR(1) NOT NULL,
11  L_LINESTATUS  CHAR(1) NOT NULL,
12  L_SHIPDATE    DATE NOT NULL,
13  L_COMMITDATE  DATE NOT NULL,
14  L_RECEIPTDATE DATE NOT NULL,
15  L_SHIPINSTRUCT CHAR(25) NOT NULL,
16  L_SHIPMODE     CHAR(10) NOT NULL,
17  L_COMMENT      VARCHAR(44) NOT NULL
18)
19DUPLICATE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)
20DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 32
21PROPERTIES (
22  "replication_num" = "3"
23);

测试工具

  • Jmeter

测试方法

  • 通过 Jmeter 向PALO写数据。每个并发每次通过 insert into 写入 1000 行数据。

测试结果

  • 数据单位为行每秒。
  • 以下测试分为 30,100,500 并发。

30 并发 sync 模式性能测试

Group commit interval 10ms 20ms 50ms 100ms
92.2K 85.9K 84K 83.2K

100 并发 sync 模式性能测试

Group commit interval 10ms 20ms 50ms 100ms
70.4K 70.5K 73.2K 69.4K

500 并发 sync 模式性能测试

Group commit interval 10ms 20ms 50ms 100ms
46.3K 47.7K 47.4K 46.5K

上一篇
数据导出
下一篇
异常数据处理