操作流程

消息服务 for Kafka Kafka

  • 功能发布记录
  • 共享版
    • 产品定价
    • 产品描述
      • 介绍
    • 快速入门
      • 多用户访问控制
      • 监控报警
      • 操作流程
    • 常见问题
      • 常见问题总览
      • 安全类问题
      • 配置类问题
    • API文档
      • 通用说明
      • 简介
      • 公共头
      • 接口说明
      • 模型定义
      • 服务域名
      • 错误返回
  • 专享版
    • 产品描述
      • 使用限制
      • 基本概念
      • 产品规格
      • 产品优势
      • 产品介绍
      • 应用场景
      • 产品架构
    • 开发指南
      • 访问协议介绍
      • 接入点查看
      • 概述
      • Python示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • Go示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • Java示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • PHP示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • C++示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
    • 快速入门
      • 步骤二:创建主题
      • 步骤一:创建Kafka集群实例
      • 步骤三:配置权限认证
      • 概述
      • 步骤四:访问Kafka集群
        • 使用SASL_SSL协议访问集群
        • 使用PLAINTEXT协议访问集群
        • 使用SASL_PLAINTEXT协议访问集群
        • 使用SSL协议访问集群
    • 操作指南
      • 多用户访问控制
      • 消息查询
      • 标签管理
      • 存储分析
      • 集群日志
      • 操作审计
      • 任务管理
        • 任务类型介绍
        • 查看任务详情
      • 主题管理
        • 主题重新分区
        • 删除主题
        • 创建主题
        • 修改主题分区数
        • 查看主题订阅关系
        • 查看主题分区详情
        • 查看主题详情
        • 修改主题配置
      • 用户管理
        • 删除用户
        • 重置用户密码
        • 创建用户
      • 监控报警
        • 报警策略配置
        • 集群监控
        • 事件策略配置
      • 消费组管理
        • 查看消费组订阅信息
        • 消费组重置位点
        • 删除消费组
      • 集群配置管理
        • 配置参数介绍
        • 查看集群配置详情
        • 创建集群配置
        • 删除集群配置
      • 权限管理
        • 创建权限
        • 删除权限
      • 集群管理
        • 删除集群
        • 查看集群信息
        • 变更访问配置
        • 集群变更
        • 磁盘水位处理
        • 查看Controller信息
        • 重启节点
        • 查看集群接入点
        • 集群启停
    • 最佳实践
      • Flink接入Kafka专享版
      • 如何选择合适的集群规格
      • Logstash接入Kafka专享版
      • Filebeat接入Kafka专享版
      • 业务迁移
    • API参考
      • 更新记录
      • 调用说明
      • 附录
      • 服务域名
      • 错误返回
      • 主题管理接口
        • 查询主题列表
        • 查询主题订阅详情
        • 删除主题
        • 查询主题详情
        • 创建主题
        • 获取订阅主题的消费组列表
        • 查询主题分区详情
        • 变更主题
        • 查询主题分区列表
      • 用户管理接口
        • 查询用户列表
        • 删除用户
        • 重置用户密码
        • 创建用户
      • 消费组管理接口
        • 删除消费组
        • 重置消费组位点
        • 查询消费组列表
        • 查询消费组订阅的主题列表
      • 集群管理接口
        • 查询集群参数
        • 创建集群
        • 释放集群
        • 查询集群节点列表
        • 停止集群
        • 查询集群详情
        • 启动集群
        • 查询集群列表
        • 查询集群接入点
      • 集群变更接口
        • 变更访问配置
        • 扩容磁盘容量
        • 增加节点数量
        • 变更用户安全组
        • 变更节点机型
        • 变更集群配置
        • 集群公网开关
        • 变更存储策略
        • 变更公网带宽
      • 权限管理接口
        • 查询权限列表
        • 创建权限
        • 删除权限
      • 任务管理接口
        • 暂停任务
        • 恢复任务
        • 查询操作详情
        • 查询任务列表
        • 取消任务
        • 查询任务详情
        • 启动任务
      • 集群配置管理接口
        • 查询集群配置列表
        • 查询集群配置详情
        • 创建集群配置
        • 查询集群配置版本列表
        • 新增集群配置版本
        • 查询集群配置版本详情
        • 删除集群配置
    • 产品定价
      • 余额不足提醒和欠费处理
      • 计费说明
      • 变配规则说明
      • 续费说明
