Spark访问Es常见问题

Elasticsearch BES

  • 邀测版存储计算分离套餐介绍
  • 功能发布记录
  • 产品描述
    • 基本概念
    • 关键特性
    • 产品介绍
    • 产品性能
      • 4核16GB512GB通用型的3个数据节点实例基准性能指标
      • 8核16GB512GB计算型的3个数据节点实例基准性能测试
      • 16核32GB512GB计算型的3个数据节点实例基准性能指标
      • 4核8GB512GB计算型的3个数据节点实例基准性能测试
      • 8核32GB512GB通用型的3个数据节点实例基准性能指标
      • 16核64GB512GB通用型的3个数据节点实例基准性能指标
      • 概述
  • Python-SDK
    • 实例
    • 异常
    • 日志
    • 节点配置类型
    • 版本更新记录
    • 快速入门
    • 集群
    • BesClient
    • 安装SDK工具包
    • 概述
  • Kibana
    • Kibana使用指南
  • Logstash
    • Logstash使用指南
    • 实例管理
      • 实例详情信息
      • 实例扩缩容
      • 管道管理
      • 创建实例
      • 实例列表
      • 删除实例
  • 开发指南
    • Elasticsearch Restful API
    • 通过其他客户端访问Elasticsearch
    • Elasticsearch Java客户端
      • Low Level REST Client
      • Rest Client操作向量索引示例
      • Java REST Client
      • High Level REST Client
  • Java-SDK
    • 实例
    • 异常
    • 日志
    • 节点配置类型
    • 版本更新记录
    • 快速入门
    • 集群
    • BesClient
    • 安装SDK工具包
    • 概述
  • 向量检索特性
    • 资源规划
    • 关键概念
    • 应用场景
    • 快速入门
    • 算法介绍
    • 能力简介
    • 向量数据库
    • 操作指南
      • 写入和查询数据
      • 训练模型
      • 向量索引缓存管理
      • 创建索引
      • 基于Reciprocal Rank Fusion的融合查询
      • 参数优化
      • 7.4版本兼容说明
    • 最佳实践
      • 调优建议
      • BES RAG 最佳实践:基于LangChain+BES的私域知识的QA问答系统
    • 迁移方案
      • ES dense vector 切换为BES方案
  • ELK
    • 基于ELK构建日志分析系统
  • 增强特性
    • 慢查询隔离
    • 百度NLP中文分词词典动态更新
    • 百度NLP中文分词插件
    • 基于段文件的主从复制
    • 基于BOS的冷热数据分离
  • 快速入门
    • 访问Elasticsearch服务
    • 导入数据并搜索
    • 创建集群
    • 集群资源评估
  • 典型实践
    • 使用BSC将BOS中的数据导入Es
    • 使用BSC将Kafka中的数据导入Es
    • 基于CCR实现多集群跨地域高可用
    • 基于节点磁盘介质的冷热数据分离
  • 服务等级协议SLA
    • 数据迁移服务协议
    • BES服务等级协议SLA(V1.0)
  • 常见问题
    • Spark访问Es常见问题
    • Elasticsearch系统常见问题
    • 常见问题总览
  • 产品定价
    • 预付费
    • 计费说明
    • 配置变更费用说明
    • 后付费
  • API文档
    • 概述
    • 自动续费相关接口
      • 续费列表
      • 查看自动续费
      • 查看自动续费规则列表
      • 续费操作
      • 更新自动续费规则
      • 删除自动续费规则
      • 创建自动续费规则
    • 插件配置接口
      • 上传自定义插件
      • 卸载系统默认插件
      • 上传nlp词典
      • 安装系统默认插件
      • 获取默认和自定义插件列表
      • 卸载自定义插件
      • 安装自定义插件
      • 删除自定义插件
      • 查看nlp词典
    • 日志管理相关接口
      • 查看日志导出任务记录
      • 创建日志导出任务
      • 日志设置
      • 日志查询
    • 标签接口
      • 查询标签列表
      • 批量新增标签
      • 单个集群更新标签
    • 智能巡检接口
      • 查看近7天已完成的巡检任务列表
      • 近七天巡检概况
      • 查看手动巡检任务的配置
      • 集群巡检授权
      • 列举所有可选巡检项
      • 查询是否可以提交巡检任务
      • 最新一次巡检概况
      • 查看集群是否开启自动巡检
      • 修改手动巡检任务的配置
      • 开启或关闭自动巡检
      • 提交手动巡检任务
      • 查询今日已执行完成的手动巡检次数
      • 查看某巡检任务的执行状态和结果
    • 集群相关接口
      • 获取操作历史信息
      • 删除集群
      • 新增节点类型
      • 开启和关闭https
      • 查看集群详情信息
      • 重置密码
      • EIP绑定
      • EIP解绑
      • 是否开启Grafana监控
      • 获取数据量观测数据
      • 查看集群列表
      • 上传NLP分词词典
      • 创建集群
      • 智能评估
      • 集群blb信息
      • 用户可用代金券列表
      • 停止集群
      • 扩容集群
      • 启动集群
      • 重启集群
    • 实例相关接口
      • 批量停止实例
      • 启动实例
      • 查看缩容节点列表
      • 数据迁移
      • 数据迁移系统建议
      • 批量启动实例
      • 数据迁移回滚
      • 停止实例
      • 数据迁移节点列表
      • 删除实例
    • 配置修改相关接口
      • 获取同义词文件列表
      • 查看集群配置
      • 删除同义词配置文件
      • 上传同义词配置文件
      • 配置修改
    • 定时调度
      • 更新定时调度任务
      • 删除定时调度任务
      • 创建和更新定时调度任务
      • 查看定时调度任务
  • API3.0
    • 集群相关接口
      • 删除集群
  • Elasticsearch
    • 用户手册
      • YML参数配置
      • 权限管理
      • 智能巡检
      • 数据迁移
      • 基于BOS的快照与恢复
      • 集群列表
      • 账号使用说明
      • 多可用区部署
      • 定时调度
      • 数据量观测
      • 日志查询
        • 日志查询(旧)
        • 日志查询(新)
        • 日志导出
        • 查询语法
      • 集群配置
        • 配置同义词
      • 插件管理
        • 上传与安装自定义插件
        • Elasticsearch插件列表
        • 系统默认插件
          • analysis-pinyin拼音分词插件
          • IK中文分词插件与动态更新词典
          • compression-zstd插件
          • CCR插件
          • 限流插件
          • 动态同义词插件
          • 简繁体转换插件
          • ingest attachment插件
          • SQL插件
      • 配置变更
        • 节点数据迁移
        • 集群变配说明与建议
        • 集群扩缩容
      • 版本升级
        • 升级版本
        • 内核版本说明
        • 升级检查
      • 集群监控报警
        • 监控查看及指标说明
        • 配置报警
      • 集群管理
        • 集群重启
        • 变更HTTPS协议用户指南
        • 节点启停
        • 查看集群信息
        • 集群创建
        • 公网绑定
        • 修改集群名称
        • 跨可用区迁移集群
        • 集群删除
        • 集群列表
        • 自动续费
        • 密码重置
        • ES服务区域代码
      • 账户管理
        • 多用户访问控制
