亚欧色一区w666天堂,色情一区二区三区免费看,少妇特黄A片一区二区三区,亚洲人成网站999久久久综合,国产av熟女一区二区三区

  • 發布文章
  • 消息中心
點贊
收藏
評論
分享
原創

Rust 中的 Tokio 線程同步機制

2024-08-16 09:37:07
51
0

Rust 中的 Tokio 線程同步機制

在并發編程(cheng)中(zhong),線(xian)(xian)程(cheng)同(tong)步(bu)是(shi)一個(ge)(ge)重要的概念(nian),用于確保多個(ge)(ge)線(xian)(xian)程(cheng)在訪問(wen)共享資源(yuan)時(shi)能(neng)夠正確地協(xie)調(diao)。Tokio 是(shi)一個(ge)(ge)強大的異步(bu)運行時(shi)庫,為 Rust 提供(gong)了多種線(xian)(xian)程(cheng)同(tong)步(bu)機制(zhi)。以(yi)下是(shi)一些常見的同(tong)步(bu)機制(zhi):

  1. Mutex
  2. RwLock
  3. Barrier
  4. Semaphore
  5. Notify
  6. oneshot 和 mpsc 通道
  7. watch 通道

1. Mutex

Mutex(互斥鎖)是最常見(jian)的同步原語之一(yi),用于保護(hu)共享數據(ju)。它確保同一(yi)時間只有一(yi)個線(xian)程(cheng)能夠訪問數據(ju),從(cong)而避免競爭條件。

use tokio::sync::Mutex;
use std::sync::Arc;
?
#[tokio::main]
async fn main() {
    let data = Arc::new(Mutex::new(0));
?
    let mut handles = vec![];
    for _ in 0..10 {
        let data = data.clone();
        let handle = tokio::spawn(async move {
            let mut lock = data.lock().await;
            *lock += 1;
        });
        handles.push(handle);
    }
?
    for handle in handles {
        handle.await.unwrap();
    }
?
    println!("Result: {}", *data.lock().await);
}

2. RwLock

RwLock(讀寫鎖)允許多線程同時讀取數據,但只允許一個線程寫入數據。它比 Mutex 更(geng)加(jia)靈活(huo),因為在(zai)讀取多于寫入的場景下(xia),它能(neng)提高性能(neng)。功能(neng)上(shang),他是讀寫互斥、寫寫互斥、讀讀兼容。

use tokio::sync::RwLock;
use std::sync::Arc;
?
#[tokio::main]
async fn main() {
    let data = Arc::new(RwLock::new(0));
?
    let read_data = data.clone();
    let read_handle = tokio::spawn(async move {
        let lock = read_data.read().await;
        println!("Read: {}", *lock);
    });
?
    let write_data = data.clone();
    let write_handle = tokio::spawn(async move {
        let mut lock = write_data.write().await;
        *lock += 1;
        println!("Write: {}", *lock);
    });
?
    read_handle.await.unwrap();
    write_handle.await.unwrap();
}

3. Barrier

Barrier 是一種同步(bu)機制,允許(xu)多(duo)個線(xian)程(cheng)在(zai)某個點上(shang)進(jin)行同步(bu)。當線(xian)程(cheng)到(dao)(dao)達(da)屏(ping)障(zhang)時,它們會(hui)等待直到(dao)(dao)所(suo)有線(xian)程(cheng)都到(dao)(dao)達(da),然后一起繼(ji)續(xu)執行。

use tokio::sync::Barrier;
use std::sync::Arc;
?
#[tokio::main]
async fn main() {
    let barrier = Arc::new(Barrier::new(3));
?
    let mut handles = vec![];
    for i in 0..3 {
        let barrier = barrier.clone();
        let handle = tokio::spawn(async move {
            println!("Before wait: {}", i);
            barrier.wait().await;
            println!("After wait: {}", i);
        });
        handles.push(handle);
    }
?
    for handle in handles {
        handle.await.unwrap();
    }
}

4. Semaphore

Semaphore(信號量)是(shi)一(yi)種用于控制對資(zi)源訪(fang)問(wen)的同(tong)步原語。它允(yun)許多個線程訪(fang)問(wen)資(zi)源,但有一(yi)個最大并發數限制。

