连接(JOIN)

数据仓库 PALO

  • 功能发布记录
  • 操作手册1
    • LDAP认证
    • 时区
    • 使用S3-SDK访问对象存储
    • 权限管理
    • 物化视图
    • 变量
    • 资源管理
    • 数据更新与删除
      • 标记删除
      • Sequence-Column
      • 数据更新
      • 数据删除
    • 备份与恢复
      • 备份与恢复
    • 数据导出1
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 数据导出概述
      • Export
    • 数据导出
      • 全量数据导出
      • 导出查询结果集
      • 导出总览
      • 导出数据到外部表
    • 查询加速1
      • 查询缓存
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
    • 数据导入
      • JSON格式数据导入说明
      • 导入本地数据
      • 导入BOS中的数据
      • 导入事务和原子性
      • 通过外部表同步数据
      • 使用JDBC同步数据
      • 列的映射、转换与过滤
      • 订阅Kafka日志
      • 严格模式
      • 导入总览
    • 数据更新与删除1
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 数据导入1
      • 高并发导入优化(Group Commit)
      • 导入概览
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 数据源
        • Kafka
        • S3 兼容存储
        • 从其他 TP 系统迁移数据
        • HDFS
        • 从其他 AP 系统迁移数据
        • Flink
        • 本地文件
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
  • 开发指南
    • 迁移ClickHouse数据
    • Doris集群间数据迁移
    • 数据更新与删除
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 查询加速
      • 查询缓存
      • Colocation Join
      • 高并发点查
      • Hint
        • Hint 概述
        • Leading Hint
        • Distribute Hint
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
      • 高效去重
        • BITMAP 精准去重
        • HLL 近似去重
      • 优化技术原理
        • TOPN 查询优化
        • 统计信息
        • Pipeline 执行引擎
        • 查询优化器介绍
        • Runtime Filter
      • 查询调优概述
        • 调优概述
        • 诊断工具
        • 分析工具
        • 调优流程
      • 查询优化实践
        • 常见调优参数
        • 计划调优
          • 使用 Hint 控制代价改写
          • 使用异步物化视图透明改写
          • 使用 Leading Hint 控制 Join 顺序
          • 优化表 Schema 设计
          • 使用分区裁剪优化扫表
          • 优化索引设计和使用
          • 使用 Hint 调整 Join Shuffle 方式
          • DML 计划调优
          • 使用 Colocate Group 优化 Join
          • 使用同步物化视图透明改写
          • 使用 SQL Cache 加速查询
        • 执行调优
          • 数据倾斜处理
          • RuntimeFilter 的等待时间调整
          • 并行度调优
    • 数据查询
      • 连接(JOIN)
      • 子查询
      • 复杂类型查询
      • 列转行 (Lateral View)
      • MySQL 兼容性
      • 聚合多维分析
      • 分析函数(窗口函数)
      • 公用表表达式(CTE)
      • 自定义函数
        • 别名函数
        • Java UDF, UDAF, UDTF
    • 数据导出
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 最佳实践
      • 数据导出概述
      • Export
    • 数据导入
      • 高并发导入优化(Group Commit)
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 导入最佳实践
      • 数据源
        • Kafka
        • Snowflake
        • S3 兼容存储
        • Google Cloud Storage
        • 从其他 TP 系统迁移数据
        • Azure Storage
        • 腾讯云 COS
        • MinIO
        • HDFS
        • 阿里云 OSS
        • 华为云 OBS
        • 从其他 AP 系统迁移数据
        • Flink
        • Redshift
        • Amazon S3
        • 本地文件
        • BigQuery
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
    • BI工具接入
      • Sugar
      • Navicat
      • Tableau
      • DBeaver
      • 永洪BI
      • FineBI(帆软)
    • 数据库连接
      • 通过 MySQL 协议连接
      • 基于 Arrow Flight SQL 的高速数据传输链路
    • 湖仓一体
      • 分析 S3或HDFS 上的文件
      • 湖仓一体概述
      • SQL 方言兼容
      • 弹性计算节点
      • 云服务认证接入
      • 元数据缓存
      • 外表统计信息
      • 数据缓存
      • 数据库分析
        • MySQL
        • JDBC Catalog
        • Oracle
        • OceanBase
        • SAP HANA
        • 阿里云 MaxCompute
        • ClickHouse
        • PostgreSQL
        • IBM Db2
        • SQL Server
        • Elasticsearch
      • 湖仓一体最佳实践
        • 使用 PALO 和 Paimon
        • 使用 PALO 和 Iceberg
        • 使用 PALO 和 Hudi
        • 使用 PALO 和 LakeSoul
      • 数据湖构建
        • Iceberg
        • Hive
      • 数据湖分析
        • Hudi Catalog
        • 阿里云 DLF
        • Iceberg Catalog
        • Paimon Catalog
        • Hive Catalog
    • 数据表设计
      • 行业混存
      • 数据压缩
      • Schema 变更
      • 数据类型
      • 自增列
      • 概览
      • 数据库建表最佳实践
      • 冷热数据分层
        • SSD 和 HDD 层级存储
        • 远程存储
        • 冷热数据分层概述
      • 表索引
        • 倒排索引
        • 前缀索引与排序键
        • N-Gram 索引
        • BloomFilter 索引
        • 索引概述
      • 数据划分
        • 数据分桶
        • 数据分布概念
        • 动态分区
        • 自动分区
        • 手动分区
        • 常见文档
      • 数据模型
        • 使用注意
        • 模型概述
        • 主键模型
        • 明细模型
        • 聚合模型
  • 版本发布历史
    • 百度数据仓库 Palo 2.0 版本全新发布
  • SQL手册
    • 字面常量
    • 别名
    • SQL-手册
    • 数据类型
    • SQL语句
    • 注释
    • 内置函数
    • 白名单管理
    • SQL操作符
    • 内置函数
      • 聚合函数
      • 位操作函数
      • 字符串函数
      • 条件函数
      • 数学函数
      • JSON解析函数
      • 类型转换函数
      • 格式转换函数
      • 通用函数
      • 时间和日期函数
      • BITMAP函数
      • 窗口函数
      • 哈希函数
      • HLL函数
    • 语法帮助
      • DML
        • INSERT
        • ROUTINE-LOAD
        • RESTORE
        • SELECT-INTO-OUTFILE
        • ALTER-ROUTINE-LOAD
        • BROKER-LOAD
        • BACKUP
        • EXPORT
        • STREAM-LOAD
      • DDL
        • CREATE-FILE
        • DROP-RESOURCE
        • CREATE-RESOURCE
        • CREATE-MATERIALIZED-VIEW
        • DROP-RESROUCE
        • CREATE-TABLE
        • DROP-REPOSITORY
        • CREATE-REPOSITORY
        • CREATE-ODBC-TABLE
      • 信息查看语句
        • SHOW-BACKUP
        • SHOW-ALTER-TABLE-MATERIALIZED-VIEW
        • SHOW-SNAPSHOT
        • SHOW-ROUTINE-LOAD
        • SHOW-CREATE-ROUTINE-LOAD
        • SHOW-ROLES
        • SHOW-GRANTS
        • SHOW-EXPORT
        • SHOW-ROUTINE-LOAD-TASK
        • SHOW-REPOSITORIES
        • SHOW-LOAD
        • SHOW-RESOURCES
        • SHOW-RESTORE
        • SHOW-PROPERTY
        • SHOW-FILE
      • 辅助命令
        • PAUSE-ROUTINE-LOAD
        • STOP-ROUTINE-LOAD
        • ALTER-ROUTINE-LOAD
        • CANCEL-LOAD
        • RESUME-ROUTINE-LOAD
      • 账户管理
        • SET-PROPERTY
        • REVOKE
        • GRANT
        • CREATE-ROLE
        • DROP-ROLE
        • CREATE-USER
        • DROP-USER
        • SET-PASSWORD
  • 快速入门
    • 快速上手
    • 存算分离
    • 存算一体
  • 典型实践
    • 如何开启Debug日志
    • 导入分析
    • 查询分析
  • 操作手册
    • 权限和子用户
    • 存算一体
      • 连接集群
      • 查询分析
      • 监控告警
        • 监控指标
        • 告警配置
      • 备份恢复
        • 通过管理页面备份与恢复
        • 备份与恢复
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 集群创建
        • 停止与删除
        • 重置管理员密码
        • 集群扩缩容
        • 集群详情
    • 存算分离
      • 连接集群
      • 计算组管理
        • 重启计算组
        • 创建计算组
      • 监控告警
        • 监控指标
        • 告警配置
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 停止与删除
        • 创建集群
        • 重置管理员密码
        • 集群详情
  • 服务等级协议SLA
    • 服务等级协议(SLA)v1.0
  • 产品概述
    • 系统架构
    • 产品特点
    • 产品介绍
  • 视频专区
    • 操作指南
    • 产品简介
  • 产品定价
    • 预付费
    • 计费说明
    • 后付费
