亚欧色一区w666天堂,色情一区二区三区免费看,少妇特黄A片一区二区三区,亚洲人成网站999久久久综合,国产av熟女一区二区三区

  • 發布文章
  • 消息中心
點贊
收藏
評論
分享
原創

Rust多線程:Worker 結構體與線程池中任務的傳遞機制

2024-09-24 10:07:29
19
0

Rust多線程:Worker 結構體與線程池中任務的傳遞機制

**在實現一個多線程的 Web 服務器時,我們會遇到一個問題:如何在創建線程之后讓它們在沒有任務時保持等待狀態,并且在任務到來時可以立即執行。這是一個典型的線程池設計問題。在 Rust 中,我們需要通過自定義設計來實現這個功能,因為標準庫中的 **thread::spawn 并不直接支持這種用法。

問題描述

**Rust 的 **thread::spawn 方法會立即執行傳入的閉包。如果我們想要在線程池中創建線程并讓它們等待任務(即在創建時不執行任何任務),我們就需要自己設計一種機制,能夠在稍后將任務傳遞給這些已經創建好的線程。

解決方案:引入 Worker 結構體

**為了解決這個問題,我們引入了一個 **Worker 結構體來管理線程池中的每個線程。Worker 的作用類似于一個工人,它等待任務的到來并在接收到任務時執行。

1. Worker 結構體的定義

Worker 結構體包含兩個字段:

  • id:用于標識每個 Worker
  • thread:存放線程的 JoinHandle<()>,它是由 thread::spawn 返回的。

代碼如下:

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
2. 創建 Worker 實例

**為了讓 **Worker 在沒有任務時處于等待狀態,我們可以在 Worker::new 函數中使用 thread::spawn 創建線程,并傳入一個空的閉包:

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});
?
        Worker { id, thread }
    }
}

**在這里,我們創建了一個 **Worker 實例,每個 Worker 都會啟動一個線程。但這個線程目前還什么都不做,因為我們傳遞給 spawn 的閉包是空的。

3. 將 Worker 集成到線程池中

**接下來,我們修改 **ThreadPool 的實現,使其存儲 Worker 的實例而不是直接存儲線程的 JoinHandle<()>。在 ThreadPool::new 中,我們使用一個 for 循環創建多個 Worker 實例,并將它們存儲在一個 Vec<Worker> 中:

pub struct ThreadPool {
    workers: Vec<Worker>,
}
?
impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
?
        let mut workers = Vec::with_capacity(size);
?
        for id in 0..size {
            workers.push(Worker::new(id));
        }
?
        ThreadPool { workers }
    }
}

**這樣,我們就為線程池創建了一個由多個 **Worker 組成的集合。每個 Worker 都有一個唯一的 ID,并且都啟動了一個線程,雖然這些線程目前還沒有執行任何有用的任務。

向 Worker 發送任務

現在,我們解決了創建線程并讓它們等待任務的問題。接下來,我們需要設計一個機制,使得線程池能夠在任務到來時將任務發送給等待中的線程。

1. 使用信道傳遞任務

**在 Rust 中,信道(channel)是一種非常適合在線程之間傳遞數據的工具。我們可以使用一個信道來傳遞任務。線程池會創建一個信道的發送端,每個 **Worker 會擁有信道的接收端。任務通過信道從線程池傳遞到 Worker,再由 Worker 中的線程執行。

use std::{sync::mpsc, thread};
?
pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}
?
struct Job;
?
impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
?
        let (sender, receiver) = mpsc::channel();
?
        let mut workers = Vec::with_capacity(size);
?
        for id in 0..size {
            workers.push(Worker::new(id));
        }
?
        ThreadPool { workers, sender }
    }
}
2. Worker 處理任務

**為了讓 **Worker 能夠處理任務,我們將信道的接收端傳遞給每個 Worker 的線程。線程會不斷地從信道中接收任務,并執行這些任務。

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(move || {
            receiver;
        });
?
        Worker { id, thread }
    }
}

**不過在這段代碼中,存在一個問題:信道的接收端 **receiver 被移交給了第一個 Worker,導致無法將其傳遞給其他 Worker

3. 使用 Arc 和 Mutex 共享接收端

**為了解決這個問題,我們需要使用 **Arc<Mutex<T>> 來共享信道的接收端,這樣所有的 Worker 都可以安全地從同一個信道接收任務:

use std::{sync::{mpsc, Arc, Mutex}, thread};
?
type Job = Box<dyn FnOnce() + Send + 'static>;
?
impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
?
        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));
?
        let mut workers = Vec::with_capacity(size);
?
        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }
?
        ThreadPool { workers, sender }
    }
    
     pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
?
        self.sender.send(job).unwrap();
    }
}
?
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();
?
            println!("Worker {id} got a job; executing.");
?
            job();
        });
?
        Worker { id, thread }
    }
}

**在 **Worker::new 中,線程會不斷地嘗試獲取鎖來接收任務,并在收到任務后執行。這里我們使用了 Arc 來共享接收端,使用 Mutex 來確保一次只有一個 Worker 能夠接收任務。

type Job = Box<dyn FnOnce() + Send + 'static>;

**這行代碼定義了一個類型別名 **Job。它代表了一個特定的任務類型:

  • Box<dyn FnOnce() + Send + 'static> 是一個動態分發的閉包(或函數),其具體實現類型在編譯時不確定。Box 是一個堆分配的智能指針,用于將閉包存儲在堆上。
  • dyn FnOnce() 表示這個閉包實現了 FnOnce trait,可以被調用一次。
  • Send 表示這個閉包可以在線程之間安全地傳遞。
  • 'static 表示閉包的生命周期是整個程序的生命周期,確保閉包在多個線程中可以安全使用。
execute 方法

**這個方法的功能是將一個新的任務(閉包)添加到線程池的任務隊列中,以供線程池中的工作線程執行。下面是對 **F: FnOnce() + Send + 'static 的解釋:

  • F: FnOnce() + Send + 'static
    

    ** 是一個泛型約束,表示必須是一個實現了 FnOnce、Send和 'static的閉包類型。**

    • FnOnce() 確保閉包可以被調用一次。
    • Send 確保閉包可以安全地在線程之間傳遞。
    • 'static 確保閉包的生命周期足夠長,可以在整個程序運行期間有效。

**在 **execute 方法中,你將傳入的閉包 f 轉換成 Job 類型(即 Box<dyn FnOnce() + Send + 'static>),然后通過 self.sender 將其發送到任務隊列中。這使得線程池的工作線程可以從隊列中接收并執行這些任務。

總結

**通過引入 **Worker 結構體并使用信道進行任務傳遞,我們成功地實現了一個可以延遲分配任務的線程池。每個 Worker 都是在創建時啟動的,但它們會等待任務的到來,只有在接收到任務后才會開始執行。這種設計不僅提高了服務器的吞吐量,還確保了線程資源的高效利用。

0條評論
0 / 1000
l****n
17文章數
0粉絲數
l****n
17 文章 | 0 粉絲
原創

Rust多線程:Worker 結構體與線程池中任務的傳遞機制

2024-09-24 10:07:29
19
0

Rust多線程:Worker 結構體與線程池中任務的傳遞機制

**在實現一個多線程的 Web 服務器時,我們會遇到一個問題:如何在創建線程之后讓它們在沒有任務時保持等待狀態,并且在任務到來時可以立即執行。這是一個典型的線程池設計問題。在 Rust 中,我們需要通過自定義設計來實現這個功能,因為標準庫中的 **thread::spawn 并不直接支持這種用法。

問題描述

**Rust 的 **thread::spawn 方法會立即執行傳入的閉包。如果我們想要在線程池中創建線程并讓它們等待任務(即在創建時不執行任何任務),我們就需要自己設計一種機制,能夠在稍后將任務傳遞給這些已經創建好的線程。

解決方案:引入 Worker 結構體

**為了解決這個問題,我們引入了一個 **Worker 結構體來管理線程池中的每個線程。Worker 的作用類似于一個工人,它等待任務的到來并在接收到任務時執行。

1. Worker 結構體的定義

Worker 結構體包含兩個字段:

  • id:用于標識每個 Worker
  • thread:存放線程的 JoinHandle<()>,它是由 thread::spawn 返回的。

代碼如下:

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
2. 創建 Worker 實例

**為了讓 **Worker 在沒有任務時處于等待狀態,我們可以在 Worker::new 函數中使用 thread::spawn 創建線程,并傳入一個空的閉包:

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});
?
        Worker { id, thread }
    }
}

**在這里,我們創建了一個 **Worker 實例,每個 Worker 都會啟動一個線程。但這個線程目前還什么都不做,因為我們傳遞給 spawn 的閉包是空的。

3. 將 Worker 集成到線程池中

**接下來,我們修改 **ThreadPool 的實現,使其存儲 Worker 的實例而不是直接存儲線程的 JoinHandle<()>。在 ThreadPool::new 中,我們使用一個 for 循環創建多個 Worker 實例,并將它們存儲在一個 Vec<Worker> 中:

pub struct ThreadPool {
    workers: Vec<Worker>,
}
?
impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
?
        let mut workers = Vec::with_capacity(size);
