物联网设备实时监控预警

百度流式计算 BSC

  • 产品定价
  • 功能发布记录
  • 产品描述
    • 产品优势
    • 应用场景
    • 产品功能
    • 核心概念
    • 概述
  • 快速入门
    • 开通服务
    • 开发作业
  • 典型实践
    • CDN 日志提取中转(ETL)
    • API 日志调用统计
    • CDN 接口日志聚合统计
    • 物联网设备实时报警统计(流表Join)
    • 物设备报警情况实时统计
    • 物联网设备实时监控预警
  • 操作指南
    • 多用户访问控制
    • 作业运维
    • 扩展内置函数
    • 编辑作业
    • 模板管理
    • 新增作业
    • 资源管理
  • API参考
    • 接口概述
    • 通用说明
    • 公共头
    • 模板相关接口
    • 模型定义
    • 服务域名
    • 请求返回格式
    • 作业实例相关接口
    • 作业相关接口
    • 资源相关接口
  • 常见问题
    • 一般问题
  • 自定义JAR作业
    • Flink自定义JAR作业
    • Spark自定义JAR作业
  • SQL
    • DML语句
    • SET 语句
    • 内置函数
    • 标识符和关键字
    • 窗口函数
    • 概述
    • DDL 语句
      • KAFKA
      • Formats
      • ES
      • Overview
      • PALO
      • TSDB
      • BKAFKA
      • RDS
      • BOS
      • MQTT
所有文档
menu
没有找到结果,请重新输入

百度流式计算 BSC

  • 产品定价
  • 功能发布记录
  • 产品描述
    • 产品优势
    • 应用场景
    • 产品功能
    • 核心概念
    • 概述
  • 快速入门
    • 开通服务
    • 开发作业
  • 典型实践
    • CDN 日志提取中转(ETL)
    • API 日志调用统计
    • CDN 接口日志聚合统计
    • 物联网设备实时报警统计(流表Join)
    • 物设备报警情况实时统计
    • 物联网设备实时监控预警
  • 操作指南
    • 多用户访问控制
    • 作业运维
    • 扩展内置函数
    • 编辑作业
    • 模板管理
    • 新增作业
    • 资源管理
  • API参考
    • 接口概述
    • 通用说明
    • 公共头
    • 模板相关接口
    • 模型定义
    • 服务域名
    • 请求返回格式
    • 作业实例相关接口
    • 作业相关接口
    • 资源相关接口
  • 常见问题
    • 一般问题
  • 自定义JAR作业
    • Flink自定义JAR作业
    • Spark自定义JAR作业
  • SQL
    • DML语句
    • SET 语句
    • 内置函数
    • 标识符和关键字
    • 窗口函数
    • 概述
    • DDL 语句
      • KAFKA
      • Formats
      • ES
      • Overview
      • PALO
      • TSDB
      • BKAFKA
      • RDS
      • BOS
      • MQTT
  • 文档中心
  • arrow
  • 百度流式计算BSC
  • arrow
  • 典型实践
  • arrow
  • 物联网设备实时监控预警
本页目录
  • 概览
  • 需求场景
  • 方案概述
  • 配置步骤
  • 定义 MQTT Source 表
  • 定义 MQTT Sink 表
  • 编写数据提取DML语句
  • 相关产品

物联网设备实时监控预警

更新时间:2025-08-21

概览

监控、预警工厂设备的用电情况。

需求场景

用户拥有大量的大功率设备,如果没有在下班之前及时关闭,会造成用电浪费,甚至引起重大安全事故。每个设备上的传感器定时(5~30秒不等)将设备当前的情况推送到 物联网核心套件(IoT Core)或 物接入(IoT Hub) 的 MQTT 当中作为 source,在我们 BSC 中创建 FLINK_STREAM/SQL 类型的作业用于设备关键信息的提取,并实时将处理结果推送到 物联网核心套件(IoT Core)或 物接入(IoT Hub) 的 MQTT 当中,方便下游 规则引擎(Rule Engine) 和 时序时空数据库(TSDB)消费。用户可以基于 智能小程序 开发小程序或第三方平台调用 TSDB 的数据 API,并完成数据展示、历史数据分析、故障预警等功能。可以有效发现安全隐患、及时更换老旧设备、发现异常用电情况,为工厂运转节省成本、提升安全系数。

