数据迁移

Elasticsearch BES

  • 邀测版存储计算分离套餐介绍
  • 功能发布记录
  • 产品描述
    • 基本概念
    • 关键特性
    • 产品介绍
    • 产品性能
      • 4核16GB512GB通用型的3个数据节点实例基准性能指标
      • 8核16GB512GB计算型的3个数据节点实例基准性能测试
      • 16核32GB512GB计算型的3个数据节点实例基准性能指标
      • 4核8GB512GB计算型的3个数据节点实例基准性能测试
      • 8核32GB512GB通用型的3个数据节点实例基准性能指标
      • 16核64GB512GB通用型的3个数据节点实例基准性能指标
      • 概述
  • Python-SDK
    • 实例
    • 异常
    • 日志
    • 节点配置类型
    • 版本更新记录
    • 快速入门
    • 集群
    • BesClient
    • 安装SDK工具包
    • 概述
  • Kibana
    • Kibana使用指南
  • Logstash
    • Logstash使用指南
    • 实例管理
      • 实例详情信息
      • 实例扩缩容
      • 管道管理
      • 创建实例
      • 实例列表
      • 删除实例
  • 开发指南
    • Elasticsearch Restful API
    • 通过其他客户端访问Elasticsearch
    • Elasticsearch Java客户端
      • Low Level REST Client
      • Rest Client操作向量索引示例
      • Java REST Client
      • High Level REST Client
  • Java-SDK
    • 实例
    • 异常
    • 日志
    • 节点配置类型
    • 版本更新记录
    • 快速入门
    • 集群
    • BesClient
    • 安装SDK工具包
    • 概述
  • 向量检索特性
    • 资源规划
    • 关键概念
    • 应用场景
    • 快速入门
    • 算法介绍
    • 能力简介
    • 向量数据库
    • 操作指南
      • 写入和查询数据
      • 训练模型
      • 向量索引缓存管理
      • 创建索引
      • 基于Reciprocal Rank Fusion的融合查询
      • 参数优化
      • 7.4版本兼容说明
    • 最佳实践
      • 调优建议
      • BES RAG 最佳实践:基于LangChain+BES的私域知识的QA问答系统
    • 迁移方案
      • ES dense vector 切换为BES方案
  • ELK
    • 基于ELK构建日志分析系统
  • 增强特性
    • 慢查询隔离
    • 百度NLP中文分词词典动态更新
    • 百度NLP中文分词插件
    • 基于段文件的主从复制
    • 基于BOS的冷热数据分离
  • 快速入门
    • 访问Elasticsearch服务
    • 导入数据并搜索
    • 创建集群
    • 集群资源评估
  • 典型实践
    • 使用BSC将BOS中的数据导入Es
    • 使用BSC将Kafka中的数据导入Es
    • 基于CCR实现多集群跨地域高可用
    • 基于节点磁盘介质的冷热数据分离
  • 服务等级协议SLA
    • 数据迁移服务协议
    • BES服务等级协议SLA(V1.0)
  • 常见问题
    • Spark访问Es常见问题
    • Elasticsearch系统常见问题
    • 常见问题总览
  • 产品定价
    • 预付费
    • 计费说明
    • 配置变更费用说明
    • 后付费
  • API文档
    • 概述
    • 自动续费相关接口
      • 续费列表
      • 查看自动续费
      • 查看自动续费规则列表
      • 续费操作
      • 更新自动续费规则
      • 删除自动续费规则
      • 创建自动续费规则
    • 插件配置接口
      • 上传自定义插件
      • 卸载系统默认插件
      • 上传nlp词典
      • 安装系统默认插件
      • 获取默认和自定义插件列表
      • 卸载自定义插件
      • 安装自定义插件
      • 删除自定义插件
      • 查看nlp词典
    • 日志管理相关接口
      • 查看日志导出任务记录
      • 创建日志导出任务
      • 日志设置
      • 日志查询
    • 标签接口
      • 查询标签列表
      • 批量新增标签
      • 单个集群更新标签
    • 智能巡检接口
      • 查看近7天已完成的巡检任务列表
      • 近七天巡检概况
      • 查看手动巡检任务的配置
      • 集群巡检授权
      • 列举所有可选巡检项
      • 查询是否可以提交巡检任务
      • 最新一次巡检概况
      • 查看集群是否开启自动巡检
      • 修改手动巡检任务的配置
      • 开启或关闭自动巡检
      • 提交手动巡检任务
      • 查询今日已执行完成的手动巡检次数
      • 查看某巡检任务的执行状态和结果
    • 集群相关接口
      • 获取操作历史信息
      • 删除集群
      • 新增节点类型
      • 开启和关闭https
      • 查看集群详情信息
      • 重置密码
      • EIP绑定
      • EIP解绑
      • 是否开启Grafana监控
      • 获取数据量观测数据
      • 查看集群列表
      • 上传NLP分词词典
      • 创建集群
      • 智能评估
      • 集群blb信息
      • 用户可用代金券列表
      • 停止集群
      • 扩容集群
      • 启动集群
      • 重启集群
    • 实例相关接口
      • 批量停止实例
      • 启动实例
      • 查看缩容节点列表
      • 数据迁移
      • 数据迁移系统建议
      • 批量启动实例
      • 数据迁移回滚
      • 停止实例
      • 数据迁移节点列表
      • 删除实例
    • 配置修改相关接口
      • 获取同义词文件列表
      • 查看集群配置
      • 删除同义词配置文件
      • 上传同义词配置文件
      • 配置修改
    • 定时调度
      • 更新定时调度任务
      • 删除定时调度任务
      • 创建和更新定时调度任务
      • 查看定时调度任务
  • API3.0
    • 集群相关接口
      • 删除集群
  • Elasticsearch
    • 用户手册
      • YML参数配置
      • 权限管理
      • 智能巡检
      • 数据迁移
      • 基于BOS的快照与恢复
      • 集群列表
      • 账号使用说明
      • 多可用区部署
      • 定时调度
      • 数据量观测
      • 日志查询
        • 日志查询(旧)
        • 日志查询(新)
        • 日志导出
        • 查询语法
      • 集群配置
        • 配置同义词
      • 插件管理
        • 上传与安装自定义插件
        • Elasticsearch插件列表
        • 系统默认插件
          • analysis-pinyin拼音分词插件
          • IK中文分词插件与动态更新词典
          • compression-zstd插件
          • CCR插件
          • 限流插件
          • 动态同义词插件
          • 简繁体转换插件
          • ingest attachment插件
          • SQL插件
      • 配置变更
        • 节点数据迁移
        • 集群变配说明与建议
        • 集群扩缩容
      • 版本升级
        • 升级版本
        • 内核版本说明
        • 升级检查
      • 集群监控报警
        • 监控查看及指标说明
        • 配置报警
      • 集群管理
        • 集群重启
        • 变更HTTPS协议用户指南
        • 节点启停
        • 查看集群信息
        • 集群创建
        • 公网绑定
        • 修改集群名称
        • 跨可用区迁移集群
        • 集群删除
        • 集群列表
        • 自动续费
        • 密码重置
        • ES服务区域代码
      • 账户管理
        • 多用户访问控制
