集成Kuiper流式计算引擎

智能边缘 BIE

  • 产品描述
    • 名词解释
    • 使用限制
    • 产品概述
    • 产品优势
    • 应用场景
  • 配置文件说明
    • baetyl-rule
    • baetyl-remote-object
    • baetyl-function
    • baetyl-broker
  • 快速入门
    • 融合模式节点安装
    • 快速入门指南
    • 离线安装k3s+docker
    • 进程模式节点安装
      • 在linux上安装
      • 在windows上安装
  • 典型实践
    • OPC-UA使用
    • 使用BIE函数计算调用文心一言
    • 边缘规则引擎实战
      • 集成Kuiper流式计算引擎
      • 边缘规则实例
    • 将AI中台模型部署至边缘节点
      • 获取AI中台模型部署包进行应用部署
      • 部署模型SDK至Atlas200DK
      • 模型中心1.0
        • 通过AI中台转换模型并下发至边缘
        • 通过AI中台直接将模型下发至边缘
      • 模型中心2.0
        • 将物体检测模型部署至边缘
        • 将图像分类模型部署至边缘
    • 部署PaddleServing模型
      • 使用BIE下发PaddleOCR模型
      • 制作GPU版本Paddle Serving推理镜像
      • 通过BIE部署Paddle Serving
    • Modbus协议采集温湿度传感器
      • 连接边缘baetyl-broker
      • 使用内置modbus驱动采集数据并进行边缘函数计算
      • 数据从baetyl-broker上传至IoTCore
    • 部署EasyDL模型SDK
      • 部署EasyDL烟火检测模型至ARM64节点
    • 部署EasyEdge模型SDK
      • 进程模式下发python SDK模型
      • 使用EasyEdge模型转换获取边缘模型
      • 部署模型SDK至Windows设备
      • 部署模型SDK至Intel Movidius设备
      • 部署模型SDK至Jetson Xavier
    • 部署自定义容器镜像模型
      • 部署mnist手写数字识别模型
      • 部署物体检测模型
    • video-infer实现边缘视频AI推断
      • 使用Movidius VPU加速边缘视频推断
      • 使用CPU实现边缘视频推断
  • 操作指南
    • 主子用户鉴权
    • 设备管理
      • 设备模拟器
      • 子设备数据云边交互方式
      • 进程模式软网关设备接入指南
      • 子设备数据云边交互方式-v2
      • 视频流管理
        • IPC子设备和驱动
        • 边缘转发RTSP视频流
      • 设备接入
        • 设备模型管理
        • 接入模板
        • 设备管理
        • 子设备绑定
      • 驱动管理
        • 进程模式软网关自定义驱动开发
        • 驱动管理
        • GO语言实现示例
        • 自定义驱动开发指南
      • 不同协议设备接入
        • BACnet设备接入
        • OPC-UA设备接入
        • Modbus 设备接入
        • IEC104设备接入
        • OPC-DA设备接入
    • 节点管理
      • 边缘应用获取云端STS鉴权
      • 进程模式节点
      • 远程MQTT消息下发
      • 节点运行模式说明
      • 节点影子
      • 远程调用
      • 容器模式节点
      • 远程调试
      • 远程SSH边缘节点
      • 边缘节点OpenAPI
      • 证书管理
      • 节点预配
    • 业务编排
      • 单元管理
      • 技能管理
      • 常见技能
    • 应用管理
      • 业务模版
      • 应用部署
        • 应用运行模式与分类说明
        • 函数应用
          • 自定义函数与依赖包解耦下发
          • 从CFC引入多个函数下发
          • 典型使用说明
          • 使用函数调用边缘AI模型
          • 自定义函数与依赖包整体下发
        • 容器应用
          • subpath子路径使用说明
          • workdir工作目录使用说明
          • Job类型容器应用
          • 容器应用状态说明
          • 原生yaml接入使用说明
          • 端口映射说明
          • 容器应用工作负载类型说明
          • Deployment类型容器应用
          • DaemonSet类型容器应用
          • QPS监控
          • emptyDir卷使用说明
          • 边缘服务调用
        • 进程应用
          • 进程应用概述
          • 可执行二进制程序类型进程应用
          • 可执行脚本类型进程应用
      • 配置管理
        • 证书
        • 函数
        • 镜像仓库凭证
        • 配置项
        • 密文
        • 镜像
        • 进程程序包
    • AI加速卡
      • AI加速卡通用资源调度方法
      • 自定义加速卡算力指标采集
      • 华为昇腾
        • 昇腾310资源监控
      • 英伟达
        • GPU资源调度-显存隔离
        • Jetson资源监控
        • GPU资源调度-显存共享
        • Jetson依赖说明
        • NVIDIA GPU资源监控
      • 寒武纪
        • MLU270资源监控
      • 百度昆仑
        • 昆仑芯片资源监控
      • 比特大陆
        • 挂载比特大陆边缘计算盒子tpu资源
        • BM-SE5资源监控
  • 服务等级协议SLA
    • 服务等级协议SLA(V1.0)
  • 备份
    • 进程模式应用
    • 部署通用CPU模型
    • 部署模型SDK至Atlas200DK
    • 适配列表
    • 连接边缘节点本地baetyl-broker
    • 使用自定义modbus应用采集
    • NVIDIA GPU资源管理
    • FAQ
    • NVIDIA Jetson专用模型部署-进程模式
    • 容器模式应用
    • 连接边缘节点本地baetyl-broker服务
    • DaemonSet类型和job类型服务部署
    • 通用CPU模型部署-容器模式
    • NVIDIA Jetson专用模型部署-容器模式
    • 功能发布记录
    • 在BIE控制台部署从AI中台下载的模型
    • EasyEdge概述
    • Nvidia_Jetson
      • 使用NVIDIA官方镜像运行模型-容器模式
      • 二进制程序运行模型-进程模式
      • 使用edgekit镜像运行模型-容器模式
    • 下载专区
      • 相关资源下载
  • 产品定价
    • 产品定价
