共享订阅

物联网核心套件 IoTCore

  • 旧版文档
  • 产品定价
  • 功能发布记录
  • 产品描述
    • 产品概述
    • 产品优势
    • 核心概念
    • 系统限制
  • 快速入门
    • 使用规则引擎将设备消息流转到业务服务器
    • 创建设备
    • 操作演示
    • 获取连接信息
    • 快速入门流程
    • 创建 IoT Core 实例
    • 使用 MQTT 模拟器连接及收发消息
  • 典型实践
    • 数据转发到函数计算服务CFC
    • 数据转发到用户 Kafka
    • 设备状态数据存储到时序数据库TSDB
    • 数据转发到百度消息服务BMS
  • 服务等级协议SLA
    • 服务等级协议SLA(V2.0)
  • API参考
    • 设备管理
    • 概述
  • 常见问题
    • MQTT客户端及设备SDK相关问题
    • 使用选择
  • 开发者指南
    • 设备侧
      • 通过 IoT Core 官方 SDK 接入
      • 通过开源MQTT Client SDK接入
    • 服务侧
      • IoT Core 日志 SDK (Java)
      • 概述
  • 操作指南
    • 设备接入与管理
      • 应用权限
      • 创建设备
      • 设备影子
      • 管理模板
      • 共享订阅
      • 设备预配
      • 管理设备
      • 通过开放协议接入
        • 使用证书鉴权建立MQTT连接
        • 通过CoAP发布消息
        • 通过MQTT连接及收发消息
        • 通过HTTP发布消息
    • 实例管理
      • 修改实例
      • 创建实例
      • 停止及启动实例
      • 删除实例
    • 运维管理
      • 日志服务
    • 规则引擎
      • 常用查询语句示例
      • 数据目的地管理
      • 数据查询语法和函数
      • 操作步骤
      • 数据目的地
      • 数据筛选
      • 概述
所有文档
menu
没有找到结果,请重新输入

物联网核心套件 IoTCore

  • 旧版文档
  • 产品定价
  • 功能发布记录
  • 产品描述
    • 产品概述
    • 产品优势
    • 核心概念
    • 系统限制
  • 快速入门
    • 使用规则引擎将设备消息流转到业务服务器
    • 创建设备
    • 操作演示
    • 获取连接信息
    • 快速入门流程
    • 创建 IoT Core 实例
    • 使用 MQTT 模拟器连接及收发消息
  • 典型实践
    • 数据转发到函数计算服务CFC
    • 数据转发到用户 Kafka
    • 设备状态数据存储到时序数据库TSDB
    • 数据转发到百度消息服务BMS
  • 服务等级协议SLA
    • 服务等级协议SLA(V2.0)
  • API参考
    • 设备管理
    • 概述
  • 常见问题
    • MQTT客户端及设备SDK相关问题
    • 使用选择
  • 开发者指南
    • 设备侧
      • 通过 IoT Core 官方 SDK 接入
      • 通过开源MQTT Client SDK接入
    • 服务侧
      • IoT Core 日志 SDK (Java)
      • 概述
  • 操作指南
    • 设备接入与管理
      • 应用权限
      • 创建设备
      • 设备影子
      • 管理模板
      • 共享订阅
      • 设备预配
      • 管理设备
      • 通过开放协议接入
        • 使用证书鉴权建立MQTT连接
        • 通过CoAP发布消息
        • 通过MQTT连接及收发消息
        • 通过HTTP发布消息
    • 实例管理
      • 修改实例
      • 创建实例
      • 停止及启动实例
      • 删除实例
    • 运维管理
      • 日志服务
    • 规则引擎
      • 常用查询语句示例
      • 数据目的地管理
      • 数据查询语法和函数
      • 操作步骤
      • 数据目的地
      • 数据筛选
      • 概述
  • 文档中心
  • arrow
  • 物联网核心套件IoTCore
  • arrow
  • 操作指南
  • arrow
  • 设备接入与管理
  • arrow
  • 共享订阅

共享订阅

更新时间:2025-08-21

MQTT 基于 PUB/SUB 消息模式,同一消息可以分发到多个订阅了此消息topic的客户端。如果单个客户端处理大量消息出现瓶颈时,可以采用共享 订阅的方式实现负载均衡。

共享订阅属于MQTT 5.0中的特性,IOT Core 在3.x版本中提前支持了此功能。 共享订阅模式的TopicFilter格式:

$share/{ShareName}/{filter}

其中:

  • $share 为共享订阅模式的固定前缀(所有非共享订阅不可以此作为topic的前缀)
  • {ShareName} 为共享组,所有拥有相同 {ShareName} 的属于同一个共享组
  • {ShareName} 最少一个字符,最大40个字符,且不可包含 /、+ 以及 #
  • filter 为实际的 topic,格式限制跟非共享订阅的 topic 一致

多个客户端可以使用共享订阅的方式订阅同一个topic,此topic的消息将以共享组的方式分发消息。以单条消息为粒度,每条消息只会被发送同一 共享组内的一个客户端。

如下图所示:

image.png

上图中,subscriber1使用非共享订阅,将会收到所有的消息。subscriber2 和 subscriber3 使用了共享订阅,且属于同一个共享组 'group1',则一 条消息只会发送给其中一个客户端,每次随机选择一个客户端发送。

同样,subscriber4 和 subscriber5 使用共享订阅且属于同一个共享组 'group2',每条消息将随机发送给其中一个客户端。 非共享订阅以及共享订阅的不同组之间则互不影响。

Paho客户端代码示例:

Plain Text
1MqttClient sampleClient1 = ...
2// topic
3sampleClient1.subscribe($share/group1/testTopic, 0);
4//  Callback
5sampleClient1.setCallback(new MqttCallback() {
6        @Override
7        public void connectionLost(Throwable cause) {
8        }
9@Override
10        public void messageArrived(String topic, MqttMessage message)
11throws Exception {
12}
13}
14//  topic  'testTopic' '$share/group1/testTopic'
15@Override
16public void deliveryComplete(IMqttDeliveryToken token) {
17}

参考 MQTT 协议原文 https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Shared_Subscriptions

上一篇
管理模板
下一篇
设备预配