写点什么

使用 MASA 全家桶从零开始搭建 IoT 平台(三)管理设备的连接状态

  • 2023-05-10
    浙江
  • 本文字数:4763 字

    阅读完需:约 16 分钟

使用MASA全家桶从零开始搭建IoT平台(三)管理设备的连接状态

前言

获取一个设备的在线和离线状态,是一个很关键的功能。我们对设备下发的控制指令,设备处于在线状态才能及时给我们反馈。这里的在线和离线,我们可以简单的理解为设备与 MQTT 的连接状态。

分析

我们打电话的时候经常能听到:"您拨打的用户已关机“和”用户不在服务区或暂时无法接通“,这两种的区别是什么?



1、当用户开机时,会自动向最近的移动基站注册,基站标记该用户为"attach"(在线)状态。2、当用户关机时,手机会发起 datach 流程,告知基站自己关机了,基站标记该用户为"detach"(离线)状态。这样再次拨打就可以节省寻呼资源,直接提示用户关机。3、当用户忽然进入无网络的环境,或者手机故障,导致来不及发起 datach 流程,基站还认为用户"在线",当有人拨打用户号码时,基站测会对用户进行寻呼,但是超时得不到回应后,就会提示"不在服务区"或者"暂时无法接通" 的语音。


其实这个方案在 IoT 上也是可行的,我们可以让设备在线和离线的过程中向特定 Topic 发送状态消息,但是存在问题,我们需要一个单独的 Broker 去订阅这个 Topic,但是这个单独的 Broker 很容易成为单点故障点。而且如果设备数量很大,这种意外离线的设备也很难及时发现,需要下发指令后等待设备响应超时才能发现。

方案 1:遗嘱消息

MQTT 遗嘱消息可以在客户端意外断线时将“遗嘱”优雅地发送给第三方订阅者,以实现离线通知、设备状态更新等业务。其中意外断线指客户端断开前未向服务器发送 DISCONNECT 消息,比如:


因网络故障或网络波动,设备在保持连接周期内未能通讯,连接被服务端关闭设备意外掉电设备尝试进行不被允许的操作而被服务端关闭连接,例如订阅自身权限以外的主题等遗嘱消息在 MQTT 客户端向服务器端 CONNECT 请求时设置,可选属性包括是否发送遗嘱消息 (Will Message)标志,和遗嘱消息主题 (Topic) 与内容(Payload) 以及 Properties。


值得一提的,遗嘱消息发布的时间可能会有延迟:通常意外断线时,服务器无法立即检测到断线行为,需要通过连接保活心跳机制并经过一定周期后才会触发;MQTT 5.0 提供的遗嘱延迟间隔(Will Delay Interval)属性也会影响发布时间。

演示遗嘱消息的使用

我们使用 A、B 两台电脑使用 MQTT X 来演示。我们在 A 电脑的 MQTT X 中新建一个名为 Test 的连接,Host 修改为 修改为我们的 MQTT 地址(192.120.5.204),并输入账号密码,在 Advanced 部分选择 MQTT Version 为 5.0,并且将 Session Expiry Interval 设置为 10,确保会话不会在遗嘱消息发布前过期。



然后在 Lass Will and Testament 部分将 Last-Will Topic 设置为 offline,Last-Will Payload 设置为 I'm offline,Will Delay Interval (s) 设置为 5。



完成以上设置后,我们点击右上角的 Connect 按钮以建立连接。

我们在 B 电脑的 MQTTX 中新建一个连接 Sub,mqtt 地址同样指向我们的 mqtt 服务器(192.120.5.204)



并订阅 offline 主题



我们用任务管理器直接结束 A 电脑的 MQTTX 进程,这是连接会被直接断开,模拟了设备断电的场景,在 5s 之后,在 B 电脑的 MQTTX 订阅中收到了一条内容为 I‘m offline 的遗嘱消息。


实施流程

1、设备遗嘱消息内容设置为 offline,该遗嘱主题与一个普通发送状态的主题设定成同一个 {设备名称}/status。例如 284202304230001/status