所有文档
menu
没有找到结果,请重新输入

数据仓库 PALO

  • 功能发布记录
  • 操作手册1
    • LDAP认证
    • 时区
    • 使用S3-SDK访问对象存储
    • 权限管理
    • 物化视图
    • 变量
    • 资源管理
    • 数据更新与删除
      • 标记删除
      • Sequence-Column
      • 数据更新
      • 数据删除
    • 备份与恢复
      • 备份与恢复
    • 数据导出1
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 数据导出概述
      • Export
    • 数据导出
      • 全量数据导出
      • 导出查询结果集
      • 导出总览
      • 导出数据到外部表
    • 查询加速1
      • 查询缓存
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
    • 数据导入
      • JSON格式数据导入说明
      • 导入本地数据
      • 导入BOS中的数据
      • 导入事务和原子性
      • 通过外部表同步数据
      • 使用JDBC同步数据
      • 列的映射、转换与过滤
      • 订阅Kafka日志
      • 严格模式
      • 导入总览
    • 数据更新与删除1
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 数据导入1
      • 高并发导入优化(Group Commit)
      • 导入概览
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 数据源
        • Kafka
        • S3 兼容存储
        • 从其他 TP 系统迁移数据
        • HDFS
        • 从其他 AP 系统迁移数据
        • Flink
        • 本地文件
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
  • 开发指南
    • 迁移ClickHouse数据
    • Doris集群间数据迁移
    • 数据更新与删除
      • 事务
      • 数据更新
        • 主键模型的导入更新
        • 主键模型的 Update 更新
        • 数据更新概述
        • 主键模型的更新并发控制
        • 聚合模型的导入更新
      • 数据删除
        • 删除操作概述
        • Truncate 操作
        • 表原子替换
        • Delete 操作
        • 基于导入的批量删除
        • 临时分区
    • 查询加速
      • 查询缓存
      • Colocation Join
      • 高并发点查
      • Hint
        • Hint 概述
        • Leading Hint
        • Distribute Hint
      • 物化视图
        • 同步物化视图
        • 物化视图概览
        • 异步物化视图
          • 异步物化视图常见问题
          • 最佳实践
          • 异步物化视图概述
          • 创建、查询与维护异步物化视图
      • 高效去重
        • BITMAP 精准去重
        • HLL 近似去重
      • 优化技术原理
        • TOPN 查询优化
        • 统计信息
        • Pipeline 执行引擎
        • 查询优化器介绍
        • Runtime Filter
      • 查询调优概述
        • 调优概述
        • 诊断工具
        • 分析工具
        • 调优流程
      • 查询优化实践
        • 常见调优参数
        • 计划调优
          • 使用 Hint 控制代价改写
          • 使用异步物化视图透明改写
          • 使用 Leading Hint 控制 Join 顺序
          • 优化表 Schema 设计
          • 使用分区裁剪优化扫表
          • 优化索引设计和使用
          • 使用 Hint 调整 Join Shuffle 方式
          • DML 计划调优
          • 使用 Colocate Group 优化 Join
          • 使用同步物化视图透明改写
          • 使用 SQL Cache 加速查询
        • 执行调优
          • 数据倾斜处理
          • RuntimeFilter 的等待时间调整
          • 并行度调优
    • 数据查询
      • 连接(JOIN)
      • 子查询
      • 复杂类型查询
      • 列转行 (Lateral View)
      • MySQL 兼容性
      • 聚合多维分析
      • 分析函数(窗口函数)
      • 公用表表达式(CTE)
      • 自定义函数
        • 别名函数
        • Java UDF, UDAF, UDTF
    • 数据导出
      • SELECT INTO OUTFILE
      • MySQL Dump
      • 最佳实践
      • 数据导出概述
      • Export
    • 数据导入
      • 高并发导入优化(Group Commit)
      • 异常数据处理
      • 导入高可用性
      • 导入时实现数据转换
      • 导入最佳实践
      • 数据源
        • Kafka
        • Snowflake
        • S3 兼容存储
        • Google Cloud Storage
        • 从其他 TP 系统迁移数据
        • Azure Storage
        • 腾讯云 COS
        • MinIO
        • HDFS
        • 阿里云 OSS
        • 华为云 OBS
        • 从其他 AP 系统迁移数据
        • Flink
        • Redshift
        • Amazon S3
        • 本地文件
        • BigQuery
      • 导入方式
        • Broker Load
        • MySQL Load
        • Insert Into Values
        • Stream Load
        • Insert Into Select
        • Routine Load
      • 文件格式
        • CSV
        • JSON
        • Parquet
        • ORC
      • 复杂数据类型
        • MAP
        • Variant
        • JSON
        • STRUCT
        • Bitmap
        • HLL
        • ARRAY
    • BI工具接入
      • Sugar
      • Navicat
      • Tableau
      • DBeaver
      • 永洪BI
      • FineBI(帆软)
    • 数据库连接
      • 通过 MySQL 协议连接
      • 基于 Arrow Flight SQL 的高速数据传输链路
    • 湖仓一体
      • 分析 S3或HDFS 上的文件
      • 湖仓一体概述
      • SQL 方言兼容
      • 弹性计算节点
      • 云服务认证接入
      • 元数据缓存
      • 外表统计信息
      • 数据缓存
      • 数据库分析
        • MySQL
        • JDBC Catalog
        • Oracle
        • OceanBase
        • SAP HANA
        • 阿里云 MaxCompute
        • ClickHouse
        • PostgreSQL
        • IBM Db2
        • SQL Server
        • Elasticsearch
      • 湖仓一体最佳实践
        • 使用 PALO 和 Paimon
        • 使用 PALO 和 Iceberg
        • 使用 PALO 和 Hudi
        • 使用 PALO 和 LakeSoul
      • 数据湖构建
        • Iceberg
        • Hive
      • 数据湖分析
        • Hudi Catalog
        • 阿里云 DLF
        • Iceberg Catalog
        • Paimon Catalog
        • Hive Catalog
    • 数据表设计
      • 行业混存
      • 数据压缩
      • Schema 变更
      • 数据类型
      • 自增列
      • 概览
      • 数据库建表最佳实践
      • 冷热数据分层
        • SSD 和 HDD 层级存储
        • 远程存储
        • 冷热数据分层概述
      • 表索引
        • 倒排索引
        • 前缀索引与排序键
        • N-Gram 索引
        • BloomFilter 索引
        • 索引概述
      • 数据划分
        • 数据分桶
        • 数据分布概念
        • 动态分区
        • 自动分区
        • 手动分区
        • 常见文档
      • 数据模型
        • 使用注意
        • 模型概述
        • 主键模型
        • 明细模型
        • 聚合模型
  • 版本发布历史
    • 百度数据仓库 Palo 2.0 版本全新发布
  • SQL手册
    • 字面常量
    • 别名
    • SQL-手册
    • 数据类型
    • SQL语句
    • 注释
    • 内置函数
    • 白名单管理
    • SQL操作符
    • 内置函数
      • 聚合函数
      • 位操作函数
      • 字符串函数
      • 条件函数
      • 数学函数
      • JSON解析函数
      • 类型转换函数
      • 格式转换函数
      • 通用函数
      • 时间和日期函数
      • BITMAP函数
      • 窗口函数
      • 哈希函数
      • HLL函数
    • 语法帮助
      • DML
        • INSERT
        • ROUTINE-LOAD
        • RESTORE
        • SELECT-INTO-OUTFILE
        • ALTER-ROUTINE-LOAD
        • BROKER-LOAD
        • BACKUP
        • EXPORT
        • STREAM-LOAD
      • DDL
        • CREATE-FILE
        • DROP-RESOURCE
        • CREATE-RESOURCE
        • CREATE-MATERIALIZED-VIEW
        • DROP-RESROUCE
        • CREATE-TABLE
        • DROP-REPOSITORY
        • CREATE-REPOSITORY
        • CREATE-ODBC-TABLE
      • 信息查看语句
        • SHOW-BACKUP
        • SHOW-ALTER-TABLE-MATERIALIZED-VIEW
        • SHOW-SNAPSHOT
        • SHOW-ROUTINE-LOAD
        • SHOW-CREATE-ROUTINE-LOAD
        • SHOW-ROLES
        • SHOW-GRANTS
        • SHOW-EXPORT
        • SHOW-ROUTINE-LOAD-TASK
        • SHOW-REPOSITORIES
        • SHOW-LOAD
        • SHOW-RESOURCES
        • SHOW-RESTORE
        • SHOW-PROPERTY
        • SHOW-FILE
      • 辅助命令
        • PAUSE-ROUTINE-LOAD
        • STOP-ROUTINE-LOAD
        • ALTER-ROUTINE-LOAD
        • CANCEL-LOAD
        • RESUME-ROUTINE-LOAD
      • 账户管理
        • SET-PROPERTY
        • REVOKE
        • GRANT
        • CREATE-ROLE
        • DROP-ROLE
        • CREATE-USER
        • DROP-USER
        • SET-PASSWORD
  • 快速入门
    • 快速上手
    • 存算分离
    • 存算一体
  • 典型实践
    • 如何开启Debug日志
    • 导入分析
    • 查询分析
  • 操作手册
    • 权限和子用户
    • 存算一体
      • 连接集群
      • 查询分析
      • 监控告警
        • 监控指标
        • 告警配置
      • 备份恢复
        • 通过管理页面备份与恢复
        • 备份与恢复
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 集群创建
        • 停止与删除
        • 重置管理员密码
        • 集群扩缩容
        • 集群详情
    • 存算分离
      • 连接集群
      • 计算组管理
        • 重启计算组
        • 创建计算组
      • 监控告警
        • 监控指标
        • 告警配置
      • 权限管理
        • 集群权限
        • 控制台权限
      • 集群管理
        • 停止与删除
        • 创建集群
        • 重置管理员密码
        • 集群详情
  • 服务等级协议SLA
    • 服务等级协议(SLA)v1.0
  • 产品概述
    • 系统架构
    • 产品特点
    • 产品介绍
  • 视频专区
    • 操作指南
    • 产品简介
  • 产品定价
    • 预付费
    • 计费说明
    • 后付费
  • 文档中心
  • arrow
  • 数据仓库PALO
  • arrow
  • 开发指南
  • arrow
  • 数据查询
  • arrow
  • 连接(JOIN)