所有文档
menu
没有找到结果,请重新输入

智能边缘 BIE

  • 产品描述
    • 名词解释
    • 使用限制
    • 产品概述
    • 产品优势
    • 应用场景
  • 配置文件说明
    • baetyl-rule
    • baetyl-remote-object
    • baetyl-function
    • baetyl-broker
  • 快速入门
    • 融合模式节点安装
    • 快速入门指南
    • 离线安装k3s+docker
    • 进程模式节点安装
      • 在linux上安装
      • 在windows上安装
  • 典型实践
    • OPC-UA使用
    • 使用BIE函数计算调用文心一言
    • 边缘规则引擎实战
      • 集成Kuiper流式计算引擎
      • 边缘规则实例
    • 将AI中台模型部署至边缘节点
      • 获取AI中台模型部署包进行应用部署
      • 部署模型SDK至Atlas200DK
      • 模型中心1.0
        • 通过AI中台转换模型并下发至边缘
        • 通过AI中台直接将模型下发至边缘
      • 模型中心2.0
        • 将物体检测模型部署至边缘
        • 将图像分类模型部署至边缘
    • 部署PaddleServing模型
      • 使用BIE下发PaddleOCR模型
      • 制作GPU版本Paddle Serving推理镜像
      • 通过BIE部署Paddle Serving
    • Modbus协议采集温湿度传感器
      • 连接边缘baetyl-broker
      • 使用内置modbus驱动采集数据并进行边缘函数计算
      • 数据从baetyl-broker上传至IoTCore
    • 部署EasyDL模型SDK
      • 部署EasyDL烟火检测模型至ARM64节点
    • 部署EasyEdge模型SDK
      • 进程模式下发python SDK模型
      • 使用EasyEdge模型转换获取边缘模型
      • 部署模型SDK至Windows设备
      • 部署模型SDK至Intel Movidius设备
      • 部署模型SDK至Jetson Xavier
    • 部署自定义容器镜像模型
      • 部署mnist手写数字识别模型
      • 部署物体检测模型
    • video-infer实现边缘视频AI推断
      • 使用Movidius VPU加速边缘视频推断
      • 使用CPU实现边缘视频推断
  • 操作指南
    • 主子用户鉴权
    • 设备管理
      • 设备模拟器
      • 子设备数据云边交互方式
      • 进程模式软网关设备接入指南
      • 子设备数据云边交互方式-v2
      • 视频流管理
        • IPC子设备和驱动
        • 边缘转发RTSP视频流
      • 设备接入
        • 设备模型管理
        • 接入模板
        • 设备管理
        • 子设备绑定
      • 驱动管理
        • 进程模式软网关自定义驱动开发
        • 驱动管理
        • GO语言实现示例
        • 自定义驱动开发指南
      • 不同协议设备接入
        • BACnet设备接入
        • OPC-UA设备接入
        • Modbus 设备接入
        • IEC104设备接入
        • OPC-DA设备接入
    • 节点管理
      • 边缘应用获取云端STS鉴权
      • 进程模式节点
      • 远程MQTT消息下发
      • 节点运行模式说明
      • 节点影子
      • 远程调用
      • 容器模式节点
      • 远程调试
      • 远程SSH边缘节点
      • 边缘节点OpenAPI
      • 证书管理
      • 节点预配
    • 业务编排
      • 单元管理
      • 技能管理
      • 常见技能
    • 应用管理
      • 业务模版
      • 应用部署
        • 应用运行模式与分类说明
        • 函数应用
          • 自定义函数与依赖包解耦下发
          • 从CFC引入多个函数下发
          • 典型使用说明
          • 使用函数调用边缘AI模型
          • 自定义函数与依赖包整体下发
        • 容器应用
          • subpath子路径使用说明
          • workdir工作目录使用说明
          • Job类型容器应用
          • 容器应用状态说明
          • 原生yaml接入使用说明
          • 端口映射说明
          • 容器应用工作负载类型说明
          • Deployment类型容器应用
          • DaemonSet类型容器应用
          • QPS监控
          • emptyDir卷使用说明
          • 边缘服务调用
        • 进程应用
          • 进程应用概述
          • 可执行二进制程序类型进程应用
          • 可执行脚本类型进程应用
      • 配置管理
        • 证书
        • 函数
        • 镜像仓库凭证
        • 配置项
        • 密文
        • 镜像
        • 进程程序包
    • AI加速卡
      • AI加速卡通用资源调度方法
      • 自定义加速卡算力指标采集
      • 华为昇腾
        • 昇腾310资源监控
      • 英伟达
        • GPU资源调度-显存隔离
        • Jetson资源监控
        • GPU资源调度-显存共享
        • Jetson依赖说明
        • NVIDIA GPU资源监控
      • 寒武纪
        • MLU270资源监控
      • 百度昆仑
        • 昆仑芯片资源监控
      • 比特大陆
        • 挂载比特大陆边缘计算盒子tpu资源
        • BM-SE5资源监控
  • 服务等级协议SLA
    • 服务等级协议SLA(V1.0)
  • 备份
    • 进程模式应用
    • 部署通用CPU模型
    • 部署模型SDK至Atlas200DK
    • 适配列表
    • 连接边缘节点本地baetyl-broker
    • 使用自定义modbus应用采集
    • NVIDIA GPU资源管理
    • FAQ
    • NVIDIA Jetson专用模型部署-进程模式
    • 容器模式应用
    • 连接边缘节点本地baetyl-broker服务
    • DaemonSet类型和job类型服务部署
    • 通用CPU模型部署-容器模式
    • NVIDIA Jetson专用模型部署-容器模式
    • 功能发布记录
    • 在BIE控制台部署从AI中台下载的模型
    • EasyEdge概述
    • Nvidia_Jetson
      • 使用NVIDIA官方镜像运行模型-容器模式
      • 二进制程序运行模型-进程模式
      • 使用edgekit镜像运行模型-容器模式
    • 下载专区
      • 相关资源下载
  • 产品定价
    • 产品定价
  • 文档中心
  • arrow
  • 智能边缘BIE
  • arrow
  • 典型实践
  • arrow
  • 边缘规则引擎实战
  • arrow
  • 集成Kuiper流式计算引擎