#[tokio::test]
async fn test_sem() {
    let semaphore = Arc::new(Semaphore::new(3));
?
    let mut handles = vec![];
    for i in 0..5 {
        let semaphore = semaphore.clone();
        let handle = tokio::spawn(async move {
            let permit = semaphore.acquire().await.unwrap();
            let now = Local::now();
            println!("Got permit: {} at {:?}", i, now);
            println!(
                "Semaphore available permits before sleep: {}",
                semaphore.available_permits()
            );
            sleep(Duration::from_secs(5)).await;
            drop(permit);
            println!(
                "Semaphore available permits after sleep: {}",
                semaphore.available_permits()
            );
        });
        handles.push(handle);
    }
?
    for handle in handles {
        handle.await.unwrap();
    }
}

最終的結果如下

Got permit: 0 at 2024-08-08T21:03:04.374666+08:00
Semaphore available permits before sleep: 2
Got permit: 1 at 2024-08-08T21:03:04.375527800+08:00
Semaphore available permits before sleep: 1
Got permit: 2 at 2024-08-08T21:03:04.375563+08:00
Semaphore available permits before sleep: 0
Semaphore available permits after sleep: 0
Semaphore available permits after sleep: 0
Semaphore available permits after sleep: 1
Got permit: 3 at 2024-08-08T21:03:09.376722800+08:00
Semaphore available permits before sleep: 1
Got permit: 4 at 2024-08-08T21:03:09.376779200+08:00
Semaphore available permits before sleep: 1
Semaphore available permits after sleep: 2
Semaphore available permits after sleep: 3

5. Notify

Notify 是一(yi)種用于(yu)線程(cheng)間通知的(de)(de)簡單機(ji)制。它允(yun)許(xu)一(yi)個線程(cheng)通知其他線程(cheng)某些事件的(de)(de)發生。

use tokio::sync::Notify;
use std::sync::Arc;
?
#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let notify_clone = notify.clone();
?
    let handle = tokio::spawn(async move {
        notify_clone.notified().await;
        println!("Received notification");
    });
?
    notify.notify_one();
    handle.await.unwrap();
}

6. oneshot 和 mpsc 通道

oneshot 通道用于一次性發送消息,而 mpsc 通道則允許多個生產者發送消(xiao)息到一個消(xiao)費者。一般(ban)地onshot用(yong)于異(yi)常通知、啟動分(fen)析等功(gong)能。mpsc用(yong)于實現異(yi)步(bu)(bu)消(xiao)息同(tong)步(bu)(bu)

oneshot

use tokio::sync::oneshot;
?
#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();
?
    tokio::spawn(async move {
        tx.send("Hello, world!").unwrap();
    });
?
    let message = rx.await.unwrap();
    println!("Received: {}", message);
}

mpsc

use tokio::sync::mpsc;
?
#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
?
    tokio::spawn(async move {
        tx.send("Hello, world!").await.unwrap();
    });
?
    while let Some(message) = rx.recv().await {
        println!("Received: {}", message);
    }
}

7. watch 通道

watch 通(tong)道(dao)用于發送和接收共享(xiang)狀態的更(geng)新。它允許多個消費者(zhe)監聽狀態的變化。

use tokio::sync::watch;
?
#[tokio::main]
async fn main() {
    let (tx, mut rx) = watch::channel("initial");
?
    tokio::spawn(async move {
        tx.send("updated").unwrap();
    });
?
    while rx.changed().await.is_ok() {
        println!("Received: {}", *rx.borrow());
    }
}

?watch通道?:

  • 用于廣播狀態更新,一個生產者更新狀態,多個消費者獲取最新狀態。
  • 適合配置變更、狀態同步等場景。

?mpsc通道?:

  • 用于傳遞消息隊列,多個生產者發送消息,一個消費者逐條處理。
  • 適合任務隊列、事件驅動等場景。

總結

Rust 中的 Tokio 提供了豐富的線程同步機制,可以根據具體需求選擇合適的同步原語。常用的同步機制包括:

  1. Mutex:互斥鎖,保護共享數據。
  2. RwLock:讀寫鎖,允許并發讀,寫時獨占。
  3. Barrier:屏障,同步多個線程在某一點。
  4. Semaphore:信號量,控制并發訪問資源。
  5. Notify:通知機制,用于線程間通知。
  6. oneshotmpsc 通道:消息傳遞機制。
  7. watch 通道:狀態更新機制。

