写点什么

Rust 从 0 到 1- 并发 - 线程间消息传递

用户头像
关注
发布于: 10 小时前
Rust从0到1-并发-线程间消息传递

目前通过消息传递(message passing),即线程或 actors (参考 Actor 模型)之间通过互相发送包含数据的消息进行通信,来确保并发安全的方式越来越流行。Rust 中的这个编程思想来源于 Go 编程提出的:不要通过内存共享进行通讯,应当通过通讯来共享内存(Do not communicate by sharing memory; instead, share memory by communicating)。

Rust 标准库实现的用于并发间的消息传递的主要工具是 channel(和 Go 类似,可能是受 Go 语言的影响)。我们可以把 channel 想象为水流的通道,譬如小溪或河流。如果我们将橡皮鸭或小船之类的东西放入水流中,它们会顺流而下到达水道的尽头。

编程中的 channel  包含两部分:发送者(transmitter)和接收者(receiver)。发送者位于上游位置,也就是我们把橡皮鸭放入河流中的地方;橡皮鸭最终会漂流至下游位置,到达接收者那里。与之对应,我们的一部分代码会调用发送者的方法发送我们希望发送的数据,另一部分代码则会在接收端检查收到的消息。当发送者或接收者任何一方被丢弃时,通道就可以被看作关闭了。

下面让我们来看一个例子,它会在一个线程生成值,并通过 channel  发送,而另一个线程会接收值并打印出来。在例子中我们只是使用比较简单的值来演示功能,一旦你掌握了这项技术,就可以利用它来实现诸如聊天程序,或是一个利用多线程进行计算,然后将结果进行聚合的分布式计算程序。我们首先来创建一个 channel(暂时还无法通过编译):

use std::sync::mpsc;
fn main() { let (tx, rx) = mpsc::channel();}
复制代码

在上面的例子中我们使用 mpsc::channel 函数创建了一个新的通道。mpsc 代表多个生产者,单个消费者(multiple producer, single consumer)。Rust 标准库中提供的 channel  实现方式,简单来说,就是一个 channel  可以有多个产生数据的发送方,但只有一个消费这些数据的接收方。我们可以想象一下,就像多条小溪最终汇聚成大河:橡皮鸭通过任何一条小溪顺流而下最后都会到达下游的大河。我们先从演示单个生产者开始,当示例可以工作后,接下来我们会增加多个生产者。

mpsc::channel 函数返回一个元组类型(tuple)数据,其中第一个元素是发送端,而第二个元素是接收端。通常我们使用 tx 和 rx 分别作为发送者(transmitter)和 接收者(receiver)的缩写,因此,在这里我们使用它们做为变量的名字。我们在 let 语句中使用模式匹配来获取 mpsc::channel 返回的元组类型数据中的元素并赋给变量,非常方便(在后面章节我们会对 let 语句中使用模式匹配 进行讨论)。

下面我们将在一个新建线程中使用发送端(tx)发送一个字符串消息,这样就实现了和主线程之间的通讯,参考下面的例子:

use std::sync::mpsc;use std::thread;
fn main() { let (tx, rx) = mpsc::channel();
thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); });}
复制代码

在上面的例子中我们使用 thread::spawn 创建了一个新线程并使用 move 关键字将 tx 移动到闭包中(新建线程需要获得 tx 的所有权,以向通道发送消息)。我们通过调用 tx 的 send 方法向 channel 发送相应的值。send 方法会返回一个 Result<T, E> 类型。因此,如果接收端不存在,将没有发送目标,发送操作会返回错误。在上面的例子中我们简单的调用 unwrap 来产生 panic(在编写真实的程序时,我们需要根据实际场景对错误进行正确的处理)。

下面,我们在主线程中接收 channel 中的数据(就像接收对方发送的聊天信息):

use std::sync::mpsc;use std::thread;
fn main() { let (tx, rx) = mpsc::channel();
thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); });
let received = rx.recv().unwrap(); println!("Got: {}", received);}
复制代码

在接收端有两个常用的方法:recv 和 try_recv。在例子中我们使用的是 recv(receive 的缩写)。它会阻塞线程执行直到从 channel 中接收到一个值,当收到发送端发送的值后,recv 会返回 Result<T, E> 类型的结果(当发送端关闭时,recv 会返回一个错误,代表不会再收到新的值了)。相反,try_recv 不会阻塞,它会立刻返回 Result<T, E> 类型的结果:如果是 Ok ,则包含发送端发送的值;如果是 Err ,一般情况下代表此时没有任何消息。这在线程还有其他工作时很有用:譬如,我们可以在循环中反复调用 try_recv,在消息到达时进行处理,否则就先处理一会其他工作,然后再进行下一次检查。