所有文档
menu
没有找到结果,请重新输入

Elasticsearch BES

  • 邀测版存储计算分离套餐介绍
  • 功能发布记录
  • 产品描述
    • 基本概念
    • 关键特性
    • 产品介绍
    • 产品性能
      • 4核16GB512GB通用型的3个数据节点实例基准性能指标
      • 8核16GB512GB计算型的3个数据节点实例基准性能测试
      • 16核32GB512GB计算型的3个数据节点实例基准性能指标
      • 4核8GB512GB计算型的3个数据节点实例基准性能测试
      • 8核32GB512GB通用型的3个数据节点实例基准性能指标
      • 16核64GB512GB通用型的3个数据节点实例基准性能指标
      • 概述
  • Python-SDK
    • 实例
    • 异常
    • 日志
    • 节点配置类型
    • 版本更新记录
    • 快速入门
    • 集群
    • BesClient
    • 安装SDK工具包
    • 概述
  • Kibana
    • Kibana使用指南
  • Logstash
    • Logstash使用指南
    • 实例管理
      • 实例详情信息
      • 实例扩缩容
      • 管道管理
      • 创建实例
      • 实例列表
      • 删除实例
  • 开发指南
    • Elasticsearch Restful API
    • 通过其他客户端访问Elasticsearch
    • Elasticsearch Java客户端
      • Low Level REST Client
      • Rest Client操作向量索引示例
      • Java REST Client
      • High Level REST Client
  • Java-SDK
    • 实例
    • 异常
    • 日志
    • 节点配置类型
    • 版本更新记录
    • 快速入门
    • 集群
    • BesClient
    • 安装SDK工具包
    • 概述
  • 向量检索特性
    • 资源规划
    • 关键概念
    • 应用场景
    • 快速入门
    • 算法介绍
    • 能力简介
    • 向量数据库
    • 操作指南
      • 写入和查询数据
      • 训练模型
      • 向量索引缓存管理
      • 创建索引
      • 基于Reciprocal Rank Fusion的融合查询
      • 参数优化
      • 7.4版本兼容说明
    • 最佳实践
      • 调优建议
      • BES RAG 最佳实践:基于LangChain+BES的私域知识的QA问答系统
    • 迁移方案
      • ES dense vector 切换为BES方案
  • ELK
    • 基于ELK构建日志分析系统
  • 增强特性
    • 慢查询隔离
    • 百度NLP中文分词词典动态更新
    • 百度NLP中文分词插件
    • 基于段文件的主从复制
    • 基于BOS的冷热数据分离
  • 快速入门
    • 访问Elasticsearch服务
    • 导入数据并搜索
    • 创建集群
    • 集群资源评估
  • 典型实践
    • 使用BSC将BOS中的数据导入Es
    • 使用BSC将Kafka中的数据导入Es
    • 基于CCR实现多集群跨地域高可用
    • 基于节点磁盘介质的冷热数据分离
  • 服务等级协议SLA
    • 数据迁移服务协议
    • BES服务等级协议SLA(V1.0)
  • 常见问题
    • Spark访问Es常见问题
    • Elasticsearch系统常见问题
    • 常见问题总览
  • 产品定价
    • 预付费
    • 计费说明
    • 配置变更费用说明
    • 后付费
  • API文档
    • 概述
    • 自动续费相关接口
      • 续费列表
      • 查看自动续费
      • 查看自动续费规则列表
      • 续费操作
      • 更新自动续费规则
      • 删除自动续费规则
      • 创建自动续费规则
    • 插件配置接口
      • 上传自定义插件
      • 卸载系统默认插件
      • 上传nlp词典
      • 安装系统默认插件
      • 获取默认和自定义插件列表
      • 卸载自定义插件
      • 安装自定义插件
      • 删除自定义插件
      • 查看nlp词典
    • 日志管理相关接口
      • 查看日志导出任务记录
      • 创建日志导出任务
      • 日志设置
      • 日志查询
    • 标签接口
      • 查询标签列表
      • 批量新增标签
      • 单个集群更新标签
    • 智能巡检接口
      • 查看近7天已完成的巡检任务列表
      • 近七天巡检概况
      • 查看手动巡检任务的配置
      • 集群巡检授权
      • 列举所有可选巡检项
      • 查询是否可以提交巡检任务
      • 最新一次巡检概况
      • 查看集群是否开启自动巡检
      • 修改手动巡检任务的配置
      • 开启或关闭自动巡检
      • 提交手动巡检任务
      • 查询今日已执行完成的手动巡检次数
      • 查看某巡检任务的执行状态和结果
    • 集群相关接口
      • 获取操作历史信息
      • 删除集群
      • 新增节点类型
      • 开启和关闭https
      • 查看集群详情信息
      • 重置密码
      • EIP绑定
      • EIP解绑
      • 是否开启Grafana监控
      • 获取数据量观测数据
      • 查看集群列表
      • 上传NLP分词词典
      • 创建集群
      • 智能评估
      • 集群blb信息
      • 用户可用代金券列表
      • 停止集群
      • 扩容集群
      • 启动集群
      • 重启集群
    • 实例相关接口
      • 批量停止实例
      • 启动实例
      • 查看缩容节点列表
      • 数据迁移
      • 数据迁移系统建议
      • 批量启动实例
      • 数据迁移回滚
      • 停止实例
      • 数据迁移节点列表
      • 删除实例
    • 配置修改相关接口
      • 获取同义词文件列表
      • 查看集群配置
      • 删除同义词配置文件
      • 上传同义词配置文件
      • 配置修改
    • 定时调度
      • 更新定时调度任务
      • 删除定时调度任务
      • 创建和更新定时调度任务
      • 查看定时调度任务
  • API3.0
    • 集群相关接口
      • 删除集群
  • Elasticsearch
    • 用户手册
      • YML参数配置
      • 权限管理
      • 智能巡检
      • 数据迁移
      • 基于BOS的快照与恢复
      • 集群列表
      • 账号使用说明
      • 多可用区部署
      • 定时调度
      • 数据量观测
      • 日志查询
        • 日志查询(旧)
        • 日志查询(新)
        • 日志导出
        • 查询语法
      • 集群配置
        • 配置同义词
      • 插件管理
        • 上传与安装自定义插件
        • Elasticsearch插件列表
        • 系统默认插件
          • analysis-pinyin拼音分词插件
          • IK中文分词插件与动态更新词典
          • compression-zstd插件
          • CCR插件
          • 限流插件
          • 动态同义词插件
          • 简繁体转换插件
          • ingest attachment插件
          • SQL插件
      • 配置变更
        • 节点数据迁移
        • 集群变配说明与建议
        • 集群扩缩容
      • 版本升级
        • 升级版本
        • 内核版本说明
        • 升级检查
      • 集群监控报警
        • 监控查看及指标说明
        • 配置报警
      • 集群管理
        • 集群重启
        • 变更HTTPS协议用户指南
        • 节点启停
        • 查看集群信息
        • 集群创建
        • 公网绑定
        • 修改集群名称
        • 跨可用区迁移集群
        • 集群删除
        • 集群列表
        • 自动续费
        • 密码重置
        • ES服务区域代码
      • 账户管理
        • 多用户访问控制
  • 文档中心
  • arrow
  • ElasticsearchBES
  • arrow
  • Elasticsearch
  • arrow
  • 用户手册
  • arrow
  • 数据迁移
