写点什么

Pipy MQTT 代理之(三)Logging

作者:Flomesh
  • 2022 年 4 月 21 日
  • 本文字数:4557 字

    阅读完需:约 15 分钟

Pipy MQTT 代理之(三)Logging

Pipy MQTT 代理系列:



如今可观测性成为一个产品的重要属性,其包含了三个方向:日志、跟踪和度量。在系列的第一篇中,我们简单实现了 metrics 的统计,这次对日志进行实现。至于追踪,MQTT 5 版本加入了 User Property 的支持,在消息头部可以承载扩展信息。对于 MQTT 的追踪目前业界并没有标准。如果端侧可以生成跟踪数据,代理可以进行透传并记录在日志中。因此本篇我们聚焦在日志和度量。


日志是比较常见的记录系统运行的手段之一,通过日志我们可以记录应用过去一段时间行为,进而掌握系统的运行状态,有时也会作为问题排查的依据。对 IoT 物联网的场景下,端侧的问题排查受到很大的限制,且数量众多。

实现

Pipy 处理的是网络流,将流输入到由过滤器组成的管道中。


当前的 MQTT 代理已经实现了限流、指标统计和负载均衡三项功能,对应着除了入口主脚本外的三个模块throttlemetricsbalancer。模块中的管道彼此连接,如下图所示。


请求的流经过限流、指标统计、负载均衡管道的处理后被发送给上游,上游返回的响应流又会经过管道的处理,并最终发送给客户端。



如果要对请求和响应进行记录,则需要将日志管道接入到请求和响应的处理流中。为此,我们加入新的模块 logger,在新的模块中定义三个管道 requestresponselogging(泛指在脚本中的多个管道)。


顾名思义,前两个管道分别接入到请求和响应的流程中,而最后一个则是对记录的信息进行输出。可以输出到控制台、或者类似 ClickHouse 的存储中(可以减少对数据的采集组件,直接送入存储中)。这里为了演示功能,直接将信息输出到控制台。


管道名字在同一个模块中唯一,在多个模块中可以不唯一。



logger 作为消息的处理环节,会记录请求和响应的详细信息,与其他几个管道不同不会对流本身产生影响。这里就用到 Pipy 中很重要的一个过滤器 fork

fork 过滤器

fork 是一种比较特殊的连接过滤器:它将输入的流拷贝一份发送到另一个管道,原来的流会按照原来的流程继续处理,不会对流。这种特性正好满足了我们的需求。

Demo

环境

继续使用之前的 MQTT broker 集群:


  • 192.168.1.11:代理 Pipy proxy

  • 192.168.1.12:客户端 emqx-bench

  • 192.168.1.13:节点 EMQ X Broker

  • 192.168.1.14:节点 EMQ X Broker

测试

熟悉 MQTT 的应该对消息的 qos 不会陌生,不同设定(0、1、2)broker 的处理各不相同。


$ emqtt_bench pub -h 192.168.1.11 -p 1884 -c 1 -L 1 -I 1 -t bench/1 -m 'hello world!' -q 0
复制代码


我们使用 1 个客户端发布 1 条消息分别验证不同 qos 的日志采集。

qos=0 日志

日志:


2022-04-20 14:01:35.823 [INF] {"clientID":"ubuntu-dev2_bench_pub_1_3823694902","broker":null,"clintAddress":"192.168.1.12","clintPort":44726,"timestamp":1650463292823,"type":"CONNECT","qos":0,"dup":false,"retain":false,"protocolLevel":5,"keepAlive":300,"properties":{"sessionExpiryInterval":0},"cleanStart":true,"direction":"request"}2022-04-20 14:01:35.823 [INF] {"clientID":"ubuntu-dev2_bench_pub_1_3823694902","broker":"192.168.1.13:1883","clintAddress":"192.168.1.12","clintPort":44726,"timestamp":1650463292834,"type":"CONNACK","qos":0,"dup":false,"retain":false,"sessionPresent":false,"reasonCode":0,"properties":{"maximumPacketSize":4096,"retainAvailable":1,"sharedSubscriptionAvailable":1,"subscriptionIdentifierAvailable":1,"topicAliasMaximum":65535,"wildcardSubscriptionAvailable":1},"direction":"response"}2022-04-20 14:01:35.823 [INF] {"clientID":"ubuntu-dev2_bench_pub_1_3823694902","broker":"192.168.1.13:1883","clintAddress":"192.168.1.12","clintPort":44726,"timestamp":1650463292835,"type":"PUBLISH","qos":0,"dup":false,"retain":false,"topicName":"bench/1","direction":"request"}
复制代码