本页目录
  • 业务场景
  • 方案介绍
  • 安装 baetyl 计算框架
  • 新建 Iot Core 实例
  • 配置 baetyl-broker
  • 安装 kuiper
  • 安装集成 kuiper 插件
  • 创建流语法解析
  • 数据业务逻辑处理语法解析
  • 创建命令配置项
  • 创建配置信息配置项
  • 创建 kuiper-tool 应用
  • 测试

集成Kuiper流式计算引擎

更新时间:2025-08-21

本文以一个常见的物联网使用场景为案例,介绍了如何利用边缘计算框架 Baetyl 来实现对业务的快速、低成本和有效地处理。

在各类物联网项目中,比如智能楼宇项目,需要采集和分析楼宇数据,如电梯、燃气、水电等。一种解决方案是将所有的设备直接接入在云端的物联网平台,类似于像 Baidu IoT Core 或者 AWS IoT Core。这种解决方案的问题在于:

  • 数据处理时延较长:通过 Internet 传输和云端的处理后返回给设备,所需时间较长
  • 数据传输和存储成本:通过 Internet 传输需要带宽,对于大规模连接的物联网项目来说,耗费的带宽会相当可观
  • 数据的安全性:有些物联网的数据会相当敏感,全部通过物联网传输的话会有风险

为了解决以上的问题,业界提出了边缘计算的方案,边缘计算的核心就在于把数据进行就近处理,避免不必要的时延、成本和安全问题。开源框架 Baetyl 是百度贡献给 Linux 基金会的开源边缘计算框架,主推物联网场景下端侧的边缘计算解决方案。