所有文档
menu
没有找到结果,请重新输入

Elasticsearch BES

  • 邀测版存储计算分离套餐介绍
  • 功能发布记录
  • 产品描述
    • 基本概念
    • 关键特性
    • 产品介绍
    • 产品性能
      • 4核16GB512GB通用型的3个数据节点实例基准性能指标
      • 8核16GB512GB计算型的3个数据节点实例基准性能测试
      • 16核32GB512GB计算型的3个数据节点实例基准性能指标
      • 4核8GB512GB计算型的3个数据节点实例基准性能测试
      • 8核32GB512GB通用型的3个数据节点实例基准性能指标
      • 16核64GB512GB通用型的3个数据节点实例基准性能指标
      • 概述
  • Python-SDK
    • 实例
    • 异常
    • 日志
    • 节点配置类型
    • 版本更新记录
    • 快速入门
    • 集群
    • BesClient
    • 安装SDK工具包
    • 概述
  • Kibana
    • Kibana使用指南
  • Logstash
    • Logstash使用指南
    • 实例管理
      • 实例详情信息
      • 实例扩缩容
      • 管道管理
      • 创建实例
      • 实例列表
      • 删除实例
  • 开发指南
    • Elasticsearch Restful API
    • 通过其他客户端访问Elasticsearch
    • Elasticsearch Java客户端
      • Low Level REST Client
      • Rest Client操作向量索引示例
      • Java REST Client
      • High Level REST Client
  • Java-SDK
    • 实例
    • 异常
    • 日志
    • 节点配置类型
    • 版本更新记录
    • 快速入门
    • 集群
    • BesClient
    • 安装SDK工具包
    • 概述
  • 向量检索特性
    • 资源规划
    • 关键概念
    • 应用场景
    • 快速入门
    • 算法介绍
    • 能力简介
    • 向量数据库
    • 操作指南
      • 写入和查询数据
      • 训练模型
      • 向量索引缓存管理
      • 创建索引
      • 基于Reciprocal Rank Fusion的融合查询
      • 参数优化
      • 7.4版本兼容说明
    • 最佳实践
      • 调优建议
      • BES RAG 最佳实践:基于LangChain+BES的私域知识的QA问答系统
    • 迁移方案
      • ES dense vector 切换为BES方案
  • ELK
    • 基于ELK构建日志分析系统
  • 增强特性
    • 慢查询隔离
    • 百度NLP中文分词词典动态更新
    • 百度NLP中文分词插件
    • 基于段文件的主从复制
    • 基于BOS的冷热数据分离
  • 快速入门
    • 访问Elasticsearch服务
    • 导入数据并搜索
    • 创建集群
    • 集群资源评估
  • 典型实践
    • 使用BSC将BOS中的数据导入Es
    • 使用BSC将Kafka中的数据导入Es
    • 基于CCR实现多集群跨地域高可用
    • 基于节点磁盘介质的冷热数据分离
  • 服务等级协议SLA
    • 数据迁移服务协议
    • BES服务等级协议SLA(V1.0)
  • 常见问题
    • Spark访问Es常见问题
    • Elasticsearch系统常见问题
    • 常见问题总览
  • 产品定价
    • 预付费
    • 计费说明
    • 配置变更费用说明
    • 后付费
  • API文档
    • 概述
    • 自动续费相关接口
      • 续费列表
      • 查看自动续费
      • 查看自动续费规则列表
      • 续费操作
      • 更新自动续费规则
      • 删除自动续费规则
      • 创建自动续费规则
    • 插件配置接口
      • 上传自定义插件
      • 卸载系统默认插件
      • 上传nlp词典
      • 安装系统默认插件
      • 获取默认和自定义插件列表
      • 卸载自定义插件
      • 安装自定义插件
      • 删除自定义插件
      • 查看nlp词典
    • 日志管理相关接口
      • 查看日志导出任务记录
      • 创建日志导出任务
      • 日志设置
      • 日志查询
    • 标签接口
      • 查询标签列表
      • 批量新增标签
      • 单个集群更新标签
    • 智能巡检接口
      • 查看近7天已完成的巡检任务列表
      • 近七天巡检概况
      • 查看手动巡检任务的配置
      • 集群巡检授权
      • 列举所有可选巡检项
      • 查询是否可以提交巡检任务
      • 最新一次巡检概况
      • 查看集群是否开启自动巡检
      • 修改手动巡检任务的配置
      • 开启或关闭自动巡检
      • 提交手动巡检任务
      • 查询今日已执行完成的手动巡检次数
      • 查看某巡检任务的执行状态和结果
    • 集群相关接口
      • 获取操作历史信息
      • 删除集群
      • 新增节点类型
      • 开启和关闭https
      • 查看集群详情信息
      • 重置密码
      • EIP绑定
      • EIP解绑
      • 是否开启Grafana监控
      • 获取数据量观测数据
      • 查看集群列表
      • 上传NLP分词词典
      • 创建集群
      • 智能评估
      • 集群blb信息
      • 用户可用代金券列表
      • 停止集群
      • 扩容集群
      • 启动集群
      • 重启集群
    • 实例相关接口
      • 批量停止实例
      • 启动实例
      • 查看缩容节点列表
      • 数据迁移
      • 数据迁移系统建议
      • 批量启动实例
      • 数据迁移回滚
      • 停止实例
      • 数据迁移节点列表
      • 删除实例
    • 配置修改相关接口
      • 获取同义词文件列表
      • 查看集群配置
      • 删除同义词配置文件
      • 上传同义词配置文件
      • 配置修改
    • 定时调度
      • 更新定时调度任务
      • 删除定时调度任务
      • 创建和更新定时调度任务
      • 查看定时调度任务
  • API3.0
    • 集群相关接口
      • 删除集群
  • Elasticsearch
    • 用户手册
      • YML参数配置
      • 权限管理
      • 智能巡检
      • 数据迁移
      • 基于BOS的快照与恢复
      • 集群列表
      • 账号使用说明
      • 多可用区部署
      • 定时调度
      • 数据量观测
      • 日志查询
        • 日志查询(旧)
        • 日志查询(新)
        • 日志导出
        • 查询语法
      • 集群配置
        • 配置同义词
      • 插件管理
        • 上传与安装自定义插件
        • Elasticsearch插件列表
        • 系统默认插件
          • analysis-pinyin拼音分词插件
          • IK中文分词插件与动态更新词典
          • compression-zstd插件
          • CCR插件
          • 限流插件
          • 动态同义词插件
          • 简繁体转换插件
          • ingest attachment插件
          • SQL插件
      • 配置变更
        • 节点数据迁移
        • 集群变配说明与建议
        • 集群扩缩容
      • 版本升级
        • 升级版本
        • 内核版本说明
        • 升级检查
      • 集群监控报警
        • 监控查看及指标说明
        • 配置报警
      • 集群管理
        • 集群重启
        • 变更HTTPS协议用户指南
        • 节点启停
        • 查看集群信息
        • 集群创建
        • 公网绑定
        • 修改集群名称
        • 跨可用区迁移集群
        • 集群删除
        • 集群列表
        • 自动续费
        • 密码重置
        • ES服务区域代码
      • 账户管理
        • 多用户访问控制
  • 文档中心
  • arrow
  • ElasticsearchBES
  • arrow
  • 常见问题
  • arrow
  • Spark访问Es常见问题
