Aeron 是如何实现的?—— Conductor
接上文
《Aeron 是什么?》https://xie.infoq.cn/article/27422063a2cadcc054187135e
Aeron 中这么多空闲策略选哪个?https://xie.infoq.cn/article/41d0885f46594e90cbdba4b2b
0. 简介
最近我们用 Aeron 实现了 Mesh agent 与 sdk 之间的共享内存通信,但是在使用过程中越来越感觉到 Aeron 框架太重了,其中很大部分功能完全用不到,有些想要自定义的逻辑很难在现有框架中实现。所以我们计划深入到 Aeron 源码中,看看它是如何实现的,最终尝试实现一个轻量的 Mesh 共享内存通信类库。再回过头来看一下这张架构图:
Media Driver 组件负责管理数据通过媒介(UDP 或 IPC)的发送与接收。其中的 Sender 和 Receiver 负责数据的发送与接收,Driver Conductor 接收 Aeron Client 的命令,协调 Media Driver 的动作。本文将分析一下 Driver Conductor 的实现,以及与 Client Conductor 的交互逻辑。
1. Driver Conductor 主干逻辑
Driver Conductor 的逻辑封装在 io.aeron.driver.DriverConductor 类中,该类是一个 org.agrona.concurrent.Agent,所以直接看一下 doWork() 方法:
processTimers(nowNs) 处理心跳和超时
clientCommandAdapter.receive() 处理来自 Client Conductor 的命令(也就是上图中的红线)
driverCmdQueue.drain(...) 处理来自 Sender 和 Receiver 的命令(也就是上图中的蓝线)
维护 publisher 的 limit 位置
name resolver
本文主要分析第 2 部分,其它暂且不看。
2. Client Conductor --> Driver Conductor
Driver Conductor 处理命令的逻辑封装在 io.aeron.driver.ClientCommandAdapter 中。业务逻辑在 onMessage 方法中,也就是针对不同 msgTypeId 调用对应的处理函数。命令传输的逻辑封装在 toDriverCommands 中,他是一个 RingBuffer,其初始化逻辑在 io.aeron.driver.MediaDriver.Context 的 concludeDependantProperties() 方法中:
可见就是将 cncByteBuffer 的一部分包装为 ManyToOneRingBuffer,Client Conductor 写入命令,Driver Conductor 处理命令。
2.1 cncByteBuffer 是什么?
cncByteBuffer 是 ${aeronDirectory}/cnc.dat 文件 mmap 映射的内存,而 ${aeronDirectory} 默认是在/dev/shm/下,也就是说这是一块共享内存。具体看一下初始化逻辑,位于 io.aeron.driver.MediaDriver.Context 的 conclude() 方法中:
mmap 映射的逻辑在 org.agrona.IoUtil 的 mapNewFile 方法中:
暂且不再深究操作系统细节,回到 cnc.dat 文件,其格式定义在 io.aeron.CncFileDescriptor 中:
toDriverCommands 也就是基于 ”to-driver Buffer“ 包装的 ManyToOneRingBuffer。Client Conductor 通过 cnc.dat 中的这部分共享内存向 Driver Conductor 发命令。
2.2 ManyToOneRingBuffer 是个啥?
顾名思义就是个环形缓冲区,状态信息存储在 buffer 末尾:
tailPosition 维护尾部的位置信息
headPosition 和 headCachePosition 维护头部的位置信息
correlationIdCounter 用于生成唯一的命令 ID,用于关联命令响应
consumerHeartbeat 消费端的心跳,也就是消费时更新个时间戳(更新的逻辑就位于上面提到的 processTimers(nowNs) 方法中)
2.3 Client Conductor 如何发送命令?
Client Conductor 向 Driver Conductor ”建连接“ 的逻辑位于 io.aeron.Aeron.Context 的 connectToDriver 中:
此处用”建连接“这个词并不准确,该步骤的主要逻辑是判断 driverTimeoutMs 时间(默认 10s)内 Driver 是否准备就绪。判断标准有三个,在超时时间内:
cnc.dat 文件可以映射
cncVersion 信息不为空,也就是已经初始了
to-driver Buffer 有心跳,也就是 Driver 的消费逻辑正常启动了
发送命令的逻辑比较清晰,以 ADD_PUBLICATION 为例:
3. Driver Conductor --> Client Conductor
上面是 Client Conductor 向 Driver Conductor 发送命令的逻辑,接下来看一下 Driver Conductor 向 Client Conductor 回复响应的逻辑。实际上实现方式是一样的,只场景稍微有些区别,所以用到的工具类不一样。Driver Conductor 向 Client Conductor 回复响应用的是 cnc.dat 中的 to-clients Buffer,具体封装在 io.aeron.driver.ClientProxy 中,初始化逻辑为:
由于存在多个 Aeron Client 连接一个 Media Driver,所以回复响应使用的是一个广播的工具类 org.agrona.concurrent.broadcast.BroadcastTransmitter,Client 根据 id 匹配需要的响应消息。Client Conductor 对应的消费逻辑封装在 io.aeron.DriverEventsAdapter 中,消费用到的广播工具类为 org.agrona.concurrent.broadcast.BroadcastReceiver
这一对广播工具类的细节就不展开了。
版权声明: 本文为 InfoQ 作者【BUG侦探】的原创文章。
原文链接:【http://xie.infoq.cn/article/b4953b06323cd26e3a1397874】。文章转载请联系作者。
评论