Pipeline 执行引擎

数据仓库 PALO

  • 功能发布记录
  • 操作手册1
    • LDAP认证
    • 时区
    • 使用S3-SDK访问对象存储
    • 权限管理
    • 物化视图
    • 变量
    • 资源管理
    • 数据更新与删除
      • 标记删除
      • Sequence-Column
      • 数据更新
      • 数据删除
    • 备份与恢复
      • 备份与恢复
    • 数据导出1
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 数据导出概述
      • Export
    • 数据导出
      • 全量数据导出
      • 导出查询结果集
      • 导出总览
      • 导出数据到外部表
    • 查询加速1
      • 查询缓存
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
    • 数据导入
      • JSON格式数据导入说明
      • 导入本地数据
      • 导入BOS中的数据
      • 导入事务和原子性
      • 通过外部表同步数据
      • 使用JDBC同步数据
      • 列的映射、转换与过滤
      • 订阅Kafka日志
      • 严格模式
      • 导入总览
    • 数据更新与删除1
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 数据导入1
      • 高并发导入优化(Group Commit)
      • 导入概览
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 数据源
        • Kafka
        • S3 兼容存储
        • 从其他 TP 系统迁移数据
        • HDFS
        • 从其他 AP 系统迁移数据
        • Flink
        • 本地文件
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
  • 开发指南
    • 迁移ClickHouse数据
    • Doris集群间数据迁移
    • 数据更新与删除
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 查询加速
      • 查询缓存
      • Colocation Join
      • 高并发点查
      • Hint
        • Hint 概述
        • Leading Hint
        • Distribute Hint
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
      • 高效去重
        • BITMAP 精准去重
        • HLL 近似去重
      • 优化技术原理
        • TOPN 查询优化
        • 统计信息
        • Pipeline 执行引擎
        • 查询优化器介绍
        • Runtime Filter
      • 查询调优概述
        • 调优概述
        • 诊断工具
        • 分析工具
        • 调优流程
      • 查询优化实践
        • 常见调优参数
        • 计划调优
          • 使用 Hint 控制代价改写
          • 使用异步物化视图透明改写
          • 使用 Leading Hint 控制 Join 顺序
          • 优化表 Schema 设计
          • 使用分区裁剪优化扫表
          • 优化索引设计和使用
          • 使用 Hint 调整 Join Shuffle 方式
          • DML 计划调优
          • 使用 Colocate Group 优化 Join
          • 使用同步物化视图透明改写
          • 使用 SQL Cache 加速查询
        • 执行调优
          • 数据倾斜处理
          • RuntimeFilter 的等待时间调整
          • 并行度调优
    • 数据查询
      • 连接(JOIN)
      • 子查询
      • 复杂类型查询
      • 列转行 (Lateral View)
      • MySQL 兼容性
      • 聚合多维分析
      • 分析函数(窗口函数)
      • 公用表表达式(CTE)
      • 自定义函数
        • 别名函数
        • Java UDF, UDAF, UDTF
    • 数据导出
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 最佳实践
      • 数据导出概述
      • Export
    • 数据导入
      • 高并发导入优化(Group Commit)
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 导入最佳实践
      • 数据源
        • Kafka
        • Snowflake
        • S3 兼容存储
        • Google Cloud Storage
        • 从其他 TP 系统迁移数据
        • Azure Storage
        • 腾讯云 COS
        • MinIO
        • HDFS
        • 阿里云 OSS
        • 华为云 OBS
        • 从其他 AP 系统迁移数据
        • Flink
        • Redshift
        • Amazon S3
        • 本地文件
        • BigQuery
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
    • BI工具接入
      • Sugar
      • Navicat
      • Tableau
      • DBeaver
      • 永洪BI
      • FineBI(帆软)
    • 数据库连接
      • 通过 MySQL 协议连接
      • 基于 Arrow Flight SQL 的高速数据传输链路
    • 湖仓一体
      • 分析 S3或HDFS 上的文件
      • 湖仓一体概述
      • SQL 方言兼容
      • 弹性计算节点
      • 云服务认证接入
      • 元数据缓存
      • 外表统计信息
      • 数据缓存
      • 数据库分析
        • MySQL
        • JDBC Catalog
        • Oracle
        • OceanBase
        • SAP HANA
        • 阿里云 MaxCompute
        • ClickHouse
        • PostgreSQL
        • IBM Db2
        • SQL Server
        • Elasticsearch
      • 湖仓一体最佳实践
        • 使用 PALO 和 Paimon
        • 使用 PALO 和 Iceberg
        • 使用 PALO 和 Hudi
        • 使用 PALO 和 LakeSoul
      • 数据湖构建
        • Iceberg
        • Hive
      • 数据湖分析
        • Hudi Catalog
        • 阿里云 DLF
        • Iceberg Catalog
        • Paimon Catalog
        • Hive Catalog
    • 数据表设计
      • 行业混存
      • 数据压缩
      • Schema 变更
      • 数据类型
      • 自增列
      • 概览
      • 数据库建表最佳实践
      • 冷热数据分层
        • SSD 和 HDD 层级存储
        • 远程存储
        • 冷热数据分层概述
      • 表索引
        • 倒排索引
        • 前缀索引与排序键
        • N-Gram 索引
        • BloomFilter 索引
        • 索引概述
      • 数据划分
        • 数据分桶
        • 数据分布概念
        • 动态分区
        • 自动分区
        • 手动分区
        • 常见文档
      • 数据模型
        • 使用注意
        • 模型概述
        • 主键模型
        • 明细模型
        • 聚合模型
  • 版本发布历史
    • 百度数据仓库 Palo 2.0 版本全新发布
  • SQL手册
    • 字面常量
    • 别名
    • SQL-手册
    • 数据类型
    • SQL语句
    • 注释
    • 内置函数
    • 白名单管理
    • SQL操作符
    • 内置函数
      • 聚合函数
      • 位操作函数
      • 字符串函数
      • 条件函数
      • 数学函数
      • JSON解析函数
      • 类型转换函数
      • 格式转换函数
      • 通用函数
      • 时间和日期函数
      • BITMAP函数
      • 窗口函数
      • 哈希函数
      • HLL函数
    • 语法帮助
      • DML
        • INSERT
        • ROUTINE-LOAD
        • RESTORE
        • SELECT-INTO-OUTFILE
        • ALTER-ROUTINE-LOAD
        • BROKER-LOAD
        • BACKUP
        • EXPORT
        • STREAM-LOAD
      • DDL
        • CREATE-FILE
        • DROP-RESOURCE
        • CREATE-RESOURCE
        • CREATE-MATERIALIZED-VIEW
        • DROP-RESROUCE
        • CREATE-TABLE
        • DROP-REPOSITORY
        • CREATE-REPOSITORY
        • CREATE-ODBC-TABLE
      • 信息查看语句
        • SHOW-BACKUP
        • SHOW-ALTER-TABLE-MATERIALIZED-VIEW
        • SHOW-SNAPSHOT
        • SHOW-ROUTINE-LOAD
        • SHOW-CREATE-ROUTINE-LOAD
        • SHOW-ROLES
        • SHOW-GRANTS
        • SHOW-EXPORT
        • SHOW-ROUTINE-LOAD-TASK
        • SHOW-REPOSITORIES
        • SHOW-LOAD
        • SHOW-RESOURCES
        • SHOW-RESTORE
        • SHOW-PROPERTY
        • SHOW-FILE
      • 辅助命令
        • PAUSE-ROUTINE-LOAD
        • STOP-ROUTINE-LOAD
        • ALTER-ROUTINE-LOAD
        • CANCEL-LOAD
        • RESUME-ROUTINE-LOAD
      • 账户管理
        • SET-PROPERTY
        • REVOKE
        • GRANT
        • CREATE-ROLE
        • DROP-ROLE
        • CREATE-USER
        • DROP-USER
        • SET-PASSWORD
  • 快速入门
    • 快速上手
    • 存算分离
    • 存算一体
  • 典型实践
    • 如何开启Debug日志
    • 导入分析
    • 查询分析
  • 操作手册
    • 权限和子用户
    • 存算一体
      • 连接集群
      • 查询分析
      • 监控告警
        • 监控指标
        • 告警配置
      • 备份恢复
        • 通过管理页面备份与恢复
        • 备份与恢复
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 集群创建
        • 停止与删除
        • 重置管理员密码
        • 集群扩缩容
        • 集群详情
    • 存算分离
      • 连接集群
      • 计算组管理
        • 重启计算组
        • 创建计算组
      • 监控告警
        • 监控指标
        • 告警配置
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 停止与删除
        • 创建集群
        • 重置管理员密码
        • 集群详情
  • 服务等级协议SLA
    • 服务等级协议(SLA)v1.0
  • 产品概述
    • 系统架构
    • 产品特点
    • 产品介绍
  • 视频专区
    • 操作指南
    • 产品简介
  • 产品定价
    • 预付费
    • 计费说明
    • 后付费
