写点什么

EMQX + PolarDB-X 一站式 IoT 数据解决方案

  • 2022 年 8 月 26 日
    北京
  • 本文字数:3496 字

    阅读完需:约 11 分钟

本文整理自 EMQX 产品经理李国伟,在 PolarDB 开源社区中关于 EMQX 与 PolarDB-X 构建一站式 IoT 数据解决方案的分享。本篇内容主要分为四个部分:


1. IoT 数据特性

2. EMQX 介绍

3. EMQX 与 PolarDB-X 集成

4. EMQX+PolarDB-X 方案 DEMO

 

一、IoT 数据特性



物联网应用场景离不开数据的采集、传输、存储、分析等过程。大家可以按照数据特性以及业务需求,把物联网数据划分为元数据、消息数据、时序数据。如上图所示,数据集通过 EMQX 连接设备,实现数据存储。


元数据是设备最新的状态数据,如在线状态、当前传感器数值。消息数据是设备发布的消息,包括上报数据和下发指令。时序数据是持续变化的元数据和消息数据。


 

在物联网应用中,数据存储的需求无处不在。元数据、消息数据经过存储使用后,可以支撑起各类业务需求。由于连接数规模、采集点数量庞大,不同的数据业务,需要对应不同的数据库选型。


元数据需要频繁插入更新,并且支持结构化查询,所以推荐使用关系数据库,进行存储。

消息数据不需要全量存储,只需要记录关键操作,或者提取消息数据的关键数据,应当根据业务情况适当的选型。


时序数据的特点是,数据修改频次低,海量数据对写入速度和存储压缩比敏感,查询需求多样,所以推荐使用时序数据库。

 

二、EMQX 介绍


MQTT 是一个轻量的发布订阅模式消息传输协议,专门针对低带宽和不稳定网络环境的物联网应用设计。


MQTT 协议是基于发布/订阅模式的物联网通信协议,凭借简单易实现、支持 QoS、报文小等特点,占据了物联网协议的半壁江山。


MQTT 协议广泛应用于物联网、移动互联网、智能硬件、车联网、电力、能源等领域,既能作为网关在设备侧接入通信,也能作为设备-云端的通信协议:行业内 ZigBee、LoRa、Modbus 等绝大多数网关协议最终都转换为 MQTT 协议接入上云。



EMQX 是 MQTT 代理,即 MQTT 消息服务器。它将设备和应用联系起来,充当设备与设备、设备与应用之间的桥梁。


EMQX 是一个大规模分布式的物联网专用的 MQTT 消息服务器。它可以高效可靠的连接海量物联网设备,提供高可靠、高性能的实时数据移动、处理和集成,助力构建关键业务的物联网平台和应用。


 

EMQX 基于 Apache 2.0 许可证完全开源,自 2013 年起 200+开源版本迭代,在全球都有广泛的用户。


EMQX 在协议的支持度、接入、吞吐、延迟等方面有显著的优势。在最新发布的 5.0 版本中,EMQX 单节点支持 500 万 MQTT 设备连接,集群可扩展至 1 亿并发 MQTT 连接,可以支撑超大规模的物联网应用。



EMQX 可以连接任何设备。通过开放标准物联网协议 MQTT、QUIC、LwM2M/CoAP,支持连接所有车联网、工业、能源电力等关键业务场景的异构终端设备。


EMQX 提供了安全的双向通信。通过 TLS/SSL、QUIC 和多样化的认证机制确保安全的双向 MQTT 安全通信。支持用户名/密码、JWT、PSK 和 X.509 证书认证,确保数据安全。


除此之外,EMQX 实时数据处理能力,通过一个强大的基于 SQL 的规则引擎,以数百万条/秒的速度实时过滤、转换与处理设备与云端之间双向移动的 MQTT 消息数据。


在运维管理以及可观测性方面,EMQX 通过 CLI、HTTP API 和 Dashboard,轻松管理 EMQX 集群。支持使用 Datadog、Statsd、Prometheus 和 Granfana 进行监控和报警。

 

三、EMQX 与 PolarDB-X 集成


 

接下来,看一下 EMQX 如何与 PolarDB-X 集成,实现物联网数据 PolarDB-X 存储方案。这个方案广泛用于 EMQX 开源版本。由于数据存储业务众多,连接数规模与采集点数量庞大,所以需要超高并发写入和海量存储支持。


通过 EMQX 的数据集成能力,以及 MQTT 共享订阅功能,将内部的设备、事件、消息,发送到用户编写的程序中,实现 PolarDB-X 的插入或更新。


数据集成是 EMQX 在发布订阅模型的基础之上的数据处理与分发组件,通过简单的可视化的配置,即可将消息流以及设备事件与 Kafka、RabbitMQ 等消息中间件,以及各类 SQL/NoSQL/时序数据库等数据系统集成。


 

通过上述规则 EMQX 可以处理各类数据,包括 MQTT 消息和设备生命中心事件。其中,MQTT 消息包括设备端的上报,或者云端的下发。


设备生命周期事件是指,整个设备在运行过程中的事件。这些事件对物联网的应用建设有很大帮助。比如进行设备管理,安全审计啊,消息确认等等。消息丢弃、消息投递、消息确认是精细化的 MQTT 消息传输过程。