qos=1 日志

2022-04-20 14:05:11.315 [INF] {"clientID":"ubuntu-dev2_bench_pub_1_3598972520","broker":null,"clintAddress":"192.168.1.12","clintPort":44732,"timestamp":1650463508315,"type":"CONNECT","qos":0,"dup":false,"retain":false,"protocolLevel":5,"keepAlive":300,"properties":{"sessionExpiryInterval":0},"cleanStart":true,"direction":"request"}2022-04-20 14:05:11.315 [INF] {"clientID":"ubuntu-dev2_bench_pub_1_3598972520","broker":"192.168.1.14:1883","clintAddress":"192.168.1.12","clintPort":44732,"timestamp":1650463508317,"type":"CONNACK","qos":0,"dup":false,"retain":false,"sessionPresent":false,"reasonCode":0,"properties":{"maximumPacketSize":4096,"retainAvailable":1,"sharedSubscriptionAvailable":1,"subscriptionIdentifierAvailable":1,"topicAliasMaximum":65535,"wildcardSubscriptionAvailable":1},"direction":"response"}2022-04-20 14:05:11.315 [INF] {"clientID":"ubuntu-dev2_bench_pub_1_3598972520","broker":"192.168.1.14:1883","clintAddress":"192.168.1.12","clintPort":44732,"timestamp":1650463508318,"type":"PUBLISH","qos":1,"dup":false,"retain":false,"topicName":"bench/1","packetIdentifier":2,"direction":"request"}2022-04-20 14:05:11.315 [INF] {"clientID":"ubuntu-dev2_bench_pub_1_3598972520","broker":"192.168.1.14:1883","clintAddress":"192.168.1.12","clintPort":44732,"timestamp":1650463508318,"type":"PUBACK","qos":0,"dup":false,"retain":false,"packetIdentifier":2,"reasonCode":16,"direction":"response"}
复制代码

qos=2 日志

对于 qos 为 2 的场景,我们需要借助另一个客户端来订阅消息。


$ emqtt_bench sub -h 192.168.1.11 -p 1884 -t bench/1 -c 1
复制代码


注册为订阅者:


2022-04-20 14:07:12.034 [INF] {"clientID":"ubuntu-dev2_bench_sub_1_1982626972","broker":null,"clintAddress":"192.168.1.12","clintPort":44734,"timestamp":1650463629034,"type":"CONNECT","qos":0,"dup":false,"retain":false,"protocolLevel":5,"keepAlive":300,"properties":{"sessionExpiryInterval":0},"cleanStart":true,"direction":"request"}2022-04-20 14:07:12.034 [INF] {"clientID":"ubuntu-dev2_bench_sub_1_1982626972","broker":"192.168.1.13:1883","clintAddress":"192.168.1.12","clintPort":44734,"timestamp":1650463629036,"type":"CONNACK","qos":0,"dup":false,"retain":false,"sessionPresent":false,"reasonCode":0,"properties":{"maximumPacketSize":4096,"retainAvailable":1,"sharedSubscriptionAvailable":1,"subscriptionIdentifierAvailable":1,"topicAliasMaximum":65535,"wildcardSubscriptionAvailable":1},"direction":"response"}2022-04-20 14:07:12.034 [INF] {"clientID":"ubuntu-dev2_bench_sub_1_1982626972","broker":"192.168.1.13:1883","clintAddress":"192.168.1.12","clintPort":44734,"timestamp":1650463629036,"type":"SUBSCRIBE","qos":1,"dup":false,"retain":false,"packetIdentifier":2,"topicFilters":[{"filter":"bench/1","qos":0}],"direction":"request"}2022-04-20 14:07:12.034 [INF] {"clientID":"ubuntu-dev2_bench_sub_1_1982626972","broker":"192.168.1.13:1883","clintAddress":"192.168.1.12","clintPort":44734,"timestamp":1650463629037,"type":"SUBACK","qos":0,"dup":false,"retain":false,"packetIdentifier":2,"reasonCodes":[],"direction":"response"}2022-04-20 14:07:12.034 [INF] :{}
复制代码