通過這些同步機制,可以在 Rust 中編寫高效、安全的并發程序。

0條評論
0 / 1000
l****n
17文章(zhang)數(shu)
0粉絲數
l****n
17 文章 | 0 粉絲
原創

Rust 中的 Tokio 線程同步機制

2024-08-16 09:37:07
51
0

Rust 中的 Tokio 線程同步機制

在并發編程中,線程同(tong)步(bu)是一個重要的(de)概念(nian),用(yong)于確保多個線程在訪問共享資源時能(neng)夠正確地協調。Tokio 是一個強(qiang)大的(de)異(yi)步(bu)運(yun)行(xing)時庫,為 Rust 提供(gong)了多種線程同(tong)步(bu)機制。以下是一些常見的(de)同(tong)步(bu)機制:

  1. Mutex
  2. RwLock
  3. Barrier
  4. Semaphore
  5. Notify
  6. oneshot 和 mpsc 通道
  7. watch 通道

1. Mutex

Mutex(互斥鎖)是最(zui)常見的同(tong)步原語之一(yi)(yi),用于(yu)保(bao)護共享(xiang)數據。它確(que)保(bao)同(tong)一(yi)(yi)時間只(zhi)有一(yi)(yi)個(ge)線程能夠訪問數據,從而(er)避免競爭條件。

use tokio::sync::Mutex;
use std::sync::Arc;
?
#[tokio::main]
async fn main() {
    let data = Arc::new(Mutex::new(0));
?
    let mut handles = vec![];
    for _ in 0..10 {
        let data = data.clone();
        let handle = tokio::spawn(async move {
            let mut lock = data.lock().await;
            *lock += 1;
        });
        handles.push(handle);
    }
?
    for handle in handles {
        handle.await.unwrap();
    }
?
    println!("Result: {}", *data.lock().await);
}

2. RwLock

RwLock(讀寫鎖)允許多線程同時讀取數據,但只允許一個線程寫入數據。它比 Mutex 更加靈活,因(yin)為在讀(du)取(qu)多于寫(xie)(xie)入(ru)的場景下,它能提高性能。功能上,他是讀(du)寫(xie)(xie)互斥、寫(xie)(xie)寫(xie)(xie)互斥、讀(du)讀(du)兼容。

use tokio::sync::RwLock;
use std::sync::Arc;
?
#[tokio::main]
async fn main() {
    let data = Arc::new(RwLock::new(0));
?
    let read_data = data.clone();
    let read_handle = tokio::spawn(async move {
        let lock = read_data.read().await;
        println!("Read: {}", *lock);
    });
?
    let write_data = data.clone();
    let write_handle = tokio::spawn(async move {
        let mut lock = write_data.write().await;
        *lock += 1;
        println!("Write: {}", *lock);
    });
?
    read_handle.await.unwrap();
    write_handle.await.unwrap();
}

3. Barrier

Barrier 是一種同步機制(zhi),允許多個線程(cheng)在某個點上(shang)進(jin)行同步。當線程(cheng)到(dao)達(da)屏障時,它們會等待直到(dao)所有線程(cheng)都到(dao)達(da),然(ran)后一起繼續執行。

use tokio::sync::Barrier;
use std::sync::Arc;
?
#[tokio::main]
async fn main() {
    let barrier = Arc::new(Barrier::new(3));
?
    let mut handles = vec![];
    for i in 0..3 {
        let barrier = barrier.clone();
        let handle = tokio::spawn(async move {
            println!("Before wait: {}", i);
            barrier.wait().await;
            println!("After wait: {}", i);
        });
        handles.push(handle);
    }
?
    for handle in handles {
        handle.await.unwrap();
    }
}

4. Semaphore

Semaphore(信號量)是一種用于控制(zhi)(zhi)對(dui)資源訪問(wen)的同步原語。它允許多個線程訪問(wen)資源,但有一個最大并發數限制(zhi)(zhi)。

