Rust 中的 Tokio 線程同步機制
在并發編程(cheng)中(zhong),線(xian)(xian)程(cheng)同(tong)步(bu)是(shi)一個(ge)(ge)重要的概念(nian),用于確保多個(ge)(ge)線(xian)(xian)程(cheng)在訪問(wen)共享資源(yuan)時(shi)能(neng)夠正確地協(xie)調(diao)。Tokio 是(shi)一個(ge)(ge)強大的異步(bu)運行時(shi)庫,為 Rust 提供(gong)了多種線(xian)(xian)程(cheng)同(tong)步(bu)機制(zhi)。以(yi)下是(shi)一些常見的同(tong)步(bu)機制(zhi):
- Mutex
- RwLock
- Barrier
- Semaphore
- Notify
- oneshot 和 mpsc 通道
- watch 通道
1. Mutex
Mutex(互斥鎖)是最常見(jian)的同步原語之一(yi),用于保護(hu)共享數據(ju)。它確保同一(yi)時間只有一(yi)個線(xian)程(cheng)能夠訪問數據(ju),從(cong)而避免競爭條件。
use tokio::sync::Mutex;
use std::sync::Arc;
?
#[tokio::main]
async fn main() {
    let data = Arc::new(Mutex::new(0));
?
    let mut handles = vec![];
    for _ in 0..10 {
        let data = data.clone();
        let handle = tokio::spawn(async move {
            let mut lock = data.lock().await;
            *lock += 1;
        });
        handles.push(handle);
    }
?
    for handle in handles {
        handle.await.unwrap();
    }
?
    println!("Result: {}", *data.lock().await);
}
2. RwLock
RwLock(讀寫鎖)允許多線程同時讀取數據,但只允許一個線程寫入數據。它比 Mutex 更(geng)加(jia)靈活(huo),因為在(zai)讀取多于寫入的場景下(xia),它能(neng)提高性能(neng)。功能(neng)上(shang),他是讀寫互斥、寫寫互斥、讀讀兼容。
use tokio::sync::RwLock;
use std::sync::Arc;
?
#[tokio::main]
async fn main() {
    let data = Arc::new(RwLock::new(0));
?
    let read_data = data.clone();
    let read_handle = tokio::spawn(async move {
        let lock = read_data.read().await;
        println!("Read: {}", *lock);
    });
?
    let write_data = data.clone();
    let write_handle = tokio::spawn(async move {
        let mut lock = write_data.write().await;
        *lock += 1;
        println!("Write: {}", *lock);
    });
?
    read_handle.await.unwrap();
    write_handle.await.unwrap();
}
3. Barrier
Barrier 是一種同步(bu)機制,允許(xu)多(duo)個線(xian)程(cheng)在(zai)某個點上(shang)進(jin)行同步(bu)。當線(xian)程(cheng)到(dao)(dao)達(da)屏(ping)障(zhang)時,它們會(hui)等待直到(dao)(dao)所(suo)有線(xian)程(cheng)都到(dao)(dao)達(da),然后一起繼(ji)續(xu)執行。
use tokio::sync::Barrier;
use std::sync::Arc;
?
#[tokio::main]
async fn main() {
    let barrier = Arc::new(Barrier::new(3));
?
    let mut handles = vec![];
    for i in 0..3 {
        let barrier = barrier.clone();
        let handle = tokio::spawn(async move {
            println!("Before wait: {}", i);
            barrier.wait().await;
            println!("After wait: {}", i);
        });
        handles.push(handle);
    }
?
    for handle in handles {
        handle.await.unwrap();
    }
}
4. Semaphore
Semaphore(信號量)是(shi)一(yi)種用于控制對資(zi)源訪(fang)問(wen)的同(tong)步原語。它允(yun)許多個線程訪(fang)問(wen)資(zi)源,但有一(yi)個最大并發數限制。
#[tokio::test]
async fn test_sem() {
    let semaphore = Arc::new(Semaphore::new(3));
?
    let mut handles = vec![];
    for i in 0..5 {
        let semaphore = semaphore.clone();
        let handle = tokio::spawn(async move {
            let permit = semaphore.acquire().await.unwrap();
            let now = Local::now();
            println!("Got permit: {} at {:?}", i, now);
            println!(
                "Semaphore available permits before sleep: {}",
                semaphore.available_permits()
            );
            sleep(Duration::from_secs(5)).await;
            drop(permit);
            println!(
                "Semaphore available permits after sleep: {}",
                semaphore.available_permits()
            );
        });
        handles.push(handle);
    }
?
    for handle in handles {
        handle.await.unwrap();
    }
}
最終的結果如下
Got permit: 0 at 2024-08-08T21:03:04.374666+08:00
Semaphore available permits before sleep: 2
Got permit: 1 at 2024-08-08T21:03:04.375527800+08:00
Semaphore available permits before sleep: 1
Got permit: 2 at 2024-08-08T21:03:04.375563+08:00
Semaphore available permits before sleep: 0
Semaphore available permits after sleep: 0
Semaphore available permits after sleep: 0
Semaphore available permits after sleep: 1
Got permit: 3 at 2024-08-08T21:03:09.376722800+08:00
Semaphore available permits before sleep: 1
Got permit: 4 at 2024-08-08T21:03:09.376779200+08:00
Semaphore available permits before sleep: 1
Semaphore available permits after sleep: 2
Semaphore available permits after sleep: 3
5. Notify
Notify 是一(yi)種用于(yu)線程(cheng)間通知的(de)(de)簡單機(ji)制。它允(yun)許(xu)一(yi)個線程(cheng)通知其他線程(cheng)某些事件的(de)(de)發生。
use tokio::sync::Notify;
use std::sync::Arc;
?
#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let notify_clone = notify.clone();
?
    let handle = tokio::spawn(async move {
        notify_clone.notified().await;
        println!("Received notification");
    });