本页目录
  • BOS快照迁移
  • 适用场景
  • 使用方式
  • 注意事项
  • 在线reindex
  • 适用场景
  • 使用方式
  • 注意事项
  • Logstash
  • 适用场景
  • 使用方式
  • 注意事项
  • 通过Spark迁移数据
  • 使用方式
  • 注意事项
  • HDFS快照迁移
  • 使用方式
  • 注意事项

数据迁移

更新时间:2025-08-20

用户在接入百度智能云Elasticsearch服务时,如需要百度智能云Elasticsearch服务间进行数据迁移或自建Elasticsearch服务数据迁移至百度智能云 Elasticsearch,可以根据自己的业务需求选择合适的迁移方案。本文介绍各迁移方案适用的场景,帮助您根据业务选择合适的场景进行迁移。

  • BOS快照迁移
  • 在线reindex
  • Logstash
  • 通过Spark迁移数据
  • HDFS快照迁移

BOS快照迁移

snapshot api 是Elasticsearch用于对数据进行备份和恢复的一组 api 接口,可以通过 snapshot api 进行跨集群的数据迁移,原理就是从源 Elasticsearch 集群创建数据快照,然后在目标 Elasticsearch 集群中进行恢复。

百度智能云Elasticsearch基于百度存储BOS(Baidu Object Storage),实现了snapshot api,能够帮助您在本集群内和多集群间进行快照备份与恢复。