方案概述

用户设备 → IoT Hub → BSC → IoT Hub → Rule Engine → TSDB → 小程序

配置步骤

一个完整的 Flink SQL 作业由 source 表、sink 表和 DML 语句构成。

定义 MQTT Source 表

FLINK
1SET job.stream.timeType = 'PROCESSTIME'; -- 设置 PROCTIME
2CREATE TABLE source_mqtt_table (
3        `modbus` ROW(
4            `request` ROW(
5                `startAddr` BIGINT,
6                `length` BIGINT
7            ),
8            `response` STRING,
9            `parsedResponse` ARRAY < ROW(
10                `desc` STRING,
11                `type` STRING,
12                `unit` STRING,
13                `value` STRING,
14                `errno` BIGINT
15            ) >
16        )
17        `metrics` ROW(
18            `Settingtime_m` BIGINT,
19            `Building` BIGINT,
20            `Floor` BIGINT,
21            `Company` BIGINT,
22            `Equipment` BIGINT,
23            `C_Temperature` DOUBLE,
24            `S_Temperature` BIGINT,
25            `Cabinet` BIGINT,
26            `Runningtime_m` BIGINT,
27            `Runningtime_h` BIGINT,
28            `Settingtime_h` BIGINT
29        )
30    ) WITH (
31        'connector.type' = 'MQTT',
32        'format.encode' = 'JSON',
33        'connector.url' = 'tcp://xxxxxxxxxx.mqtt.iot.gz.baidubce.com:1883',
34        'connector.topic' = 'Device1',
35        'connector.username' = 'xxxxxxxxxx/yyyyyy',
36        'connector.password' = 'xxxxxxxx',
37        'connector.semantic' = 'at_least_once'
38    );

定义 MQTT Sink 表

FLINK
1CREATE TABLE sink_mqtt_table (
2        `field` STRING,
3        `timestamp` BIGINT,
4        `value` DOUBLE,
5        `Company` BIGINT,
6        `Building` BIGINT,
7        `Floor` BIGINT,
8        `Cabinet` BIGINT,
9        `Equipment` BIGINT
10    ) WITH (
11        'connector.type' = 'MQTT',
12        'format.encode' = 'JSON',
13        'connector.url' = 'tcp://xxxxxxx.mqtt.iot.gz.baidubce.com:1883',
14        'connector.topic' = 'Device1_Unnested',
15        'connector.username' = 'xxxxxx/yyyyy',
16        'connector.password' = 'xxxxxxxx'
17    );

编写数据提取DML语句

解析 source 表中的复杂嵌套 json, 提取出设备的关键信息,如位置编号、运行状态,并使用 PROCTIME 来为输出结果添加一条记录

FLINK
1INSERT INTO
2    sink_mqtt_table
3SELECT
4    `desc` AS field,
5    TO_BIGINT(CURRENT_TIMESTAMP) AS `timestamp`,
6    CAST(`value` AS DOUBLE) AS `value`,
7    Company,
8    Building,
9    `Floor`,
10    Cabinet,
11    Equipment
12FROM
13    source_mqtt_table,
14    UNNEST(sink_mqtt_table.parsedResponse) AS A(`desc`, `type`, `unit`, `value`, `errno`)
15WHERE
16    `desc` NOT IN (
17        'Company',
18        'Building',
19        'Floor',
20        'Cabinet',
21        'Equipment'
22    )

相关产品

物联网核心套件 IoT Core、时序时空数据库TSDB

上一篇
物设备报警情况实时统计
下一篇
操作指南