本页目录
  • 说明
  • 版本号检测异常
  • 数据节点发现
  • 用户权限问题
  • 配置Bulk导入有错误的处理方式
  • 如何拿到除_source外的其他文档元数据
  • 导入的时候指定id、version的方法
  • spark默认写ES的时候refresh的会在每次bulk结束的时候调用
  • 控制每次Bulk写入的量
  • 读相关设置
  • 控制需要写入的文档字段
  • 执行upsert操作

Spark访问Es常见问题

更新时间:2025-08-20

说明

本文档主要介绍了通过elasticsearch-hadoop中的Spark访问ES时常见配置项意义。本文中的es-spark是elasticsearch-hadoop中和Spark相关联的包,用户通过自己的Spark集群读写ES集群,elasticsearch-hadoop基本上兼容了目前ES所有的版本

版本号检测异常

es-spark 运行时通常会自动检测ES集群的版本号,获取的版本号主要是用来对不同集群的版本做API的兼容处理

一般情况下用户不用关注ES版本号,但是在云上有时候自动检测集群的版本号会发生一些莫名其妙的检测不到的错误,可以通过配置解决:

配置项:

Plain Text
1es.internal.es.version:"6.5.3"

在一些较新版本的es-spark包中同样需要配置:

Plain Text
1es.internal.es.cluster.name:"Your Cluter Name"