本文将流处理模块 kuiper 部署到边缘计算框架 baetyl 上,对一段时间内边缘侧的设备消息进行流式处理,并将处理结果上传云端进行存储。

业务场景

假设现有一组设备,组中的每个设备有一个 ID,通过 MQTT 协议往 MQTT 消息服务器上相应的主题发送数据。主题的设计如下,其中 {device_id} 为设备的 ID。

Plain Text
1devices/{device_id}/messages

每个设备发送的数据格式为 JSON,发送的通过该传感器采集的温度与湿度数据。

Plain Text
1{
2    "temperature": 30,
3    "humidity" : 20
4}

现在需要实时分析数据,并提出以下需求:对每个设备的温度数据按照每 10 秒钟计算平均值(t_av),并且记下 10 秒钟内的最大值 (t_max)、最小值(t_min) 和数据条数(t_count),计算完毕后将这 4 个结果进行保存,以下为样例结果数据:

Plain Text
1[
2    {
3        "device_id" : "1", "t_av" : 25,  "t_max" : 45, "t_min" : 5, "t_count" : 2
4    },
5    {
6        "device_id" : "2", "t_av" : 25,  "t_max" : 45, "t_min" : 5, "t_count" : 2
7    },
8    ...
9]

方案介绍

如下图所示,我们将在 baetyl 边缘计算框架上,采用边缘分析/流式数据处理的方式,从 baetyl-broker 订阅相关设备消息,最后将处理结果输出到 Baidu 的 IoT Core 中。

image.png

baetyl-broker 是 Baetyl 框架端侧的消息中间件,采用 MQTT3.1.1 协议,可在低带宽、不可靠网络中提供可靠的消息传输服务。

kuiper 是基于 SQL 的轻量级边缘流式数据分析引擎,安装包只有约 7MB,非常适合于运行在边缘设备端。

Baidu Iot Core 提供了比较全的设备接入和数据分析的方案,此处用于云端的结果数据接入,以及应用所需的结果数据分析。

安装 baetyl 计算框架

在云端新建边缘节点并安装到边缘设备。安装成功后如下所示:

image.png

新建 Iot Core 实例

参考百度云IoTCore文档 新建Iot Core实例并进行相关的设备模板配置。配置成功后,使用 MqttBox 进行连接,其中 $iot/test/user/# 主题是我们自定义的具有收发权限的用户主题。

image.png

image.png

配置 baetyl-broker

编辑 baetyl-broker 配置,暴露一个外部端口供测试使用。

image.png

image.png

image.png

然后编辑配置文件,配置文件如下:

YAML
1listeners:
2  - address: tcp://0.0.0.0:8003
3principals:
4  - username: test
5    password: hahaha
6    permissions:
7      - action: pub
8        permit: ["#"]
9      - action: sub
10        permit: ["#"]
11
12session:
13  sysTopics:
14    - $link
15    - $baetyl
16logger:
17  level: debug
18  encoding: console

新增 8003 端口供测试使用。 并且需要设置映射宿主机端口以供连接。

image.png

然后在本地使用MqttBox连接baetyl-broker,来测试连通性。

image.png

image.png

安装 kuiper

从 Kuiper 官方镜像仓库镜像仓库选取 Kuiper 的官方 Docker 镜像,这里选取的是:

Plain Text
1emqx/kuiper:0.5.1-alpine

然后创建容器服务,并添加 kuiper 服务,设置镜像、添加端口映射以及环境变量。

Plain Text
1MQTT_BROKER_ADDRESS=baetyl-broker.baetyl-edge-system:8003
2MQTT_BROKER_USERNAME=test
3MQTT_BROKER_PASSWORD=hahaha

image.png

image.png

image.png

image.png

用户在端侧可以通过 Telnet 命令来判断边缘设备上 Kuiper 是否启动成功。

image.png

更多 kuiper 资料可以参考 kuiper 官网 。

安装集成 kuiper 插件

kuiper 原生的 stream、rule 创建都是通过 Http 请求,为了适配 baetyl 平台,可以使用 kuiper 推出的适配插件: kuiper-kubernetes-tool ,支持从配置文件加载 stream、rule 配置。

从 kuiper 官方镜像仓库镜像仓库选取 kuiper 的官方 Docker 镜像,这里选取的是:

Plain Text
1emqx/kuiper-kubernetes-tool:0.5.1

我们在新建 kuiper 插件应用时,先新建对应的配置文件。

创建流语法解析