本页目录
  • 什么是 JOIN
  • PALO 支持的 JOIN 类型
  • PALO 中的 JOIN 物理实现
  • PALO Hash Join 的实现方式
  • Broadcast Join
  • Partition Shuffle Join
  • Bucket Shuffle Join
  • Colocate Join
  • 对比 Bucket Shuffle Join 与 Colocate Join
  • Bucket Shuffle Join 示例
  • Colocate Join 示例
  • 四种 Shuffle 方式对比
  • 常见问题
  • Bucket Shuffle Join 的限制
  • Colocate Join 的限制

连接(JOIN)

更新时间:2025-08-21

什么是 JOIN

在关系型数据库中,数据被分布在多个表中,这些表之间通过特定关系相互关联。SQL JOIN 操作允许我们根据这些关联条件将不同的表合并成一个更完整的结果集。

PALO 支持的 JOIN 类型

  • INNER JOIN(内连接):对左表每一行和右表所有行进行 JOIN 条件比较,返回两个表中满足 JOIN 条件的匹配行。
  • LEFT JOIN(左连接):在 INNER JOIN 的结果集基础上。如果左表的行在右表中没有匹配,则返回左表的所有行,同时右表对应的列显示为 NULL。
  • RIGHT JOIN(右连接):与 LEFT JOIN 相反,如果右表的行在左表中没有匹配,则返回右表的所有行,同时左表对应的列显示为 NULL。
  • FULL JOIN(全连接):在 INNER JOIN 的结果集基础上。返回两个表中所有的行,如果某行在另一侧表中没有匹配,则另一侧表的相应列显示为 NULL。
  • CROSS JOIN(交叉连接):没有 JOIN 条件,返回两个表的笛卡尔积,即左表的每一行与右表的每一行都进行组合。
  • LEFT SEMI JOIN(左半连接):对左表每一行和右表所有行进行 JOIN 条件比较,如果存在匹配,就返回左表的对应行。
  • RIGHT SEMI JOIN(右半连接):与 LEFT SEMI JOIN 相反,对右表每一行和左表所有行进行 JOIN 条件比较,如果存在匹配,就返回右表的对应行。
  • LEFT ANTI JOIN(左反半连接):对左表每一行和右表所有行进行 JOIN 条件比较,如果没有匹配,则返回左表的对应行。
  • RIGHT ANTI JOIN(右反半连接):与 LEFT ANTI JOIN 相反,对右表每一行和左表所有行进行 JOIN 条件比较,如果没有匹配,则返回这些行。
  • NULL AWARE LEFT ANTI JOIN (对 NULL 值特殊处理的左反半连接):与 LEFT ANTI JOIN 类似,但忽略左表中匹配列为 NULL 的行。

