业务迁移

消息服务 for Kafka Kafka

  • 功能发布记录
  • 共享版
    • 产品定价
    • 产品描述
      • 介绍
    • 快速入门
      • 多用户访问控制
      • 监控报警
      • 操作流程
    • 常见问题
      • 常见问题总览
      • 安全类问题
      • 配置类问题
    • API文档
      • 通用说明
      • 简介
      • 公共头
      • 接口说明
      • 模型定义
      • 服务域名
      • 错误返回
  • 专享版
    • 产品描述
      • 使用限制
      • 基本概念
      • 产品规格
      • 产品优势
      • 产品介绍
      • 应用场景
      • 产品架构
    • 开发指南
      • 访问协议介绍
      • 接入点查看
      • 概述
      • Python示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • Go示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • Java示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • PHP示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • C++示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
    • 快速入门
      • 步骤二:创建主题
      • 步骤一:创建Kafka集群实例
      • 步骤三:配置权限认证
      • 概述
      • 步骤四:访问Kafka集群
        • 使用SASL_SSL协议访问集群
        • 使用PLAINTEXT协议访问集群
        • 使用SASL_PLAINTEXT协议访问集群
        • 使用SSL协议访问集群
    • 操作指南
      • 多用户访问控制
      • 消息查询
      • 标签管理
      • 存储分析
      • 集群日志
      • 操作审计
      • 任务管理
        • 任务类型介绍
        • 查看任务详情
      • 主题管理
        • 主题重新分区
        • 删除主题
        • 创建主题
        • 修改主题分区数
        • 查看主题订阅关系
        • 查看主题分区详情
        • 查看主题详情
        • 修改主题配置
      • 用户管理
        • 删除用户
        • 重置用户密码
        • 创建用户
      • 监控报警
        • 报警策略配置
        • 集群监控
        • 事件策略配置
      • 消费组管理
        • 查看消费组订阅信息
        • 消费组重置位点
        • 删除消费组
      • 集群配置管理
        • 配置参数介绍
        • 查看集群配置详情
        • 创建集群配置
        • 删除集群配置
      • 权限管理
        • 创建权限
        • 删除权限
      • 集群管理
        • 删除集群
        • 查看集群信息
        • 变更访问配置
        • 集群变更
        • 磁盘水位处理
        • 查看Controller信息
        • 重启节点
        • 查看集群接入点
        • 集群启停
    • 最佳实践
      • Flink接入Kafka专享版
      • 如何选择合适的集群规格
      • Logstash接入Kafka专享版
      • Filebeat接入Kafka专享版
      • 业务迁移
    • API参考
      • 更新记录
      • 调用说明
      • 附录
      • 服务域名
      • 错误返回
      • 主题管理接口
        • 查询主题列表
        • 查询主题订阅详情
        • 删除主题
        • 查询主题详情
        • 创建主题
        • 获取订阅主题的消费组列表
        • 查询主题分区详情
        • 变更主题
        • 查询主题分区列表
      • 用户管理接口
        • 查询用户列表
        • 删除用户
        • 重置用户密码
        • 创建用户
      • 消费组管理接口
        • 删除消费组
        • 重置消费组位点
        • 查询消费组列表
        • 查询消费组订阅的主题列表
      • 集群管理接口
        • 查询集群参数
        • 创建集群
        • 释放集群
        • 查询集群节点列表
        • 停止集群
        • 查询集群详情
        • 启动集群
        • 查询集群列表
        • 查询集群接入点
      • 集群变更接口
        • 变更访问配置
        • 扩容磁盘容量
        • 增加节点数量
        • 变更用户安全组
        • 变更节点机型
        • 变更集群配置
        • 集群公网开关
        • 变更存储策略
        • 变更公网带宽
      • 权限管理接口
        • 查询权限列表
        • 创建权限
        • 删除权限
      • 任务管理接口
        • 暂停任务
        • 恢复任务
        • 查询操作详情
        • 查询任务列表
        • 取消任务
        • 查询任务详情
        • 启动任务
      • 集群配置管理接口
        • 查询集群配置列表
        • 查询集群配置详情
        • 创建集群配置
        • 查询集群配置版本列表
        • 新增集群配置版本
        • 查询集群配置版本详情
        • 删除集群配置
    • 产品定价
      • 余额不足提醒和欠费处理
      • 计费说明
      • 变配规则说明
      • 续费说明
所有文档
menu
没有找到结果,请重新输入

