将Kafka作为DTS目标端

数据传输服务 DTS

  • 任务管理
    • 任务限速
    • 修改迁移任务的当前位点
    • 查看任务进度
    • 创建类似任务
    • 一键反向
    • 管理任务对象
      • 同步Online DDL
      • 迁移数据库账号
      • 修改同步对象
      • 传输对象范围
      • 设置过滤条件
      • 库表列名映射
    • 生命周期
      • 结束任务
      • 删除任务
      • 暂停任务
      • 变更任务链路规格
      • 启动任务
  • API3.0
    • 调用说明
    • API概览
    • 目录
    • 数据传输任务
      • 结束任务
      • 更新任务名称
      • 暂停任务
      • 查询前置检查结果
      • 释放任务
      • 修改同步对象
      • 修改同步对象记录
      • 创建任务
      • 前置检查
      • 强制跳过预检查
      • 查询任务列表
      • 变更链路规格
      • 配置任务
      • 查询任务信息
      • 启动任务
      • demo
        • 专线迁移Redis Cluster迁移到云数据库Redis企业集群版-一键反向
        • 专线迁移MongoDB副本集迁移到云数据库MongoDB副本集
        • 专线迁移Redis Cluster迁移到云数据库Redis标准版
        • 专线迁移Redis Cluster迁移到云数据库Redis标准版-数据校验
        • 专线迁移Redis Cluster迁移到云数据库Redis企业集群版-数据校验
        • 公网sql迁移到公网kafka
        • 专线迁移Redis标准版迁移到云数据库Redis标准版-一键反向
        • 专线迁移的MongoDB分片集迁移到云数据库MongoDB分片集-数据校验
        • 专线迁移MongoDB副本集迁移到云数据库MongoDB副本集-一键反向
        • 专线迁移MySQL迁移到云数据库RDS-一键反向
        • 专线迁移的Redis标准版迁移到云数据库Redis标准版-数据校验
        • 专线迁移MySQL迁移到云数据库RDS-数据校验
        • 专线迁移MongoDB分片集迁移到云数据库MongoDB分片集
        • 专线迁移MongoDB副本集迁移到云数据库MongoDB副本集-数据校验
        • 专线迁移MySQL迁移到云数据库RDS
        • 专线迁移Redis Cluster迁移到云数据库Redis企业集群版
        • 专线迁移Redis标准版迁移到云数据库Redis标准版
      • 其他
        • 一键反向查询前置检查结果
        • 展示授权白名单
        • 一键反向延迟检查
        • 获取订单状态
        • 配置一键反向
        • 查询vpc列表
        • 查询MySQL实例拥有的账号列表
        • 绑定&解除标签
        • 查询可用区列表
        • 检查数据库连通性
        • 一键反向
      • 数据校验
        • 查询数据校验任务列表
        • 数据校验任务启动
        • 创建数据校验任务
        • 配置数据校验任务
        • 停止校验
        • 查看数据校验比对结果
        • 数据校验前置检查
        • 查看数据校验任务信息
  • 监控报警
    • 查看任务监控
    • 设置报警策略
  • 产品计费
    • 计费概述
    • 到期或欠费说明
    • 计费方式
    • 变更和退订说明
  • 数据校验
    • 配置数据校验
    • 查看数据校验
    • 数据校验概述
  • Java_SDK
    • SDK下载页
    • 安装-DTS-Java-SDK
    • 使用数据订阅SDK
    • SDK接口简介
    • SDK下载
    • 概述
  • 快速入门
    • 购买流程
    • 数据迁移操作指导
    • 配置迁移任务
  • 典型实践
    • 使用NimoShake将Amazon DynamoDB迁移至百度云DocDB
    • 不同库名间的数据同步
    • 获取主账号ID
    • 使用DTS实现目标端为Elasticsearch的数据迁移
    • 使用DTS实现MySQL数据拆分
    • 数据库迁移上云
    • 业务切换流程
    • DTS支持专线迁移
    • DTS支持专线迁移(新)
    • 修改RDS MySQL参数提升迁移性能
    • MySQL实例的双向同步
  • 准备工作
    • 自建MySQL创建账号并设置binlog
    • 迁移评估
      • 源端MySQL迁移评估
    • 网络准备
      • 网络准备概述
      • VPC接入
      • 添加DTS服务IP白名单
      • 本地IDC接入百度智能云
        • VPN接入
        • 专线接入
        • 公网接入
    • 访问控制
      • 子用户权限管理
  • 产品简介
    • 支持的数据流
    • 功能概览
    • 基本概念
    • 产品优势
    • 架构原理
    • 应用场景
    • 什么是数据库传输服务DTS
    • 链路规格说明
  • 数据迁移
    • 跨账号迁移云数据库实例
    • 迁移方案概览
    • TiDB为源的迁移
      • 自建TiDB迁移至GaiaDB
    • Oracle为源的迁移
      • 自建Oracle迁移至RDS MySQL
    • GaiaDB为源的迁移
      • GaiaDB实例间的迁移
    • MySQL为源的迁移
      • 腾讯云MySQL迁移至RDS MySQL
      • RDS MySQL迁移至GaiaDB
      • 自建MySQL迁移至GaiaDB
      • 自建MySQL迁移至RDS MySQL
      • 阿里云PolarDB迁移至GaiaDB
      • RDS MySQL实例间的迁移
      • 阿里云MySQL迁移至RDS MySQL
      • MySQL迁移至Kafka
    • 异构数据库间的数据类型映射关系
      • 百度智能云DTS数据类型
      • DTS支持的数据源端
        • 将Kafka作为源端
        • 将Redis标准版作为源端
        • 将GaiaDB作为源端
        • 将Oracle作为源端
        • 将SQL Server作为源端
        • 将云上百度DRDS作为源端
        • 将MongoDB分片集作为源端
        • 将PostgreSQL作为源端
        • 将MongoDB副本集作为源端
        • 将MySQL作为源端
      • DTS支持的数据目的端
        • 将SQL Server作为目标端
        • 将Elasticsearch作为DTS目的端
        • 将GaiaDB作为目标端
        • 将Palo作为目标端
        • 将DataHub作为目标端
        • 将PostgreSQL作为目标端
        • 将MySQL作为目标端
        • 将Kafka作为DTS目标端
    • SQL Server 为源的迁移
      • 自建SQL Server迁移至RDS SQL Server
    • MongoDB为源的迁移
      • 自建MongoDB分片集迁移至DocDB MongoDB
      • DocDB MongoDB副本集迁移至DocDB MongoDB
    • Kafka为源的迁移
      • 自建Kafka迁移至消息服务 for Kafka
    • PostgreSQL为源的迁移
      • RDS PostgreSQL实例间的迁移
      • 自建PostgreSQL迁移至RDS PostgreSQL
      • AWS PostgreSQL迁移至RDS PostgreSQL
    • Redis为源的迁移
      • 使用DTS实现自建Redis标准版到百度智能云SCS for Redis标准版的数据迁移
      • 云数据库Redis实例间的迁移
      • 自建Redis迁移至云数据库Redis(含PegaDB)
    • Milvus为源的迁移
      • 自建 Milvus 迁移至向量数据库 VectorDB
  • 产品动态
    • 功能发布记录
    • 公告
      • 数据传输服务DTS支持多规格数据传输任务
      • 数据传输服务DTS任务状态升级通知
  • 预检查项
    • TiDB检查项
    • PostgreSQL检查项
    • MySQL检查项
    • Palo检查项
    • Oracle检查项
    • GaiaDB-X检查项
    • RocketMQ检查项
    • MongoDB检查项
    • Redis检查项
    • Elasticsearch检查项
    • GaiaDB检查项
    • Kafka检查项
    • 预检查不通过处理方法
      • 源库、目标库的连接数是否满足要求检查
      • 迁移表依赖的外键父表是否迁移检查
      • 目的数据库是否只读检查
      • 目的数据库的账号权限是否满足迁移要求检查
      • 目的库中表是否为空检查
      • 源库和目标库中SQL_MODE是否合法检查
      • 数据传输服务器是否能连通目的数据库检查
      • 源数据库的版本号检查
      • 源数据库的账号权限是否满足迁移要求检查
      • 迁移表是否有不支持的存储引擎检查
      • 预检查项汇总
      • 迁移表的表结构在目的库是否存在检查
      • 目的数据库待迁入的数据库是否可用检查
      • 目的库是否存在跟待迁移对象同名的结构对象检查
      • 数据传输服务器是否能连通源数据库检查
  • 服务支持
    • 相关协议
      • DTS服务等级协议SLA
    • 常见问题
      • 使用类问题
      • 数据迁移问题
      • 常见问题总览
      • 数据同步问题