所有文档
menu
没有找到结果,请重新输入

数据仓库 PALO

  • 功能发布记录
  • 操作手册1
    • LDAP认证
    • 时区
    • 使用S3-SDK访问对象存储
    • 权限管理
    • 物化视图
    • 变量
    • 资源管理
    • 数据更新与删除
      • 标记删除
      • Sequence-Column
      • 数据更新
      • 数据删除
    • 备份与恢复
      • 备份与恢复
    • 数据导出1
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 数据导出概述
      • Export
    • 数据导出
      • 全量数据导出
      • 导出查询结果集
      • 导出总览
      • 导出数据到外部表
    • 查询加速1
      • 查询缓存
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
    • 数据导入
      • JSON格式数据导入说明
      • 导入本地数据
      • 导入BOS中的数据
      • 导入事务和原子性
      • 通过外部表同步数据
      • 使用JDBC同步数据
      • 列的映射、转换与过滤
      • 订阅Kafka日志
      • 严格模式
      • 导入总览
    • 数据更新与删除1
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 数据导入1
      • 高并发导入优化(Group Commit)
      • 导入概览
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 数据源
        • Kafka
        • S3 兼容存储
        • 从其他 TP 系统迁移数据
        • HDFS
        • 从其他 AP 系统迁移数据
        • Flink
        • 本地文件
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
  • 开发指南
    • 迁移ClickHouse数据
    • Doris集群间数据迁移
    • 数据更新与删除
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 查询加速
      • 查询缓存
      • Colocation Join
      • 高并发点查
      • Hint
        • Hint 概述
        • Leading Hint
        • Distribute Hint
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
      • 高效去重
        • BITMAP 精准去重
        • HLL 近似去重
      • 优化技术原理
        • TOPN 查询优化
        • 统计信息
        • Pipeline 执行引擎
        • 查询优化器介绍
        • Runtime Filter
      • 查询调优概述
        • 调优概述
        • 诊断工具
        • 分析工具
        • 调优流程
      • 查询优化实践
        • 常见调优参数
        • 计划调优
          • 使用 Hint 控制代价改写
          • 使用异步物化视图透明改写
          • 使用 Leading Hint 控制 Join 顺序
          • 优化表 Schema 设计
          • 使用分区裁剪优化扫表
          • 优化索引设计和使用
          • 使用 Hint 调整 Join Shuffle 方式
          • DML 计划调优
          • 使用 Colocate Group 优化 Join
          • 使用同步物化视图透明改写
          • 使用 SQL Cache 加速查询
        • 执行调优
          • 数据倾斜处理
          • RuntimeFilter 的等待时间调整
          • 并行度调优
    • 数据查询
      • 连接(JOIN)
      • 子查询
      • 复杂类型查询
      • 列转行 (Lateral View)
      • MySQL 兼容性
      • 聚合多维分析
      • 分析函数(窗口函数)
      • 公用表表达式(CTE)
      • 自定义函数
        • 别名函数
        • Java UDF, UDAF, UDTF
    • 数据导出
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 最佳实践
      • 数据导出概述
      • Export
    • 数据导入
      • 高并发导入优化(Group Commit)
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 导入最佳实践
      • 数据源
        • Kafka
        • Snowflake
        • S3 兼容存储
        • Google Cloud Storage
        • 从其他 TP 系统迁移数据
        • Azure Storage
        • 腾讯云 COS
        • MinIO
        • HDFS
        • 阿里云 OSS
        • 华为云 OBS
        • 从其他 AP 系统迁移数据
        • Flink
        • Redshift
        • Amazon S3
        • 本地文件
        • BigQuery
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
    • BI工具接入
      • Sugar
      • Navicat
      • Tableau
      • DBeaver
      • 永洪BI
      • FineBI(帆软)
    • 数据库连接
      • 通过 MySQL 协议连接
      • 基于 Arrow Flight SQL 的高速数据传输链路
    • 湖仓一体
      • 分析 S3或HDFS 上的文件
      • 湖仓一体概述
      • SQL 方言兼容
      • 弹性计算节点
      • 云服务认证接入
      • 元数据缓存
      • 外表统计信息
      • 数据缓存
      • 数据库分析
        • MySQL
        • JDBC Catalog
        • Oracle
        • OceanBase
        • SAP HANA
        • 阿里云 MaxCompute
        • ClickHouse
        • PostgreSQL
        • IBM Db2
        • SQL Server
        • Elasticsearch
      • 湖仓一体最佳实践
        • 使用 PALO 和 Paimon
        • 使用 PALO 和 Iceberg
        • 使用 PALO 和 Hudi
        • 使用 PALO 和 LakeSoul
      • 数据湖构建
        • Iceberg
        • Hive
      • 数据湖分析
        • Hudi Catalog
        • 阿里云 DLF
        • Iceberg Catalog
        • Paimon Catalog
        • Hive Catalog
    • 数据表设计
      • 行业混存
      • 数据压缩
      • Schema 变更
      • 数据类型
      • 自增列
      • 概览
      • 数据库建表最佳实践
      • 冷热数据分层
        • SSD 和 HDD 层级存储
        • 远程存储
        • 冷热数据分层概述
      • 表索引
        • 倒排索引
        • 前缀索引与排序键
        • N-Gram 索引
        • BloomFilter 索引
        • 索引概述
      • 数据划分
        • 数据分桶
        • 数据分布概念
        • 动态分区
        • 自动分区
        • 手动分区
        • 常见文档
      • 数据模型
        • 使用注意
        • 模型概述
        • 主键模型
        • 明细模型
        • 聚合模型
  • 版本发布历史
    • 百度数据仓库 Palo 2.0 版本全新发布
  • SQL手册
    • 字面常量
    • 别名
    • SQL-手册
    • 数据类型
    • SQL语句
    • 注释
    • 内置函数
    • 白名单管理
    • SQL操作符
    • 内置函数
      • 聚合函数
      • 位操作函数
      • 字符串函数
      • 条件函数
      • 数学函数
      • JSON解析函数
      • 类型转换函数
      • 格式转换函数
      • 通用函数
      • 时间和日期函数
      • BITMAP函数
      • 窗口函数
      • 哈希函数
      • HLL函数
    • 语法帮助
      • DML
        • INSERT
        • ROUTINE-LOAD
        • RESTORE
        • SELECT-INTO-OUTFILE
        • ALTER-ROUTINE-LOAD
        • BROKER-LOAD
        • BACKUP
        • EXPORT
        • STREAM-LOAD
      • DDL
        • CREATE-FILE
        • DROP-RESOURCE
        • CREATE-RESOURCE
        • CREATE-MATERIALIZED-VIEW
        • DROP-RESROUCE
        • CREATE-TABLE
        • DROP-REPOSITORY
        • CREATE-REPOSITORY
        • CREATE-ODBC-TABLE
      • 信息查看语句
        • SHOW-BACKUP
        • SHOW-ALTER-TABLE-MATERIALIZED-VIEW
        • SHOW-SNAPSHOT
        • SHOW-ROUTINE-LOAD
        • SHOW-CREATE-ROUTINE-LOAD
        • SHOW-ROLES
        • SHOW-GRANTS
        • SHOW-EXPORT
        • SHOW-ROUTINE-LOAD-TASK
        • SHOW-REPOSITORIES
        • SHOW-LOAD
        • SHOW-RESOURCES
        • SHOW-RESTORE
        • SHOW-PROPERTY
        • SHOW-FILE
      • 辅助命令
        • PAUSE-ROUTINE-LOAD
        • STOP-ROUTINE-LOAD
        • ALTER-ROUTINE-LOAD
        • CANCEL-LOAD
        • RESUME-ROUTINE-LOAD
      • 账户管理
        • SET-PROPERTY
        • REVOKE
        • GRANT
        • CREATE-ROLE
        • DROP-ROLE
        • CREATE-USER
        • DROP-USER
        • SET-PASSWORD
  • 快速入门
    • 快速上手
    • 存算分离
    • 存算一体
  • 典型实践
    • 如何开启Debug日志
    • 导入分析
    • 查询分析
  • 操作手册
    • 权限和子用户
    • 存算一体
      • 连接集群
      • 查询分析
      • 监控告警
        • 监控指标
        • 告警配置
      • 备份恢复
        • 通过管理页面备份与恢复
        • 备份与恢复
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 集群创建
        • 停止与删除
        • 重置管理员密码
        • 集群扩缩容
        • 集群详情
    • 存算分离
      • 连接集群
      • 计算组管理
        • 重启计算组
        • 创建计算组
      • 监控告警
        • 监控指标
        • 告警配置
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 停止与删除
        • 创建集群
        • 重置管理员密码
        • 集群详情
  • 服务等级协议SLA
    • 服务等级协议(SLA)v1.0
  • 产品概述
    • 系统架构
    • 产品特点
    • 产品介绍
  • 视频专区
    • 操作指南
    • 产品简介
  • 产品定价
    • 预付费
    • 计费说明
    • 后付费
  • 文档中心
  • arrow
  • 数据仓库PALO
  • arrow
  • 开发指南
  • arrow
  • 查询加速
  • arrow
  • 优化技术原理
  • arrow
  • Pipeline 执行引擎