消息服务 for Kafka Kafka

  • 功能发布记录
  • 共享版
    • 产品定价
    • 产品描述
      • 介绍
    • 快速入门
      • 多用户访问控制
      • 监控报警
      • 操作流程
    • 常见问题
      • 常见问题总览
      • 安全类问题
      • 配置类问题
    • API文档
      • 通用说明
      • 简介
      • 公共头
      • 接口说明
      • 模型定义
      • 服务域名
      • 错误返回
  • 专享版
    • 产品描述
      • 使用限制
      • 基本概念
      • 产品规格
      • 产品优势
      • 产品介绍
      • 应用场景
      • 产品架构
    • 开发指南
      • 访问协议介绍
      • 接入点查看
      • 概述
      • Python示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • Go示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • Java示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • PHP示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • C++示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
    • 快速入门
      • 步骤二:创建主题
      • 步骤一:创建Kafka集群实例
      • 步骤三:配置权限认证
      • 概述
      • 步骤四:访问Kafka集群
        • 使用SASL_SSL协议访问集群
        • 使用PLAINTEXT协议访问集群
        • 使用SASL_PLAINTEXT协议访问集群
        • 使用SSL协议访问集群
    • 操作指南
      • 多用户访问控制
      • 消息查询
      • 标签管理
      • 存储分析
      • 集群日志
      • 操作审计
      • 任务管理
        • 任务类型介绍
        • 查看任务详情
      • 主题管理
        • 主题重新分区
        • 删除主题
        • 创建主题
        • 修改主题分区数
        • 查看主题订阅关系
        • 查看主题分区详情
        • 查看主题详情
        • 修改主题配置
      • 用户管理
        • 删除用户
        • 重置用户密码
        • 创建用户
      • 监控报警
        • 报警策略配置
        • 集群监控
        • 事件策略配置
      • 消费组管理
        • 查看消费组订阅信息
        • 消费组重置位点
        • 删除消费组
      • 集群配置管理
        • 配置参数介绍
        • 查看集群配置详情
        • 创建集群配置
        • 删除集群配置
      • 权限管理
        • 创建权限
        • 删除权限
      • 集群管理
        • 删除集群
        • 查看集群信息
        • 变更访问配置
        • 集群变更
        • 磁盘水位处理
        • 查看Controller信息
        • 重启节点
        • 查看集群接入点
        • 集群启停
    • 最佳实践
      • Flink接入Kafka专享版
      • 如何选择合适的集群规格
      • Logstash接入Kafka专享版
      • Filebeat接入Kafka专享版
      • 业务迁移
    • API参考
      • 更新记录
      • 调用说明
      • 附录
      • 服务域名
      • 错误返回
      • 主题管理接口
        • 查询主题列表
        • 查询主题订阅详情
        • 删除主题
        • 查询主题详情
        • 创建主题
        • 获取订阅主题的消费组列表
        • 查询主题分区详情
        • 变更主题
        • 查询主题分区列表
      • 用户管理接口
        • 查询用户列表
        • 删除用户
        • 重置用户密码
        • 创建用户
      • 消费组管理接口
        • 删除消费组
        • 重置消费组位点
        • 查询消费组列表
        • 查询消费组订阅的主题列表
      • 集群管理接口
        • 查询集群参数
        • 创建集群
        • 释放集群
        • 查询集群节点列表
        • 停止集群
        • 查询集群详情
        • 启动集群
        • 查询集群列表
        • 查询集群接入点
      • 集群变更接口
        • 变更访问配置
        • 扩容磁盘容量
        • 增加节点数量
        • 变更用户安全组
        • 变更节点机型
        • 变更集群配置
        • 集群公网开关
        • 变更存储策略
        • 变更公网带宽
      • 权限管理接口
        • 查询权限列表
        • 创建权限
        • 删除权限
      • 任务管理接口
        • 暂停任务
        • 恢复任务
        • 查询操作详情
        • 查询任务列表
        • 取消任务
        • 查询任务详情
        • 启动任务
      • 集群配置管理接口
        • 查询集群配置列表
        • 查询集群配置详情
        • 创建集群配置
        • 查询集群配置版本列表
        • 新增集群配置版本
        • 查询集群配置版本详情
        • 删除集群配置
    • 产品定价
      • 余额不足提醒和欠费处理
      • 计费说明
      • 变配规则说明
      • 续费说明
  • 文档中心
  • arrow
  • 消息服务 for KafkaKafka
  • arrow
  • 专享版
  • arrow
  • 最佳实践
  • arrow
  • 业务迁移