#[tokio::test]
async fn test_sem() {
    let semaphore = Arc::new(Semaphore::new(3));
?
    let mut handles = vec![];
    for i in 0..5 {
        let semaphore = semaphore.clone();
        let handle = tokio::spawn(async move {
            let permit = semaphore.acquire().await.unwrap();
            let now = Local::now();
            println!("Got permit: {} at {:?}", i, now);
            println!(
                "Semaphore available permits before sleep: {}",
                semaphore.available_permits()
            );
            sleep(Duration::from_secs(5)).await;
            drop(permit);
            println!(
                "Semaphore available permits after sleep: {}",
                semaphore.available_permits()
            );
        });
        handles.push(handle);
    }
?
    for handle in handles {
        handle.await.unwrap();
    }
}

最終的結果如下

Got permit: 0 at 2024-08-08T21:03:04.374666+08:00
Semaphore available permits before sleep: 2
Got permit: 1 at 2024-08-08T21:03:04.375527800+08:00
Semaphore available permits before sleep: 1
Got permit: 2 at 2024-08-08T21:03:04.375563+08:00
Semaphore available permits before sleep: 0
Semaphore available permits after sleep: 0
Semaphore available permits after sleep: 0
Semaphore available permits after sleep: 1
Got permit: 3 at 2024-08-08T21:03:09.376722800+08:00
Semaphore available permits before sleep: 1
Got permit: 4 at 2024-08-08T21:03:09.376779200+08:00
Semaphore available permits before sleep: 1
Semaphore available permits after sleep: 2
Semaphore available permits after sleep: 3

5. Notify

Notify 是(shi)一種用于線程間通知的簡(jian)單機制(zhi)。它允許一個線程通知其他線程某些事件的發生。

use tokio::sync::Notify;
use std::sync::Arc;
?
#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let notify_clone = notify.clone();
?
    let handle = tokio::spawn(async move {
        notify_clone.notified().await;
        println!("Received notification");
    });
?
    notify.notify_one();
    handle.await.unwrap();
}

6. oneshot 和 mpsc 通道

oneshot 通道用于一次性發送消息,而 mpsc 通道則(ze)允許多個(ge)生產者發送消息到一個(ge)消費(fei)者。一般地onshot用于異常(chang)通知、啟動(dong)分析等(deng)功能。mpsc用于實現異步消息同步

oneshot

use tokio::sync::oneshot;
?
#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();
?
    tokio::spawn(async move {
        tx.send("Hello, world!").unwrap();
    });
?
    let message = rx.await.unwrap();
    println!("Received: {}", message);
}

mpsc

use tokio::sync::mpsc;
?
#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
?
    tokio::spawn(async move {
        tx.send("Hello, world!").await.unwrap();
    });
?
    while let Some(message) = rx.recv().await {
        println!("Received: {}", message);
    }
}

7. watch 通道

watch 通道用于發送(song)和(he)接收共享狀態的(de)更(geng)新。它允許多個消費者監聽(ting)狀態的(de)變化。

use tokio::sync::watch;
?
#[tokio::main]
async fn main() {
    let (tx, mut rx) = watch::channel("initial");
?
    tokio::spawn(async move {
        tx.send("updated").unwrap();
    });
?
    while rx.changed().await.is_ok() {
        println!("Received: {}", *rx.borrow());
    }
}

?watch通道?:

  • 用于廣播狀態更新,一個生產者更新狀態,多個消費者獲取最新狀態。
  • 適合配置變更、狀態同步等場景。

?mpsc通道?:

  • 用于傳遞消息隊列,多個生產者發送消息,一個消費者逐條處理。
  • 適合任務隊列、事件驅動等場景。

總結

Rust 中的 Tokio 提供了豐富的線程同步機制,可以根據具體需求選擇合適的同步原語。常用的同步機制包括:

  1. Mutex:互斥鎖,保護共享數據。
  2. RwLock:讀寫鎖,允許并發讀,寫時獨占。
  3. Barrier:屏障,同步多個線程在某一點。
  4. Semaphore:信號量,控制并發訪問資源。
  5. Notify:通知機制,用于線程間通知。
  6. oneshotmpsc 通道:消息傳遞機制。
  7. watch 通道:狀態更新機制。

通過這些同步機制,可以在 Rust 中編寫高效、安全的并發程序。

文章來自個人專欄
文章 | 訂閱(yue)
0條評論
0 / 1000
請輸入你的評論
0
0