实现原理:

配置完以后,es-spark不会请求 / 目录,解析version,会直接使用用户配置的version:

Plain Text
1INTERNAL_ES_VERSION = "es.internal.es.version"
2INTERNAL_ES_CLUSTER_NAME = "es.internal.es.cluster.name"
3
4public static EsMajorVersion discoverEsVersion(Settings settings, Log log) {
5      return discoverClusterInfo(settings, log).getMajorVersion();
6}
7
8// 不同版本的elasticsearch-hadoop可能会有差异
9public static ClusterInfo discoverClusterInfo(Settings settings, Log log) {
10        ClusterName remoteClusterName = null;
11        EsMajorVersion remoteVersion = null;
12        // 尝试从配置中获取集群名字
13        String clusterName = settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_CLUSTER_NAME);
14        // 尝试从配置中获取集群UUID
15        String clusterUUID = settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_CLUSTER_UUID);
16        // 尝试从配置中获取ES version
17        String version = settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_VERSION);
18        // 如果集群名字和版本号没有从配置文件中拿到,则发起网络请求(请求根目录)
19        if (StringUtils.hasText(clusterName) && StringUtils.hasText(version)) {
20            if (log.isDebugEnabled()) {
21                log.debug(String.format("Elasticsearch cluster [NAME:%s][UUID:%s][VERSION:%s] already present in configuration; skipping discovery",
22                        clusterName, clusterUUID, version));
23            }
24            remoteClusterName = new ClusterName(clusterName, clusterUUID);
25            remoteVersion = EsMajorVersion.parse(version);
26            return new ClusterInfo(remoteClusterName, remoteVersion);
27        }
28      ....
29}

