写点什么

5 分钟速读之 Rust 权威指南(三十一)线程通信

用户头像
码生笔谈
关注
发布于: 1 小时前
5分钟速读之Rust权威指南(三十一)线程通信

线程通信

前面我们讲到了如何使用多线程来处理耗时逻辑,但是实际工作中需求不会那么简单,通常用时多线程时,线程不是简单的独立运行的,而是伴随着多线程之间的消息通信

使用消息通道

rust 提供了 mpsc 模块允许我们在线程间传递消息(mpsc 是英文“multiple producer, single consumer”(多个生产者,单个消费者)的缩写):


use std::thread;use std::sync::mpsc; // 引入mpsc模块
// 创建消息通道,tx是生产者,rx是消费者let (tx, rx) = mpsc::channel();
thread::spawn(move || { // 在新线程中向主线程发送消息,send返回Result<T>类型, // 这里简单使用unwrap,遇到错误时将抛出panic! tx.send(String::from("Hi")).unwrap();});
// 在主线程中接收消息,recv会阻塞当前线程,直到收到消息,// 如果在调用recv之前已经收到消息,那么调用recv会立即取到结果// recv同样返回返回Result<T>类型let received = rx.recv().unwrap();println!("收到消息:{}", received); // 收到消息:Hi
复制代码

使用 try_recv 非阻塞监听消息

上面的 recv 方法会阻塞主线程,直到主线程收到消息为止,但是主线程还在进行其他工作的时候 recv 方法就不是很方便,这个时候我们需要用到一个非阻塞的方法 try_recv 来接收消息:


use std::time::Duration;let (tx, rx) = mpsc::channel();thread::spawn(move || {  // 5秒之后发送消息给主线程  thread::sleep(Duration::from_secs(5));  tx.send(String::from("Hi")).unwrap();});
for s in 1..10 { println!("每秒轮询消息,第{}次", s); // 每秒查看是否收到消息 thread::sleep(Duration::from_secs(1)); if let Result::Ok(received) = rx.try_recv() { println!("收到消息:{}", received); break // 收到消息后终止轮询 }}// 每秒轮询消息,第1次// 每秒轮询消息,第2次// 每秒轮询消息,第3次// 每秒轮询消息,第4次// 每秒轮询消息,第5次// 收到消息:Hi
复制代码

通道和所有权转移

由于 rust 所有权机制,发送的数据将会转移所有权,试想一下如果不转移的话,两个线程可能同时修改数据,就会造成数据竞争的问题:


let (tx, rx) = mpsc::channel();thread::spawn(move || {  let s = String::from("Hi");  tx.send(s).unwrap();  println!("{}", s); // 报错,s已经被转移所有权到主线程});let received = rx.recv().unwrap();println!("{}", received);
复制代码

发送多个值并观察接收者的等待过程

实际上消息接收者(也就是 rx)实现了 Iterator trait,所以我们可以使用循环来将 rx 进行迭代,迭代次数取决于收到了多少个消息:


let (tx, rx) = mpsc::channel();thread::spawn(move || {  let v = vec![    "Hi~",    "Hello~",    "Welcome~",    "Bye~",  ];  for item in v {    tx.send(item).unwrap();    thread::sleep(Duration::from_secs(1));  }});
// 将rx视作迭代器,而不再显式地调用recv函数。// 迭代中的代码会打印出每个接收到的值,// 并在tx被销毁时(也就是发送完成时)退出循环。for msg in rx { println!("{}", msg)}println!("done")// Hi~// Hello~// Welcome~// Bye~// done
复制代码

多个生产者

上面我们的代码值一直只有一个生产者,文章开头介绍了 mpsc 支持多个生产者,下面我们看下这种多个生产者的情况:


let (tx, rx) = mpsc::channel();
// 使用Sender::clone方法克隆一个生产者let tx2 = mpsc::Sender::clone(&tx);
// 线程1thread::spawn(move || { let v = vec![ "Hi~", "Hello~", "Welcome~", "Bye~", ]; for item in v { tx.send(String::from("from spawn:") + &item).unwrap(); thread::sleep(Duration::from_secs(1)); }});
// 线程2thread::spawn(move || { let v = vec![ "Hi~", "Hello~", "Welcome~", "Bye~", ]; for item in v { tx2.send(String::from("from spawn2:") + &item).unwrap(); thread::sleep(Duration::from_secs(1)); }});
for msg in rx { println!("{}", msg)}// from spawn:Hi~// from spawn2:Hi~// from spawn2:Hello~// from spawn:Hello~// from spawn:Welcome~// from spawn2:Welcome~// from spawn2:Bye~// from spawn:Bye~
复制代码


通过 Sender::clone 方法实现了两个线程同时向主线程发送消息的功能。

发布于: 1 小时前阅读数: 2
用户头像

码生笔谈

关注

欢迎关注「码生笔谈」公众号 2018.09.09 加入

前端、Rust干货分享,一起成为更好的Javascripter & Rustacean

评论

发布
暂无评论
5分钟速读之Rust权威指南(三十一)线程通信