写点什么

Rust 中的 Tokio 线程同步机制

  • 2025-09-18
    北京
  • 本文字数:3104 字

    阅读完需:约 10 分钟

本文分享自天翼云开发者社区《Rust 中的 Tokio 线程同步机制》,作者:l****n

Rust 中的 Tokio 线程同步机制

在并发编程中,线程同步是一个重要的概念,用于确保多个线程在访问共享资源时能够正确地协调。Tokio 是一个强大的异步运行时库,为 Rust 提供了多种线程同步机制。以下是一些常见的同步机制:

  1. Mutex

  2. RwLock

  3. Barrier

  4. Semaphore

  5. Notify

  6. oneshot 和 mpsc 通道

  7. watch 通道

1. Mutex

Mutex(互斥锁)是最常见的同步原语之一,用于保护共享数据。它确保同一时间只有一个线程能够访问数据,从而避免竞争条件。

use tokio::sync::Mutex;use std::sync::Arc;#[tokio::main]async fn main() {    let data = Arc::new(Mutex::new(0));    let mut handles = vec![];    for _ in 0..10 {        let data = data.clone();        let handle = tokio::spawn(async move {            let mut lock = data.lock().await;            *lock += 1;        });        handles.push(handle);    }    for handle in handles {        handle.await.unwrap();    }    println!("Result: {}", *data.lock().await);}
复制代码

2. RwLock

RwLock(读写锁)允许多线程同时读取数据,但只允许一个线程写入数据。它比 Mutex 更加灵活,因为在读取多于写入的场景下,它能提高性能。功能上,他是读写互斥、写写互斥、读读兼容。

use tokio::sync::RwLock;use std::sync::Arc;#[tokio::main]async fn main() {    let data = Arc::new(RwLock::new(0));    let read_data = data.clone();    let read_handle = tokio::spawn(async move {        let lock = read_data.read().await;        println!("Read: {}", *lock);    });    let write_data = data.clone();    let write_handle = tokio::spawn(async move {        let mut lock = write_data.write().await;        *lock += 1;        println!("Write: {}", *lock);    });    read_handle.await.unwrap();    write_handle.await.unwrap();}
复制代码

3. Barrier

Barrier 是一种同步机制,允许多个线程在某个点上进行同步。当线程到达屏障时,它们会等待直到所有线程都到达,然后一起继续执行。

use tokio::sync::Barrier;use std::sync::Arc;#[tokio::main]async fn main() {    let barrier = Arc::new(Barrier::new(3));    let mut handles = vec![];    for i in 0..3 {        let barrier = barrier.clone();        let handle = tokio::spawn(async move {            println!("Before wait: {}", i);            barrier.wait().await;            println!("After wait: {}", i);        });        handles.push(handle);    }    for handle in handles {        handle.await.unwrap();    }}
复制代码

4. Semaphore

Semaphore(信号量)是一种用于控制对资源访问的同步原语。它允许多个线程访问资源,但有一个最大并发数限制。

#[tokio::test]async fn test_sem() {    let semaphore = Arc::new(Semaphore::new(3));    let mut handles = vec![];    for i in 0..5 {        let semaphore = semaphore.clone();        let handle = tokio::spawn(async move {            let permit = semaphore.acquire().await.unwrap();            let now = Local::now();            println!("Got permit: {} at {:?}", i, now);            println!(                "Semaphore available permits before sleep: {}",                semaphore.available_permits()            );            sleep(Duration::from_secs(5)).await;            drop(permit);            println!(                "Semaphore available permits after sleep: {}",                semaphore.available_permits()            );        });        handles.push(handle);    }    for handle in handles {        handle.await.unwrap();    }}
复制代码

最终的结果如下

Got permit: 0 at 2024-08-08T21:03:04.374666+08:00Semaphore available permits before sleep: 2Got permit: 1 at 2024-08-08T21:03:04.375527800+08:00Semaphore available permits before sleep: 1Got permit: 2 at 2024-08-08T21:03:04.375563+08:00Semaphore available permits before sleep: 0Semaphore available permits after sleep: 0Semaphore available permits after sleep: 0Semaphore available permits after sleep: 1Got permit: 3 at 2024-08-08T21:03:09.376722800+08:00Semaphore available permits before sleep: 1Got permit: 4 at 2024-08-08T21:03:09.376779200+08:00Semaphore available permits before sleep: 1Semaphore available permits after sleep: 2Semaphore available permits after sleep: 3
复制代码

5. Notify

Notify 是一种用于线程间通知的简单机制。它允许一个线程通知其他线程某些事件的发生。

use tokio::sync::Notify;use std::sync::Arc;#[tokio::main]async fn main() {    let notify = Arc::new(Notify::new());    let notify_clone = notify.clone();    let handle = tokio::spawn(async move {        notify_clone.notified().await;        println!("Received notification");    });    notify.notify_one();    handle.await.unwrap();}
复制代码

6. oneshot 和 mpsc 通道

oneshot 通道用于一次性发送消息,而 mpsc 通道则允许多个生产者发送消息到一个消费者。一般地 onshot 用于异常通知、启动分析等功能。mpsc 用于实现异步消息同步

oneshot

use tokio::sync::oneshot;#[tokio::main]async fn main() {    let (tx, rx) = oneshot::channel();    tokio::spawn(async move {        tx.send("Hello, world!").unwrap();    });    let message = rx.await.unwrap();    println!("Received: {}", message);}
复制代码

mpsc

use tokio::sync::mpsc;#[tokio::main]async fn main() {    let (tx, mut rx) = mpsc::channel(32);    tokio::spawn(async move {        tx.send("Hello, world!").await.unwrap();    });    while let Some(message) = rx.recv().await {        println!("Received: {}", message);    }}
复制代码

7. watch 通道

watch 通道用于发送和接收共享状态的更新。它允许多个消费者监听状态的变化。

use tokio::sync::watch;#[tokio::main]async fn main() {    let (tx, mut rx) = watch::channel("initial");    tokio::spawn(async move {        tx.send("updated").unwrap();    });    while rx.changed().await.is_ok() {        println!("Received: {}", *rx.borrow());    }}
复制代码

watch 通道​:

  • 用于广播状态更新,一个生产者更新状态,多个消费者获取最新状态。

  • 适合配置变更、状态同步等场景。

mpsc 通道​:

  • 用于传递消息队列,多个生产者发送消息,一个消费者逐条处理。

  • 适合任务队列、事件驱动等场景。

总结

Rust 中的 Tokio 提供了丰富的线程同步机制,可以根据具体需求选择合适的同步原语。常用的同步机制包括:

  1. Mutex:互斥锁,保护共享数据。

  2. RwLock:读写锁,允许并发读,写时独占。

  3. Barrier:屏障,同步多个线程在某一点。

  4. Semaphore:信号量,控制并发访问资源。

  5. Notify:通知机制,用于线程间通知。

  6. oneshot 和 mpsc 通道:消息传递机制。

  7. watch 通道:状态更新机制。

通过这些同步机制,可以在 Rust 中编写高效、安全的并发程序。

用户头像

还未添加个人签名 2022-02-22 加入

天翼云是中国电信倾力打造的云服务品牌,致力于成为领先的云计算服务提供商。提供云主机、CDN、云电脑、大数据及AI等全线产品和场景化解决方案。

评论

发布
暂无评论
Rust 中的 Tokio 线程同步机制_云计算_天翼云开发者社区_InfoQ写作社区