本页目录
  • 迁移前提
  • 集群准备
  • 1. 购买专享版消息服务for Kafka集群
  • 2. 为购买的集群创建主题
  • 迁移方案
  • 方案一:先迁生产,后迁消费
  • 适用场景
  • 迁移步骤
  • 方案二:同时消费,后迁生产
  • 适用场景
  • 迁移步骤
  • 使用MirrorMaker迁移
  • 概述
  • 迁移原理
  • 环境准备
  • 迁移步骤
  • 迁移结果查看

业务迁移

更新时间:2025-08-21

本文主要介绍将自建Kafka集群迁移到专享版 消息服务 for Kafka集群的相关操作。

迁移前提

  • 存在一个可用的专享版 消息服务 for Kafka。
  • 为专享版 消息服务 for Kafka创建主题。

集群准备

1. 购买专享版消息服务for Kafka集群

开通消息服务 for Kafka服务后,在控制台页面点击『创建集群』,即可进行购买。

业务迁移1.png

2. 为购买的集群创建主题

在控制台页面点击集群名称,进入集群详情页面。

在左侧的边栏中点击『主题管理』,进入主题管理页面。

业务迁移2.png

在主题管理页面点击『创建主题』,进行主题的创建。

开通消息服务 for Kafka服务后,在控制台页面点击『创建集群』,即可进行购买。

业务迁移3.png

迁移方案

方案一:先迁生产,后迁消费

先将生产消息的业务迁移到专享版 消息服务 for Kafka集群,这样原Kafka集群不会有新的消息生产。

待原有Kafka集群的消息全部消费完成后,再将消费消息业务迁移到专享版 消息服务 for Kafka集群,开始消费消息。

img

适用场景

本方案为业界通用的迁移方案,操作步骤简单,迁移过程由业务侧自主控制,整个过程中消息不会存在乱序问题,适用于对消息顺序有要求的场景。但是该方案中需要等待消费者业务直至消费完毕,存在一个时间差的问题,部分数据可能存在较大的端到端时延。

迁移步骤

  1. 专享版 消息服务 for Kafka集群,在集群中创建好 topic 等资源,具体操作见『集群准备』。
  2. 切换生产者写入地址:
    • 将生产客户端的Kafka连接地址修改为专享版 消息服务 for Kafka集群所提供的接入点地址。接入点查看请参考:查看集群接入点
    • 重启生产业务,使得生产者将新的消息发送到专享版 消息服务 for Kafka集群中。
  1. 观察各消费组在原Kafka的消费进度,直到原Kafka中数据都已经被消费完毕。
  2. 切换消费者
    • 将消费客户端的Kafka连接地址修改为专享版 消息服务 for Kafka集群所提供的接入点地址。接入点查看请参考:查看集群接入点
    • 重启消费业务,使得消费者从专享版 消息服务 for Kafka集群中消费消息。
  1. 观察消费者是否能正常从专享版 消息服务 for Kafka集群中获取数据。
  2. 迁移结束。

方案二:同时消费,后迁生产

指消费者业务启用多个消费客户端,分别向原Kafka集群和专享版 消息服务 for Kafka集群消费消息,然后将生产业务切到专享版 消息服务 for Kafka集群,这样能确保所有消息都被及时消费。

img

适用场景

迁移过程由业务自主控制。本方案中消费业务会在一段时间内同时消费原Kafka集群和专享版 消息服务 for Kafka集群。由于在迁移生产业务之前,已经有消费业务运行在专享版 消息服务 for Kafka集群上,因此不会存在端到端时延的问题。但在迁移生产的开始阶段,同时消费原Kafka集群与专享版 消息服务 for Kafka集群,会导致部分消息之间的生产顺序无法保证,存在消息乱序的问题。此场景适用于对端到端时延有要求,却对消息顺序不敏感的业务。

迁移步骤

  1. 专享版 消息服务 for Kafka集群,在集群中创建好 topic 等资源,具体操作见『集群准备』。
  2. 启动新的消费客户端,配置Kafka连接地址为修改为专享版 消息服务 for Kafka集群所提供的接入点地址。消费专享版 消息服务 for Kafka集群中的数据。(原有消费客户端需继续运行,消费业务同时消费原Kafka集群与专享版 消息服务 for Kafka集群中的消息。)
  3. 切换生产者写入地址:
    • 修改生产客户端,Kafka连接地址改为专享版 消息服务 for Kafka集群所提供的接入点地址。接入点查看请参考:查看集群接入点
    • 重启生产客户端,将生产业务迁移到专享版 消息服务 for Kafka集群中。
    • 生产业务迁移后,观察连接专享版 消息服务 for Kafka集群的消费业务是否正常。
  1. 等待原Kafka集群中数据消费完毕,关闭原有消费业务客户端。
  2. 迁移结束。

