写点什么

走进 Tokio 的异步世界

用户头像
lipi
关注
发布于: 2021 年 02 月 12 日

Tokio 是一个基于 Rust 语言开发的异步运行时。初接触的开发者可能会存在两个疑问,为什么要异步,什么要基于 Rust 来做异步?

简单的说,异步更符合计算机的运行机制,能够更大的发挥计算能力。当然,这个是针对 IO 密集型的任务。如果是 CPU 密集型的,长耗时的纯计算,那还是同步机制好

从通常的场景来看,大部分的应用都是 IO 密集型的。长耗时的纯 CPU 计算只需要写一个脚本跑就可以了,比较简单

为什么采用 Rust 来做异步,这个可能要说的内容比较长。个人的理解,从 Python 的 yield 和 green thread ,到后来的 Node callback ,到 go 和 java 的异步,都实现的比较别扭,没有完全释放异步的性能。究其原因,还是因为之前的语言有自己的设计模式,只是在原有语言的能力上增加了一些语法糖来实现 Async ,没有专门针对异步来设计。通过 green thread 或者 callback 都不是异步的全部。Rust 是系统级语言,也就是底层的能力透明的使用 Linux 系统能力,也就有可能充分利用系统能力来打造完全为异步而设计的框架,Tokio 就是这么一个框架。

有一篇文章:Rust’s Journey to Async/Await 讲述了 Rust Async 的历史。我在 Rust 高性能开发 里面也描述了这个过程:

Async 为什么会采用 Native Thread ,什么是 Native Thread ?

核心的原因是,Rust 是 "system programming language" ,和 C 之间不能有 overhead 。也就是说,Rust 必须使用系统 Native 的 Thread,才能和 C 的转换没有额外的 IO 损耗。

Rust 的 Async 采用了一种 "Synchronous non-blocking network I/O" (同步非阻塞 IO)。这个看上去有些矛盾,但是仔细了解一下,感觉挺有道理的。同步阻塞的问题,就是效率较低。异步非阻塞的问题,对于长耗时的操作效率较低。异步阻塞,能够让长耗时的任务安排到独立线程运行,达到更好的性能。同步非阻塞 IO,就是用同步的方法来写代码,但是内部其实是异步调用。

async-std 在这篇博客 这样说:"The new runtime detects blocking automatically. We don’t need spawn_blocking 。anymore and can simply deprecate it " 。系统 runtime 竟然能够自动检测是不是阻塞操作,不需要显式调用 spawn_blocking 来针对阻塞操作。但是 Tokio 没有采用这个办法,Tokio 提供了 spawn_blocking 来创造阻塞任务

但是 Native Thread 在应对 IO 请求的时候,存在问题。它会针对每个请求,准备一个线程。这样会极大消耗系统资源,并且这些线程在等待的时候什么都不做。这样的机制面对大量请求的异步操作时会非常低效。

Go 和 Erlang 都是采用 Green Thread 来解决这个问题。但是 Rust 因为不想和 C 之间有更多的隔阂,不想采用 Green Thread 模式。

Rust 参考了 Nginx 的 Event Poll 模型,还有 Node.js 的 "Evented non-blocking IO" 模型。withoutboats 非常推崇 Node.js 模型,但是 Node.js 带来了回调地狱 (callback hell) 。Javascript 又创造了 Promise 来避免回调的一些问题。Promise 就是 Future 的思路来源。

Twitter 的工程师在处理这个问题的时候,放弃了 JVM 转而用 Scala ,获得了非常大的性能提升 。然后他们写了一个 Paper 叫做 "Your Server as a Function" 。介绍了一个概念,叫做 Future 。这样描述:

A future is a container used to hold the result of an asynchronous operation such as a network RPC, a timeout, or a disk I/O operation. A future is either empty—the result is not yet available; succeeded—the producer has completed and has populated the future with the result of the operation; or failed—the producer failed, and the future contains the resulting exception

