写点什么

Rust 从 0 到 1- 并发 - 状态共享

用户头像
关注
发布于: 2 小时前
Rust从0到1-并发-状态共享

虽然利用消息传递处理并发是一种很好的方式,但并不是唯一的。让我们重新思考一下 Go 语言文档中提出的:“不要通过内存共享进行通讯”(do not communicate by sharing memory.)。那么,通过内存共享通讯会是什么样子的?是什么原因不这样做,而是反过来“通过通讯来共享内存”(share memory by communicating)?

从某方面来说,任何编程语言中的 channel 都类似于单一所有权,因为将一个数据通过 channel 发送出去以后,将无法再使用“这个”数据。而内存共享类似于多所有权:多个线程可以同时访问内存中的同一个位置。就像我们前面介绍过的,智能指针使得多所有权成为可能,这会增加额外的复杂性,因为我们需管理这些不同的所有者。而 Rust 的类型系统和所有权规则在正确管理这些所有权方面给予我们极大的帮助。下面,让我们以 mutexes(互斥器、互斥锁或互斥量),用于内存共享中常见的并发原语(primitives,在学习操作系统的时候大家可能会比较常看到这个词,我理解为底层的一些概念和操作,在此之上我们可以封装更高级的行为),为例子来看看。

使用 Mutexes 控制并发

