写点什么

Aeron 是如何实现的?—— Ipc Publication

作者:BUG侦探
  • 2021 年 11 月 24 日
  • 本文字数:2231 字

    阅读完需:约 7 分钟

Aeron是如何实现的?—— Ipc Publication

接上文

Aeron 是什么?https://xie.infoq.cn/article/27422063a2cadcc054187135e

Aeron 中这么多空闲策略选哪个?https://xie.infoq.cn/article/41d0885f46594e90cbdba4b2b

Aeron 是如何实现的?—— Conductor https://xie.infoq.cn/article/b4953b06323cd26e3a1397874

0. 简介

最近我们用 Aeron 实现了 Mesh agent 与 sdk 之间的共享内存通信,但是在使用过程中越来越感觉到 Aeron 框架太重了,其中很大部分功能完全用不到,有些想要自定义的逻辑很难在现有框架中实现。所以我们计划深入到 Aeron 源码中,看看它是如何实现的,最终尝试实现一个轻量的 Mesh 共享内存通信类库。上文分析了 Conductor 的逻辑,本文继续分析 Ipc Publication 的逻辑。

1. Driver Conductor - add[Exclusive]Publication

在发送数据之前,需要先向 Driver Conductor 发送 ADD_[EXCLUSIVE_]PUBLICATION 命令,让 Driver 初始化通信的共享内存结构。至于 Conductor 交互的逻辑不再赘述,直接看 Driver Conductor 处理 ADD_[EXCLUSIVE_]PUBLICATION 命令的逻辑。处理逻辑的入口在 io.aeron.driver.ClientCommandAdapter:

这里只关心 IPC_CHANNEL,进入 io.aeron.driver.DriverConductor:

三个主要步骤:

  1. 创建 IpcPublication,核心就是 "${aeronDirectory}/${correlationId}.logbuffer" 共享内存文件;

  2. 返回 ON_[EXCLUSIVE_]PUBLICATION_READY 消息;

  3. 向所有相关的 subscribers 发送 ON_AVAILABLE_IMAGE 消息。(subscribers 收到该消息就会读取共享内存进行消费,具体逻辑下篇分析)

1.1 ${correlationId}.logbuffer 共享内存文件

logbuffer 文件结构的定义在 io.aeron.logbuffer.LogBufferDescriptor 中:

上面三个 Term 用于传输数据,这三个 Term 轮转使用,Aeron Cookbook 上有个动画演示,看起来这种设计主要有利于 UDP 数据流重建。下面的 Meta Data 比较复杂,具体字段用到的时候再详细看:

2. Client Conductor - onNew[Exclusive]Publication

client 收到 ON_[EXCLUSIVE_]PUBLICATION_READY 消息后构建相应的[Exclusive|Concurrent]Publication,该类封装了发送消息的逻辑。其中 ConcurrentPublication 支持并发发送数据,但是性能不如 ExclusivePublication 好。看一下父类 io.aeron.Publication 的构造方法:

再看一下 io.aeron.ExclusivePublication 的构造方法(ConcurrentPublication 的主干逻辑类似,这里就不详细看了):

大部分参数比较直观,termAppenders 封装了具体 Term 的写入逻辑。上面的提到的 Term 轮转用法具体到代码中,就是 activePartitionIndex 和 termBeginPosition 这两个变量的维护。首先看一下选哪个 Term:

  1. 从 Log Meta Data 中取出 Active Term Count (termCount)

  2. termCount 按照 PARTITION_COUNT(3)取余得到当前的 Term 索引 (index)

然后看一下从哪个位置开始写:

  1. 从 Log Meta Data 中获取对应的的 Tail Counter #index (rawTail)

  2. rawTail 的高 32 位是 termId,低 32 位是 termOffset

  3. termBeginPosition = (termId - initialTermId) << positionBitsToShift,其中 positionBitsToShift 这步操作本质上就是乘 Term 的长度

最后需要解释一下 positionLimit,这个值读取的是 cnc.dat 中的一个 Counters Buffer,用于表示可以写入的位置限制,主要作用是传递 Subscriber 的消费能力,用于背压。(这个值的更新在 Driver 中,下篇分析 Subscription 时再详细看)这个信息可以通过 io.aeron.samples.AeronStat 工具查看。其 label 是 "pub-lmt: ${registrationId} ${sessionId} ${streamId} ${channel}",其中的 registrationId 就是 logbuffer 文件名的 correlationId。

3. ExclusivePublication - offer

发送数据有两个方法 offer 和 tryClaim。看一下 ExclusivePublication 的 offer 方法(ConcurrentPublication 的主干逻辑与之类似,只是多了些并发控制):

首先如果写入的位置 position 大于等于 limit,也就是消费的能力更不上写入了,那么产生背压(ConcurrentPublication 由于存在并发的情况,所以并不是严格限制,但是不会超过该 term):

这里还有个细节 maxPossiblePosition,主要是限制 termCount 这个 int 值不要溢出。

3.1 termAppender.append[Un]fragmentedMessage 写入数据

对于正常可以写入的情况,如果写入的数据小于 MTU,那么调用 appendUnfragmentedMessage:

首先更新 Tail Counter #index。如果写入的长度大于本 term 剩余可写空间了,那么在 handleEndOfLogCondition 方法中处理异常情况:

剩余空间填充一个 PADDING_FRAME_TYPE 的消息,然后返回 FAILED(-1)。如果本 term 剩余可写空间足够,那么依次写入 header、reservedValue 和数据。关于长度字段的写入,有个小细节很有意思,先写入一个负值,最后再写入一个正值。通过这种方式可以保证 subscriber 读到“正值”时,数据已经全部写入了。对于数据大于 MTU 的场景,就需要对数据分段了,其它的逻辑跟上面是一样的,只是通过 BEGIN_FRAG_FLAG 和 END_FRAG_FLAG 标识了消息。

3.2 newPosition 更新位置

数据发送成功后,通过 newPosition 方法更新位置信息:

如果写入成功了,那么直接更新本地变量 termOffset 即可。如果写入失败了,又不是超过最大位置的场景,那么就需要轮转 term 了。

更新 Log Meta Data 中的 Active Term Count 和 Tail Counter #next,以及相关的本地变量。

4. tryClaim

最后看一下 tryClaim 方法,主干逻辑与 offer 一致。区别在于:offer 时,数据已经准备好,所以在处理逻辑中直接写入;tryClaim 时,只是预先占着位置,上层通过 BufferClaim 写入数据,最后 commit 时,提交“正值”的长度字段。







用户头像

BUG侦探

关注

还未添加个人签名 2021.06.08 加入

专注于发掘程序员/工程师的有趣灵魂,对工作中的思路与总结进行闪光播报。

评论

发布
暂无评论
Aeron是如何实现的?—— Ipc Publication