所有文档
menu
没有找到结果,请重新输入

数据传输服务 DTS

  • 任务管理
    • 任务限速
    • 修改迁移任务的当前位点
    • 查看任务进度
    • 创建类似任务
    • 一键反向
    • 管理任务对象
      • 同步Online DDL
      • 迁移数据库账号
      • 修改同步对象
      • 传输对象范围
      • 设置过滤条件
      • 库表列名映射
    • 生命周期
      • 结束任务
      • 删除任务
      • 暂停任务
      • 变更任务链路规格
      • 启动任务
  • API3.0
    • 调用说明
    • API概览
    • 目录
    • 数据传输任务
      • 结束任务
      • 更新任务名称
      • 暂停任务
      • 查询前置检查结果
      • 释放任务
      • 修改同步对象
      • 修改同步对象记录
      • 创建任务
      • 前置检查
      • 强制跳过预检查
      • 查询任务列表
      • 变更链路规格
      • 配置任务
      • 查询任务信息
      • 启动任务
      • demo
        • 专线迁移Redis Cluster迁移到云数据库Redis企业集群版-一键反向
        • 专线迁移MongoDB副本集迁移到云数据库MongoDB副本集
        • 专线迁移Redis Cluster迁移到云数据库Redis标准版
        • 专线迁移Redis Cluster迁移到云数据库Redis标准版-数据校验
        • 专线迁移Redis Cluster迁移到云数据库Redis企业集群版-数据校验
        • 公网sql迁移到公网kafka
        • 专线迁移Redis标准版迁移到云数据库Redis标准版-一键反向
        • 专线迁移的MongoDB分片集迁移到云数据库MongoDB分片集-数据校验
        • 专线迁移MongoDB副本集迁移到云数据库MongoDB副本集-一键反向
        • 专线迁移MySQL迁移到云数据库RDS-一键反向
        • 专线迁移的Redis标准版迁移到云数据库Redis标准版-数据校验
        • 专线迁移MySQL迁移到云数据库RDS-数据校验
        • 专线迁移MongoDB分片集迁移到云数据库MongoDB分片集
        • 专线迁移MongoDB副本集迁移到云数据库MongoDB副本集-数据校验
        • 专线迁移MySQL迁移到云数据库RDS
        • 专线迁移Redis Cluster迁移到云数据库Redis企业集群版
        • 专线迁移Redis标准版迁移到云数据库Redis标准版
      • 其他
        • 一键反向查询前置检查结果
        • 展示授权白名单
        • 一键反向延迟检查
        • 获取订单状态
        • 配置一键反向
        • 查询vpc列表
        • 查询MySQL实例拥有的账号列表
        • 绑定&解除标签
        • 查询可用区列表
        • 检查数据库连通性
        • 一键反向
      • 数据校验
        • 查询数据校验任务列表
        • 数据校验任务启动
        • 创建数据校验任务
        • 配置数据校验任务
        • 停止校验
        • 查看数据校验比对结果
        • 数据校验前置检查
        • 查看数据校验任务信息
  • 监控报警
    • 查看任务监控
    • 设置报警策略
  • 产品计费
    • 计费概述
    • 到期或欠费说明
    • 计费方式
    • 变更和退订说明
  • 数据校验
    • 配置数据校验
    • 查看数据校验
    • 数据校验概述
  • Java_SDK
    • SDK下载页
    • 安装-DTS-Java-SDK
    • 使用数据订阅SDK
    • SDK接口简介
    • SDK下载
    • 概述
  • 快速入门
    • 购买流程
    • 数据迁移操作指导
    • 配置迁移任务
  • 典型实践
    • 使用NimoShake将Amazon DynamoDB迁移至百度云DocDB
    • 不同库名间的数据同步
    • 获取主账号ID
    • 使用DTS实现目标端为Elasticsearch的数据迁移
    • 使用DTS实现MySQL数据拆分
    • 数据库迁移上云
    • 业务切换流程
    • DTS支持专线迁移
    • DTS支持专线迁移(新)
    • 修改RDS MySQL参数提升迁移性能
    • MySQL实例的双向同步
  • 准备工作
    • 自建MySQL创建账号并设置binlog
    • 迁移评估
      • 源端MySQL迁移评估
    • 网络准备
      • 网络准备概述
      • VPC接入
      • 添加DTS服务IP白名单
      • 本地IDC接入百度智能云
        • VPN接入
        • 专线接入
        • 公网接入
    • 访问控制
      • 子用户权限管理
  • 产品简介
    • 支持的数据流
    • 功能概览
    • 基本概念
    • 产品优势
    • 架构原理
    • 应用场景
    • 什么是数据库传输服务DTS
    • 链路规格说明
  • 数据迁移
    • 跨账号迁移云数据库实例
    • 迁移方案概览
    • TiDB为源的迁移
      • 自建TiDB迁移至GaiaDB
    • Oracle为源的迁移
      • 自建Oracle迁移至RDS MySQL
    • GaiaDB为源的迁移
      • GaiaDB实例间的迁移
    • MySQL为源的迁移
      • 腾讯云MySQL迁移至RDS MySQL
      • RDS MySQL迁移至GaiaDB
      • 自建MySQL迁移至GaiaDB
      • 自建MySQL迁移至RDS MySQL
      • 阿里云PolarDB迁移至GaiaDB
      • RDS MySQL实例间的迁移
      • 阿里云MySQL迁移至RDS MySQL
      • MySQL迁移至Kafka
    • 异构数据库间的数据类型映射关系
      • 百度智能云DTS数据类型
      • DTS支持的数据源端
        • 将Kafka作为源端
        • 将Redis标准版作为源端
        • 将GaiaDB作为源端
        • 将Oracle作为源端
        • 将SQL Server作为源端
        • 将云上百度DRDS作为源端
        • 将MongoDB分片集作为源端
        • 将PostgreSQL作为源端
        • 将MongoDB副本集作为源端
        • 将MySQL作为源端
      • DTS支持的数据目的端
        • 将SQL Server作为目标端
        • 将Elasticsearch作为DTS目的端
        • 将GaiaDB作为目标端
        • 将Palo作为目标端
        • 将DataHub作为目标端
        • 将PostgreSQL作为目标端
        • 将MySQL作为目标端
        • 将Kafka作为DTS目标端
    • SQL Server 为源的迁移
      • 自建SQL Server迁移至RDS SQL Server
    • MongoDB为源的迁移
      • 自建MongoDB分片集迁移至DocDB MongoDB
      • DocDB MongoDB副本集迁移至DocDB MongoDB
    • Kafka为源的迁移
      • 自建Kafka迁移至消息服务 for Kafka
    • PostgreSQL为源的迁移
      • RDS PostgreSQL实例间的迁移
      • 自建PostgreSQL迁移至RDS PostgreSQL
      • AWS PostgreSQL迁移至RDS PostgreSQL
    • Redis为源的迁移
      • 使用DTS实现自建Redis标准版到百度智能云SCS for Redis标准版的数据迁移
      • 云数据库Redis实例间的迁移
      • 自建Redis迁移至云数据库Redis(含PegaDB)
    • Milvus为源的迁移
      • 自建 Milvus 迁移至向量数据库 VectorDB
  • 产品动态
    • 功能发布记录
    • 公告
      • 数据传输服务DTS支持多规格数据传输任务
      • 数据传输服务DTS任务状态升级通知
  • 预检查项
    • TiDB检查项
    • PostgreSQL检查项
    • MySQL检查项
    • Palo检查项
    • Oracle检查项
    • GaiaDB-X检查项
    • RocketMQ检查项
    • MongoDB检查项
    • Redis检查项
    • Elasticsearch检查项
    • GaiaDB检查项
    • Kafka检查项
    • 预检查不通过处理方法
      • 源库、目标库的连接数是否满足要求检查
      • 迁移表依赖的外键父表是否迁移检查
      • 目的数据库是否只读检查
      • 目的数据库的账号权限是否满足迁移要求检查
      • 目的库中表是否为空检查
      • 源库和目标库中SQL_MODE是否合法检查
      • 数据传输服务器是否能连通目的数据库检查
      • 源数据库的版本号检查
      • 源数据库的账号权限是否满足迁移要求检查
      • 迁移表是否有不支持的存储引擎检查
      • 预检查项汇总
      • 迁移表的表结构在目的库是否存在检查
      • 目的数据库待迁入的数据库是否可用检查
      • 目的库是否存在跟待迁移对象同名的结构对象检查
      • 数据传输服务器是否能连通源数据库检查
  • 服务支持
    • 相关协议
      • DTS服务等级协议SLA
    • 常见问题
      • 使用类问题
      • 数据迁移问题
      • 常见问题总览
      • 数据同步问题
  • 文档中心
  • arrow
  • 数据传输服务DTS
  • arrow
  • 数据迁移
  • arrow
  • 异构数据库间的数据类型映射关系
  • arrow
  • DTS支持的数据目的端
  • arrow
  • 将Kafka作为DTS目标端