具体使用方式参照基于BOS的快照与恢复。

适用场景

  • 源端数据量较大(GB、TB、PB级别)的场景
  • 恢复数据过程中需要关闭目的集群index
  • 源Elasticsearch集群和目的集群Elasticsearch版本符合快照备份与恢复场景的匹配
Snapshot version → new Cluster version 2.x 5.x 6.x 7.x 8.x
1.x → icon-yes.png
2.x → icon-yes.png icon-yes.png
5.x → icon-yes.png icon-yes.png
6.x → icon-yes.png icon-yes.png
7.x → icon-yes.png icon-yes.png

使用方式

  1. 两个集群分别创建基于BOS的仓库

源Elasticsearch集群需要添加BOS信息来创建仓库。创建命令如下:

Plain Text
1POST /_snapshot/es_repo
2{
3    "type": "bos",
4    "settings": {
5        "access_key": "your access_key",
6        "secret_key": "your secret_key",
7        "endpoint": "s3.bj.bcebos.com",
8        "bucket": "es-repo",
9        "base_path": "test_base_path"
10    }
11}

相关参数意义:

参数 作用
type 仓库的类型,在这里您应该填bos
access_key 百度智能云的access_key,可以在百度智能云的console中看到
secret_key 百度智能云的secret_key,可以在百度智能云的console中看到
endpoint BOS对应各个region的服务域名
bucket BOS的bucket,务必保证对应的用户身份有读写bucket权限
base_path 仓库的起始位置,默认为根目录