使用MirrorMaker迁移

概述

MirrorMaker为Kafka自带的数据迁移工具,可以利用此种方式直接进行kafka集群之间的数据迁移;减少中间组件的使用。

具体介绍请参考Kakfa官方网站:MirrorMaker

迁移原理

通过Consumer从旧的Kafka集群中消费数据,然后通过Producer将数据生产在目标kafka集群中,来实现数据同步备份。

环境准备

准备一台与专享版 消息服务 for Kafka集群网络相通的服务器,例如:云服务器BCC。

可通过如下链接下载迁移工具:迁移工具,并将其上传到BCC中进行解压。

BCC中进行如下配置操作:

Bash
1# 1. 安装JDK 1.8
2yum install java
3# 2. 下载并解压kafka_2.12-2.7.2.tgz文件
4tar -zxvf kafka_2.12-2.7.2.tgz

迁移步骤

  1. 进入kafka_2.12-2.7.2/config路径下,找到connect-mirror-maker.properties文件,并进行如下配置:
XML
1# Licensed to the Apache Software Foundation (ASF) under A or more
2# contributor license agreements.  See the NOTICE file distributed with
3# this work for additional information regarding copyright ownership.
4# The ASF licenses this file to You under the Apache License, Version 2.0
5# (the "License"); you may not use this file except in compliance with
6# the License.  You may obtain a copy of the License at
7# 
8#    http://www.apache.org/licenses/LICENSE-2.0
9# 
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15# see org.apache.kafka.clients.consumer.ConsumerConfig for more details
16
17# Sample MirrorMaker 2.0 top-level configuration file
18# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties 
19
20# 指定任意数量的集群别名
21clusters = A, B
22
23# 配置新旧集群的地址
24# 例如A配置为原Kakfa集群的地址,B则配置为专享版 消息服务 for Kafka提供的接入点地址
25A.bootstrap.servers = A_host1:port, A_host2:port, A_host3:port
26B.bootstrap.servers = B_host1:port, B_host2:port, B_host3:port
27
28# 指定数据同步的方向(单向同步、相互同步)
29A->B.enabled = true
30
31# 指定需要同步的主题,默认全部同步
32A->B.topics = .*
33
34# 下面两行配置表示两个集群相互同步
35B->A.enabled = true
36B->A.topics = .*
37
38# 设置副本数
39replication.factor=1
40
41############################# Internal Topic Settings  #############################
42# The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and
43# "mm2-offset-syncs.B.internal"
44# 下面的配置在测试环境中可以设置为1,生产环境中建议设置大于1的值,例如:3
45checkpoints.topic.replication.factor=1
46heartbeats.topic.replication.factor=1
47offset-syncs.topic.replication.factor=1
48
49# The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and
50# "mm2-status.B.internal"
51# 下面的配置在测试环境中可以设置为1,生产环境中建议设置大于1的值,例如:3
52offset.storage.replication.factor=1
53status.storage.replication.factor=1
54config.storage.replication.factor=1
55
56# 有需要可以自行配置
57# replication.policy.separator = _
58# sync.topic.acls.enabled = false
59# emit.heartbeats.interval.seconds = 5
  1. 进入kafka_2.12-2.7.2/bin路径下,启动MirrorMaker脚本工具,执行如下命令:
Bash
1./bin/connect-mirror-maker.sh config/connect-mirror-maker.properties
  1. 等待数据迁移。

迁移结果查看

  • 在原集群生产并消费消息。
  • 在专享版 消息服务 for Kafka的集群监控中查看数据同步情况。

监控查看步骤如下:

(1)在专享版消息服务 for Kafka的控制台页面找到需要连接的集群,点击集群名称进入集群详情页面。

监控信息1.png

(2)页面跳转后,进入左侧边中的集群详情页面。

监控信息2.png

(3)点击左侧边栏中的集群监控,进入集群监控页面。

监控信息3.png

(4)通过查看集群监控页面,提供的不同纬度的监控信息(集群监控、节点监控、主题监控、消费组监控),即可获知集群的生产和消费情况。

集群监控的具体使用请参考:集群监控

监控信息4.png

上一篇
Filebeat接入Kafka专享版
下一篇
API参考