本页目录
  • 1. 适用场景
  • 2. 将Kafka作为DTS目标端的限制
  • 3. 将Kafka作为DTS目标端的前置条件
  • 3.1 环境要求
  • 3.2 权限要求
  • 3.3 目标端Kafka推荐配置
  • 3.3.1 目标端为百度消息服务主题
  • 3.3.2 目标端为自建Kafka集群
  • 3.3.2.1 通过公网访问您的Kafka集群
  • listeners
  • advertised.listeners
  • listener.security.protocol.map
  • inter.broker.listener.name
  • 启动broker
  • 3.3.2.2 通过百度云内部网络访问您的Kafka集群
  • 查询PNET IP
  • listeners
  • advertised.listeners
  • listener.security.protocol.map
  • inter.broker.listener.name
  • 4. 使用Kafka作为DTS目标端
  • 4.1 任务配置
  • 4.2 对象映射
  • 5. Kafka数据格式
  • 5.1 消息格式版本

将Kafka作为DTS目标端

更新时间:2025-08-21

1. 适用场景

本文适用于使用百度智能云数据传输服务DTS(以下简称 DTS),将DTS已经支持的数据源迁移至Kafka目标端中的场景。

2. 将Kafka作为DTS目标端的限制

  • 增量同步不支持同步关系型数据库的DDL语句。
  • DTS向Kafka发送消息满足At-Least-Once约束(消息不丢但可能重复),重启任务等行为会导致下游Kafka出现少量重复数据(建议使用每条数据的GLOBAL_ID做去重)。

