RPC 框架编写实践——服务治理的基石,这位阿里 P7 大牛分析总结的属实到位
里面有三个方法, set 负责把数据存入到汇聚桶中, get 负责从固定桶获取数据, 两个方法调用的是不同的桶, 而 replace_dict 的方法就是负责替换桶。 那么, 该怎么确保每个窗口时间到的时候进行切换呢, 由于我的 RPC 框架是基于 Asyncio 的, 所以会考虑用到 Asyncio 来进行调度,在 Asyncio 中有个 loop.call_at 的方法, 用于指定几时执行这个函数, 假设间隔时间为 10 秒, 可以把方法改为:
import asyncio
class Demo:
def init(self) -> None:
汇聚桶
self.temp_dict: dict = {}
固定桶
self.fixed_dict: dict = {}
self.loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()
def set(self, key: Any, value: Any) -> None:
self.temp_dict[key] = value
def get(self, key: Any) -> None:
return self.fixed_dict.get(key)
def replace_dict(self) -> None:
切换逻辑
self.fixed_dict = self.temp_dict
self.temp_dict = {}
表示 10 秒后调用 replace_dict 这个函数
self.loop.call_at(self.loop.time() + 10, self.replace_dict)
之后只要事件循环一直在运行, replace_dict 从第一次被调用后就会被一直运行, 而且是通过事件循环运行的, 还不会出现最大递归的问题。
但是这里会暴露出两个问题, 一个是当窗口时间非常小时, 比如 1 秒, 这时候事件循环调度的误差会被扩大, 因为调度是非常复杂的, 它只能尽量的确定在那个时间点附近能调用到, 误差有多少会由很多的复杂因素共同决定的(对于线程调度等也是一样的原理);另一个问题是当这个窗口时间被限定为 1 秒时, 如果要统计 2 秒的数据, 只能另开一个时间窗口为 2 秒的统计, 如果要统计 3 秒的数据, 就只能再开一个时间窗口为 3 的统计...以此类推, 会非常麻烦, 需要一种好的统一的方法来处理。
启用一个线程
在后台循环, 每隔 n 秒自动切换这个方案看似没问题, 但是线程也是由调用器来调度的, 当时间窗口再小一些时,也会暴露一些问题。同时我会在切换数据那里会调用一些钩子函数, 所以也会增加间隔的误差, 该方案不可采纳。
3.通过时间轮实现一段时间内的数据收集统计
=====================
上面说到, 滑动窗口的方案会暴露出两个问题,导致使用体验上没那么好, 需要解决掉。
首先是第一个问题: 交由调度在自动切换时会存在一些调用误差。 这个问题的核心是, 数据跟时间是强相关的, 所以一定会依赖于时间, 数据的读写必须要跟时间相关, 而上面时间窗口的方案使用的是被动切换的, 他会降低时间的准确性, 那被动方案不行就可以反过来, 使用主动切换。
主动切换的逻辑是根据操作时的时间来判断要操作哪个槽, 再读取数据或者写入数据(性能会稍微降低), 这个方案需要把读写的逻辑改一下:
1.首先存储的数组要把销毁阶段的槽复原,不然在极端的情况下写入和读取的下标可能会一致的, 这时候这个数组的长度为 3, 然后记录一个开始时间。
2.假设间隔为 1 秒, 每次读写数据时, 就会根据当前时间与起始时间的差除以 1 再与数组长度求出余数, 余数的范围在 0, 1, 2 之中,而这个 0, 1, 2 刚好是存数据的数组的下标范围, 随着时间的推移, 得到的下标会一直以 0, 1, 2, 0, 1, 2 一直循环变化着。
3.基本逻辑都搞定了, 但是每次读取数据的时间是不固定的, 比如第一次写入数据时, 刚好命中下标 0, 第二次写入数据时, 刚好命中下标 2,如果刚好遇到读数据 1, 这时候要识别这里 1 的数据并不是有效的, 同时 0, 1, 2 这样一直循环着, 1 可能已经走到第二轮了, 2 还是第一轮的数据, 所以这时候就需要有个变量来标记轮数, 而轮数也很简单, 就是当前时间与起始时间相差再于时间间隔取商。
大概逻辑了解完了, 可以把上面的代码进行改写, 改写完代码如下:
import time
from typing import Dict, List, Tuple
class Demo:
def init(self) -> None:
首先那个销毁的桶要复原, 不然在极端情况下会导致放置和获取的下标是一致的, 然后就像真正的滑动窗口一样变为一个数组
元素中 cnt 代表这是第几轮的数据
self.bucket_list: List[Tuple[dict]] = [{'cnt': 0}, {'cnt': 0}, {'cnt': 0}]
总长度
self.bucket_len: int = len(self.bucket_list)
设置一个变量, 记录开始时间
self.start_timestamp: float = time.time()
def get_index(self) -> Tuple[int, int]:
*1000 是兼容有些间隔是毫秒的
返回的数据中, 第一个为数组下标(也就是指针), 第二个代表第几轮
diff: int = int((time.time() - self.start_timestamp) * 1000)
return diff % self.bucket_len, diff // self.bucket_len
def set(self, key: Any, value: Any) -> None:
index, cnt = self.get_index()
bucket: dict = self.bucket_list[index]
if bucket['cnt'] != cnt:
轮次不一样需要初始化
bucket = {key: value, 'cnt': cnt}
else:
轮次一样更新数据
bucket[key] = value
def get(self, key: Any) -> None:
index, cnt = self.get_index()
index = index - 1
获取的数据要小于当前的指针
if index == -1:
index = self.bucket_len - 1
bucket: dict = self.bucket_list[index]
if bucket['cnt'] != cnt:
轮次不一样, 代表数据不存在
raise KeyError(key)
else:
轮次一样更新数据
return bucket.get(key)
代码非常简单, 移除了 replace_dict 这个被动切换的逻辑, 改为每次 set 和 get 来计算应该落到哪个区间。 这样第一个问题就解决了, 可以来考虑第二个问题, 从问题一中的代码可以发现里面的 bucket_list 虽然是一个数组, 但通过时间和数组长度算出的商来获得下标会一直以 0, 1, 2, 0, 1,2 循环, 所以实际运行起来就像一个时间轮一直转着,新的轮次的数据会覆盖旧的轮次, 而这个时间轮的槽总量不变的, 如下图(draw.io 画出来的有点点丑):
而时间轮也是刚好可以解决问题二的一个方案, 首先假设我们的时间间隔为 60 秒, 如果要兼容 1 秒, 2 秒,3 秒,4 秒,5 秒等多种的时间间隔统计, 那么就可以取他们的最小公因数 1 作为一个子间隔, 它需要 60 个槽位, 我们可以把时间轮当成一个整体, 然后把这个时间轮切为 60 个等份的槽, 再通过时间来算出指针要指向哪个槽位(指针会跟着操作时间与时间轮总槽数取的余数来变化), 写数据时会写入指针指定的槽位, 这个槽位也就是汇聚数据槽, 而指针逆时针方向后面的槽位根据是否有数据来区分固定槽和空槽(初始化时, 所有槽位都是空槽, 第一次写了数据后就只能成为固定槽或者汇聚槽), 取数据只能取当前指针后面的, 且是本轮的数据且不为空槽的数据 。
然后 1 秒间隔的统计只取指针后面一格的槽数据, 2 秒间隔的统计只取指针后面两格的槽数据求和, 3 秒间隔的统计只取指针间隔后面三格的槽数据求和, 以此类推, 这样就能以一个类来兼容多种时间间隔的统计了。不过这个方案有个缺点, 就是上述的时间轮有 60 个槽位, 一个槽被定义会数据汇聚槽, 那剩下的最多只有 59 个固定槽可以获取数据, 还缺少一个槽位, 这时候就需要进行补槽处理, 比如 60 个槽就可以拓展为 65 个槽, 然后限制只能获取最近的 60 个槽即可, 剩下的 5 个槽可以用来当做缓冲位, 重新更改后的代码如下:
这个方案是兼容小范围的数据, 使用 1 分钟的时间间隔来兼容秒级的时钟间隔, 如果要兼容大范围的时间间隔, 比如 5 分钟, 10 分钟,15 分钟等等, 则可以采用指针式时钟一样的方案。 如果你了解 Kafka 的时间轮设计, 就可以知道 Kafka 有多层时间轮, 小层转动一层, 大层就转动一格(就像秒指针跑完一圈, 分指针就动一格一样), 等到大层的数据的到期时间范围小于等于小层时, 数据就会流向小层。不过服务治理的数据还是需要尽量的有效, 所以它的时间跨度一般不会超过一分钟。
import time
from typing import Dict, List, Tuple
class Demo:
def init(self, max_interval: int = 60, min_interval: int = 1) -> None:
self.max_interval = max_interval
算出有多少个槽
self.bucket_len: int = (max_interval // min_interval) + 5
设置槽,元素中第一位代表这是第几轮的数据, 方便覆盖
self.bucket_list: List[dict] = [{'cnt': 0} for _ in range(self.bucket_len)]
设置一个变量, 记录开始时间
self.start_timestamp: float = time.time()
def get_index(self) -> Tuple[int, int]:
*1000 是兼容有些间隔是毫秒的
返回的数据中, 第一个为数组下标(也就是指针), 第二个代表第几轮
diff: int = int((time.time() - self.start_timestamp) * 1000)
return diff % self.bucket_len, diff // self.bucket_len
def set(self, key: Any, value: Any) -> None:
index, cnt = self.get_index()
bucket: dict = self.bucket_list[index]
if bucket['cnt'] != cnt:
轮次不一样需要初始化
bucket = {'cnt': cnt, key: value}
else:
轮次一样更新数据
bucket[1][key] = value
def get(self, key: Any, diff: int = 0) -> None:
新增 diff 参数, 用于指定要获取从指针后到第几个槽的数据
if diff > max_interval:
raise ValueError(f"diff:{diff} > {max_interval}")
index, cnt = self.get_index()
value: int = 0
for i in range(diff):
bucket: dict = self.bucket_list[index - i]
if bucket['cnt'] != cnt:
评论