eKuiper 与百度智能边缘框架 BIE 集成方案
本文转自百度智能边缘 BIE
Baetyl 项目由百度发起,是国内首个加入 LF Edge 的边缘计算项目,旨在将云计算能力拓展至用户现场,提供临时离线、低延时的计算服务,包括设备接入、消息路由、消息远程同步、函数计算、设备信息上报、配置下发等功能。Baetyl 和智能边缘 BIE(Baidu-IntelliEdge)云端管理套件配合使用,整体可达到边缘计算、云端管理、边云协同的效果,满足各种边缘计算场景。
本文以一个常见的物联网使用场景为案例,介绍了如何利用边缘计算框架 Baetyl 来实现对业务的快速、低成本和有效地处理。
在各类物联网项目中,比如智能楼宇项目,需要将楼宇的数据(比如电梯、燃气、水电等)进行采集和分析。一种解决方案是将所有的设备直接接入在云端的物联网平台,类似于像 Baidu IoT Core 或者 AWS IoT Core。这种解决方案的问题在于,数据处理时延较长:数据处理时延较长:通过 Internet 传输和云端的处理后返回给设备,所需时间较长 数据传输和存储成本:通过 Internet 传输需要带宽,对于大规模连接的物联网项目来说,耗费的带宽会相当可观 数据的安全性:有些物联网的数据会相当敏感,全部通过物联网传输的话会有风险 为了解决以上的问题,业界提出了边缘计算的方案,边缘计算的核心就在于把数据进行就近处理,避免不必要的时延、成本和安全问题。开源框架 Baetyl 是百度贡献给 Linux 基金会的开源边缘计算框架,主推物联网场景下端侧的边缘计算解决方案。本文将流处理模块 eKuiper 部署到边缘计算框架 baetyl 上,对一段时间内边缘侧的设备消息进行流式处理,并将处理结果上传云端进行存储。
业务场景
假设现有一组设备,组中的每个设备有一个 id,通过 MQTT 协议往 MQTT 消息服务器上相应的主题发送数据。主题的设计如下,其中 {device_id} 为设备的 id。
每个设备发送的数据格式为 JSON,发送的通过该传感器采集的温度与湿度数据。
现在需要实时分析数据,并提出以下的需求:对每个设备的温度数据按照每 10 秒钟计算平均值(t_av),并且记下 10 秒钟内的最大值 (t_max)、最小值(t_min) 和数据条数(t_count),计算完毕后将这 4 个结果进行保存,以下为样例结果数据:
方案介绍
如下图所示,我们将在 baetyl 边缘计算框架上,采用边缘分析/流式数据处理的方式,从 baetyl-broker 订阅相关设备消息,最后将处理结果输出到 Baidu 的 IoT Core 中。
baetyl-broker (https://github.com/baetyl/baetyl-broker) 是 Baetyl 框架端侧的消息中间件,采用 MQTT3.1.1 协议,可在低带宽、不可靠网络中提供可靠的消息传输服务。
eKuiper (https://github.com/lf-edge/ekuiper/)是基于 SQL 的轻量级边缘流式数据分析引擎,安装包只有约 7MB,非常适合于运行在边缘设备端。
Baidu IoT Core (https://cloud.baidu.com/doc/IoTCore/index.html) 提供了比较全的设备接入和数据分析的方案,此处用于云端的结果数据接入,以及应用所需的结果数据分析。
安装 baetyl 计算框架
在云端新建边缘节点并安装到边缘设备。安装成功后如下所示:
新建 IoT Core 实例
参考百度云 IoTCore 文档 (https://cloud.baidu.com/doc/IoTCore/s/Akck4811r) 新建 IoT Core 实例并进行相关的设备模板配置。配置成功后,使用 MqttBox 进行连接,其中 $iot/test/user/# 主题是我们自定义的具有收发权限的用户主题。
配置 baetyl-broker
编辑 baetyl-broker 配置,暴露一个外部端口供测试使用。
然后编辑配置文件,配置文件如下:
新增 8003 端口供测试使用。并且需要设置映射宿主机端口以供连接。
然后在本地使用 MqttBox 连接 baetyl-broker,来测试连通性。
安装 eKuiper
从 eKuiper 官方镜像仓库选取 eKuiper 的官方 Docker 镜像,这里选取的是:
然后创建容器服务,并添加 eKuiper 服务,设置镜像、添加端口映射以及环境变量。
用户在端侧可以通过 telnet 命令来判断边缘设备上 eKuiper 是否启动成功。
更多 eKuiper 资料可以参考 eKuiper (https://github.com/lf-edge/ekuiper/) 官网 。
安装集成 eKuiper 插件
eKuiper 原生的 stream、rule 创建都是通过 Http 请求,为了适配 baetyl 平台,可以使用 eKuiper 推出的适配插件: kuiper-kubernetes-tool (https://github.com/lf-edge/ekuiper/tree/master/tools/kubernetes) ,支持从配置文件加载 stream、rule 配置。
从 eKuiper 官方镜像仓库镜像仓库选取 eKuiper 的官方 Docker 镜像,这里选取的是:
我们在新建 eKuiper 插件应用时,先新建对应的配置文件。
创建流语法解析
创建流的目的是为了定义发送到该流上的数据格式,类似于在关系数据库中定义表的结构。eKuiper 中所有支持的数据类型,可以参考 eKuiper (https://github.com/lf-edge/ekuiper/) 官网 。
上述语句在 eKuiper 中创建了一个名为 demo 的流定义,包含了两个字段,分别为 temperature 和 humidity,数据源为订阅 MQTT 的主题 devices/+/messages,这里请注意采用了通配符 +,用于订阅不同设备的消息。
数据业务逻辑处理语法解析
eKuiper 采用 SQL 实现业务逻辑,每 10 秒钟统计温度的平均值、最大值、最小值和次数,并根据设备 ID 进行分组,实现的 SQL 如下所示。
这里的 SQL 用了四个聚合函数,用于统计在 10 秒钟窗口期内的相关值。
· avg: 平均值
· max: 最大值
· min: 最小值
· count: 计数
另外还使用了几个基本的函数:
· mqtt: 消息中取出 MQTT 协议的信息,mqtt(topic) 就是取得当前取得消息的主题名称
· split_value: 该函数将第一个参数使用第二个参数进行分割,然后第三个参数指定下标,取得分割后的值。所以函数 split_value("devices/001/messages", "/", 1)调用就返回 001
· GROUP BY 跟的是分组的字段,分别为计算字段 device_id;时间窗口 TUMBLINGWINDOW(ss, 10),该时间窗口的含义为每 10 秒钟生成一批统计数据。
actions 列表中的 mqtt 类型的 action 的相关配置信息是 IoT Core 的连接信息。这里注意替换 IoTCore 的连接信息。
创建命令配置项
将上述两步的语法填写到配置项中。创建配置项如下:
创建配置信息配置项
配置信息用于 kuiper-kubernetes-tool 连接 eKuiper 模块,其中指定了 eKuiper 的 ip、port 等信息。
关于配置详情可以参考 kuiper-kubernetes-tool (https://github.com/lf-edge/ekuiper/tree/master/tools/kubernetes) 文档 。其中 9081 端口是 eKuiper 默认的 Restful API 端口。
创建配置项如下:
创建 kuiper-tool 应用
新建容器服务,并添加 kuiper-kubernetes-tool 服务,设置镜像、添加上两步的配置项。
如果上述步骤都安装正确,在边缘设备执行如下命令,可以得到如下结果:
测试
我们使用 Mqtt Box 模拟设备向事先约定的 Topic 主题发送消息,观察 IoT Core 是否可以收到流式处理的结果。
我们分别向 Baetyl-Broker 发送两条消息:
预期 10s 后 IoT Core 会收到如下消息:
实际操作:
向 baetyl-broker 发送两条消息。
IoTCore 查看:
如上图,符合预期。
此时观察端上应用的资源消耗:
可以看出流式处理引擎 eKuiper 只消耗了极小的内存和 CPU。
通过本文,读者可以基于 Baetyl 边缘计算框架快速集成 eKuiper 流式处理引擎,快速搭建边缘侧的流式解决方案,灵活地开发出基于边缘数据分析的系统,实现数据的低时延、低成本和安全的处理。
评论