PALO 中的 JOIN 物理实现

PALO 支持两种 JOIN 的物理实现方式:Hash Join 和 Nest Loop Join。

  • Hash Join: 在右表上根据等值 JOIN 列构建一个哈希表,左表的数据以流式方式通过该哈希表进行 JOIN 计算。这种方法的局限性在于它仅适用于等值 JOIN 条件的情况。
  • Nest Loop Join: 通过两层循环,以左表驱动,对左表的每一行逐一遍历右表的每一行,进行 join 条件判断。适用于所有 JOIN 场景,包括处理 Hash Join 无法胜任的情况,比如涉及大于或小于比较条件的查询,或是需要执行笛卡尔积运算的场景。但相比 Hash Join,Nest Loop Join 在性能上可能会有所不及。

PALO Hash Join 的实现方式

作为分布式 MPP 数据库,PALO 在 Hash Join 过程中需要进行数据的 Shuffle,进行拆分调度,以确保 JOIN 结果的正确性。以下是几种数据 Shuffle 方式:

Broadcast Join

如图所示,Broadcast Join 的过程涉及将右表的所有数据发送到所有参与 Join 计算的节点,包括左表数据的扫描节点,而左表数据则保持不动。这一过程中,每个节点都会接收到右表的完整数据副本(总量为 T(R) 的数据),以确保所有节点都具备执行 Join 操作所需的数据。

