写点什么

Aeron 是如何实现的?—— Ipc Subscription

作者:BUG侦探
  • 2021 年 12 月 08 日
  • 本文字数:1906 字

    阅读完需:约 6 分钟

Aeron 是如何实现的?—— Ipc Subscription

接上文

Aeron 是什么?https://xie.infoq.cn/article/27422063a2cadcc054187135e

Aeron 中这么多空闲策略选哪个?https://xie.infoq.cn/article/41d0885f46594e90cbdba4b2b

Aeron 是如何实现的?—— Conductor https://xie.infoq.cn/article/b4953b06323cd26e3a1397874

Aeron 是如何实现的?—— Ipc Publication https://xie.infoq.cn/article/9c3c085daef8eb12d533d2f66

0. 简介

最近我们用 Aeron 实现了 Mesh agent 与 sdk 之间的共享内存通信,但是在使用过程中越来越感觉到 Aeron 框架太重了,其中很大部分功能完全用不到,有些想要自定义的逻辑很难在现有框架中实现。所以我们计划深入到 Aeron 源码中,看看它是如何实现的,最终尝试实现一个轻量的 Mesh 共享内存通信类库。上文分析了 Ipc Publication 的逻辑,本文继续分析 Ipc Subscription 的逻辑。

1. Driver Conductor - onAddIpcSubscription

在读取数据之前,需要先向 Driver Conductor 发送 ADD_SUBSCRIPTION 命令,在 Driver 中构建发送接收的关系。至于 Conductor 交互的逻辑不再赘述,直接看 Driver Conductor 处理 ADD_SUBSCRIPTION 命令的逻辑。处理逻辑的入口在 io.aeron.driver.ClientCommandAdapter,这里只关心 Ipc 的情况,找到 DriverConductor 中的处理逻辑:

构建完 IpcSubscriptionLink 之后,就返回 ON_SUBSCRIPTION_READY。然后再匹配当前的 ipcPublications,如果有 match 的,那么将订阅关系添加到对应的 IpcPublication 中:

1.1 UnsafeBufferPosition

在发送接收关系中,最核心的数据结构是 Position,记录消费的位置,本质上就是 cnc.dat 中的一个 Counter。

type id 是 SUBSCRIBER_POSITION_TYPE_ID(4),label 是 "sub-pos: ${registrationId} ${sessionId} ${streamId} ${channel} @${joinPosition}"。这个信息可以通过 io.aeron.samples.AeronStat 工具查看。

1.2 关系映射

在 SubscriptionLink 中,positionBySubscribableMap 维护着订阅的多个 Publication 的消费位置。此处之所以是多个的原因是,同一个 stream 可以有多个 session。

在 IpcPublication 中,subscriberPositions 维护着多个 subscriber 的消费位置。(tether 的逻辑暂时忽略,异常情况处理下一篇再分析)

1.3 ON_AVAILABLE_IMAGE

Driver 这边的关系信息维护好之后,向 Client 广播可用 ON_AVAILABLE_IMAGE 信息。

其中包含 Publication 生成的 logbuffer 信息,Subscription 也是直接读该共享内存。

2. Client Conductor - addSubscription

Client Conductor 的处理逻辑也很清晰,主要的处理逻辑就是响应 ON_SUBSCRIPTION_READY 和 ON_AVAILABLE_IMAGE 这两个消息。

收到 ON_SUBSCRIPTION_READY 之后,Client Conductor 构建相应的 io.aeron.Subscription 封装类,然后添加到 resourceByRegIdMap 这个映射关系中。此时调用 poll 拉取消息是不行的,因为还没有 image,也就是对应 Publication 的信息。这个信息是通过 ON_AVAILABLE_IMAGE 消息通知的,处理的方法是 onAvailableImage:

构建的 io.aeron.Image 包装类主要包含 logbuffer 和 position counter 这两个信息。接着调 Subscription 的 addImage 方法,将其添加到 images 数组中。

3. Subscription.poll & controlledPoll

此时就可以调用 poll 方法拉取消息了。

最终调用的是 image 的 poll 方法:

根据上次读的位置(首次是 joinPosition)计算对应的 term 读取。

逻辑很清楚,读数据,最后更新位置。有个细节,此处没有检查数据覆盖的情况,这说明在 Driver 中严格控制了 Publication 的最大位置。读取数据还有个 controlledPoll 方法,主干逻辑与 poll 一样,区别主要在于更新位置信息的逻辑。

4. Driver 对 publisherLimit 的维护

Driver 维护 position 的逻辑在 DriverConductor 的 trackStreamPositions 方法中。对于 ipc 场景调用的是 IpcPublication 的 updatePublisherLimit 方法:

核心逻辑就是更新 publisherLimit。publisherLimit 和 consumerPosition 的起始位置都是 Publication 的初始写入位置。这说明对于一个没有 subscriber 的 Publication 是不能写入的,当有 ADD_SUBSCRIPTION 后,就进入第一个 if 逻辑:

  1. 遍历所有 subscriberPositions,找出最大位置 maxSubscriberPosition 和最小位置 minSubscriberPosition

  2. consumerPosition 设定为最大读取位置

  3. proposedLimit 也就是提议给 Publication 的写入限制,初始值为最小读取位置加上 termWindowLength(termWindowLength 默认为 termLength 的一半,且自定义不能大于该值)

  4. 通过 tripLimit 和 tripGain 限制更新的步长,默认为 termWindowLength 的 1/8

  5. cleanBufferTo 方法清理之前的共享内存

从这里的逻辑看,ipc 场景用两个 termBuffer 就可以,三个岂不是浪费内存?再精简一下,一个 RingBuffer 看起来也可以。







发布于: 2021 年 12 月 08 日阅读数: 10
用户头像

BUG侦探

关注

还未添加个人签名 2021.06.08 加入

专注于发掘程序员/工程师的有趣灵魂,对工作中的思路与总结进行闪光播报。

评论

发布
暂无评论
Aeron 是如何实现的?—— Ipc Subscription