BOS对应各个region的服务域名:

区域 访问Endpoint
北京 s3.bj.bcebos.com
保定 s3.bd.bcebos.com
苏州 s3.su.bcebos.com
广州 s3.gz.bcebos.com
香港 s3.hkg.bcebos.com
金融云武汉专区 s3.fwh.bcebos.com

在目的Elasticsearch集群创建仓库时,除了添加BOS信息外,还需要添加readonly参数,以保证只有一个集群可以对仓库进行操作。创建命令如下:

Plain Text
1POST /_snapshot/es_repo
2{
3    "type": "bos",
4    "settings": {
5        "access_key": "your access_key",
6        "secret_key": "your secret_key",
7        "endpoint": "s3.bj.bcebos.com",
8        "bucket": "es-repo",
9        "base_path": "test_base_path",
10        "readonly": true
11    }
12}
  1. 在源Elasticsearch集群创建数据的快照
Plain Text
1PUT /_snapshot/es_repo/snapshot_2020_01_01?wait_for_completion=true
2{
3  "indices": "index_1,index_2",
4  "ignore_unavailable": true,
5  "include_global_state": false
6}
  1. 在目的Elasticsearch集群恢复数据快照
Plain Text
1POST /_snapshot/es_repo/snapshot_2020_01_01/_restore
2{
3  "indices": "index_1,index_2",
4  "include_global_state": false
5}

具体参数详解参照基于BOS的快照备份恢复。

注意事项

  • 需要开通BOS服务并且创建bucket。
  • 并且源集群和目的集群能够访问BOS网络。
  • 源集群进行快照时,同时只能有一个快照运行,且有删除快照的操作时,不能进行快照。
  • 目的集群创建仓库时,需要添加readonly参数,以保证只有一个集群可以对仓库进行写操作。
  • 目的集群恢复数据时,恢复的index可以不存在,如果存在必须是closed状态。
  • 对源集群和目的集群的版本匹配有严格要求,具体要求参见上文表格。
  • 使用BOS跨region迁移数据时,可能会产生外网流量费用。
  • 对索引做快照的时候,对索引的变更操作不会在快照中出现,用户应该避免这种情况。如果必须变更数据,需要在迁移完后在新的集群进行回放变更操作。同理,其他迁移方式也要注意这点。

