使用 Rust 实现的基础的 List 和 Watch 机制
本文分享自天翼云开发者社区《使用 Rust 实现的基础的List 和 Watch 机制》。作者:l****n
使用 Rust 实现的基础的 List 和 Watch 机制
介绍
在日常的开发过程中,有一个很重要的任务是能够通过 Rust 语言实现 K8s 中的各种生态组件,在这个过程中,既需要能过够了解 K8S 的工作原理也需要能够知道 rust 的语言特性。因此,在这个过程中有很多值得探讨的知识点。
在这里,第一步,我们将探索如何使用 Rust 实现一个类似于 Kubernetes 的 list 和 watch 机制。我们将通过 WebSocket 实现实时的消息推送,并使用一些关键的 Rust 异步编程模型来处理事件和连接管理。
我们首先默认大家能够了解 rust 语言的基本特性。下文中,将针对 rust 的知识点展开进行探讨。
目标
理解 WebSocket 连接的建立和管理。
学习如何通过 WebSocket 推送消息。
掌握消息缓存和处理的实现方式。
了解如何使用 Rust 实现一个高效的事件分发系统。
理解 K8S 中的数据一致性保障方法了解本机制的不足,以及后续如何进行改进
理解问题
什么是 list 和 watch?
List:列出当前所有资源的状态。
Watch:实时监控资源的变化,一旦有资源变化,就会立即通知客户端。
使用场景
自动化运维:实时监控系统资源状态,触发自动化运维操作。
应用监控:实时获取应用状态,及时处理异常,在很多的系统设计场景中,能够减少耦合。
K8S 中的相应设计:K8S 中,对相应资源的通知的基础即为 list and watch 机制。本人在学习 K8S 源码的第一步就是学习这一套设计架构。
分析问题
\当然,通过简单的代码仅仅通过 http 进行主动连接也可实现这个功能。但在目前阶段,我们希望能够设计一个高效的、稳定的、可扩展的 list and watch 体系,因此我们需要考虑以下几个关键问题。
关键问题
如何建立和管理 我们服务器和客户端的连接?通过什么方式进行?
如何实现高效的消息推送机制?
如何处理消息缓存和订阅管理?
技术选型
语言:Rust
Web 框架:warp 框架
WebSocket 实现和框架:tokio-tungstenite、warp
异步编程:tokio、管道机制
设计代码结构
针对以上这个需求,结合目前 kunos-system 的需求我们阐释如下
有以下几个资源,Node、Task(Task 是一个 shell 命令、镜像运行命令的载体)、Job(Task 的上层资源,一个 Job 包含多个 Task,类似于 K8s 中的 replicaset)我们需要对这几个资源的状态进行推送。
能够在服务器建立起来一个 watch and list 服务器,能够推送各种事件
能够
组件设计
Broker:管理 WebSocket 订阅者和事件分发。
Watcher:对不同资源类型进行管理和操作。
WebSocket 客户端:与服务器交互,接收实时事件。
基本原理
websocket 路由入口
1. warp::path!("watch" / "node")
*这部分代码定义了一个路径过滤器,用于匹配路径 /watch/node 的 HTTP 请求。
warp::path!是 Warp 框架提供的一个宏,用于简化路径定义。这里的
"watch" / "node"表示请求路径必须是
/watch/node` 才能匹配这个过滤器。
2. .and(warp::ws())
这一部分代码将路径过滤器与 WebSocket 协议过滤器组合起来。warp::ws()
过滤器会匹配 WebSocket 握手请求并提取一个 warp::ws::Ws
类型,表示 WebSocket 配置。这表示我们的这个路径将为一个 websocket 接口。
warp::ws()
过滤器用于匹配并提取 WebSocket 握手请求,确保该请求是 WebSocket 协议请求。
3. .map(move |ws: warp::ws::Ws| { ... })
.map
方法用于将前面的过滤器组合结果映射到一个新的处理逻辑中。这里的 move |ws: warp::ws::Ws| { ... }
是一个闭包,用于处理 WebSocket 请求。
move
关键字确保闭包捕获其环境中的所有变量的所有权,因为这些变量将在异步操作中使用。ws: warp::ws::Ws
参数是从前面的warp::ws()
过滤器中提取的 WebSocket 配置。
4. ws.on_upgrade(move |socket| async move { ... })
ws.on_upgrade
方法用于将 WebSocket 协议升级请求处理为 WebSocket 连接。它接受一个闭包作为参数,当 WebSocket 握手成功后,这个闭包会被调用。在官方定义中,这个方法主要用于自定义一个函数对建立后的 websocket 连接进行一定的操作,因此我们在这里将建立连接后一切操作,比如保持连接,发送信息等。
move |socket| async move { ... }
是一个异步闭包,它将在 WebSocket 连接成功升级后执行。socket
参数表示已经升级的 WebSocket 连接。
5. node_broker_clone.subscribe("node".to_string(), socket).await;
在异步闭包内部,调用 node_broker_clone 的
subscribe` 方法,将新的 WebSocket 连接订阅到节点(node)主题中。后续我们将展开讲解
"node".to_string()
将节点主题名称转换为字符串。socket
参数表示当前的 WebSocket 连接。await
关键字等待异步订阅操作完成。
websocket 连接处理
上面说到,我们通过 ws.on_upgrade(move |socket| async move { ... })
这个方法在连接建立之后进行处理,其中可以知道,我们处理的方法如下所示。
websocket 连接处理
let (ws_sender, mut ws_receiver) = socket.split();
这里使用原生的代码,将已经建立起来的 socket 进行分割,因为 websocket 是双向连接,因此获得针对这个 socket 的发送端(ws_sender)和接收端(ws_receiver)。建立连接并保存
在这里,我们建立了个一个管道,并将 subscriber 的信息进行保存,这里的 mpsc::unbounded_channel::<Message>();
类似于 golang 中的 channel,他会生成一个发送者、一个接收者,当往发送者发送消息的时候,接收者会受到该消息并进行一定处理。因此我们将 subscriber 的发送者(tx)保存至内存里。
建立消息发送机制
这个就是很简单了,通过如果 rx 收到了消息,则向 websocket 的 subscriber 进行发送。该任务是以新协程任务的方式启动的,在后台持续运行
建立 websocket 连接保活机制
这里我们仍然在后台启动一个守护协程,用于保活 websocket 连接,一旦发生了连接失效,则注销消息发送机制,删除 subscribers 缓存中的订阅者。
消息推送机制
事件推送事件推送时候将允许调用相关事件的推送地址,向推送端发送消息
当收到消息的时候,不直接处理消息,而是将放入缓存队列中(一个消息无界流)
事件分发同样的。将启动一个协程,用于从和 event_sender 对应的 event_receiver 中获取消息,推送给订阅者。
获取订阅者的列表并依次发送
如果发现发送失败,则将这个订阅者从缓存中删除
客户端
客户端的代码就是建立起来一个订阅者关注相关事件的动态。在相应的代码中,可以使用该方法。本方法最终返回的是一个无界流 Stream<Item = WatchEvent<R>>
,用于得到服务器推送过来的事件类型
使用验证
不足分析
经过上面的介绍,我们可以看到这个基础的 list and watch 机制能够正确运行。但是,和 K8S、ETCD 中广泛使用的 list and watch 相比仍然缺少一个机制来保证 list 和 watch 的一致性。
请考虑这样一种情况我们的服务器中会源源不断地产生数据 d1,d2,d3,...,dn。当我们使用 list 时候,能够感知到 d1,d2,d3,此时我们完成 list,开始建立 watch。加入在开始建立 watch 这个阶段,即使可能是几毫秒的时间但服务器生成了 d4,而在 watch 建立起来后,只能接收到 d5,d6,...。这就导致了数据的遗失。
在 Kubernetes 中,List
和 Watch
操作结合使用时,需要使用一个 revision 机制以确保资源的变更不会被遗漏。理解 List
和 Watch
操作时 revision
(即 resourceVersion
)的具体含义和管理方式对于保证一致性至关重要。revision 的存在有着如下的意义:
数据版本控制:
revision
是 Etcd 的全局递增计数器,用于标识数据的当前版本。当进行数据的修改、更新操作时候,revision 会+1一致性视图:确保返回的数据是一致的快照视图,表示在该
revision
之前的所有操作都已完成。
revision
与 List
和 Watch
的关系
List 操作:返回资源列表和当前的全局
revision
,作为resourceVersion
。确保获取到的资源是该revision
时刻的一致视图。Watch 操作:使用 List
操作返回的
resourceVersion` 作为起点。从该 resourceVersion开始监听资源的变化,确保在
List和
Watch` 之间的变更不会丢失。
List
操作的 revision
当进行 List 操作时,Kubernetes API Server 从 Etcd 获取当前资源的状态及其
resourceVersion 。这个
resourceVersion 是 Etcd 当前的全局
revision 。它表示在此
revision 之前的所有操作都已经完成,并确保返回的数据是这个
revision` 时刻的一致视图。
Watch
操作的 revision
Watch
操作使用 List
操作返回的 resourceVersion
作为起点,从该版本开始监听资源的变化。这确保了从 List
到 Watch
之间的变更不会被遗漏。
示例流程
List 操作:API Server 从 Etcd 获取指定资源的当前状态。Etcd 返回包含所有资源对象的列表和一个全局 revision
,这个
revision将作为
resourceVersion`。Watch 操作:API Server 使用
List
操作返回的resourceVersion
(revision) 作为起点,开始监听资源的变化。Etcd 返回从指定 revision` 开始的所有变更事件。
总结
revision
:标识数据版本,确保数据一致性。List
和Watch
:List
获取资源和revision
,Watch
从该revision
开始监听变化,确保变更的连续性和一致性。
评论