本页目录
  • 物理计划
  • Pipeline 执行
  • Pipeline
  • PipelineTask
  • Operator
  • Scan 并行化
  • Local Shuffle

Pipeline 执行引擎

更新时间:2025-08-21

PALO 的并行执行模型是一种 Pipeline 执行模型,主要参考了Hyper论文中 Pipeline 的实现方式,Pipeline 执行模型能够充分释放多核 CPU 的计算能力,并对 PALO 的查询线程的数目进行限制,从而解决 PALO 的执行线程膨胀的问题。 PALO 3.0 之后,Pipeline 执行模型彻底替换了原有的火山模型,基于 Pipeline 执行模型,PALO 实现了 Query、DDL、DML 语句的并行处理。

物理计划

为了更好的理解 Pipeline 执行模型,首先需要介绍一下物理查询计划中两个重要的概念:PlanFragment 和 PlanNode。我们使用下面这条 SQL 作为例子:

Plain Text
1SELECT k1, SUM(v1) FROM A,B WHERE A.k2 = B.k2 GROUP BY k1 ORDER BY SUM(v1);

FE 首先会把它翻译成下面这种逻辑计划,计划中每个节点就是一个 PlanNode,每种 Node 的具体含义,可以参考查看物理计划的介绍。

由于 PALO 是一个 MPP 的架构,每个查询都会尽可能的让所有的 BE 都参与进来并行执行,来降低查询的延时。所以还需要将上述逻辑计划拆分为一个物理计划,拆分物理计划基本上就是在逻辑计划中插入了 DataSink 和 ExchangeNode,通过这两个 Node 完成了数据在多个 BE 之间的 Shuffle。拆分完成后,每个 PlanFragment 相当于包含了一部分 PlanNode,可以作为一个独立的任务发送给 BE,每个 BE 完成了 PlanFragment 内包含的 PlanNode 的计算后,通过 DataSink 和 ExchangeNode 这两个算子把数据 shuffle 到其他 BE 上来进行接下来的计算。

