线程通信
前面我们讲到了如何使用多线程来处理耗时逻辑,但是实际工作中需求不会那么简单,通常用时多线程时,线程不是简单的独立运行的,而是伴随着多线程之间的消息通信
使用消息通道
rust 提供了 mpsc 模块允许我们在线程间传递消息(mpsc 是英文“multiple producer, single consumer”(多个生产者,单个消费者)的缩写):
use std::thread;
use std::sync::mpsc; // 引入mpsc模块
// 创建消息通道,tx是生产者,rx是消费者
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
// 在新线程中向主线程发送消息,send返回Result<T>类型,
// 这里简单使用unwrap,遇到错误时将抛出panic!
tx.send(String::from("Hi")).unwrap();
});
// 在主线程中接收消息,recv会阻塞当前线程,直到收到消息,
// 如果在调用recv之前已经收到消息,那么调用recv会立即取到结果
// recv同样返回返回Result<T>类型
let received = rx.recv().unwrap();
println!("收到消息:{}", received); // 收到消息:Hi
复制代码
使用 try_recv 非阻塞监听消息
上面的 recv 方法会阻塞主线程,直到主线程收到消息为止,但是主线程还在进行其他工作的时候 recv 方法就不是很方便,这个时候我们需要用到一个非阻塞的方法 try_recv 来接收消息:
use std::time::Duration;
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
// 5秒之后发送消息给主线程
thread::sleep(Duration::from_secs(5));
tx.send(String::from("Hi")).unwrap();
});
for s in 1..10 {
println!("每秒轮询消息,第{}次", s);
// 每秒查看是否收到消息
thread::sleep(Duration::from_secs(1));
if let Result::Ok(received) = rx.try_recv() {
println!("收到消息:{}", received);
break // 收到消息后终止轮询
}
}
// 每秒轮询消息,第1次
// 每秒轮询消息,第2次
// 每秒轮询消息,第3次
// 每秒轮询消息,第4次
// 每秒轮询消息,第5次
// 收到消息:Hi
复制代码
通道和所有权转移
由于 rust 所有权机制,发送的数据将会转移所有权,试想一下如果不转移的话,两个线程可能同时修改数据,就会造成数据竞争的问题:
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let s = String::from("Hi");
tx.send(s).unwrap();
println!("{}", s); // 报错,s已经被转移所有权到主线程
});
let received = rx.recv().unwrap();
println!("{}", received);
复制代码
发送多个值并观察接收者的等待过程
实际上消息接收者(也就是 rx)实现了 Iterator trait,所以我们可以使用循环来将 rx 进行迭代,迭代次数取决于收到了多少个消息:
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let v = vec![
"Hi~",
"Hello~",
"Welcome~",
"Bye~",
];
for item in v {
tx.send(item).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
// 将rx视作迭代器,而不再显式地调用recv函数。
// 迭代中的代码会打印出每个接收到的值,
// 并在tx被销毁时(也就是发送完成时)退出循环。
for msg in rx {
println!("{}", msg)
}
println!("done")
// Hi~
// Hello~
// Welcome~
// Bye~
// done
复制代码
多个生产者
上面我们的代码值一直只有一个生产者,文章开头介绍了 mpsc 支持多个生产者,下面我们看下这种多个生产者的情况:
let (tx, rx) = mpsc::channel();
// 使用Sender::clone方法克隆一个生产者
let tx2 = mpsc::Sender::clone(&tx);
// 线程1
thread::spawn(move || {
let v = vec![
"Hi~",
"Hello~",
"Welcome~",
"Bye~",
];
for item in v {
tx.send(String::from("from spawn:") + &item).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
// 线程2
thread::spawn(move || {
let v = vec![
"Hi~",
"Hello~",
"Welcome~",
"Bye~",
];
for item in v {
tx2.send(String::from("from spawn2:") + &item).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for msg in rx {
println!("{}", msg)
}
// from spawn:Hi~
// from spawn2:Hi~
// from spawn2:Hello~
// from spawn:Hello~
// from spawn:Welcome~
// from spawn2:Welcome~
// from spawn2:Bye~
// from spawn:Bye~
复制代码
通过 Sender::clone 方法实现了两个线程同时向主线程发送消息的功能。
评论