Future 是一个容器用来收置一个异步操作,例如网络、RPC、超时、或者磁盘 I/O。Future 或者是空,这个时候结果还没有返回;或者是成功,生产者( 生产 Future 的函数或者进程)已经提交并且完成了 Future 的操作,获得了结果;或者失败,生产者出现了错误,future 返回了异常结果

Rust 在这个基础上,完善并推出了 zero cost future 。Aaron Turon 写了另外一篇文章:Zero-cost futures in Rust 来详细说明这个。这应该是 Rust 语言级别对 Async Future 的优化,也是 Rust Async 的精华所在,Aaron 在里面是这样说的:

I’ve claimed a few times that our futures library provides a zero-cost abstraction, in that it compiles to something very close to the state machine code you’d write by hand.

  • None of the future combinators impose any allocation. When we do things like chain uses of and_then, not only are we not allocating, we are in fact building up a big enum that represents the state machine. (There is one allocation needed per “task”, which usually works out to one per connection.)

  • When an event arrives, only one dynamic dispatch is required.

  • There are essentially no imposed synchronization costs; if you want to associate data that lives on your event loop and access it in a single-threaded way from futures, we give you the tools to do so.

意思就是 Rust 设计了一个类似“状态机”的机制来优化 future 的调度。减少多层嵌套 future 的额外分配的开销;多次事件处理一次分配;提供工具在同步进程关联数据。这个挺有意思的,我们可以在下一个篇幅里面研究一下 Rust 是怎么实现的。这篇先说 Tokio

Tokio 经过几次更迭,到 1.0 的时候,已经是一个非常优雅的多层技术栈了。



这是 Tokio 主页的图。Runtime 是核心,承载 I/O、文件、同步和调度,是异步框架的基础;Hyper 是 http1/2 的网路协议实现;Tonic 是 gRPC 的 Rust 实现;Tower 是网络组件,提供负载均衡、超时重试等客户端之间的网络管理能力;Mio 是系统级的事件驱动的最小实现;Tracing 调试跟踪能力;Bytes 流式数据处理能力

既然 Async Future 是 核心,那就让我们从 Future 开始:

Future 在 Rust 的定义是这样:

A future represents an asyncchronous computation

A future is a value tha may not have finished computing yet. This kind of "asynchronous value" make it possible for a thread to continue doing useful work whilt it waits for the value to become available

Future 表示的是一个异步计算,还没有完成的计算的值。这个“异步的值”让线程在等待这个值可用之前,可以做其他事情

异步就是提交了一个操作之后,本线程继续执行。这个操作由其他线程承载,在操作完成之后,通过事件机制记录异步的操作完成了。然后提交这个操作的线程下一次轮询的时候就可以直接获取这个值。这个在很多文章里面都有介绍。



上面是 Rust 异步调用的一个示意图,原图来自于 ira

Future 提交到任务执行队列(1)。执行后台是一个 event loop 和线程池。Future 提交之后,会有一个线程 poll 这个队列(2),然后分配线程来执行 Future 。执行的时候会调用 Future 的 poll,如果发现 poll 的状态是 Ready(3),就把返回值返回给在 await 位置等待的线程(4)。如果 poll 的状态是 Pending,则把在事件循环里面的事件树上做记录,线程继续执行 Future,主 poll 线程离开。

线程在执行 Future 之后,执行完成。就通过 waker 提交到事件树(5),告诉本线程执行完成。事件循环到这个事件的时候,发现 Future 执行完成。就通过回掉 Waker(6),把 Future 重新推送到执行队列。这个时候,Future 已经执行完成,Future 也有返回值。再次 poll 的时候,就返回结果给在 await 等待的线程

上面的过程有些复杂,核心包括两个:

  • 线程池和轮询分配机制

  • 事件树和轮询机制

可以简化理解为,后面有一堆线程。提交一个 Future 到队列。后面有个分配者(线程)分配线程池的线程来执行。分配线程只做分配,同时看队列的线程执行完成没有(通过查看 poll 状态)。

事件和 waker 就是查看和调度在执行 Future 的线程。一旦发现执行完成,就把执行完成的 Future 再推到执行线程,通过 poll 返回给调用者