所以 PALO 的规划分为 3 层: PLAN:执行计划,一个 SQL 会被执行规划器翻译成一个执行计划,之后执行计划会提供给执行引擎执行。

FRAGMENT:由于 PALO 是一个分布式执行引擎。一个完整的执行计划会被切分为多个单机的执行片段。一个 FRAGMENT 代表一个完整的单机执行片段。多个 FRAGMENT 组合在一起,构成一个完整的 PLAN。

PLAN NODE:算子,是执行计划的最小单位。一个 FRAGMENT 由多个算子构成。每一个算子负责一个实际的执行逻辑,比如聚合,连接等

Pipeline 执行

PlanFragment 是 FE 发往 BE 执行任务的最小单位。BE 可能会收到同一个 Query 的多个不同的 PlanFragment,每个 PlanFragment 都会被单独的处理。在收到 PlanFragment 之后,BE 会把 PlanFragment 拆分为多个 Pipeline,进而启动多个 PipelineTask 来实现并行执行,提升查询效率。

Pipeline

一个 Pipeline 有一个 SourceOperator 和 一个 SinkOperator 以及中间的多个其他 Operator 组成。SourceOperator 代表从外部读取数据,可以是一个表(OlapTable),也可以是一个 Buffer(Exchange)。SinkOperator 表示数据的输出,输出可以是通过网络 shuffle 到别的节点,比如 DataStreamSinkOperator,也可以是输出到 HashTable,比如 Agg 算子,JoinBuildHashTable 等。