3. 将Kafka作为DTS目标端的前置条件

3.1 环境要求

已创建作为迁移目标端的Kafka集群或百度消息服务主题。自建Kafka集群支持版本为0.9或0.10。

3.2 权限要求

要求给定账号拥有向Kafka指定topic写入数据的权限。

3.3 目标端Kafka推荐配置

3.3.1 目标端为百度消息服务主题

无需额外配置,可直接配置DTS任务,步骤参见下文:目标端Kafka任务配置。

3.3.2 目标端为自建Kafka集群

由于DTS服务管控节点和目标端自建Kafka集群间存在网络隔离,因而需要配置自建Kafka集群的访问路由规则。您可以根据需要选择不同的访问方式,并按步骤配置您的Kafka集群。

3.3.2.1 通过公网访问您的Kafka集群

如果您希望DTS通过公网链路访问您的Kafka集群,您需要为Kafka集群中的每一台机器配置公网访问。假设目标端Kafka集群有三个broker,其公网IP分别为:106.0.0.1、106.0.0.2、106.0.0.3;其内部网络IP分别为:172.16.0.1、172.16.0.2、172.16.0.3。您需要在每个broker的配置文件server.properties做如下配置,以broker1为例(公网IP:106.0.0.1,内网IP:172.16.0.1):