所有文档
menu
没有找到结果,请重新输入

消息服务 for Kafka Kafka

  • 功能发布记录
  • 共享版
    • 产品定价
    • 产品描述
      • 介绍
    • 快速入门
      • 多用户访问控制
      • 监控报警
      • 操作流程
    • 常见问题
      • 常见问题总览
      • 安全类问题
      • 配置类问题
    • API文档
      • 通用说明
      • 简介
      • 公共头
      • 接口说明
      • 模型定义
      • 服务域名
      • 错误返回
  • 专享版
    • 产品描述
      • 使用限制
      • 基本概念
      • 产品规格
      • 产品优势
      • 产品介绍
      • 应用场景
      • 产品架构
    • 开发指南
      • 访问协议介绍
      • 接入点查看
      • 概述
      • Python示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • Go示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • Java示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • PHP示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • C++示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
    • 快速入门
      • 步骤二:创建主题
      • 步骤一:创建Kafka集群实例
      • 步骤三:配置权限认证
      • 概述
      • 步骤四:访问Kafka集群
        • 使用SASL_SSL协议访问集群
        • 使用PLAINTEXT协议访问集群
        • 使用SASL_PLAINTEXT协议访问集群
        • 使用SSL协议访问集群
    • 操作指南
      • 多用户访问控制
      • 消息查询
      • 标签管理
      • 存储分析
      • 集群日志
      • 操作审计
      • 任务管理
        • 任务类型介绍
        • 查看任务详情
      • 主题管理
        • 主题重新分区
        • 删除主题
        • 创建主题
        • 修改主题分区数
        • 查看主题订阅关系
        • 查看主题分区详情
        • 查看主题详情
        • 修改主题配置
      • 用户管理
        • 删除用户
        • 重置用户密码
        • 创建用户
      • 监控报警
        • 报警策略配置
        • 集群监控
        • 事件策略配置
      • 消费组管理
        • 查看消费组订阅信息
        • 消费组重置位点
        • 删除消费组
      • 集群配置管理
        • 配置参数介绍
        • 查看集群配置详情
        • 创建集群配置
        • 删除集群配置
      • 权限管理
        • 创建权限
        • 删除权限
      • 集群管理
        • 删除集群
        • 查看集群信息
        • 变更访问配置
        • 集群变更
        • 磁盘水位处理
        • 查看Controller信息
        • 重启节点
        • 查看集群接入点
        • 集群启停
    • 最佳实践
      • Flink接入Kafka专享版
      • 如何选择合适的集群规格
      • Logstash接入Kafka专享版
      • Filebeat接入Kafka专享版
      • 业务迁移
    • API参考
      • 更新记录
      • 调用说明
      • 附录
      • 服务域名
      • 错误返回
      • 主题管理接口
        • 查询主题列表
        • 查询主题订阅详情
        • 删除主题
        • 查询主题详情
        • 创建主题
        • 获取订阅主题的消费组列表
        • 查询主题分区详情
        • 变更主题
        • 查询主题分区列表
      • 用户管理接口
        • 查询用户列表
        • 删除用户
        • 重置用户密码
        • 创建用户
      • 消费组管理接口
        • 删除消费组
        • 重置消费组位点
        • 查询消费组列表
        • 查询消费组订阅的主题列表
      • 集群管理接口
        • 查询集群参数
        • 创建集群
        • 释放集群
        • 查询集群节点列表
        • 停止集群
        • 查询集群详情
        • 启动集群
        • 查询集群列表
        • 查询集群接入点
      • 集群变更接口
        • 变更访问配置
        • 扩容磁盘容量
        • 增加节点数量
        • 变更用户安全组
        • 变更节点机型
        • 变更集群配置
        • 集群公网开关
        • 变更存储策略
        • 变更公网带宽
      • 权限管理接口
        • 查询权限列表
        • 创建权限
        • 删除权限
      • 任务管理接口
        • 暂停任务
        • 恢复任务
        • 查询操作详情
        • 查询任务列表
        • 取消任务
        • 查询任务详情
        • 启动任务
      • 集群配置管理接口
        • 查询集群配置列表
        • 查询集群配置详情
        • 创建集群配置
        • 查询集群配置版本列表
        • 新增集群配置版本
        • 查询集群配置版本详情
        • 删除集群配置
    • 产品定价
      • 余额不足提醒和欠费处理
      • 计费说明
      • 变配规则说明
      • 续费说明
  • 文档中心
  • arrow
  • 消息服务 for KafkaKafka
  • arrow
  • 共享版
  • arrow
  • 快速入门
  • arrow
  • 操作流程