多个 Pipeline 之间实际是有依赖关系的,以 JoinNode 为例,实际被拆分到了 2 个 Pipeline 里。其中 Pipeline-0 是读取 Exchange 的数据,来构建 HashTable;Pipeline-1 是从表里读取数据,来进行 Probe。这 2 个 Pipeline 之间是有关联关系的,只有 Pipeline-0 运行完毕之后才能执行 Pipeline-1。这两者之间的依赖关系,称为 Dependency。当 Pipeline-0 运行完毕后,会调用 Dependency 的 set_ready 方法通知 Pipeline-1 可执行。

PipelineTask

Pipeline 实际还是一个逻辑概念,他并不是一个可执行的实体。在有了 Pipeline 之后,需要进一步的把 Pipeline 实例化为多个 PipelineTask。将需要读取的数据分配给不同的 PipelineTask 最终实现并行处理。同一个 Pipeline 的多个 PipelineTask 之间的 Operator 完全相同,他们的区别在于 Operator 的状态不一样,比如读取的数据不一样,构建出的 HashTable 不一样,这些不一样的状态,我们称之为 LocalState。 每个 PipelineTask 最终都会被提交到一个线程池中作为独立的任务执行。在 Dependency 这种触发机制下,可以更好的利用多核 CPU,实现充分的并行。

