共享状态的并发
前面讲了如何在 rust 中使用多线程和如何在线程之间通信,本节我们介绍多线程之间如何共享状态,一般情况下当多个线程同时修改同一个数据的时候会造成数据竞争,从而导致不确定的结果,rust 中提供了互斥体才解决这个问题
使用互斥体
互斥体 Mutex<T>类型,互斥体(mutex)是英文 mutual exclusion 的缩写,也就是说,一个互斥体在任意时刻只允许一个线程访问数据:
use std::sync::Mutex;
let m = Mutex::new(5);
{
// 获取锁,并阻塞当前线程直到取到锁为止
let mut num = m.lock().unwrap();
// 取到的num是一个MutexGuard<T>类型的智能指针,是指向内部数据的可变引用
*num = 6;
} // 在离开所在的作用域时会自动解锁
println!("{:?}", m);
复制代码
在多个线程间共享 Mutex<T>
假如我们启动 10 个线程,并在每个线程中分别为共享的计数器的值加 1。正常执行完成的话,这最终会让计数器的值从 0 累计到 10:
use std::thread;
use std::sync::Mutex;
// 用于计数的互斥体
let counter = Mutex::new(0);
// 用于存储线程
let mut handles = vec![];
for _ in 0..10 {
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap(); // 报错,所有权问题,当循环到第二次的时候,counter已经被移动
*num += 1;
});
handles.push(handle);
}
for handle in handles {
// 等待所有线程都执行完毕
handle.join().unwrap();
}
println!("counter: {}", *counter.lock().unwrap())
复制代码
上边当我们尝试将 counter 移动到线程中,会无法编译通过,因为第一次循环创建第一个子线程的时候,counter 的所有权已经被移动进去,等到第二次循环创建第二个子线程,counter 已经没有了所有权。
尝试使用 Rc<T>来共享 counter
还记得我们前面学过使用 Rc<T>来共享数据,我们可以尝试一下再多线程的场景下是否可用:
use std::rc::Rc;
// 将Mutex再包裹一层Rc
let counter = Rc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Rc::clone(&counter);
let handle = thread::spawn(move || { // 报错,Rc<Mutex<i32>>类型无法安全地在线程中传递
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("counter: {}", *counter.lock().unwrap())
复制代码
当 Rc<T>管理引用计数时,它会在每次调用 clone 的过程中增加引用计数,并在克隆出的实例被丢弃时减少引用计数,但它并没有使用任何并发原语来保证修改计数的过程不会被另一个线程所打断。
使用原子引用计数 Arc<T>
rust 还提供了 Arc<T>类型,来代替 Rc<T>类型来解决上面问题,它既拥有类似于 Rc<T>的行为,又保证了自己可以被安全地用于并发场景:
use std::sync::Arc;
// 将Rc替换为Arc
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("counter: {}", *counter.lock().unwrap());
// counter: 10
复制代码
改用 Arc<T>后上面成功计算出了 counter 的值。
RefCell<T>、Rc<T>、Mutex<T>、Arc<T>之间的相似性
死锁是什么
当某个操作需要同时锁住两个资源,而两个线程分别持有其中一个锁并相互请求另外一个锁时,这两个线程就会陷入无穷尽的等待过程:
use std::thread;
use std::time::Duration;
use std::sync::{Arc,Mutex};
// 定义两个可以用在多线程中共享的互斥类型a,b
let a = Arc::new(Mutex::new("a"));
let b = Arc::new(Mutex::new("b"));
// 在handle1中使用a2,b2
let a2 = Arc::clone(&a);
let b2 = Arc::clone(&b);
let handle1 = thread::spawn(move || {
// 获取a2的锁,取名a3
let a3 = a2.lock().unwrap();
println!("a3:{}", a3); // a3:a
// 等待3秒
thread::sleep(Duration::from_secs(3));
// 3秒之后获取b2的锁,取名b3
let b3 = b2.lock().unwrap();
println!("b3:{}", b3); // 不会输出
});
// 在handle1中使用a4,b4
let a4 = Arc::clone(&a);
let b4 = Arc::clone(&b);
let handle2 = thread::spawn(move || {
// 获取b4的锁,取名b5
let b5 = b4.lock().unwrap();
println!("b5:{}", b5); // b5:b
// 等待3秒
thread::sleep(Duration::from_secs(3));
// 3秒之后获取a4的锁,取名a5
let a5 = a4.lock().unwrap();
println!("a5:{}", a5); // 不会输出
});
handle1.join().unwrap();
handle2.join().unwrap();
println!("ok");
// a3:a
// b5:b
复制代码
上面程序陷入了死锁状态,程序永远不会停止,也不会输出 b3、b5,原因在于 handle1 获取 a2 的锁之后,并没有对 a3 进行解锁,此时 handle2 获取 b4 的锁之后,也没有对 b5 进行解锁,过了三秒,handle1 又去获取 b2 的锁,handle2 也去获取 a4 的锁(注意这里的所有的 a(n)变量都是 a 的引用,b(n)也都是 b 的引用),handle1 和 handle2 都同时拿着对方需要的锁,导致程序陷入“死锁”状态。
评论