2、当设备连接时,向主题 {设备名称}/status 发送内容为 online 的 Retained 消息,其它客户端订阅主题 {设备名称}/status 的时候,将获取到 Retained 消息为 online。


保留消息(Retain )MQTT 服务端收到 Retain 标志为 1 的 PUBLISH 报文时,会将该报文视为保留消息,除了被正常转发以外, 保留消息会被存储在服务端,每个主题下只能存在一份保留消息,因此如果已经存在相同主题的保留消息,则该保留消息被替换。当客户端建立订阅时,如果服务端存在主题匹配的保留消息,则这些保留消息将被立即发送给该客户端。 借助保留消息,新的订阅者能够立即获取最近的状态,而不需要等待无法预期的时间,这在很多场景下是非常重要的。EMQX 默认开启保留消息的能力和服务,可以在 etc/emqx.conf 中修改 mqtt.retain_available 为 false 来关闭保留消息的能力, 这样客户端将被禁止发送 Retain 标志为 1 的 PUBLISH 报文,否则,客户端将会收到原因码为 0x9A(不支持保留消息)的 DISCONNECT 报文。保留消息的服务会存储和管理客户端发送的保留消息,并发送给相应的订阅者。


3、当客户端异常断开时,系统自动向主题 {设备名称}/status 发送内容为 offline 的消息,其它订阅了此主题的客户端会马上收到 offline 消息;如果遗嘱消息设置了 Will Retain,那么此时如果有新的订阅 A/status 主题的客户端上线,也将获取到内容为 offline 的遗嘱消息。

方案 2:使用 WebHook

方案 1 需要设备主动设置遗嘱消息才能实现,那么有没有更简单的方式,直接通过设备与 Mqtt 的连接事件来获取连接状态呢。EMQX 设计了一套 WebHook 系统,可以通过这个自带的 WebHook 系统获取内部的事件并进行处理。WebHook 的原理很简单,当设备与 mqtt 建立连接或者断开连接时,EMQX 会把事件的信息通过我们的配置调用特定的 URL 上的接口,实现通知。使用 WebHook 还可以有限避免单点故障。所以本项目会采用 WebHook 的方式来实现对设备在线和离线的管理。

开启 WebHook

数据集成 -> 数据桥接 中创建一个 Webhook



名称设置为 ConnectedEvent,URL 中填写我们的 Webhook 地址,也就是触发事件之后的调用接口地址,这里我们填:


http://192.120.5.204:5000/api/Device/ConnectedEvent


请求方式为 Post,其他内容保持默认不变



这里注意 URL 可以通过 ${field}的方式拼接,请求体也可以自己指定,如果留空会原样转发消息,我们这里请求体留空设备在线和离线的事件转发的消息格式如下


{  "username": "284202304230001",  "timestamp": 1682652598840,  "sockname": "172.17.0.5:1883",  "receive_maximum": 32,  "proto_ver": 5,  "proto_name": "MQTT",  "peername": "172.17.0.1:48524",  "node": "emqx@172.17.0.5",  "mountpoint": "undefined",  "metadata": {    "rule_id": "rule_3hsx"  },  "keepalive": 60,  "is_bridge": false,  "expiry_interval": 10,  "event": "client.connected",  "connected_at": 1682652598840,  "conn_props": {    "User-Property": {},    "Session-Expiry-Interval": 10  },  "clientid": "mqttx_c4491df0",  "clean_start": false}
复制代码


我们点击 创建 ,并继续点击 创建规则



我们在创建规则中指定新的规则名称 rule_client_connected,并在 SQL 编辑器复制以下内容


SELECT  *FROM  "$events/client_connected",  "$events/client_disconnected"
复制代码


在右侧的事件中,我们可以看到所有可用的事件,我们选择了连接和断开两个事件,在这两个事件触发时会通过 Webhook 调用我们配置的接口,这样我们就能获取到设备的在线、离线状态了。



我们点击 创建按钮 完成规则的创建我们可以看见我们创建好的规则



点击规则 ID,还可以看到统计数据



在 FLows 中还可以看到整个工作流程


演示 Webhook

我们使用 MQTTX 模拟一次设备连接和断开动作,可以在规则统计界面看到我们的操作已经被记录。


编写代码