该方法适用于多种通用场景,但不适用于 RIGHT OUTER, RIGHT ANTI, 和 RIGHT SEMI 类型的 Hash Join。其网络开销为 Join 的节点数 N 乘以右表数据量 T(R)。

Partition Shuffle Join

此方式通过 JOIN 条件计算 Hash 值并进行分桶。具体来说,左右表的数据会根据 JOIN 条件计算得到的 Hash 值进行分区,然后这些分区数据被发送到相应的分区节点上。

该方法的网络开销主要包括两个部分:传输左表数据 T(S) 所需的开销和传输右表数据 T(R) 所需的开销。该方法的仅支持 Hash Join 操作,因为它依赖于 JOIN 条件来执行数据的分桶操作。

Bucket Shuffle Join

当 JOIN 条件包含左表的分桶列时,保持左表数据不动,将右表数据分发到左表节点进行 JOIN,减少网络开销。

当参与 Join 操作的某一侧表的数据已经按照 Join 条件列进行了 Hash 分布时,我们可以选择保持这一侧的数据位置不变,而将另一侧的数据依据相同的 Join 条件列,相同的 Hash 分布计算进行数据分发。(这里提到的“表”不仅限于物理存储的表,还可以是 SQL 查询中任意算子的输出结果,并且可以灵活选择保持左表或右表的数据位置不变,而只移动并分发另一侧的表。)

以 PALO 的物理表为例,由于其表数据本身就是通过 Hash 计算进行分桶存储,因此可以直接利用这一特性来优化 Join 操作的数据 Shuffle 过程。假设我们有两张表需要进行 Join,且 Join 列是左表的分桶列,那么在这种情况下,我们无需移动左表的数据,只需根据左表的分桶信息将右表的数据分发到相应的位置,即可完成 Join 计算(如图所示)。

此过程的网络开销主要来自于右表数据的移动,即 T(R)。

Colocate Join

与 Bucket Shuffle Join 相似,如果参与 Join 的两侧的表,刚好是按照 Join 条件列进行计算的 Hash 分布,那么可以跳过 Shuffle 过程,直接在本地进行 Join 计算。以下通过物理表进行简单说明:

当 PALO 在建表时指定为 DISTRIBUTED BY HASH,那么在数据导入时,系统会根据 Hash 分布键进行数据分发。如果两张表的 Hash 分布键恰好与 Join 条件列一致,那么可以认为这两张表的数据已经按照 Join 的需求进行了预分布,即无需额外的 Shuffle 操作。因此,在实际查询时,可以直接在这两张表上执行 Join 计算。

注意: 对于直接 Scan 数据后执行 Join 的场景,建表时需要满足一定的条件。

对比 Bucket Shuffle Join 与 Colocate Join

上文我们提到过,对于 Bucket Shuffle Join 和 Colocate Join 只要参与 Join 操作的两侧的表分布满足特定条件,就可以执行相应的 join 操作(这里的表指的是更广义的表,即 SQL 查询中任意算子的输出都可以视为一张“表”)。

接下来,我们将分别通过 t1 和 t2 两张表以及相关的 SQL 示例,来更详细地介绍广义上的 Bucket Shuffle Join 和 Colocate Join。首先,给出这两张表的建表语句如下:

SQL
1create table t1
2(
3    c1 bigint, 
4    c2 bigint
5)
6DISTRIBUTED BY HASH(c1) BUCKETS 3
7PROPERTIES ("replication_num" = "1");
8
9create table t2
10(
11    c1 bigint, 
12    c2 bigint
13)
14DISTRIBUTED BY HASH(c1) BUCKETS 3
15PROPERTIES ("replication_num" = "1");

Bucket Shuffle Join 示例

在下面的例子中,t1 和 t2 表都经过了 GROUP BY 算子处理,并输出了新的表(此时 tx 表按照 c1 进行 hash 分布,而 ty 表则按照 c2 进行 Hash 分布)。随后的 JOIN 条件是 tx.c1 = ty.c2,这恰好满足了 Bucket Shuffle Join 的条件。

SQL
1explain select *
2from 
3    (
4        -- t1 表按照 c1 做了 hash 分布,经过 group by 算子后,仍然保持按照 c1 做的 hash 分布
5        select c1 as c1, sum(c2) as c2
6        from t1
7        group by c1 
8    ) tx
9join 
10    (
11        -- t2 表按照 c1 做了 hash 分布,经过 group by 算子后,数据分布变成按照 c2 进行的 hash 分布
12        select c2 as c2, sum(c1) as c1
13        from t2
14        group by c2 
15    ) ty
16on tx.c1 = ty.c2;

从下面的 Explain 执行计划中,我们可以观察到,7 号 Hash Join 节点的左侧子节点是 6 号聚合节点,而右侧子节点是 4 号 Exchange 节点。这表示左侧子节点聚合后的数据位置保持不变,而右侧子节点的数据则会根据 Bucket Shuffle 的方式被分发到左侧子节点所在的节点上,以便进行后续的 Hash Join 操作。