本页目录
  • 创建主题
  • 创建消费组
  • 下载证书
  • 证书管理
  • 连接服务
  • 通过样例代码连接服务
  • 逐步配置信息连接服务

操作流程

更新时间:2025-08-21

创建主题

  1. 注册并登录百度智能云平台,具体操作请参考注册和登录。
  2. 登录成功后,选择“产品服务 -> 消息服务 for Kafka”,点击“主题”后进入主题列表页。
  3. 点击“创建主题”,设置主题名称和分区个数:

    • 若输入主题名称是“demo”,则系统会创建一个主题:<accountID>__demo。
    • 分区数目决定了本主题可以处理的消息通量(throughput),请根据业务需求选择。

    创建主题1.png

  4. 点击“确定”即创建了百度消息服务主题,可直接使用。
  5. 可点击“查看证书”以查看/编辑该主题的证书权限。

    查看证书.png

  6. 证书权限中显示“特权证书”、“普通证书”和“他人的证书”对该主题的权限。

    查看证书2.png

特权证书:权限适用于所有主题的自有证书;

普通证书:权限适用于指定主题的自有证书;

他人的证书:用户可通过添加他人的证书,将自己主题的读/写权限授权给他人账号下,授权后他人将有权限读/写用户的主题。

注意,当前百度消息服务的数据存储时间默认为24小时,过期后自动删除,若有特殊需求,请提工单联系我们。

创建消费组

消费组是Kafka提供的可扩展且具有容错性的消费者机制。通过console创建的消费组享有完善的安全保护机制,强烈推荐用户采用这种方式使用消费者组功能。

  1. 注册并登录百度智能云平台,具体操作请参考注册和登录。
  2. 登录成功后,选择“产品服务>百度消息服务”,点击“消费组管理”后进入列表页。
  3. (可选)选择区域,请根据实际需求切换选择。
  4. 点击“创建消费组”,输入消费组名称。

    注:通过console创建的消费组会在名称前自动加上Account ID前缀,用户使用时需输入完整的消费组名称。

    创建消费组.png

  5. 消费组的使用权限与证书绑定。可点击“查看证书”以查看/编辑该消费组的证书权限。
  6. 证书权限中显示“特权证书”、“普通证书”和“他人的证书”对该消费组的权限。用户使用消费组时,会对所使用的证书进行鉴权,验证是否有该消费组的使用权限。

    注:对于未通过console创建的消费组不进行权限控制,但是可能存在数据安全问题,强烈建议用户通过console创建并进行使用。

    image.png

下载证书

Kafka客户端在连接百度消息服务之前需要提供相应的证书来用以鉴权授权。请至百度智能云平台的“产品服务>百度消息服务-证书”页下载用于认证客户和云服务的身份的证书:kafka-key.zip。

说明:

  • 请妥善保存证书,该证书用于建立SSL连接,同时用于认证客户端的AccountId。一旦丢失或泄漏,请至百度智能云平台重新生成。
  • 若使用香港的服务,请先提工单申请开通香港区域,通过后至香港区域下载对应的证书。
  • 各个区域的证书相互独立,不能够跨区域使用。
  • 广州和北京的证书,因历史原因可跨区域使用。

证书管理