创建流的目的是为了定义发送到该流上的数据格式,类似于在关系数据库中定义表的结构。 kuiper 中所有支持的数据类型,可以参考 kuiper 官网 。

Plain Text
1{
2    "commands":[
3        {
4            "url":"/streams",
5            "description":"create stream1",
6            "method":"post",
7            "data":{
8                "sql":"create stream demo (temperature float, humidity bigint) WITH (FORMAT=\"JSON\", DATASOURCE=\"devices/+/messages\");"
9            }
10        }
11    ]
12}

上述语句在 kuiper 中创建了一个名为 demo 的流定义,包含了两个字段,分别为 temperature 和 humidity,数据源为订阅 MQTT 的主题 devices/+/messages,这里请注意采用了通配符 +,用于订阅不同设备的消息。

数据业务逻辑处理语法解析

kuiper 采用 SQL 实现业务逻辑,每10秒钟统计温度的平均值、最大值、最小值和次数,并根据设备 ID 进行分组,实现的 SQL 如下所示。

Plain Text
1{
2    "commands":[
3        {
4            "url":"/rules",
5            "description":"create rule1",
6            "method":"post",
7            "data":{
8                "id":"rule1",
9                "sql": "SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), \"/\", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)",
10                "actions": [
11                    {
12                        "log": {}
13                    },
14                    {
15                        "mqtt": {
16                            "server": "tcp://arncpan.iot.gz.baidubce.com:1883",
17                            "topic": "$iot/test/user/b",
18                            "protocol_version": "3.1.1",
19                            "qos": 0,
20                            "clientId": "demo_001",
21                            "username": "arncpan/test",
22                            "password": "xxxx"
23                        }
24                    }
25                ]
26            }
27        }]
28}

这里的 SQL 用了四个聚合函数,用于统计在10秒钟窗口期内的相关值。

  • avg: 平均值
  • max: 最大值
  • min: 最小值
  • count: 计数

另外还使用了几个基本的函数:

  • mqtt: 消息中取出 MQTT 协议的信息,mqtt(topic) 就是取得当前取得消息的主题名称
  • split_value: 该函数将第一个参数使用第二个参数进行分割,然后第三个参数指定下标,取得分割后的值。所以函数 split_value("devices/001/messages", "/", 1)调用就返回001
  • GROUP BY 跟的是分组的字段,分别为计算字段 device_id;时间窗口 TUMBLINGWINDOW(ss, 10),该时间窗口的含义为每10秒钟生成一批统计数据。

actions 列表中的 mqtt 类型的 action 的相关配置信息是 Iot Core 的连接信息。这里注意替换 IotCore 的连接信息。

创建命令配置项

将上述两步的语法填写到配置项中。 创建配置项如下:

image.png

创建配置信息配置项

配置信息用于 kuiper-kubernetes-tool 连接 kuiper 模块,其中指定了 kuiper 的 ip、port 等信息。

Plain Text
1port: 9081
2timeout: 500
3intervalTime: 30
4ip: "kuiper"
5logPath: "log/kuiper.log"
6commandDir: "sample"

关于配置详情可以参考 kuiper-kubernetes-tool 文档 。其中 9081 端口是 kuiper 默认的 Restful API 端口。

创建配置项如下:

image.png

创建 kuiper-tool 应用

新建容器服务,并添加 kuiper-kubernetes-tool 服务,设置镜像、添加上两步的配置项。

image.png

image.png

image.png

如果上述步骤都安装正确,在边缘设备执行如下命令,可以得到如下结果:

image.png

image.png

测试

我们使用 Mqtt Box 模拟设备向事先约定的 Topic 主题发送消息,观察 Iot Core 是否可以收到流式处理的结果。

我们分别向 Baetyl-Broker 发送两条消息:

Plain Text
1{"temperature": 30, "humidity" : 80}
2{"temperature": 60, "humidity" : 80}

预期 10s 后 Iot Core 会收到如下消息:

Plain Text
1[{"device_id":"device_001","t_av":45,"t_count":2,"t_max":60,"t_min":30}]

实际操作:

向 baetyl-broker 发送两条消息。

image.png

IotCore 查看:

image.png

如上图,符合预期。

此时观察端上应用的资源消耗:

image.png

可以看出流式处理引擎 Kuiper 只消耗了极小的内存和CPU。

通过本文,读者可以基于 Baetyl 边缘计算框架快速集成 Kuiper 流式处理引擎,快速搭建边缘侧的流式解决方案,灵活地开发出基于边缘数据分析的系统,实现数据的低时延、低成本和安全的处理。

上一篇
使用BIE函数计算调用文心一言
下一篇
边缘规则实例