发布消息:


2022-04-20 14:08:57.739 [INF] [object pipy::mqtt::MessageHead]:{"type":"CONNECT","qos":0,"dup":false,"retain":false,"protocolLevel":5,"keepAlive":300,"properties":{"sessionExpiryInterval":0},"clientID":"ubuntu-dev2_bench_pub_1_1960359114","cleanStart":true,"direction":"request"}2022-04-20 14:08:57.744 [INF] [object pipy::mqtt::MessageHead]:{"type":"PUBLISH","qos":2,"dup":false,"retain":false,"topicName":"bench/1","packetIdentifier":2,"direction":"request"}2022-04-20 14:08:57.745 [INF] [object pipy::mqtt::MessageHead]:{"type":"PUBREL","qos":1,"dup":false,"retain":false,"packetIdentifier":2,"reasonCode":0,"direction":"request"}2022-04-20 14:09:00.739 [INF] {"clientID":"ubuntu-dev2_bench_pub_1_1960359114","broker":null,"clintAddress":"192.168.1.12","clintPort":44736,"timestamp":1650463737739,"type":"CONNECT","qos":0,"dup":false,"retain":false,"protocolLevel":5,"keepAlive":300,"properties":{"sessionExpiryInterval":0},"cleanStart":true,"direction":"request"}2022-04-20 14:09:00.739 [INF] {"clientID":"ubuntu-dev2_bench_pub_1_1960359114","broker":"192.168.1.14:1883","clintAddress":"192.168.1.12","clintPort":44736,"timestamp":1650463737743,"type":"CONNACK","qos":0,"dup":false,"retain":false,"sessionPresent":false,"reasonCode":0,"properties":{"maximumPacketSize":4096,"retainAvailable":1,"sharedSubscriptionAvailable":1,"subscriptionIdentifierAvailable":1,"topicAliasMaximum":65535,"wildcardSubscriptionAvailable":1},"direction":"response"}2022-04-20 14:09:00.739 [INF] {"clientID":"ubuntu-dev2_bench_pub_1_1960359114","broker":"192.168.1.14:1883","clintAddress":"192.168.1.12","clintPort":44736,"timestamp":1650463737744,"type":"PUBLISH","qos":2,"dup":false,"retain":false,"topicName":"bench/1","packetIdentifier":2,"direction":"request"}2022-04-20 14:09:00.739 [INF] {"clientID":"ubuntu-dev2_bench_pub_1_1960359114","broker":"192.168.1.14:1883","clintAddress":"192.168.1.12","clintPort":44736,"timestamp":1650463737745,"type":"PUBREC","qos":0,"dup":false,"retain":false,"packetIdentifier":2,"reasonCode":0,"direction":"response"}2022-04-20 14:09:00.739 [INF] {"clientID":"ubuntu-dev2_bench_sub_1_1982626972","broker":"192.168.1.13:1883","clintAddress":"192.168.1.12","clintPort":44734,"timestamp":1650463737745,"type":"PUBLISH","qos":0,"dup":false,"retain":false,"topicName":"bench/1","direction":"response"}2022-04-20 14:09:00.739 [INF] {"clientID":"ubuntu-dev2_bench_pub_1_1960359114","broker":"192.168.1.14:1883","clintAddress":"192.168.1.12","clintPort":44736,"timestamp":1650463737745,"type":"PUBREL","qos":1,"dup":false,"retain":false,"packetIdentifier":2,"reasonCode":0,"direction":"request"}
复制代码

总结

这次我们通过插件化的方式引入了 logger 模块,对请求和响应的信息进行了记录。对 Pipy 来说,代理功能都是通过引入新管道的方式进行实现的,新的管道通过连接过滤器与其他管道进行连接,接入到流的处理流程中。可能这个流程比较抽象,后面会单独一篇文章来描述 Pipy 对流的处理。

发布于: 刚刚阅读数: 7
用户头像

Flomesh

关注

微信订阅号:flomesh 2022.04.07 加入

一站式云原生应用流量管理供应商 官网:https://flomesh.io

评论

发布
暂无评论
Pipy MQTT 代理之(三)Logging_IoT_Flomesh_InfoQ写作社区