如果启用集群名、版本号自动检测功能,需要保证分配给es-spark的用户有访问根目录/的GET权限

Plain Text
1GET /

数据节点发现

配置项:

Plain Text
1es.nodes.wan.only: false  默认为false 
2es.nodes.discovery: true   默认为true 

百度云ES集群前面有一个BLB负载均衡,配置es.nodes的时候写这个BLB的地址即可,需要保证es-spark可以访问BLB的地址

  • es.nodes.wan.only: false,es.nodes.discovery: true: Spark会通过访问es.nodes中指定的host(可以为多个) 得到ES集群所有开启HTTP服务节点的ip和port,后续对数据的访问会直接访问分片数据所在的节点上(需要保证ES集群所有节点都能够被Spark集群访问到)
  • es.nodes.wan.only: true,es.nodes.discovery: false或不设置:Spark发送给ES的所有请求都需要通过这个节点进行转发,效率相对比较低

具体代码逻辑:

Plain Text
1ES_NODES_DISCOVERY = "es.nodes.discovery"
2ES_NODES_WAN_ONLY = "es.nodes.wan.only"
3ES_NODES_WAN_ONLY_DEFAULT = "false"
4
5InitializationUtils#discoverNodesIfNeeded
6    public static List<NodeInfo> discoverNodesIfNeeded(Settings settings, Log log) {
7        if (settings.getNodesDiscovery()) { // 需要读取配置项
8            RestClient bootstrap = new RestClient(settings);
9
10            try {
11                List<NodeInfo> discoveredNodes = bootstrap.getHttpNodes(false);
12                if (log.isDebugEnabled()) {
13                    log.debug(String.format("Nodes discovery enabled - found %s", discoveredNodes));
14                }
15
16                SettingsUtils.addDiscoveredNodes(settings, discoveredNodes);
17                return discoveredNodes;
18            } finally {
19                bootstrap.close();
20            }
21        }
22
23        return null;
24    }
25 
26public boolean getNodesDiscovery() {
27        // by default, if not set, return a value compatible with the WAN setting
28        // otherwise return the user value.
29        // this helps validate the configuration
30        return Booleans.parseBoolean(getProperty(ES_NODES_DISCOVERY), !getNodesWANOnly()); //默认值是!getNodesWANOnly() 
31    }
32    
33public boolean getNodesWANOnly() {
34        return Booleans.parseBoolean(getProperty(ES_NODES_WAN_ONLY, ES_NODES_WAN_ONLY_DEFAULT));
35    }

用户权限问题

如果启动节点发现功能,需要保证es-spark使用的用户有访问

Plain Text
1GET /_nodes/http
2GET /{index}/_search_shards

的权限,否则会导致整个Job失败

配置Bulk导入有错误的处理方式

Spark 写ES的时候发生错误且经过几次尝试后会中断job,默认的情况下会直接中断当前Job,导致整个任务失败,如果只是想把导入失败的文档打印到日志中,可以通过如下配置解决:

配置错误发生错误的处理机制

Plain Text
1es.write.rest.error.handlers = log 
2es.write.rest.error.handler.log.logger.name: es_error_handler