SQL
1+------------------------------------------------------------+
2| Explain String(Nereids Planner)                            |
3+------------------------------------------------------------+
4| PLAN FRAGMENT 0                                            |
5|   OUTPUT EXPRS:                                            |
6|     c1[#18]                                                |
7|     c2[#19]                                                |
8|     c2[#20]                                                |
9|     c1[#21]                                                |
10|   PARTITION: HASH_PARTITIONED: c1[#8]                      |
11|                                                            |
12|   HAS_COLO_PLAN_NODE: true                                 |
13|                                                            |
14|   VRESULT SINK                                             |
15|      MYSQL_PROTOCAL                                        |
16|                                                            |
17|   7:VHASH JOIN(364)                                        |
18|   |  join op: INNER JOIN(BUCKET_SHUFFLE)[]                 |
19|   |  equal join conjunct: (c1[#12] = c2[#6])               |
20|   |  cardinality=10                                        |
21|   |  vec output tuple id: 8                                |
22|   |  output tuple id: 8                                    |
23|   |  vIntermediate tuple ids: 7                            |
24|   |  hash output slot ids: 6 7 12 13                       |
25|   |  final projections: c1[#14], c2[#15], c2[#16], c1[#17] |
26|   |  final project output tuple id: 8                      |
27|   |  distribute expr lists: c1[#12]                        |
28|   |  distribute expr lists: c2[#6]                         |
29|   |                                                        |
30|   |----4:VEXCHANGE                                         |
31|   |       offset: 0                                        |
32|   |       distribute expr lists: c2[#6]                    |
33|   |                                                        |
34|   6:VAGGREGATE (update finalize)(342)                      |
35|   |  output: sum(c2[#9])[#11]                              |
36|   |  group by: c1[#8]                                      |
37|   |  sortByGroupKey:false                                  |
38|   |  cardinality=10                                        |
39|   |  final projections: c1[#10], c2[#11]                   |
40|   |  final project output tuple id: 6                      |
41|   |  distribute expr lists: c1[#8]                         |
42|   |                                                        |
43|   5:VOlapScanNode(339)                                     |
44|      TABLE: tt.t1(t1), PREAGGREGATION: ON                  |
45|      partitions=1/1 (t1)                                   |
46|      tablets=1/1, tabletList=491188                        |
47|      cardinality=21, avgRowSize=0.0, numNodes=1            |
48|      pushAggOp=NONE                                        |
49|                                                            |
50| PLAN FRAGMENT 1                                            |
51|                                                            |
52|   PARTITION: HASH_PARTITIONED: c2[#2]                      |
53|                                                            |
54|   HAS_COLO_PLAN_NODE: true                                 |
55|                                                            |
56|   STREAM DATA SINK                                         |
57|     EXCHANGE ID: 04                                        |
58|     BUCKET_SHFFULE_HASH_PARTITIONED: c2[#6]                |
59|                                                            |
60|   3:VAGGREGATE (merge finalize)(355)                       |
61|   |  output: sum(partial_sum(c1)[#3])[#5]                  |
62|   |  group by: c2[#2]                                      |
63|   |  sortByGroupKey:false                                  |
64|   |  cardinality=5                                         |
65|   |  final projections: c2[#4], c1[#5]                     |
66|   |  final project output tuple id: 3                      |
67|   |  distribute expr lists: c2[#2]                         |
68|   |                                                        |
69|   2:VEXCHANGE                                              |
70|      offset: 0                                             |
71|      distribute expr lists:                                |
72|                                                            |
73| PLAN FRAGMENT 2                                            |
74|                                                            |
75|   PARTITION: HASH_PARTITIONED: c1[#0]                      |
76|                                                            |
77|   HAS_COLO_PLAN_NODE: false                                |
78|                                                            |
79|   STREAM DATA SINK                                         |
80|     EXCHANGE ID: 02                                        |
81|     HASH_PARTITIONED: c2[#2]                               |
82|                                                            |
83|   1:VAGGREGATE (update serialize)(349)                     |
84|   |  STREAMING                                             |
85|   |  output: partial_sum(c1[#0])[#3]                       |
86|   |  group by: c2[#1]                                      |
87|   |  sortByGroupKey:false                                  |
88|   |  cardinality=5                                         |
89|   |  distribute expr lists: c1[#0]                         |
90|   |                                                        |
91|   0:VOlapScanNode(346)                                     |
92|      TABLE: tt.t2(t2), PREAGGREGATION: ON                  |
93|      partitions=1/1 (t2)                                   |
94|      tablets=1/1, tabletList=491198                        |
95|      cardinality=10, avgRowSize=0.0, numNodes=1            |
96|      pushAggOp=NONE                                        |
97|                                                            |
98|                                                            |
99| Statistics                                                 |
100|  planed with unknown column statistics                     |
101+------------------------------------------------------------+
10297 rows in set (0.01 sec)

Colocate Join 示例

在下面的例子中,t1 和 t2 表都通过 GROUP BY 算子进行了处理,并输出了新的表(此时 tx 和 ty 均按照 c2 进行了 Hash 分布)。随后的 JOIN 条件是 tx.c2 = ty.c2,这恰好满足了 Colocate Join 的条件。

SQL
1explain select *
2from 
3    (
4        -- t1 表按照 c1 做了 hash 分布,经过 group by 算子后,数据分布变成按照 c2 进行的 hash 分布
5        select c2 as c2, sum(c1) as c1
6        from t1
7        group by c2 
8    ) tx
9join 
10    (
11        -- t2 表按照 c1 做了 hash 分布,经过 group by 算子后,数据分布变成按照 c2 进行的 hash 分布
12        select c2 as c2, sum(c1) as c1
13        from t2
14        group by c2 
15    ) ty
16on tx.c2 = ty.c2;

从下面的 Explain 执行计划结果中可以看出,8 号 Hash Join 节点的左侧子节点是 7 号聚合节点,右侧子节点是 3 号聚合节点,并且没有出现 Exchange 节点。这表明左侧和右侧子节点聚合后的数据都保持在其原始位置不动,无需进行数据移动,可以直接在本地进行后续的 Hash Join 操作。

SQL
1+------------------------------------------------------------+
2| Explain String(Nereids Planner)                            |
3+------------------------------------------------------------+
4| PLAN FRAGMENT 0                                            |
5|   OUTPUT EXPRS:                                            |
6|     c2[#20]                                                |
7|     c1[#21]                                                |
8|     c2[#22]                                                |
9|     c1[#23]                                                |
10|   PARTITION: HASH_PARTITIONED: c2[#10]                     |
11|                                                            |
12|   HAS_COLO_PLAN_NODE: true                                 |
13|                                                            |
14|   VRESULT SINK                                             |
15|      MYSQL_PROTOCAL                                        |
16|                                                            |
17|   8:VHASH JOIN(373)                                        |
18|   |  join op: INNER JOIN(PARTITIONED)[]                    |
19|   |  equal join conjunct: (c2[#14] = c2[#6])               |
20|   |  cardinality=10                                        |
21|   |  vec output tuple id: 9                                |
22|   |  output tuple id: 9                                    |
23|   |  vIntermediate tuple ids: 8                            |
24|   |  hash output slot ids: 6 7 14 15                       |
25|   |  final projections: c2[#16], c1[#17], c2[#18], c1[#19] |
26|   |  final project output tuple id: 9                      |
27|   |  distribute expr lists: c2[#14]                        |
28|   |  distribute expr lists: c2[#6]                         |
29|   |                                                        |
30|   |----3:VAGGREGATE (merge finalize)(367)                  |
31|   |    |  output: sum(partial_sum(c1)[#3])[#5]             |
32|   |    |  group by: c2[#2]                                 |
33|   |    |  sortByGroupKey:false                             |
34|   |    |  cardinality=5                                    |
35|   |    |  final projections: c2[#4], c1[#5]                |
36|   |    |  final project output tuple id: 3                 |
37|   |    |  distribute expr lists: c2[#2]                    |
38|   |    |                                                   |
39|   |    2:VEXCHANGE                                         |
40|   |       offset: 0                                        |
41|   |       distribute expr lists:                           |
42|   |                                                        |
43|   7:VAGGREGATE (merge finalize)(354)                       |
44|   |  output: sum(partial_sum(c1)[#11])[#13]                |
45|   |  group by: c2[#10]                                     |
46|   |  sortByGroupKey:false                                  |
47|   |  cardinality=10                                        |
48|   |  final projections: c2[#12], c1[#13]                   |
49|   |  final project output tuple id: 7                      |
50|   |  distribute expr lists: c2[#10]                        |
51|   |                                                        |
52|   6:VEXCHANGE                                              |
53|      offset: 0                                             |
54|      distribute expr lists:                                |
55|                                                            |
56| PLAN FRAGMENT 1                                            |
57|                                                            |
58|   PARTITION: HASH_PARTITIONED: c1[#8]                      |
59|                                                            |
60|   HAS_COLO_PLAN_NODE: false                                |
61|                                                            |
62|   STREAM DATA SINK                                         |
63|     EXCHANGE ID: 06                                        |
64|     HASH_PARTITIONED: c2[#10]                              |
65|                                                            |
66|   5:VAGGREGATE (update serialize)(348)                     |
67|   |  STREAMING                                             |
68|   |  output: partial_sum(c1[#8])[#11]                      |
69|   |  group by: c2[#9]                                      |
70|   |  sortByGroupKey:false                                  |
71|   |  cardinality=10                                        |
72|   |  distribute expr lists: c1[#8]                         |
73|   |                                                        |
74|   4:VOlapScanNode(345)                                     |
75|      TABLE: tt.t1(t1), PREAGGREGATION: ON                  |
76|      partitions=1/1 (t1)                                   |
77|      tablets=1/1, tabletList=491188                        |
78|      cardinality=21, avgRowSize=0.0, numNodes=1            |
79|      pushAggOp=NONE                                        |
80|                                                            |
81| PLAN FRAGMENT 2                                            |
82|                                                            |
83|   PARTITION: HASH_PARTITIONED: c1[#0]                      |
84|                                                            |
85|   HAS_COLO_PLAN_NODE: false                                |
86|                                                            |
87|   STREAM DATA SINK                                         |
88|     EXCHANGE ID: 02                                        |
89|     HASH_PARTITIONED: c2[#2]                               |
90|                                                            |
91|   1:VAGGREGATE (update serialize)(361)                     |
92|   |  STREAMING                                             |
93|   |  output: partial_sum(c1[#0])[#3]                       |
94|   |  group by: c2[#1]                                      |
95|   |  sortByGroupKey:false                                  |
96|   |  cardinality=5                                         |
97|   |  distribute expr lists: c1[#0]                         |
98|   |                                                        |
99|   0:VOlapScanNode(358)                                     |
100|      TABLE: tt.t2(t2), PREAGGREGATION: ON                  |
101|      partitions=1/1 (t2)                                   |
102|      tablets=1/1, tabletList=491198                        |
103|      cardinality=10, avgRowSize=0.0, numNodes=1            |
104|      pushAggOp=NONE                                        |
105|                                                            |
106|                                                            |
107| Statistics                                                 |
108|  planed with unknown column statistics                     |
109+------------------------------------------------------------+
110105 rows in set (0.06 sec)

四种 Shuffle 方式对比

Shuffle 方式 网络开销 物理算子 适用场景
Broadcast N * T(R) Hash Join /Nest Loop Join 通用
Shuffle T(S) + T(R) Hash Join 通用
Bucket Shuffle T(R) Hash Join JOIN 条件含左表分桶列,左表单分区
Colocate 0 Hash Join JOIN 条件含左表分桶列,且两表属同一 Colocate Group

备注:

N:参与 Join 计算的 Instance 个数

T(关系) : 关系的 Tuple 数目

上述四种 Shuffle 方式的灵活性依次递减,它们对数据分布的要求也愈发严格。在多数场景下,随着对数据分布要求的提高,Join 计算的性能往往也会逐步提升。值得注意的是,如果表的 Bucket 数量较少,Bucket Shuffle 或 Colocate Join 可能会因为并行度较低而导致性能下降,甚至可能慢于 Shuffle Join。这是因为 Shuffle 操作能更有效地均衡数据的分布,从而在后续处理中提供更高的并行度。

常见问题

Bucket Shuffle Join 和 Colocate Join 在应用时对数据分布和 JOIN 条件具有一定限制条件。下面,我们将详细阐述这两种 JOIN 方式各自的具体限制。

Bucket Shuffle Join 的限制

在直接扫描两张物理表以进行 Bucket Shuffle Join 时,需要满足以下条件:

  1. 等值 Join 条件:Bucket Shuffle Join 仅适用于 Join 条件为等值的场景,因为它依赖于 Hash 计算来确定数据分布。
  2. 包含分桶列的等值条件:等值 Join 条件中须包含两张表的分桶列,当左表的分桶列作为等值 Join 条件时,更有可能被规划为 Bucket Shuffle Join。
  3. 表类型限制:Bucket Shuffle Join 仅适用于 PALO 原生的 OLAP 表。对于 ODBC、MySQL、ES 等外部表,当它们作为左表时,Bucket Shuffle Join 无法生效。
  4. 单分区要求:对于分区表,由于每个分区的数据分布可能不同,Bucket Shuffle Join 仅在左表为单分区时保证有效。因此在执行 SQL 时,应尽可能使用 WHERE 条件来启用分区裁剪策略。

Colocate Join 的限制

在直接扫描两张物理表时,Colocate Join 相较于 Bucket Shuffle Join 具有更严格的限制条件,除了满足 Bucket Shuffle Join 的所有条件外,还需满足以下要求:

  1. 分桶列的类型和分桶数量必须一致,以确保数据分布的一致性。
  2. 需要显式指定 Colocation Group,只有处于相同 Colocation Group 的表才能进行 Colocate Join。
  3. 在进行副本修复或副本均衡等操作时,Colocation Group 可能处于 Unstable 状态,此时 Colocate Join 将退化为普通的 Join 操作。

上一篇
查询加速
下一篇
子查询