我们这里采用方案 2。我们需要实现之前配置的 ConnectedEvent 接口


    /// <summary>    /// 连接事件请求    /// </summary>    public class ConnectedEventRequest    {        /// <summary>        /// 设备名称        /// </summary>        public string Username { get; set; }        /// <summary>        /// 时间戳        /// </summary>        public long Timestamp { get; set; }
/// <summary> /// 事件(连接/断开) /// </summary> public string Event { get; set; } /// <summary> /// 连接时间(断开事件中为0) /// </summary> public long Connected_at { get; set; }
/// <summary> /// Client ID /// </summary> public string Clientid { get; set; } }
复制代码


        /// <summary>        /// 更新设备在线状态        /// </summary>        /// <param name="deviceName"></param>        /// <param name="onlineStatus"></param>        /// <returns></returns>        public async Task UpdateDeviceOnlineStatusAsync(string deviceName, OnLineStates onlineStatus)        {            var device = await _ioTDbContext.IoTDeviceInfo.Include(o => o.IoTDeviceExtend).AsNoTracking()                .FirstOrDefaultAsync(o => o.DeviceName == deviceName);            if (device == null)            {                return;            }            else            {                if (device.IoTDeviceExtend == null) //扩展表为空                {                    device.IoTDeviceExtend = new IoTDeviceExtend                    {                        DeviceInfoId = device.Id,                        OnLineStates = (int)onlineStatus,                    };                    _ioTDbContext.Attach(device.IoTDeviceExtend);                                        _ioTDbContext.Entry(device.IoTDeviceExtend).State = EntityState.Added;                    _ioTDbContext.Entry(device.IoTDeviceExtend).Property(o => o.OnLineStates).IsModified = true;                    await _ioTDbContext.SaveChangesAsync();                }                if (device.IoTDeviceExtend.OnLineStates != (int)onlineStatus)         //在线状态不一致                {                    device.IoTDeviceExtend.OnLineStates = (int)onlineStatus;
_ioTDbContext.Attach(device.IoTDeviceExtend); //防止更新其他字段 _ioTDbContext.Entry(device.IoTDeviceExtend).State = EntityState.Unchanged; _ioTDbContext.Entry(device.IoTDeviceExtend).Property(o => o.OnLineStates).IsModified = true; await _ioTDbContext.SaveChangesAsync(); } } }
复制代码


我们根据 Event 中的内容来判断是 连接(client.connected)/断开(client.disconnected) 的事件


        /// <summary>        /// 连接、断开事件        /// </summary>        /// <param name="request"></param>        /// <returns></returns>        [HttpPost]        public async Task ConnectedEventAsync([FromBody] ConnectedEventRequest request)        {            var onlineStatus = request.Event switch            {                "client.connected" => OnLineStates.OnLine,                _ => OnLineStates.OffLine            };
await _deviceHandler.UpdateDeviceOnlineStatusAsync(request.Username, onlineStatus); }
复制代码


总结


以上就是本文要讲的内容,我们可以通过 MQTTX 来测试我们的代码有效性。该方案还存在部分缺点,例如:1、每次设备上下线会导致频繁的请求接口,在大量设备接入的场景中需要考虑接口性能。2、由于网络等问题,Web 调用顺序可能不能完全保证,也许离线会比在线事件更早处理,从而导致状态不一致。我们后面会尝试用其他方案来替代 WebHook,尝试解决上述问题,在此之前我们都会继续使用 WebHook 进行功能演示。


完整代码在这里:https://github.com/sunday866/MASA.IoT-Training-Demos


如果你对我们的 MASA 感兴趣,无论是代码贡献、使用、提 Issue,欢迎联系我们

WeChat:MasaStackTechOps

QQ:7424099

发布于: 1 小时前阅读数: 10
用户头像

还未添加个人签名 2021-10-26 加入

MASA技术团队官方账号,我们专注于.NET现代应用开发解决方案,Wechat:MasaStackTechOps ,Website:www.masastack.com

评论

发布
暂无评论
使用MASA全家桶从零开始搭建IoT平台(三)管理设备的连接状态_IoT_MASA技术团队_InfoQ写作社区