listeners
Plain Text
1listeners=INTERNAL://172.16.0.1:9092,PUBLIC://172.16.0.1:19092

listeners是用来定义broker的listener的配置项。 INTERNAL标签下的连接信息(172.16.0.1:9092),用于broker间的内部通信。这里配置了内网IP(172.16.0.1),表示broker间可以通过内部网络实现网络通信。如果您希望broker间的通信走公网链路,可以改为配置公网IP(106.0.0.1) PUBLIC标签标记的连接信息(172.16.0.1:19092),用于与公网进行网络通信。注意:这里配置的IP应当与INTERNAL标签下的IP保持一致,但端口务必要不同。

advertised.listeners
Plain Text
1advertised.listeners=INTERNAL://172.16.0.1:9092,PUBLIC://106.0.0.1:19092

advertised.listeners用于将broker的listener信息发布到Zookeeper中,供客户端或其他broker查询。如果配置了advertised.listeners,那么就不会将listeners配置的信息发布到Zookeeper中, INTERNAL标签下的连接信息(172.16.0.1:9092)与上文listeners配置保持一致,但PUBLIC标签下的连接信息(106.0.0.1:19092)需要填入公网IP。

listener.security.protocol.map
Plain Text
1listener.security.protocol.map=INTERNAL:PLAINTEXT,PUBLIC:PLAINTEXT