在线reindex

reindex from a remote cluster api是Elasticsearch提供的一个 api接口,可以把数据从源Elasticsearch集群导入到当前Elasticsearch集群,实现数据的迁移。原理是从源 Elasticsearch 集群中查询数据,然后写入到目标 Elasticsearch 集群中。

适用场景

  • 在线迁移数据
  • 对迁移速度要求不高
  • 可以对源端集群数据进行查询

使用方式

  • 在目的集群的elasticsearch.yml配置reindex白名单
Plain Text
1reindex.remote.whitelist: {IP}:{port}
  • 在目的集群创建index(如果不需要特殊处理分片数和mapping等,可以省略这一步)
  • 做reindex操作
Plain Text
1POST _reindex
2{
3  "source": {
4    "remote": {
5      "host": "http://{oldhost}:{oldport}",
6      "username": "{username}",
7      "password": "{password}"
8    },
9    "index": "source_index",
10    "query": {
11      "match_all": {}
12    }
13  },
14  "dest": {
15    "index": "dest_index"
16  }
17}

详细使用方式参照reindex from a remote cluster api。

注意事项

  • 目的集群需要连通源集群的网络。
  • 需要重启目的集群配置reindex白名单。
Plain Text
1白名单修改可以在百度云页面【配置修改】进行修改,
2注意:
31、修改配置后,需要重启集群才生效。
42、配置格式为yml格式。注意,在【冒号】后【配置值】前要有一个“空格”。

image.png

  • 迁移数据时,最好不要对索引有变更操作;如果必须变更数据,需要在迁移完后在新的集群进行回放变更操作。

Logstash

适用场景

  • 在线迁移数据
  • 可以对源端集群数据进行查询
  • 请注意 Logstash 版本的版本是否与Elastic相匹配,建议与 Elasticsearch 版本保持一致。版本兼容性说明请参见Elasticsearch系列产品兼容性。

使用方式

  • 安装部署Logstash与Java8以上的jdk环境。
Plain Text
1wget https://artifacts.elastic.co/downloads/logstash/logstash-7.4.2.tar.gz
2tar xvf logstash-7.4.2.tar.gz 
  • 根据数据源类型自定义配置文件*.conf,配置文件内容如下:
Plain Text
1input {
2    elasticsearch {
3        hosts => ["http://{oldhost}:{oldport}"]
4        index => "*"
5        docinfo => true
6    }
7}
8output {
9    elasticsearch {
10        hosts => ["http://{newhost}:{newport}"]
11        index => "%{[@metadata][_index]}"
12    }
13}
  • 执行Logstash。