?
    notify.notify_one();
    handle.await.unwrap();
}
6. oneshot 和 mpsc 通道
oneshot 通道用于一次性發送消息,而 mpsc 通道則允許多個生產者發送消(xiao)息到一個消(xiao)費者。一般(ban)地onshot用(yong)于異(yi)常通知、啟動分(fen)析等功(gong)能。mpsc用(yong)于實現異(yi)步(bu)(bu)消(xiao)息同(tong)步(bu)(bu)
oneshot
use tokio::sync::oneshot;
?
#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();
?
    tokio::spawn(async move {
        tx.send("Hello, world!").unwrap();
    });
?
    let message = rx.await.unwrap();
    println!("Received: {}", message);
}
mpsc
use tokio::sync::mpsc;
?
#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
?
    tokio::spawn(async move {
        tx.send("Hello, world!").await.unwrap();
    });
?
    while let Some(message) = rx.recv().await {
        println!("Received: {}", message);
    }
}
7. watch 通道
watch 通(tong)道(dao)用于發送和接收共享(xiang)狀態的更(geng)新。它允許多個消費者(zhe)監聽狀態的變化。
use tokio::sync::watch;
?
#[tokio::main]
async fn main() {
    let (tx, mut rx) = watch::channel("initial");
?
    tokio::spawn(async move {
        tx.send("updated").unwrap();
    });
?
    while rx.changed().await.is_ok() {
        println!("Received: {}", *rx.borrow());
    }
}
?watch通道?:
- 用于廣播狀態更新,一個生產者更新狀態,多個消費者獲取最新狀態。
- 適合配置變更、狀態同步等場景。
?mpsc通道?:
- 用于傳遞消息隊列,多個生產者發送消息,一個消費者逐條處理。
- 適合任務隊列、事件驅動等場景。
總結
Rust 中的 Tokio 提供了豐富的線程同步機制,可以根據具體需求選擇合適的同步原語。常用的同步機制包括:
- Mutex:互斥鎖,保護共享數據。
- RwLock:讀寫鎖,允許并發讀,寫時獨占。
- Barrier:屏障,同步多個線程在某一點。
- Semaphore:信號量,控制并發訪問資源。
- Notify:通知機制,用于線程間通知。
- oneshot和- mpsc通道:消息傳遞機制。
- watch通道:狀態更新機制。
通過這些同步機制,可以在 Rust 中編寫高效、安全的并發程序。