Operator

在大多数时候,Pipeline 中的每个 Operator 都对应了一个 PlanNode,但是有一些特殊的算子除外:

  • JoinNode,被拆分为 JoinBuildOperator 和 JoinProbeOperator
  • AggNode 被拆分为 AggSinkOperator 和 AggSourceOperator
  • SortNode 被拆分为 SortSinkOperator 和 SortSourceOperator 基本原理是,对于一些 breaking 算子(指需要把所有的数据都收集齐之后才能运算的算子),把灌入数据的部分拆分为 Sink,然后把从这个算子里获取数据的部分称为 Source。

Scan 并行化

扫描数据是一个非常重的 IO 操作,它需要从本地磁盘读取大量的数据(如果是数据湖的场景,就需要从 HDFS 或者 S3 中读取,延时更长),需要比较多的时间。所以我们在 ScanOperator 中引入了并行扫描的技术,ScanOperator 会动态的生成多个 Scanner,每个 Scanner 扫描 100 万 -200 万 行左右的数据,每个 Scanner 在做数据扫描时,完成相应的数据解压、过滤等计算任务,然后把数据发送给一个 DataQueue,供 ScanOperator 读取。

通过并行扫描的技术可以有效的避免由于分桶不合理或者数据倾斜导致某些 ScanOperator 执行时间特别久,把整个查询的延时都拖慢的问题。

Local Shuffle

在 Pipeline 执行模型中,Local Exchange 作为一个 Pipeline Breaker 出现,是在本地将数据重新分发至各个执行任务的技术。它把上游 Pipeline 输出的全部数据以某种方式(HASH / Round Robin)均匀分发到下游 Pipeline 的全部 Task 中。解决执行过程中的数据倾斜的问题,使执行模型不再受数据存储以及 plan 的限制。接下来我们举例来说明 Local Exchange 的工作逻辑。 我们用上述例子中的 Pipeline-1 为例子进一步阐述 Local Exchange 如何可以避免数据倾斜。

首先,通过在 Pipeline 1 中插入 Local Exchange,我们把 Pipeline 1 进一步拆分成 Pipeline 1-0 和 Pipeline 1-1。 此时,我们不妨假设当前并发等于 3(每个 Pipeline 有 3 个 task),每个 task 读取存储层的一个 bucket,而 3 个 bucket 中数据行数分别是 1,1,7。

HashJoin 和 Agg 算子需要处理的数据量从 (1, 1, 7) 变成了 (3, 3, 3),从而避免了数据倾斜。 在 PALO 中,Local Exchange 根据一系列规则来决定是否被规划,例如当查询耗时比较大的 Join、聚合、窗口函数等算子需要被执行时,我们就需要使用 Local Exchange 来尽可能避免数据倾斜。

上一篇
统计信息
下一篇
查询优化器介绍