共享订阅是在多个订阅者之间实现负载均衡的订阅方式,相当于订阅端的负载均衡功能。当发布者的生产能力较强时,可能会出现订阅者的消费能力无法及时跟上的情况。


如果在 EMQX 集群中,某个节点挂了,同时订阅多个节点避免某个节点故障导致数据丢失。


 

用户可以通过使用 $share/{group}/{topic}或 $queue/{topic}格式的主题,发起共享订阅。


如上图所示,多个客户端发布的消息。经过 EMQX 之后,共享订阅会在多个订阅者之间均衡派发消息。

 

四、EMQX+PolarDB-X 方案 DEMO



在本次演示中,使用 PolarDB-X 存储 MQTT 设备在线状态,在上下线时更新并记录更新时间。


使用 PolarDB-X 记录设备事件,记录设备上下线历史、订阅、取消订阅历史记录等行为轨迹。


使用消息存储功能,将存储设备上报的消息,来源设备、主题、消息内容进行存储,实现消息入库。


上图是 worker 的实例代码,负责接收 MQTT 消息,进行数据插入。在处理时,把它转换成 JSON,根据 JSON event 判断插入历史表,或者进行其他操作。



上图是设备在线状态表,包括一个 ID、设备 IP、在线状态,创建时间、更新时间。



历史事件和消息表都有对应字段,对应的数据类型。消息表的 Payload 可以存储较大的消息文本。



EMQX 规则将所需事件数据通过 republish 动作,转发至 worker 处理。



接下来,进入实际的操作演示。首先,选择本地部署 PolarDB-X,按照文档中的快速入门指南,在本地部署。



然后,创建表结构。上图展示了具体的 SQL 文件,方便大家进行创建。



接下来,插入数据并更新语句。更新 ip_address 和状态。当唯一键重复时,将会执行更新操作。如果不重复,会写入数据。


client_events 表包括客户端的事件。当设备断开连接,没有 topic。当会话订阅时,有 topic。



接下来,查看表里面都有哪些数据。如上图所示,表里包括了 id、clientid、event、topic、created_at、updated_at。


 接下来,插入 messages 表。将 clientid,topic,payload 插入。


接下来,在本地启动 EMQX。启动成功之后,访问 localhost18083,把语言改成简体中文。


在数据集成功能中,主要包括数据桥接以及规则,用规则获取数据。


EMQX 的事件通过主题进行获取。在客户端建立上线事件连接,用调试的方式,执行 SQL。EMQX 可以在客户端里,拿到相关信息,包括 ID、关键事件、连接时间、连接属性等等


然后,断开连接。断开连接对应的是 disconnected 事件,跟连接的结构很像。但在 event 里,是断开连接事件。所以,用户可以通过 event 字段,区分是否连接。


在设备的事件历史记录方面,获取订阅跟取消订阅事件。


session_subscribed 是订阅事件。当会话订阅执行之后,可以获取当前的上下文信息、订阅的 topic 等等。


session_unsubscribed 是取消订阅事件,跟订阅大同小异。event_topic 变成了取消订阅的主题。


在消息存储方面,用户需要从客户端发出的主题中,获取消息。比如从 t-1 主题发布消息时,需要从 t-1 主题获取消息。其中,最主要的是 Payload 信息。其他数据,可以根据需要进行存储。


当用户获取相关数据后,可以使用数据桥接进行转发,通过外置数据库或者 MQTT 进行数据发送。除此之外,用户也可以使用控制台输出,把结果打印到控制台。


接下来,进行测试。在问题分析中,找到 WebSocket 客户端建立连接。已发送是 t-1,已接收是 t-2。消息经过 republic,进行转发。


在控制台里,除了消息发布,还有控制台打印。把动作和上下文信息打印出来,方便用户进行调试。



在 EMQX 开源版里,由于没有直接编写数据库的能力。所以用户需要使用 Worker 建立一个 mysql 链接,连接到 PolarDB,使用 PolarDB 的兼容模式。


然后,创建订阅者。把客户端连接到 EMQX 上。在主程序里,会创建十个客户端。每个客户端都共享订阅主题。当收到消息时,注册一个 handleMessage 程序就。它会把 data 转成对象。


转换之后,就可以获取相应的字段。当 event 已连接或断开连接时,需要写入或更新设备表。把数据插入到历史表里。


如果是消息发布事件,把消息插入 message 表里。如果是其他事件,把这个事件插入到客户端事件表里。每来一条消息,会轮流派发到客户端。不会出现一条消息同时给十个客户端处理。



PolarDB-X 超高并发、海量存储的特性可以应对物联网大规模设备接入所需的各类数据存储场景。


其分布式特性以及存储计算分离架构带来的水平扩展、分布式事务、混合负载等能力,可以与同样是分布式的 EMQX 结合使用,打造真正的可伸缩物联网应用,应对从数千到数千万的设备接入。


除了本次分享的开源版方案,EMQX Cloud 和 EMQX 企业版中还提供了 MySQL 数据集成能力,可以直接通过 PolarDB-X 兼容语法完成数据集成,更简单高效的实现物联网数据集成。

用户头像

还未添加个人签名 2022.01.10 加入

还未添加个人简介

评论

发布
暂无评论
EMQX + PolarDB-X 一站式 IoT 数据解决方案_数据库_阿里云数据库开源_InfoQ写作社区