写点什么

RPC 框架编写实践——服务治理的基石,这位阿里 P7 大牛分析总结的属实到位

作者:Java高工P7
  • 2021 年 11 月 10 日
  • 本文字数:3684 字

    阅读完需:约 12 分钟

里面有三个方法, set 负责把数据存入到汇聚桶中, get 负责从固定桶获取数据, 两个方法调用的是不同的桶, 而 replace_dict 的方法就是负责替换桶。 那么, 该怎么确保每个窗口时间到的时候进行切换呢, 由于我的 RPC 框架是基于 Asyncio 的, 所以会考虑用到 Asyncio 来进行调度,在 Asyncio 中有个 loop.call_at 的方法, 用于指定几时执行这个函数, 假设间隔时间为 10 秒, 可以把方法改为:


import asyncio


class Demo:


def init(self) -> None:

汇聚桶

self.temp_dict: dict = {}

固定桶

self.fixed_dict: dict = {}


self.loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()


def set(self, key: Any, value: Any) -> None:


self.temp_dict[key] = value


def get(self, key: Any) -> None:


return self.fixed_dict.get(key)


def replace_dict(self) -> None:

切换逻辑

self.fixed_dict = self.temp_dict


self.temp_dict = {}

表示 10 秒后调用 replace_dict 这个函数

self.loop.call_at(self.loop.time() + 10, self.replace_dict)


之后只要事件循环一直在运行, replace_dict 从第一次被调用后就会被一直运行, 而且是通过事件循环运行的, 还不会出现最大递归的问题。


但是这里会暴露出两个问题, 一个是当窗口时间非常小时, 比如 1 秒, 这时候事件循环调度的误差会被扩大, 因为调度是非常复杂的, 它只能尽量的确定在那个时间点附近能调用到, 误差有多少会由很多的复杂因素共同决定的(对于线程调度等也是一样的原理);另一个问题是当这个窗口时间被限定为 1 秒时, 如果要统计 2 秒的数据, 只能另开一个时间窗口为 2 秒的统计, 如果要统计 3 秒的数据, 就只能再开一个时间窗口为 3 的统计...以此类推, 会非常麻烦, 需要一种好的统一的方法来处理。


启用一个线程


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


在后台循环, 每隔 n 秒自动切换这个方案看似没问题, 但是线程也是由调用器来调度的, 当时间窗口再小一些时,也会暴露一些问题。同时我会在切换数据那里会调用一些钩子函数, 所以也会增加间隔的误差, 该方案不可采纳。


3.通过时间轮实现一段时间内的数据收集统计


=====================


上面说到, 滑动窗口的方案会暴露出两个问题,导致使用体验上没那么好, 需要解决掉。


首先是第一个问题: 交由调度在自动切换时会存在一些调用误差。 这个问题的核心是, 数据跟时间是强相关的, 所以一定会依赖于时间, 数据的读写必须要跟时间相关, 而上面时间窗口的方案使用的是被动切换的, 他会降低时间的准确性, 那被动方案不行就可以反过来, 使用主动切换。


主动切换的逻辑是根据操作时的时间来判断要操作哪个槽, 再读取数据或者写入数据(性能会稍微降低), 这个方案需要把读写的逻辑改一下:


  • 1.首先存储的数组要把销毁阶段的槽复原,不然在极端的情况下写入和读取的下标可能会一致的, 这时候这个数组的长度为 3, 然后记录一个开始时间。

  • 2.假设间隔为 1 秒, 每次读写数据时, 就会根据当前时间与起始时间的差除以 1 再与数组长度求出余数, 余数的范围在 0, 1, 2 之中,而这个 0, 1, 2 刚好是存数据的数组的下标范围, 随着时间的推移, 得到的下标会一直以 0, 1, 2, 0, 1, 2 一直循环变化着。

  • 3.基本逻辑都搞定了, 但是每次读取数据的时间是不固定的, 比如第一次写入数据时, 刚好命中下标 0, 第二次写入数据时, 刚好命中下标 2,如果刚好遇到读数据 1, 这时候要识别这里 1 的数据并不是有效的, 同时 0, 1, 2 这样一直循环着, 1 可能已经走到第二轮了, 2 还是第一轮的数据, 所以这时候就需要有个变量来标记轮数, 而轮数也很简单, 就是当前时间与起始时间相差再于时间间隔取商。


大概逻辑了解完了, 可以把上面的代码进行改写, 改写完代码如下:


import time


from typing import Dict, List, Tuple


class Demo:


def init(self) -> None:

首先那个销毁的桶要复原, 不然在极端情况下会导致放置和获取的下标是一致的, 然后就像真正的滑动窗口一样变为一个数组

元素中 cnt 代表这是第几轮的数据

self.bucket_list: List[Tuple[dict]] = [{'cnt': 0}, {'cnt': 0}, {'cnt': 0}]

总长度

self.bucket_len: int = len(self.bucket_list)

设置一个变量, 记录开始时间

self.start_timestamp: float = time.time()


def get_index(self) -> Tuple[int, int]:

*1000 是兼容有些间隔是毫秒的

返回的数据中, 第一个为数组下标(也就是指针), 第二个代表第几轮

diff: int = int((time.time() - self.start_timestamp) * 1000)


return diff % self.bucket_len, diff // self.bucket_len