出于简单考虑,我们在例子使用的是 recv。因为,主线程中除了等待消息到来之后把它打印出来,没有任何其它工作,所以适用于阻塞主线程。尝试运行例子中的代码,我们得到类似下面的结果:

Got: hi
复制代码

Channel 和所有权

所有权规则在消息传递中发挥了极为重要的作用,因为其帮助我们编写并发安全的代码。在编写 Rust 程序中始终考虑所有权将有利于防止并发编程中的错误。下面让我们通过代码切身体验下 channel 和所有权如何一起帮助我们避免并发问题的:我们将在新建线程中尝试使用通过 channel 已经发送出去的数据。参考下面的例子:

use std::sync::mpsc;use std::thread;
fn main() { let (tx, rx) = mpsc::channel();
thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); println!("val is {}", val); });
let received = rx.recv().unwrap(); println!("Got: {}", received);}
复制代码

在上面的例子中,我们尝试在通过 tx.send 发送 val 到 channel 之后,调用 println! 打印它。允许这么做会是一个坏主意:一旦把 val  发送到另一个线程后,在另一个线程里可能会对其修改或者丢弃,而这些操作可能会发生在,在我们之前的线程中,尝试使用它之前;而这将会导致由于不一致或不存在的数据产生的错误或预期之外的结果。因此,在尝试编译时,Rust 会给出类似下面的错误:

$ cargo run   Compiling message-passing v0.1.0 (file:///projects/message-passing)error[E0382]: borrow of moved value: `val`  --> src/main.rs:10:31   |8  |         let val = String::from("hi");   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait9  |         tx.send(val).unwrap();   |                 --- value moved here10 |         println!("val is {}", val);   |                               ^^^ value borrowed here after move
error: aborting due to previous error
For more information about this error, try `rustc --explain E0382`.error: could not compile `message-passing`
To learn more, run the command again with --verbose.
复制代码

我们的例子造成了一个编译时错误。send 函数会获取其参数的所有权,并且当接收者收到这个数据后,所有权会“移动”给接收者。这么做可以防止我们前面所说的可能产生的问题,而这些都是由所有权系统负责保障。

发送多次消息

在前面的例子中我们已经演示了使用 channel 发送一条消息,但是它并没有清楚的展示出两个独立的线程利用 channel 相互通讯。下面我们做一些修改,来更明确的证明它们是独立并发执行的:在新建的线程中发送端会发送多个消息并在每个消息之后暂停一秒钟。参考下面的例子:

use std::sync::mpsc;use std::thread;use std::time::Duration;
fn main() { let (tx, rx) = mpsc::channel();
thread::spawn(move || { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ];
for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } });
for received in rx { println!("Got: {}", received); }}
复制代码

在上面修改后的例子中,我们在新建的线程中遍历一个 vector,将其中的字符串元素发送到主线程,并在每次发送后暂停一秒。在主线程中,我们利用 rx 的迭代器特性,将每一个接收到字符串打印出来。当发送端关闭时,迭代器也就会结束。尝试运行修改后的例子,我们将看到类似下面的输出结果(每一行之后会暂停一秒):

Got: hiGot: fromGot: theGot: thread
复制代码

因为在主线程中我们并没有任何暂停或等待的代码,所以说主线程是在等待新建线程中的发送端向 channel 中发送数据。

创建多个生产者

之前我们介绍过 mpsc 是 “multiple producer, single consumer” 的缩写,即多个生产者,单个消费者。这也是 Rust 标准库提供的实现,因此我们可以扩展前面例子中的代码,在多个线程中向同一接收者发送消息。那么应该如何实现呢?参考下面的例子:

// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone(); thread::spawn(move || { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ];
for val in vals { tx1.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } });
thread::spawn(move || { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ];
for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } });
for received in rx { println!("Got: {}", received); }
// --snip--
复制代码

在上面的例子中,我们调用了 tx 的 clone 方法,克隆了一个发送端 tx1,并在新建的两个线程中分别使用 tx 和 tx1 向 channel 中发送消息。尝试运行例子,我们会看到类似下面的输出结果:

Got: hiGot: moreGot: fromGot: messagesGot: forGot: theGot: threadGot: you
复制代码

这些值可能会以不同的顺序打印出来,这依赖于运行的系统。这让并发既有趣又困难,而如果再加上 thread::sleep ,并在不同的线程中赋予不同的参数,那么运行结果将更加不确定,而且每次都可能会产生不同的输出结果。

发布于: 10 小时前阅读数: 2
用户头像

关注

公众号"山 顽石"欢迎大家关注;) 2021.03.23 加入

IT老兵,关注Rust、Java、前端、架构、大数据、AI、算法等;平时喜欢美食、旅游、宠物、健身、篮球、乒乓球等。希望能和大家交流分享。

评论

发布
暂无评论
Rust从0到1-并发-线程间消息传递