设置后当出现bulk写错误的时候,Job不会中断,会以log的形式输出到日志中,日志前缀为es_error_handler

如何拿到除_source外的其他文档元数据

正常情况下,我们通过调用ES的_search API返回的每条数据如下:

Plain Text
1         {
2            "_index": "d_index",
3            "_type": "doc",
4            "_id": "51rrB2sBaX4YjyPY-2EG",
5            "_score": 1,
6            "_source": {
7               "A": "field A value",
8               "B": "field B value"
9            }
10         }

但是用Spark去读ES的时候,默认是不读除_source字段意外的其他字段,如_id _version, 在一些场景下,业务可能需要拿到_id,可以通过如下配置:

Plain Text
1es.read.metadata:true  //这个配置默认是false
2es.read.metadata.field: "_id" // 配置需要读取的元数据字段
3es.read.metadata.version: 默认false 读取es的版本号

文档的元数据字段信息会放在一个_metadata的字段里面

导入的时候指定id、version的方法

做数据迁移的时候,比如从一个低版本的ES集群迁移到高版的ES集群,我们可以用es-spark边度边写,如果需要指定_id, _rouring, _version 这些信息,可以设置:

Plain Text
1es.mapping.id:”_meta._id”  指定json中id的路径
2es.mapping.routing:”_meta._rouring”  指定json中id的路径
3es.mapping.version:”_meta._version”  指定json中id的路径

_meta.xxx是所需的字段在Json文档中的路径

spark默认写ES的时候refresh的会在每次bulk结束的时候调用

我们建议设置为false,有ES内部index的 refresh_interval来控制refresh,否则ES集群会有大量的线程在refesh会带来很大的CPU和磁盘压力

Plain Text
1es.batch.write.refresh: false 默认是true

控制每次Bulk写入的量

Plain Text
1es.batch.size.bytes:1mb 每次bulk写入文档的大小,默认1mb
2es.batch.size.entries:1000 每次bulk写入的文档数,默认是1000

用户可根据ES集群套餐合理设置

读相关设置

Plain Text
1es.scroll.size: 50, 默认是50 这个值相对来说比较小,可以适当增大至1000-10000
2es.input.use.sliced.partitions:  true  为了提高并发,es会进行scroll-slice进行切分
3es.input.max.docs.per.partition: 100000 根据这个值进行切分slice

es-spark 读取 ES集群数据的时候,会按照每个分片的总数进行切分做scroll-slice处理:

Plain Text
1int numPartitions = (int) Math.max(1, numDocs / maxDocsPerPartition); 

numDocs为单个分片的文档总数,如果文档有5千万,这时候会切分成500个sclice,会对后端的线上ES集群造成巨大的CPU压力,所以一般建议关闭scroll-slice,以避免影响在线业务

建议的参数:

Plain Text
1es.scroll.size: 2000 //尽量根据文档的大小来选择
2es.input.use.sliced.partitions: false 

控制需要写入的文档字段

有时候业务导入数据的时候,希望有一些字段不被写如ES,可以设置:

Plain Text
1es.mapping.exclude:默认是none
2// 多个字段用逗号进行分割,直接 . * 表达
3es.mapping.exclude = *.description
4es.mapping.include = u*, foo.*

执行upsert操作

示例:

Plain Text
1String update_params = "parmas:update_time";
2String update_script = "ctx._source.update_time = params.update_time";
3// 设置sparkConfig
4SparkConf sparkConf = new SparkConf()
5        .setAppName("YourAppName”)
6        .set("es.net.http.auth.user", user)
7        .set("es.net.http.auth.pass", pwd)
8        .set("es.nodes", nodes)
9        .set("es.port", port)
10        .set("es.batch.size.entries", "50")
11        .set("es.http.timeout","5m")
12        .set("es.read.metadata.field", "_id")
13        .set("es.write.operation","upsert")
14        .set("es.update.script.params", update_params)
15        .set("es.update.script.inline", update_script)
16        .set("es.nodes.wan.only", "true");

注:

Plain Text
1es.update.script.params 为执行更新需要的参数列表
2es.update.script.inline 为执行update使用script脚本

上一篇
服务等级协议SLA
下一篇
Elasticsearch系统常见问题