def set(self, key: Any, value: Any) -> None:


index, cnt = self.get_index()


bucket: dict = self.bucket_list[index]


if bucket['cnt'] != cnt:

轮次不一样需要初始化

bucket = {key: value, 'cnt': cnt}


else:

轮次一样更新数据

bucket[key] = value


def get(self, key: Any) -> None:


index, cnt = self.get_index()


index = index - 1

获取的数据要小于当前的指针

if index == -1:


index = self.bucket_len - 1


bucket: dict = self.bucket_list[index]


if bucket['cnt'] != cnt:

轮次不一样, 代表数据不存在

raise KeyError(key)


else:

轮次一样更新数据

return bucket.get(key)


代码非常简单, 移除了 replace_dict 这个被动切换的逻辑, 改为每次 set 和 get 来计算应该落到哪个区间。 这样第一个问题就解决了, 可以来考虑第二个问题, 从问题一中的代码可以发现里面的 bucket_list 虽然是一个数组, 但通过时间和数组长度算出的商来获得下标会一直以 0, 1, 2, 0, 1,2 循环, 所以实际运行起来就像一个时间轮一直转着,新的轮次的数据会覆盖旧的轮次, 而这个时间轮的槽总量不变的, 如下图(draw.io 画出来的有点点丑):



而时间轮也是刚好可以解决问题二的一个方案, 首先假设我们的时间间隔为 60 秒, 如果要兼容 1 秒, 2 秒,3 秒,4 秒,5 秒等多种的时间间隔统计, 那么就可以取他们的最小公因数 1 作为一个子间隔, 它需要 60 个槽位, 我们可以把时间轮当成一个整体, 然后把这个时间轮切为 60 个等份的槽, 再通过时间来算出指针要指向哪个槽位(指针会跟着操作时间与时间轮总槽数取的余数来变化), 写数据时会写入指针指定的槽位, 这个槽位也就是汇聚数据槽, 而指针逆时针方向后面的槽位根据是否有数据来区分固定槽和空槽(初始化时, 所有槽位都是空槽, 第一次写了数据后就只能成为固定槽或者汇聚槽), 取数据只能取当前指针后面的, 且是本轮的数据且不为空槽的数据 。


然后 1 秒间隔的统计只取指针后面一格的槽数据, 2 秒间隔的统计只取指针后面两格的槽数据求和, 3 秒间隔的统计只取指针间隔后面三格的槽数据求和, 以此类推, 这样就能以一个类来兼容多种时间间隔的统计了。不过这个方案有个缺点, 就是上述的时间轮有 60 个槽位, 一个槽被定义会数据汇聚槽, 那剩下的最多只有 59 个固定槽可以获取数据, 还缺少一个槽位, 这时候就需要进行补槽处理, 比如 60 个槽就可以拓展为 65 个槽, 然后限制只能获取最近的 60 个槽即可, 剩下的 5 个槽可以用来当做缓冲位, 重新更改后的代码如下:


这个方案是兼容小范围的数据, 使用 1 分钟的时间间隔来兼容秒级的时钟间隔, 如果要兼容大范围的时间间隔, 比如 5 分钟, 10 分钟,15 分钟等等, 则可以采用指针式时钟一样的方案。 如果你了解 Kafka 的时间轮设计, 就可以知道 Kafka 有多层时间轮, 小层转动一层, 大层就转动一格(就像秒指针跑完一圈, 分指针就动一格一样), 等到大层的数据的到期时间范围小于等于小层时, 数据就会流向小层。不过服务治理的数据还是需要尽量的有效, 所以它的时间跨度一般不会超过一分钟。


import time


from typing import Dict, List, Tuple


class Demo:


def init(self, max_interval: int = 60, min_interval: int = 1) -> None:


self.max_interval = max_interval

算出有多少个槽

self.bucket_len: int = (max_interval // min_interval) + 5

设置槽,元素中第一位代表这是第几轮的数据, 方便覆盖

self.bucket_list: List[dict] = [{'cnt': 0} for _ in range(self.bucket_len)]

设置一个变量, 记录开始时间

self.start_timestamp: float = time.time()


def get_index(self) -> Tuple[int, int]:

*1000 是兼容有些间隔是毫秒的

返回的数据中, 第一个为数组下标(也就是指针), 第二个代表第几轮

diff: int = int((time.time() - self.start_timestamp) * 1000)


return diff % self.bucket_len, diff // self.bucket_len


def set(self, key: Any, value: Any) -> None:


index, cnt = self.get_index()


bucket: dict = self.bucket_list[index]


if bucket['cnt'] != cnt:

轮次不一样需要初始化

bucket = {'cnt': cnt, key: value}


else:

轮次一样更新数据

bucket[1][key] = value


def get(self, key: Any, diff: int = 0) -> None:

新增 diff 参数, 用于指定要获取从指针后到第几个槽的数据

if diff > max_interval:


raise ValueError(f"diff:{diff} > {max_interval}")


index, cnt = self.get_index()


value: int = 0


for i in range(diff):


bucket: dict = self.bucket_list[index - i]


if bucket['cnt'] != cnt:

用户头像

Java高工P7

关注

还未添加个人签名 2021.11.08 加入

还未添加个人简介

评论

发布
暂无评论
RPC框架编写实践——服务治理的基石,这位阿里P7大牛分析总结的属实到位