所以我们还是回到 Future 的定义,以及怎么执行 Future。就可能会更清晰一些

Future 的定义如下:

pub trait Future{  type Output;  fn poll(self:Pin<&mut Self>, cx:&mut Context<`_>) -> Poll<Self::Output>;}
复制代码


Output 是当 Future 完成之后的结果

poll 是一个函数,返回结果有两个。如果 Future 还没完成就返回 Poll::Pending;如果 Future 执行完成就返回 Poll::Ready(val),val 就是具体的返回值

Poll 参数有两个,一个是 Pin,一个是 Context。Pin 的作用是让这个 trait 不要移动。因为移动会造成额外的开销。withoutboats 有一篇文章 Zero Cost Abstractions 里面有一段说明了 Pin 的作用:

Async/await and Futures. The Futures API is an important example, because the early versions of futures hit the “zero cost” part of zero cost abstraction really well, but was not actually providing a good enough UX to drive adoption. By adding pinning to support async/await, references across awaits, and so on, we’ve made a product that I really think will solve users’ problems and make Rust more viable for writing high performance network services.

Zero Cost Abstraction(零成本抽象) 是 Rust 设计 Zero Cost Future 的初衷。withoutboats 在上面这边文章里面引用了 C++ 设计者 Bjarne Stroustrup 的一段话作为 Zero Cost Abstract 的定义

What you don’t use, you don’t pay for. And further: What you do use, you couldn’t hand code any better.

这句话原始的来源于 Foundations of C++



Withoutboat 描述 Pin 的设计,是提供了一种更好 UX 来支持 Async 的操作

poll 还有一个参数,叫做 Context,属于 waker 的一个 trait。这个也引入了另外一个概念叫 waker。在上面的异步流程图里面也可以看到,waker 是事件驱动的很关键的机制。

Pin<P> 让被 pin 的 P 所指向的内存块在内存里固定,不能被移动,在 P 被 dropped 的时候也不能清除或者分配这块内存

在 Async 的实现过程中,函数的执行是分段的。例如在下面的代码里面:

async fn foo() -> u8 { 5 }async fn bar() -> impl Future<Output = u8> {  let i = 1;  let x: u8 = foo().await;  let y = &i;  *y + x}
复制代码


foo().await 调用这里,前面的 let i =1; 和 await 后面执行的可能不在一个线程。那 i 的状态就需要保留。

程序执行的时候会保存状态到 heap 和 stack,就是堆和栈。Stack 是每个函数创建的时候会在内存有一块区域,如果这个函数退出了,对象这个区域的内容就清除;Heap 是一块集中的区域,由 rust 的编译器管理。大致示意图如下:



上面的 async 代码,let i = 1,就属于某一个线程的 stack。然后到了 let x: u8 = foo().await 的时候,前一个线程已经结束了。那 i 的值就需要保持住,然后下一个线程在 foo() 运行完之后,可以获得借用 ilet y = %i

上面定义的两个都是 Future,通过 tokio::spawn 或者 #[tokio::main] 运行。在运行之前,编译器会把 async 后面的部分解开。

我们看一下 Tokio::spawn代码:

/// Spawns a future onto the thread poolpub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>whereF: Future + Send + 'static,F::Output: Send + 'static,{  let (task, handle) = task::joinable(future);  self.shared.schedule(task, false);  handle}
复制代码


joinable 是把 future 包装为一个 RawTask 和一个 包裹着 RawTask 的 JoinHandle,如下:

pub(crate) fn joinable<T, S>(task: T) -> (Notified<S>, JoinHandle<T::Output>)whereT: Future + Send + 'static,S: Schedule,{  let raw = RawTask::new::<_, S>(task);
let task = Task { raw, _p: PhantomData, };
let join = JoinHandle::new(raw);
(Notified(task), join)}
复制代码


task 就可以进入队列来调度,Spawn 的返回值是 JoinHandle 类型。JoinHandle 是对线程的协作管理。因为 Future 的执行就是嵌套循环的执行 poll,调用 poll,发现数据 Ready,就执行返回;发现数据 Pending,就再推送到队列等待下次 poll

如果在一个 task 做了一个阻塞或者长耗时的操作而没有通过 .await 提交到其他线程,那就会阻塞这个线程,后面的任务就无法执行

所以 tokio 设置了一个 Budget 的机制,Budget 标记了一个 task 可以执行的一个阈值,默认是 128

回到前面, spawn 的时候,返回了一个 JoinHandle 的实例,针对 spawn 的 future。在程序开始执行的时候,就会执行这个 实例的 poll()。JoinHandle 的 poll 函数是这样:

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {  let mut ret = Poll::Pending;
// Keep track of task budget let coop = ready!(crate::coop::poll_proceed(cx)); // read task output unsafe { raw.try_read_output(&mut ret as *mut _ as *mut (), cx.waker()); }
if ret.is_ready() { coop.made_progress(); }
ret}
复制代码


crate::coop::poll_proceed(cx) 的作用是递减 coop 的 budget ,就是上面说的默认 128 。如果递减 budget ,发现 budget 为 0 ,也就是这个 task 执行时间较长了。那就需要把这个任务重新标记投入到任务队列,释放这个线程

那这个时间间隔是多少呢?budget 定为 128 ,也就是执行 128 个 poll 循环之后,发现任务还没完成,就再打包投入到任务队列

异步架构对于长耗时阻塞的处理是一个难点。写代码的时候,很难完全预知哪些任务会阻塞。但是,一旦有一个任务耗时很久,就会导致整个 poll 阻塞



tokio 采用了一种没有 Global 的 poll 方式。每个 spawned task 处理自己的 future。长耗时任务不会占用主线程。

Tokio 官方还有一篇文章:Reducing tail latencies with automatic cooperative task yielding 详细说明了这个机制,叫做 Pre-task operation budget

each Tokio task has an operation budget. This budget is reset when the scheduler switches to the task

Each asynchronous operation (actions that users must .await on) decrements the task's budget. Once the task is out of budget, all Tokio resources will perpetually return "not ready" until the task yields back to the scheduler. At that point, the budget is reset, and future .awaits on Tokio resources will again function normally.

为什么设置为 128

The number 128 was picked mostly because it felt good and seemed to work well with the cases we were testing against (Noria and HTTP).

If the budget is zero, the task yields back to the scheduler

顺便说一下,Noria 也是一个潜力巨大的项目,是 tokio 开发成员 Jon Gjengset 的另外一个项目。采用预计算和缓存查询结果的方式的提供快速动态的数据流查询

Noria uses partially-stateful data-flow to reduce memory overhead, and supports dynamic, runtime data-flow and query change.

Tokio 这个机制的设计来源是 Ryan Dahl ,Node 和 Deno 的设计者

Ryan mentioned that, when he worked on node.js, they handled the problem by adding per resource limits. So, if a TCP socket was always ready, it would force a yield every so often.

在上面文章的最后,有一段,A note on blocking 详细描述在异步中阻塞的处理方式。

Although automatic cooperative task yielding improves performance in many cases, it cannot preempt tasks. Users of Tokio must still take care to avoid both CPU intensive work and blocking APIs. The spawn_blocking function can be used to "asyncify" these sorts of tasks by running them on a thread pool where blocking is allowed.

Tokio 提供了协作 task 的机制来增加性能。但是仍然需要避免 CPU 敏感和阻塞的任务。Tokio 提供了 spawn_blocking 函数在线程池“异步化”阻塞任务。也就是用异步的写法,运行的却是阻塞任务

以前的方式是通过检测阻塞任务并通过向任务增加线程来进行优化。这样的做法是通过增加一个监控线程,这个线程轮询其他线程,并且检查运行进度。如果监控线程发现其他线程在进行阻塞操作,就 spawn 一个新的线程来增加接收任务的能力。10 年前的 .Net 就是这样的机制,CarlDB 认为这样的机制有问题,所以 Go、Java、Erlang 就没有采用

一个问题就是很难定义阻塞任务的“进度”,还有就是检测容易收到突发的请求的影响。导致这样的机制效果不好。Go 采用的是抢占式调度策略,所以 Go 也不检测阻塞任务的进度

Tokio 提供了 spawn_blocking 函数来固定一个线程执行长耗时任务。因为 task 上下文的来回切换和调度也会耗费时间,如果 cpu 核数够用的话,可以固定一些线程执行长耗时任务。Tokio 提供了一种灵活内部机制,不用改变写代码的方式,而能够避免长耗时任务降低框架性能,也提供了手工的执行长耗时任务的函数,这样的做法比较人性化

前面插入了一段关于线程协作机制,让我们继续看 spawn task:

let (task, handle) = task::joinable(future);self.shared.schedule(task, false);handle
复制代码


spawn 把 future 包装成 task,然后 schedule ,返回一个 JoinHandle。在 poll 开始运行的时候,JoinHandle 会查看 Budget 资源,同时尝试获取任务输出,如果任务还没完成。就更新 Waker ,标记 task 还没完成。如果发现任务完成,就返回结果

在前面的调度过程中,coop::poll_processed 对 budget 资源不足的时候,需要讲 task yield 到等待的任务线程。如下:

pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {  CURRENT.with(|cell| {    let mut budget = cell.get();
if budget.decrement() { let restore = RestoreOnPending(Cell::new(cell.get())); cell.set(budget); Poll::Ready(restore) } else { cx.waker().wake_by_ref(); Poll::Pending } })}
复制代码


这里面有一个操作 cx.waker().wake_by_ref(),就是通过 waker 来调度任务。

调度是异步操作里面最核心的内容,Tokio 通过 Task、Cell、Waker、Scheduler、JoinHandler 等组件来实现独特的异步调度。让我们从 Task 开始,看看 Tokio 是怎么做的


Task 结构里面是一个 RawTask,RawTask 里面是一个 Header

Cell 是包含了 Task 所有组件的结构,包含 header、core 和 trailer。header 是 "Hot Task" 的状态数据,也就是提交但还没有执行完的数据(待确认);core 包含 task 对应的 future 以及 task 的 Scheduler(调度);trailer 是 “Cold data”,也就是已经执行完,等待获取的 Waker

回顾一下产生 Task 的过程:


通过 spawn 把带有 async 注解的代码块包裹成 Future, 然后通过 RawTask::new 和 Task 构造函数生成为一个 Task 的实例。然后通过 JoinHandle::new 生成 JoinHandle;另外 Task 经过 Notified 包裹之后,通过 scheduler,提交到 Header 类型的任务信息队列

这是在程序编译器生成的过程。在 tokio 程序运行起来后,就会获取 task,然后执行 task 的 poll,就是上面描述过的运行的流程。再具体一些,就是下图:


这里面有两个核心,一个是 Cell、一个是 VTable

VTable 是一个包含了 future 操作的函数结构体。poll 就对应 poll;dealloc 代表释放内存;try_read_output 代表尝试读取 future 的 output;drop_join_handle_slow 代表清除 JoinHandle;shutdown 代表停止 scheduler

RawTask 实现了对应上面的 5 种操作,例如:

impl RawTask{  // ...  pub(super) fn poll(self) {    let vtable = self.header().vtable;    unsafe { (vtable.poll)(self.ptr) }  }}
复制代码


所以,当程序运行起来,获取 task ,然后运行 task 的 poll 。 就回去 Cell 找到对应 task 的 Header ,然后获取 Header 对应 VTable

Harness 是一个包裹 Cell 的结构。每次在进行 poll 和其他操作的时候。都会通过 Header 生成一个 Harness 的实例,然后再执行对应操作。例如:

unsafe fn poll<T: Future, S: Schedule>(ptr: NonNull<Header>) {    let harness = Harness::<T, S>::from_raw(ptr);    harness.poll();}
复制代码

因为 Header 对应的 VTable 是虚函数,只有实例化之后才行执行

在 Harness 获取 Task 的 Header 之后,开始执行 poll 的时候,会生成一个 Waker

// harness.rs,poll()--> poll_inner()let waker_ref = waker_ref::<T, S>(self.header());let cx = Context::from_waker(&*waker_ref);poll_future(self.header(), &self.core().stage, snapshot, cx)
复制代码

然后带着 waker 进入 poll_future:

// harness.rs poll_future()
let res = guard.core.poll(cx);// prevent the guard from dropping the futuremem::forget(guard);
复制代码

在 core.poll 里面:

// core.rs poll()let future = match unsafe { &mut *ptr } {  Stage::Running(future) => future,  _ => unreachable!("unexpected stage"),};
// Safety: The caller ensures the future is pinned.let future = unsafe { Pin::new_unchecked(future) };
future.poll(&mut cx)
复制代码

上面两段可以看到,提交 Future 的时候用 Guard 包裹,并且调用 mem:forget ,在这段程序执行完成之后,future 还保留。然后在 core.poll 的时候,使用了 Pin 方法固定了 future 的内存位置,这个在上面描述过。关于 Pin 和自引用结构,可以看这两篇文章:Rust的Pin与UnpinPin 概念解析

如果不想让一个 T类型的实例移动,只需要把它分配在堆上,用智能指针(如 Box<T>)访问就行了, 因为移动 Box<T>只是 memcpy 了指针,原对象并没有被移动。不过由于 Box提供的 api 中可以获取到 &mut T,进而可以通过 mem::swap间接将 T 移出。 所以只需要提供一个弱化版的智能指针 api,防止泄露 &mut T就能够达到防止对象被移动。这就是实现 Pin api 的主要思路。

在上面的过程中,Harness 又引入了一个新的概念 waker。 在 Harness 创建了 Waker,然后带入到 future 的 poll 里面。Future 是 Rust stdlib,所以 Waker 也是 Rust 异步的一个重要概念

回到前面构建 Waker 的过程:

// harness.rs,poll()--> poll_inner()let waker_ref = waker_ref::<T, S>(self.header());let cx = Context::from_waker(&*waker_ref);poll_future(self.header(), &self.core().stage, snapshot, cx)
复制代码

Waker 通过 Header 构建,Header 的结构除了任务的状态、队列信息之外,就是一个 VTable ,在上面的图里有描述。

那 Waker 的结构是怎么样呢,Waker 包裹了一个 RawWaker,结构如下:

pub struct RawWaker {    /// A data pointer, which can be used to store arbitrary data as required    /// by the executor.    data: *const (),    /// Virtual function pointer table that customizes the behavior of this waker.    vtable: &'static RawWakerVTable,}
复制代码

Waker 包含一个 data 指针和一个 RawWakerVTable。RawWakerVTable 包含四个虚函数,clone、waker、wake_by_ref 和 drop

Waker 是对 Future 的一个数据和操作的包装,在 poll 的过程中被传递或引用。用来实现对 Future 的管理。例如在上面 coop::poll_process 函数,就通过 cx.waker().wake_by_ref(); 来获取 waker 并执行 waker 对应的函数实现协作调度:

pub fn wake_by_ref(&self) {  unsafe { (self.waker.vtable.wake_by_ref)(self.waker.data) }}
复制代码

在 Tokio 的异步调用过程中,有很多地方,甚至是贯穿全流程的都使用 Waker。所以 Waker 和 Poll 可以说是 Tokio 异步调用的两个核心概念,掌握了这两个概念,主干就找到了

Tokio 还有很多概念,一篇文章只能描述很少的部分。主要目的是自己做个记录,把对 Tokio 的一个浅显理解写出来。Tokio 应该会是异步框架的卓越代表,异步在解决了回掉地狱、阻塞处理以及基础生态库异步化改造之后,应该会在应用层带来极大的发展


发布于: 2021 年 02 月 12 日阅读数: 21
用户头像

lipi

关注

fn 2018.08.28 加入

上海,从事金融行业 喜欢 Rust、React 和 Nodejs ,对分布式计算和数据处理比较熟悉。喜欢和热爱开发的人交流。微信:lipengsh

评论

发布
暂无评论
走进 Tokio 的异步世界