证书列表中可查看证书的相关信息:名称、序列号、类型、密钥、创建时间;可选择“重新生成”更新证书,选择“删除证书”删除该证书;选择“查看主题”查看/编辑该证书对主题的权限,选择“查看消费组”查看/编辑该证书对消费组的权限。

证书列表.png

连接服务

您可直接使用百度智能云提供的样例代码连接百度消息服务,也可从Kafka官网下载Kafka二进制包并逐步配置相关信息后连接百度消息服务。

通过样例代码连接服务

  1. 下载代码样例。请登录控制台,打开“产品服务>百度消息服务-证书”页,点击页面右上角“代码样例”,下载至本地,下载后解压。
  2. 百度消息服务提供的代码样例是Maven项目包。
  3. 运行样例代码。cd至代码目录“sample-with-kafka-key”下,执行命令run.bat <accountID>_demo,执行完毕则建立了与百度消息服务的连接。

此外,我们提供Kafka服务的Python和Golang两种语言代码样例,已上传至GitHub中的Python样例和Golang样例,您可通过GitHub克隆代码至本地设计自己的程序。

逐步配置信息连接服务

  1. 从Kafka官网下载Kafka二进制包:http://kafka.apache.org/downloads.html,目前已支持了0.10版本。
  2. 通过Java访问Kafka服务的用户需使用Java客户端的pom依赖:kafka-client/0.10;我们推荐用户使用JDK 7 或JDK 8 版本,最低配置须为Java 1.7.0-b147。
  3. 配置client.properties。请至百度智能云平台的百度“产品服务>百度消息服务-证书”页下载用于认证客户和云服务的身份的证书:kafka-key.zip,请将zip包解压到kafka二进制包的根目录下,client.properties,client.keystore.jks,client.truststore.jks都需在同级目录中。解压后打开client.properties,该文件是UTF8编码的文本文件,显示如下内容:

    Plain Text
    1 security.protocol=SSL
    2 ssl.truststore.location=client.truststore.jks
    3 ssl.truststore.password=<your certificate password>
    4 ssl.keystore.location=client.keystore.jks
    5 ssl.keystore.password=<your certificate password>
  4. 使用客户端代码中的脚本与百度消息服务通讯,连接通过SSL加密,保证数据传输无法被监听与篡改。以使用北京区域的百度消息服务为例,具体操作如下(操作前请确保client.properties,client.keystore.jks,client.truststore.jks都在kafka二进制包的根目录下):

    1. 启动消费者。打开命令行工具并输入以下命令:

      Plain Text
      1sh bin/kafka-console-consumer.sh --topic <accountID>__demo --bootstrap-server kafka.bj.baidubce.com:9091 --consumer.config client.properties --from-beginning --new-consumer
      • 针对windows,启动命令如下:

        Plain Text
        1bin\windows\kafka-console-consumer.bat --topic <accountID>__demo --bootstrap-server kafka.bj.baidubce.com:9091 --consumer.config client.properties --from-beginning --new-consumer
    2. 发布消息。打开一个新的命令行工具并输入以下命令:

      Plain Text
      1sh bin/kafka-console-producer.sh --producer.config client.properties --topic <accountID>__demo --sync --broker-list kafka.bj.baidubce.com:9091
    • 针对windows,发布命令如下:

      Plain Text
      1bin\windows\kafka-console-producer.bat --producer.config client.properties --topic <accountID>__demo --broker-list kafka.bj.baidubce.com:9091
    1. 消息发送以后,消费者命令行中返回消息已经正确接收到信息。

      说明:

      • 北京区域对应的域名和端口号是:kafka.bj.baidubce.com:9091;
      • 广州区域对应的域名和端口号是:kafka.gz.baidubce.com:9092;
      • 苏州区域对应的域名和端口号是:kafka.su.baidubce.com:9091;
      • 上海区域对应的域名和端口号是:kafka.fsh.baidubce.com:9091;
      • 香港区域对应的域名和端口号是:kafka.hk02.hk.baidubce.com:9092;
      • 保定区域对应的域名和端口号是:kafka.bd.baidubce.com:9071。

上一篇
监控报警
下一篇
常见问题