?
        for id in 0..size {
            workers.push(Worker::new(id));
        }
?
        ThreadPool { workers }
    }
}

**這樣,我們就為線程池創建了一個由多個 **Worker 組成的集合。每個 Worker 都有一個唯一的 ID,并且都啟動了一個線程,雖然這些線程目前還沒有執行任何有用的任務。

向 Worker 發送任務

現在,我們解決了創建線程并讓它們等待任務的問題。接下來,我們需要設計一個機制,使得線程池能夠在任務到來時將任務發送給等待中的線程。

1. 使用信道傳遞任務

**在 Rust 中,信道(channel)是一種非常適合在線程之間傳遞數據的工具。我們可以使用一個信道來傳遞任務。線程池會創建一個信道的發送端,每個 **Worker 會擁有信道的接收端。任務通過信道從線程池傳遞到 Worker,再由 Worker 中的線程執行。

use std::{sync::mpsc, thread};
?
pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}
?
struct Job;
?
impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
?
        let (sender, receiver) = mpsc::channel();
?
        let mut workers = Vec::with_capacity(size);
?
        for id in 0..size {
            workers.push(Worker::new(id));
        }
?
        ThreadPool { workers, sender }
    }
}
2. Worker 處理任務

**為了讓 **Worker 能夠處理任務,我們將信道的接收端傳遞給每個 Worker 的線程。線程會不斷地從信道中接收任務,并執行這些任務。

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(move || {
            receiver;
        });
?
        Worker { id, thread }
    }
}

**不過在這段代碼中,存在一個問題:信道的接收端 **receiver 被移交給了第一個 Worker,導致無法將其傳遞給其他 Worker

3. 使用 Arc 和 Mutex 共享接收端

**為了解決這個問題,我們需要使用 **Arc<Mutex<T>> 來共享信道的接收端,這樣所有的 Worker 都可以安全地從同一個信道接收任務:

use std::{sync::{mpsc, Arc, Mutex}, thread};
?
type Job = Box<dyn FnOnce() + Send + 'static>;
?
impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
?
        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));
?
        let mut workers = Vec::with_capacity(size);
?
        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }
?
        ThreadPool { workers, sender }
    }
    
     pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
?
        self.sender.send(job).unwrap();
    }
}
?
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();
?
            println!("Worker {id} got a job; executing.");
?
            job();
        });
?
        Worker { id, thread }
    }
}

**在 **Worker::new 中,線程會不斷地嘗試獲取鎖來接收任務,并在收到任務后執行。這里我們使用了 Arc 來共享接收端,使用 Mutex 來確保一次只有一個 Worker 能夠接收任務。

type Job = Box<dyn FnOnce() + Send + 'static>;

**這行代碼定義了一個類型別名 **Job。它代表了一個特定的任務類型:

  • Box<dyn FnOnce() + Send + 'static> 是一個動態分發的閉包(或函數),其具體實現類型在編譯時不確定。Box 是一個堆分配的智能指針,用于將閉包存儲在堆上。
  • dyn FnOnce() 表示這個閉包實現了 FnOnce trait,可以被調用一次。
  • Send 表示這個閉包可以在線程之間安全地傳遞。
  • 'static 表示閉包的生命周期是整個程序的生命周期,確保閉包在多個線程中可以安全使用。
execute 方法

**這個方法的功能是將一個新的任務(閉包)添加到線程池的任務隊列中,以供線程池中的工作線程執行。下面是對 **F: FnOnce() + Send + 'static 的解釋:

  • F: FnOnce() + Send + 'static
    

    ** 是一個泛型約束,表示必須是一個實現了 FnOnce、Send和 'static的閉包類型。**

    • FnOnce() 確保閉包可以被調用一次。
    • Send 確保閉包可以安全地在線程之間傳遞。
    • 'static 確保閉包的生命周期足夠長,可以在整個程序運行期間有效。

**在 **execute 方法中,你將傳入的閉包 f 轉換成 Job 類型(即 Box<dyn FnOnce() + Send + 'static>),然后通過 self.sender 將其發送到任務隊列中。這使得線程池的工作線程可以從隊列中接收并執行這些任務。

總結

**通過引入 **Worker 結構體并使用信道進行任務傳遞,我們成功地實現了一個可以延遲分配任務的線程池。每個 Worker 都是在創建時啟動的,但它們會等待任務的到來,只有在接收到任務后才會開始執行。這種設計不僅提高了服務器的吞吐量,還確保了線程資源的高效利用。

文章來自個人專欄
文章 | 訂閱
0條評論
0 / 1000
請輸入你的評論
0
0