Plain Text
1 nohup ./bin/logstash -f ~/*.conf 2>&1 >/dev/null &

上述配置文件将源Elasticsearch集群的所有索引同步到目标集群中,也可以设置只同步指定的索引,Logstash 的更多功能可查阅 Logstash 官方文档。

注意事项

  • 需要准备Logstash环境并连通Elasticsearch集群的网络。

通过Spark迁移数据

Spark支持从一个Elasticsearch集群中读取数据然后写入到另一个Elasticsearch集群。

使用方式

Spark代码参考如下:

Java
1import org.apache.spark.SparkConf;
2import org.apache.spark.SparkContext;
3import org.apache.spark.api.java.JavaRDD;
4import org.apache.spark.api.java.JavaSparkContext;
5import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
6
7import java.util.HashMap;
8import java.util.Map;
9
10import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark.esRDD;
11
12public class MigrateESToES {
13
14    public static void main(String[] args) {
15
16        SparkConf conf = new SparkConf();
17
18        conf.setAppName("Baidu-Cloud-Migrate-ES-Example");
19        conf.set("es.index.auto.create", "true");
20
21        SparkContext sc = new SparkContext();
22
23        // source cluster configuration
24        Map<String, String> inputCfg = new HashMap<>();
25        // node ip:port
26        inputCfg.put("es.nodes", "{Source Elasticseach Host}:8200");
27        // 要迁移的索引和文档类型,例如test_size索引,文档类型_doc
28        inputCfg.put("es.resource", "test_size/_doc");
29        // 如果没有开启认证的话,这两个选项可以不写
30        // username
31        // inputCfg.put("es.net.http.auth.user", "superuser");
32        // password
33        // inputCfg.put("es.net.http.auth.pass", "xxxx");
34        // es_version,条件可以的话建议写上
35        inputCfg.put("es.internal.es.version", "7.4.2");
36        // 读取_version 字段
37        // inputCfg.put("es.read.metadata.version", "true");
38        // 读取_matedata 字段
39        // inputCfg.put("es.read.metadata", "true");
40
41        // dst cluster configuration
42        Map<String, String> outputCfg = new HashMap<>();
43        outputCfg.put("es.nodes", "{Dst Cluster}:8200");
44        outputCfg.put("es.net.http.auth.user", "root");
45        outputCfg.put("es.net.http.auth.pass", "root");
46        outputCfg.put("es.internal.es.version", "7.4.2");
47        // 目标索引和文档类型
48        outputCfg.put("es.resource", "test_size/_doc");
49
50        // 指定写入version type 为外部指定
51        // outputCfg.put("es.mapping.version.type", "external");
52        // 指定 index id
53        // outputCfg.put("es.mapping.id", "_metadata._id");
54        // 指定 routing id
55        // outputCfg.put("es.mapping.routing", "_metadata._id");
56        // 指定 doc version
57        // outputCfg.put("es.mapping.version", "_metadata._version");
58        // 关闭refresh
59        outputCfg.put("es.batch.write.refresh", "false");
60        // 单次写15mb
61        outputCfg.put("es.batch.size.bytes", "15mb");
62        //size (in entries) for batch writes using Elasticsearch bulk API
63        outputCfg.put("es.batch.size.entries", "5000");
64        // 异常不处理,打log,避免conflict 导致task failed
65        outputCfg.put("es.write.rest.error.handlers", "log");
66        // 异常日志前缀
67        outputCfg.put("es.write.rest.error.handler.log.logger.name", "es_error_handler");
68        // outputCfg.put("es.mapping.exclude", "_metadata");
69
70        // create spark context
71        JavaSparkContext jsc = new JavaSparkContext(sc);
72
73        JavaRDD<Map<String, Object>> sourceRDD = esRDD(jsc, inputCfg).values();
74
75        JavaEsSpark.saveToEs(sourceRDD, outputCfg);
76        sc.stop();
77    }
78}

注意事项

  • 需要准备Spark环境并连通Elasticsearch集群的网络。
  • 需要自行实现Spark业务逻辑。

HDFS快照迁移

使用方式

BES已预装了repository-hdfs插件。您可以通过HDFS进行快照备份并恢复,使用方式与基于BOS的快照备份与恢复相类似,只是中转媒介换成了HDFS。

创建快照仓库的命令示例:

JSON
1PUT _snapshot/my_hdfs_repository
2{
3  "type": "hdfs",
4  "settings": {
5    "uri": "hdfs://192.168.1.101:8020",
6    "path": "/tmp/hadoop/es_repo"
7  }
8}

配置详情可参考ES官网介绍repository-hdfs-config。

注意事项

  • 需要准备资源足够的HDFS环境并连通网络。
  • BES已预装了repository-hdfs插件。
  • 目的集群恢复数据时,需要关闭相应index。
  • 对源集群和目的集群的版本匹配有严格要求,具体要求参见上文表格。

上一篇
智能巡检
下一篇
基于BOS的快照与恢复