Mutex 的全称是 mutual exclusion,在任意时刻,它只允许一个线程对数据进行访问。为了访问 mutex 中的数据,线程首先需要尝试获得互斥锁(mutex's lock)。锁是 mutex 的一部分,它用于记录当前谁有享有数据的访问权(排他的)。因此,mutex 是通过锁机制来保护数据。

众所周知,mutex 很难用,因为我们必须需要记住以下两条规则:

  • 在使用数据之前必须先尝试获取锁。

  • 在使用完数据之后,必须要释放锁,这样其它线程才能获取锁。

以现实中的例子类比,可以想象以下场景:在某次小组会议上,只有一个麦克风。而如果要发言,必须先要求或示意需要使用麦克风。在获得麦克风后,就可畅所欲言,发言结束后需要将麦克风交给下一位要发言的成员。如果某位成员在结束发言后忘记交还麦克风,那么其他人都将无法发言。可见,如果对麦克风的管理出现问题,会议将无法按计划进行下去!(这个大家可能在一些线上会议中会遇到过类似场景)

正确的管理 mutexes 可能会非常的复杂,这也是为什么许多人倾向于使用 channel。不然怎样,在 Rust 中,非常庆幸有类型系统和所有权规则,至少我们不会在加锁和解锁上出错。

Mutex<T>的 API

让我们先从单线程开始展示如何使用 Mutex,参考下面面的例子:

use std::sync::Mutex;
fn main() { let m = Mutex::new(5);
{ let mut num = m.lock().unwrap(); *num = 6; }
println!("m = {:?}", m);}
复制代码

在上面的例子中,我们通过关联函数 new 创建了一个 Mutex<T> 类型变量;然后为了访问 mutex 中的数据,我们调用了其 lock 方法来尝试获取锁,lock 方法会阻塞当前线程,直到我们获得锁。如果有另一个拥有锁的线程发生了 panic,lock 方法的调用会失败。这种场景下,没有人可以获得锁,因此,我们使用 unwrap 在遇到这种情况时产生 panic。

当我们获得锁以后,就可以将返回值 num 看作是 m 中数据的可变引用(前面介绍过强制隐式转换)。类型系统确保我们必须先获得锁才能使用 m 中的值:Mutex<i32> 无法像 i32 一样直接使用,所以必须先获取锁才能使用其中的 i32 值。由于类型机制的这种限制,我们是不可能忘记要先获取锁的,这解决了第一条规则,即先获得锁。

你可能会觉得 Mutex<T> 是一个智能指针,或者更准确的说,lock 方法 返回了 MutexGuard 类型的智能指针(包含在 LockResut 里)。智能指针 MutexGuard 实现了 Deref ,并指向指向我们传入的数据;同时其也实现了 Drop ,在离开作用域时自动释放锁。因为锁的释放是自动的,我们就不需要时刻记着要释放锁,从而避免了阻塞其它线程获取 mutex 的锁。这解决了第二条规则,即使用完数据之后,必须要释放锁。

运行上面的例子,我们可以看到将打印出结果 m = 6。

多个线程之间共享 Mutex<T>

下面我们将尝试通过 Mutex<T> 在多个线程间共享数据。我们将启动十个线程,并在每个线程中对计数加 1,那么计数应该是 从最开始的 0 变为 10。参考下面的例子:

use std::sync::Mutex;use std::thread;
fn main() { let counter = Mutex::new(0); let mut handles = vec![];
for _ in 0..10 { let handle = thread::spawn(move || { let mut num = counter.lock().unwrap();
*num += 1; }); handles.push(handle); }
for handle in handles { handle.join().unwrap(); }
println!("Result: {}", *counter.lock().unwrap());}
复制代码

在上面的例子中,我们创建了一个 Mutex<i32> 类型的 counter 变量用来计数。接着我们通过循环创建了 10 个线程,并且它们使用了相同的闭包:调用 counter 的 lock 方法获取锁,并将其加 1。最后,在主线程中,我们调用每个线程的 join  方法来确保它们都执行完成。看上去好像没什么问题,但是目前是无法编译通过的,我们会得到类似下面的错误:

$ cargo run   Compiling shared-state v0.1.0 (file:///projects/shared-state)error[E0382]: use of moved value: `counter`  --> src/main.rs:9:36   |5  |     let counter = Mutex::new(0);   |         ------- move occurs because `counter` has type `Mutex<i32>`, which does not implement the `Copy` trait...9  |         let handle = thread::spawn(move || {   |                                    ^^^^^^^ value moved into closure here, in previous iteration of loop10 |             let mut num = counter.lock().unwrap();   |                           ------- use occurs due to use in closure
error: aborting due to previous error
For more information about this error, try `rustc --explain E0382`.error: could not compile `shared-state`
To learn more, run the command again with --verbose.
复制代码

编译器提示我们 counter 在上一次循环中已经被移动到了一个线程里,因此,无法在移动到当前的线程里。也就是说,counter 的需要可以支持多所有权。下面让我们尝试通过前面介绍的获得多所有权的方法来修复这个问题。

让多个线程获得所有权

在前面章节我们介绍过使用智能指针 Rc<T> 来让数据可以拥有多个所有者。让我们尝试使用这种方法对例子进行修改,看看会发生什么。参考下面的例子:

use std::rc::Rc;use std::sync::Mutex;use std::thread;
fn main() { let counter = Rc::new(Mutex::new(0)); let mut handles = vec![];
for _ in 0..10 { let counter = Rc::clone(&counter); let handle = thread::spawn(move || { let mut num = counter.lock().unwrap();
*num += 1; }); handles.push(handle); }
for handle in handles { handle.join().unwrap(); }
println!("Result: {}", *counter.lock().unwrap());}
复制代码

再次编译,还是无法通过,我们会得到类似下面的错误(编译器真是我们的好老师):

$ cargo run   Compiling shared-state v0.1.0 (file:///projects/shared-state)error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely   --> src/main.rs:11:22    |11  |           let handle = thread::spawn(move || {    |  ______________________^^^^^^^^^^^^^_-    | |                      |    | |                      `Rc<Mutex<i32>>` cannot be sent between threads safely12  | |             let mut num = counter.lock().unwrap();13  | |14  | |             *num += 1;15  | |         });    | |_________- within this `[closure@src/main.rs:11:36: 15:10]`    |    = help: within `[closure@src/main.rs:11:36: 15:10]`, the trait `Send` is not implemented for `Rc<Mutex<i32>>`    = note: required because it appears within the type `[closure@src/main.rs:11:36: 15:10]`
error: aborting due to previous error
For more information about this error, try `rustc --explain E0277`.error: could not compile `shared-state`
To learn more, run the command again with --verbose.
复制代码

编译器告诉我们 `std::rc::Rc<std::sync::Mutex<i32>>` cannot be sent between threads safely(想起来没有,在前面介绍 Rc<T> 的时候,我们说过它只适用于单线程场景)。编译器同时也提示了我们原因 the trait bound `Send` is not satisfied。在后面我们会介绍 Send:这是保障类型是并发安全(或者说线程安全)的 trait 之一。

现在我们清楚的知道 Rc<T> 是线程不安全的。它用于管理和记录应用的计数,并没有使用任何并发“原语”来确保操作不受其它线程的影响。因此,在并发场景下计数可能会出错并导致诡异的 bug,比如造成内存泄漏,或在数据被过早的丢弃。我们所需要的是一个类似 Rc<T> 的并发安全的类型。下面让我们看看 Rust 给我们提供了什么吧。

使用 Arc<T> 进行原子引用计数

Arc<T> 正是一个类似 Rc<T> 并且并发安全的类型。其中 a 意思是原子的(atomic),意味着它是一个原子引用计数(atomically reference counted)类型。和 mutex 类似,atomic 是另一个并发原语(可以查阅标准库文档中 std::sync::atomic 部分进一步了解)。在这里我们现在只需要知道它可以安全的在线程间共享。

我们可能不禁会想为什么不是所有的基本类型都是原子的?为什么标准库中的类型不都使用 Arc<T> 实现?这是因为为了保证线程安全会损失一定的性能,我们希望只在必要时才使用。如果只是在单线程中运行,保证线程安全完全没有必要,我们可以获得更“快”的代码。

Arc<T> 和 Rc<T> 的 API 基本相同,因此我们简单的进行替换就可以完成修改。参考下面的例子:

use std::sync::{Arc, Mutex};use std::thread;
fn main() { let counter = Arc::new(Mutex::new(0)); let mut handles = vec![];
for _ in 0..10 { let counter = Arc::clone(&counter); let handle = thread::spawn(move || { let mut num = counter.lock().unwrap();
*num += 1; }); handles.push(handle); }
for handle in handles { handle.join().unwrap(); }
println!("Result: {}", *counter.lock().unwrap());}
复制代码

现在运行上面的例子,我们会得到类似下面的结果:

Result: 10
复制代码

我们做到了!这个例子本身可能比较简单,不过通过它我们一步步了解了关于 Mutex<T> 和线程安全的内容!利用这些方法我们可以进行比计数更为复杂的操作,譬如,将某个计算过程拆分成独立的部分,分散到多个线程中进行,然后通过 Mutex<T> 将计算结果合并。

RefCell<T>/Rc<T> 与 Mutex<T>/Arc<T>

我们可能已经注意到了,在前面的例子中,counter 是不可变的,不过我们可以获取其内部值的可变引用,也就是说 Mutex<T> 提供了内部可变性,这和 RefCell<T> 类型类似。就像前面我们使用 RefCell<T> 改变 Rc<T> 中的内容一样,我们可以使用 Mutex<T> 来改变 Arc<T> 中的内容。

另一个需要我们注意的是 Rust 并不能避免在使用 Mutex<T> 时所有的逻辑错误。就像前面介绍的使用 Rc<T> 造成循环引用的问题:两个 Rc<T> 类型的变量相互引用,造成内存泄露。同样,Mutex<T> 也可能造成死锁(deadlock)。譬如,当一个操作需要锁住两个资源,而两个线程各自持有其中一个资源的锁,这会造成它们陷入永远的等待(都在等待对方释放锁)。如果对这方面比较感兴趣,可以尝试编写一个产生死锁的 Rust 程序,接着尝试使用其它语言中规避死锁的策略来解决。标准库中 Mutex<T> 和 MutexGuard 的 API 文档也提及了这方面的内容,感兴趣的话可以进一步阅读。

发布于: 2 小时前阅读数: 2
用户头像

关注

公众号"山 顽石"欢迎大家关注;) 2021.03.23 加入

IT老兵,关注Rust、Java、前端、架构、大数据、AI、算法等;平时喜欢美食、旅游、宠物、健身、篮球、乒乓球等。希望能和大家交流分享。

评论

发布
暂无评论
Rust从0到1-并发-状态共享