写点什么

Aeron 是如何实现的?—— Conductor

作者:BUG侦探
  • 2021 年 11 月 10 日
  • 本文字数:1831 字

    阅读完需:约 6 分钟

Aeron是如何实现的?—— Conductor

接上文

《Aeron 是什么?》https://xie.infoq.cn/article/27422063a2cadcc054187135e

Aeron 中这么多空闲策略选哪个?https://xie.infoq.cn/article/41d0885f46594e90cbdba4b2b

0. 简介

最近我们用 Aeron 实现了 Mesh agent 与 sdk 之间的共享内存通信,但是在使用过程中越来越感觉到 Aeron 框架太重了,其中很大部分功能完全用不到,有些想要自定义的逻辑很难在现有框架中实现。所以我们计划深入到 Aeron 源码中,看看它是如何实现的,最终尝试实现一个轻量的 Mesh 共享内存通信类库。再回过头来看一下这张架构图:

Media Driver 组件负责管理数据通过媒介(UDP 或 IPC)的发送与接收。其中的 Sender 和 Receiver 负责数据的发送与接收,Driver Conductor 接收 Aeron Client 的命令,协调 Media Driver 的动作。本文将分析一下 Driver Conductor 的实现,以及与 Client Conductor 的交互逻辑。

1. Driver Conductor 主干逻辑

Driver Conductor 的逻辑封装在 io.aeron.driver.DriverConductor 类中,该类是一个 org.agrona.concurrent.Agent,所以直接看一下 doWork() 方法:

  1. processTimers(nowNs) 处理心跳和超时

  2. clientCommandAdapter.receive() 处理来自 Client Conductor 的命令(也就是上图中的红线)

  3. driverCmdQueue.drain(...) 处理来自 Sender 和 Receiver 的命令(也就是上图中的蓝线)

  4. 维护 publisher 的 limit 位置

  5. name resolver

本文主要分析第 2 部分,其它暂且不看。

2. Client Conductor --> Driver Conductor

Driver Conductor 处理命令的逻辑封装在 io.aeron.driver.ClientCommandAdapter 中。业务逻辑在 onMessage 方法中,也就是针对不同 msgTypeId 调用对应的处理函数。命令传输的逻辑封装在 toDriverCommands 中,他是一个 RingBuffer,其初始化逻辑在 io.aeron.driver.MediaDriver.Context 的 concludeDependantProperties() 方法中:

可见就是将 cncByteBuffer 的一部分包装为 ManyToOneRingBuffer,Client Conductor 写入命令,Driver Conductor 处理命令。

2.1 cncByteBuffer 是什么?

cncByteBuffer 是 ${aeronDirectory}/cnc.dat 文件 mmap 映射的内存,而 ${aeronDirectory} 默认是在/dev/shm/下,也就是说这是一块共享内存。具体看一下初始化逻辑,位于 io.aeron.driver.MediaDriver.Context 的 conclude() 方法中:

mmap 映射的逻辑在 org.agrona.IoUtil 的 mapNewFile 方法中:

暂且不再深究操作系统细节,回到 cnc.dat 文件,其格式定义在 io.aeron.CncFileDescriptor 中:

toDriverCommands 也就是基于 ”to-driver Buffer“ 包装的 ManyToOneRingBuffer。Client Conductor 通过 cnc.dat 中的这部分共享内存向 Driver Conductor 发命令。

2.2 ManyToOneRingBuffer 是个啥?

顾名思义就是个环形缓冲区,状态信息存储在 buffer 末尾:

  1. tailPosition 维护尾部的位置信息

  2. headPosition 和 headCachePosition 维护头部的位置信息

  3. correlationIdCounter 用于生成唯一的命令 ID,用于关联命令响应

  4. consumerHeartbeat 消费端的心跳,也就是消费时更新个时间戳(更新的逻辑就位于上面提到的 processTimers(nowNs) 方法中)

2.3 Client Conductor 如何发送命令?

Client Conductor 向 Driver Conductor ”建连接“ 的逻辑位于 io.aeron.Aeron.Context 的 connectToDriver 中:

此处用”建连接“这个词并不准确,该步骤的主要逻辑是判断 driverTimeoutMs 时间(默认 10s)内 Driver 是否准备就绪。判断标准有三个,在超时时间内:

  1. cnc.dat 文件可以映射

  2. cncVersion 信息不为空,也就是已经初始了

  3. to-driver Buffer 有心跳,也就是 Driver 的消费逻辑正常启动了

发送命令的逻辑比较清晰,以 ADD_PUBLICATION 为例:

3. Driver Conductor --> Client Conductor

上面是 Client Conductor 向 Driver Conductor 发送命令的逻辑,接下来看一下 Driver Conductor 向 Client Conductor 回复响应的逻辑。实际上实现方式是一样的,只场景稍微有些区别,所以用到的工具类不一样。Driver Conductor 向 Client Conductor 回复响应用的是 cnc.dat 中的 to-clients Buffer,具体封装在 io.aeron.driver.ClientProxy 中,初始化逻辑为:

由于存在多个 Aeron Client 连接一个 Media Driver,所以回复响应使用的是一个广播的工具类 org.agrona.concurrent.broadcast.BroadcastTransmitter,Client 根据 id 匹配需要的响应消息。Client Conductor 对应的消费逻辑封装在 io.aeron.DriverEventsAdapter 中,消费用到的广播工具类为 org.agrona.concurrent.broadcast.BroadcastReceiver

这一对广播工具类的细节就不展开了。






发布于: 54 分钟前阅读数: 3
用户头像

BUG侦探

关注

还未添加个人签名 2021.06.08 加入

专注于发掘程序员/工程师的有趣灵魂,对工作中的思路与总结进行闪光播报。

评论

发布
暂无评论
Aeron是如何实现的?—— Conductor