Pipy MQTT 代理之(三)Logging
Pipy MQTT 代理系列:
如今可观测性成为一个产品的重要属性,其包含了三个方向:日志、跟踪和度量。在系列的第一篇中,我们简单实现了 metrics 的统计,这次对日志进行实现。至于追踪,MQTT 5 版本加入了 User Property 的支持,在消息头部可以承载扩展信息。对于 MQTT 的追踪目前业界并没有标准。如果端侧可以生成跟踪数据,代理可以进行透传并记录在日志中。因此本篇我们聚焦在日志和度量。
日志是比较常见的记录系统运行的手段之一,通过日志我们可以记录应用过去一段时间行为,进而掌握系统的运行状态,有时也会作为问题排查的依据。对 IoT 物联网的场景下,端侧的问题排查受到很大的限制,且数量众多。
实现
Pipy 处理的是网络流,将流输入到由过滤器组成的管道中。
当前的 MQTT 代理已经实现了限流、指标统计和负载均衡三项功能,对应着除了入口主脚本外的三个模块:throttle、metrics、balancer。模块中的管道彼此连接,如下图所示。
请求的流经过限流、指标统计、负载均衡管道的处理后被发送给上游,上游返回的响应流又会经过管道的处理,并最终发送给客户端。
如果要对请求和响应进行记录,则需要将日志管道接入到请求和响应的处理流中。为此,我们加入新的模块 logger,在新的模块中定义三个管道 request、response 和 logging(泛指在脚本中的多个管道)。
顾名思义,前两个管道分别接入到请求和响应的流程中,而最后一个则是对记录的信息进行输出。可以输出到控制台、或者类似 ClickHouse 的存储中(可以减少对数据的采集组件,直接送入存储中)。这里为了演示功能,直接将信息输出到控制台。
管道名字在同一个模块中唯一,在多个模块中可以不唯一。
logger 作为消息的处理环节,会记录请求和响应的详细信息,与其他几个管道不同不会对流本身产生影响。这里就用到 Pipy 中很重要的一个过滤器 fork。
fork 过滤器
fork 是一种比较特殊的连接过滤器:它将输入的流拷贝一份发送到另一个管道,原来的流会按照原来的流程继续处理,不会对流。这种特性正好满足了我们的需求。
Demo
环境
继续使用之前的 MQTT broker 集群:
192.168.1.11:代理 Pipy proxy
192.168.1.12:客户端 emqx-bench
192.168.1.13:节点 EMQ X Broker
192.168.1.14:节点 EMQ X Broker
测试
熟悉 MQTT 的应该对消息的 qos 不会陌生,不同设定(0、1、2)broker 的处理各不相同。
我们使用 1 个客户端发布 1 条消息分别验证不同 qos 的日志采集。
qos=0 日志
日志:
qos=1 日志
qos=2 日志
对于 qos 为 2 的场景,我们需要借助另一个客户端来订阅消息。
注册为订阅者:
发布消息:
总结
这次我们通过插件化的方式引入了 logger 模块,对请求和响应的信息进行了记录。对 Pipy 来说,代理功能都是通过引入新管道的方式进行实现的,新的管道通过连接过滤器与其他管道进行连接,接入到流的处理流程中。可能这个流程比较抽象,后面会单独一篇文章来描述 Pipy 对流的处理。
版权声明: 本文为 InfoQ 作者【Flomesh】的原创文章。
原文链接:【http://xie.infoq.cn/article/0702c31fd6620066f795dab0a】。文章转载请联系作者。
评论