listener.security.protocol.map用于配置监听者的安全协议。 这里您可以按照自己的需要为不同的连接方式配置不同的安全协议。示例中默认为INTERNAL和PUBLIC都配置了无访问控制(PLAINTEXT)的安全协议。

inter.broker.listener.name
Plain Text
1inter.broker.listener.name=INTERNAL

inter.broker.listener.name用于指定某一个标签作为internal listener的连接方式,这个标签所代表的listener专门用于Kafka集群中broker之间的通信。 示例中将字段取值配置为INTERNAL,表示希望broker之间通过内部网络进行通信。

启动broker

完成上述4个参数的配置后,保存修改并退出到kafka根目录,重新启动broker1。然后按照相同步骤配置并启动broker2和broker3。

3.3.2.2 通过百度云内部网络访问您的Kafka集群

除了公网自建实例外,目前DTS还支持下游是BBC或BCC自建的Kafka集群。由于集群部署在百度云上,用户可以选择绑定EIP后,让DTS通过公网访问Kafka集群,或直接通过百度云内部网络访问Kafka集群。

公网访问请参见上一章节,本章节介绍如何配置BBC/BCC自建的Kafka集群,使DTS能够通过百度云内部网络访问Kafka集群。

查询PNET IP

在百度云内部网络中,PNET IP用于唯一标识某一虚机实例,DTS使用PNET IP才能在百度云内部网络中正确访问到您的Kafka集群。在自己的BBC/BCC实例命令行执行如下命令,可以获得实例的PNET IP。

Plain Text
1curl http://169.254.169.254/2009-04-04/meta-data/public-ipv4

PNET IP.png

这里同样以broker1为例(PNET IP:10.0.0.1,内网IP:192.168.0.1),修改server.properties中的4个网络通信配置项,各个配置项含义详见上文通过公网访问章节。

listeners
Plain Text
1listeners=INTERNAL://192.168.0.1:9092,EXTERNAL://192.168.0.1:19092

这里INTERNAL配置的IP是百度云VPC内网IP,您可以在BCC或BBC的实例详情页查询到实例的内网IP。

BCC.png

EXTERNAL标签下的listener表示通过PNET IP访问broker的连接信息,注意:这里配置的IP应当与INTERNAL标签下的IP保持一致,但端口务必要不同。

advertised.listeners
Plain Text
1advertised.listeners=INTERNAL://192.168.0.1:9092,EXTERNAL://10.0.0.1:19092

这里EXTERNAL标签对应的advertised.listeners配置为PNET IP:监听端口,INTERNAL标签的内容与配置项listeners一致。

listener.security.protocol.map
Plain Text
1listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT

这里您可以按照自己的需要为不同的连接方式配置不同的安全协议。示例中默认为INTERNAL和EXTERNAL都配置了无访问控制(PLAINTEXT)的安全协议。

inter.broker.listener.name
Plain Text
1inter.broker.listener.name=INTERNAL

示例中将字段取值配置为INTERNAL,表示broker之间可以通过百度云VPC子网进行通信。

4. 使用Kafka作为DTS目标端

使用Kafka作为目标端,在任务创建、前置检查、任务启动、任务暂停、任务终止的操作流程请参考操作指南。在任务配置和对象映射部分与其他数据源有些许不同。

4.1 任务配置

首先进入任务连接配置页,图中以源端为百度智能云数据库RDS for MySQL为例,选择源端实例即可。

config_src.png

配置目标端连接信息时,首先要根据目标端Kafka集群的访问方式选择接入类型。

若目标端为百度消息服务主题,则接入类型选择百度消息服务,并选择相应的地域和主题ID。

bms.png

若目标端为自建Kafka集群,则如下图所示,按照公网访问和百度云内部网络访问两种方式选择对应的接入类型。

注意:Broker列表中的IP必须填PNET IP

config_dst.png

然后按要求填入其他信息即可。注意:目前DTS仅支持0.9和0.10版本的Kafka集群,并支持0.10版本Kafka集群配置SASL访问控制。

BaiduHi_2019-8-16_11-41-21.png

最后,在参数设置部分,进行消息格式的配置。目前可选的消息格式如下:

  • BAIDU_JSON_V1: DTS默认的消息格式,详情见第5部分。
  • BAIDU_JSON_V2: 在V1的基础上,做了以下优化。

    • NULL值:输出NULL(V1版本输出为"")
    • TIMESTAMP/DATETIME: 全量、增量迁移均按UTC时区转换为时间戳格式

消息格式new.jpg

点击【授权白名单进入下一步】,选择源端实例的迁移对象。

4.2 对象映射

schemas.png

完成迁移类型和迁移对象的选择后,点击【保存并预检查】,完成新建任务,然后在任务列表查看任务状态。

  • 状态列显示“前置检查通过”,可以勾选并启动迁移任务,任务启动后可以在任务进度列查看迁移进度。
  • 状态列显示“前置检查失败”,点击旁边的按钮查看失败原因并修改,重新启动检查直到成功后再启动迁移任务。

前置检查项详细解释参见:数据迁移操作指南-预检查

5. Kafka数据格式

任务启动后,DTS将从源端数据库实例拉取到全量基准或增量变更数据,并以固定的格式写入目标端Kafka集群的指定Topic中。

5.1 消息格式版本

在参数设置部分,进行消息格式的配置。目前可选的消息格式如下:

  • BAIDU_JSON_V1: 如下所示,DTS默认的消息格式。
Plain Text
1//json结构
2[   //最外层是数组
3{   //第一行记录,一条message可能包含1到多行记录
4    "TIME":"20180831165311",          //时间戳
5    "GLOBAL_ID":"xxxxxxxx",           //全局唯一ID,消费者可用此ID来去重
6    "DATABASE":"db1",                 //数据库名
7    "SCHEMA":"schema1",               //SCHEMA名,仅在上游为PostgreSQL、SQLServer时存在
8    "TABLE":"tbl1",                   //表名
9    "TYPE":"U",                       //变更类型,I为insert,U为update,D为delete
10    "LOGICAL_CLOCK":"1617327447310000011",      //逻辑时钟,用于批处理时判断消息先后顺序
11    "OLD_VALUES":{                    //变更前每列的”列名”:”列值”组,变更类型为I时无OLD_VALUES
12        "key1":"old-value1",
13        "key2":"old-value2",
14        ...
15    },
16    "NEW_VALUES":{                    //变更后每列的”列名”:”列值”组,变更类型为D时无NEW_VALUES
17        "key1":"",                    // NULL值用""表示
18        "key2":"new-value2",
19        ...
20    },
21    "OFFSET":{                        //该条记录的位置信息,仅在上游为MySQL时的增量迁移阶段有值,对消费者无用
22        "BINLOG_NAME":"mysql-bin.xxx",
23        "BINLOG_POS":"xxx",
24        "GTID":"server_id:transaction_id"   //仅在使用GTID方式同步时存在
25    }
26},  
27{   //第二行记录
28    "TIME":"20180831165311",
29    "DATABASE":"db1",
30    ...
31}
32... //更多行记录
33]
  • BAIDU_JSON_V2: 在V1的基础上,做了以下优化:

    • 针对old-value/new-value中的NULL值:输出NULL(V1版本输出为"")
    • 针对old-value/new-value中的TIMESTAMP/DATETIME字段: 全量、增量迁移均按UTC时区转换为时间戳格式

上一篇
将MySQL作为目标端
下一篇
SQL Server 为源的迁移