代码之家  ›  专栏  ›  技术社区  ›  Makogan

工人池设计

  •  0
  • Makogan  · 技术社区  · 2 年前

    我正试着把工人的水池弄得锈迹斑斑。设计本身很简单,其想法是将函数指针作为任务添加到结构中(假设为独立的)。然后,该结构生成的线程数等于机器上的最大线程数。

    然后,每个线程从队列中获取一个任务,执行该任务,并在完成后获取下一个任务直到它们全部用完。

    为了实现这一目标,我尝试了以下方法:

    use std::sync::{Arc, Mutex};
    use std::thread;
    
    pub struct ThreadPool<'a>
    {
        task_queue: Vec<Task<'a>>,
        task_count: Arc<Mutex<usize>>,
    }
    
    pub struct Task<'a>(Box<dyn FnMut() + 'a>);
    unsafe impl<'a> Send for Task<'a> {}
    
    impl<'a> Task<'a>
    {
        fn call(&mut self) { self.0() }
    }
    
    impl<'a> ThreadPool<'a>
    {
        fn add_task<T>(&mut self, task: T)
        where
            T: 'a + FnMut() -> (),
        {
            self.task_queue.push(Task(Box::new(task)));
        }
    
        fn run(&mut self)
        {
            let thread_count = thread::available_parallelism().unwrap().get();
            println!("{}", thread_count);
    
            let mut handlers = Vec::with_capacity(thread_count);
            for _ in 0..thread_count
            {
                unsafe {
                    let queue = &mut self.task_queue as *mut Vec<Task<'a>>;
                    let task_count = Arc::clone(&self.task_count);
                    handlers.push(thread::spawn(move || {
                        let index = task_count.lock().unwrap().overflowing_add(1).0 - 1;
                        (*queue)[index].call();
                    }));
                }
            }
        }
    }
    

    这甚至不是编译。

    我想要的接口类型如下:

     let mut thread_pool = ThreadPool {
                task_queue: Vec::new(),
                task_count: Arc::new(Mutex::new(0)),
            };
    
            for i in 0..100
            {
                thread_pool.add_task(move || println!(r"ran {i} task"));
            }
    
            thread_pool.run();
    

    也就是说,你注册了你想要的任务,每个任务在周围的范围内捕获它之外的数据,然后它全部运行。

    我试着搜索类似的例子,但 thread pool example 在文档中是非常不同的。

    error[E0277]: `*mut std::vec::Vec<Task<'a>>` cannot be sent between threads safely
       --> examples/06_fluid/thread_pool.rs:38:45
        |
    38  |                   handlers.push(thread::spawn(move || {
        |                                 ------------- ^------
        |                                 |             |
        |  _______________________________|_____________within this `[closure@examples/06_fluid/thread_pool.rs:38:45: 38:52]`
        | |                               |
        | |                               required by a bound introduced by this call
    39  | |                     let index = task_count.lock().unwrap().overflowing_add(1).0 - 1;
    40  | |                     (*queue)[index].call();
    41  | |                 }));
        | |_________________^ `*mut std::vec::Vec<Task<'a>>` cannot be sent between threads safely
        |
        = help: within `[closure@examples/06_fluid/thread_pool.rs:38:45: 38:52]`, the trait `Send` is not implemented for `*mut std::vec::Vec<Task<'a>>`
    note: required because it's used within this closure
       --> examples/06_fluid/thread_pool.rs:38:45
        |
    38  |                 handlers.push(thread::spawn(move || {
        |                                             ^^^^^^^
    note: required by a bound in `spawn`
       --> /home/makogan/.rustup/toolchains/nightly-2022-10-29-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/mod.rs:705:8
        |
    705 |     F: Send + 'static,
        |        ^^^^ required by this bound in `spawn`
    
    For more information about this error, try `rustc --explain E0277`.
    error: could not compile `neverengine` due to previous error
    
    0 回复  |  直到 2 年前
        1
  •  2
  •   Finomnis    2 年前

    正如@ChayimFriedman所指出的,您的代码有很多错误。

    • 最重要的是, 不要 使用 unsafe 如果您不了解错误的根本原因,可以修复错误。你肯定只能解决症状,但根本原因仍然存在,并会在以后咬你。
    • 永远不要 ,甚至不在里面 不安全的 ,铸造 & 对的引用 &mut 参考这总是一个坏主意,也是其他事情已经非常错误的有力迹象。
    • 不使用 Arc s过早。大多数时候,还有更好的选择,比如 std::thread::scope .
    • 不强制执行 Send 通过 不安全的 。每种类型都是 邮寄 默认情况下,如果一个类型不是,那么这是有原因的。在您的情况下,原因是您的任务类型应该是 Box<dyn FnMut() + 'a + Send> ,然后整个 Task 结构也是 邮寄 自动地
    • 不使用 Mutex 过早地。在许多情况下(当然不是所有情况),有更好的选择,比如 atomics .
    • 如果您通过的内部索引迭代数组 不安全的 制作 绝对确定 你不会跑出界。您的代码在访问之前不执行溢出检查 (*queue)[index] .
    • 你从不循环;目前,每个线程只执行相同的最后一个任务。这已经证明了为什么要强制执行 & &mut 是个坏主意。
    • 请注意 Fn , FnMut FnOnce 。在您的情况下,如果您确定每个任务只运行一次,请不要使用 FnMut 使用 FnOnce 相反全部的 FnMut 是自动的 FnOnce ,但不是反过来。
    • 最后但同样重要的是:有非常好的板条箱可以完全实现这种并行性,比如 rayon 。使用这些,会更安全,性能更好。

    也就是说,这里有两个可能的解决方案示例:

    • 根据您的代码,但我更改了:
      • 去除 和使用 std::thread::scope 相反
      • 删除整个 不安全的 事情并介绍一个 Mutex 围绕 self.task_queue 相反使用 VecDeque 而不是 Vec ,所以我们可以简单地弹出项目,而不必计数。有了这些知识,我们可以移除 task_count 所有这些。
      • 改变 FnMut FnOnce ,因为我们在过程中销毁了任务对象,因此拥有它的所有权。这将增加我们允许的函数类型。
      • 修复Bug
    use std::collections::VecDeque;
    use std::sync::Mutex;
    use std::thread;
    
    pub struct ThreadPool<'a> {
        task_queue: Mutex<VecDeque<Task<'a>>>,
    }
    
    pub struct Task<'a>(Box<dyn FnOnce() + 'a + Send>);
    
    impl<'a> Task<'a> {
        fn call(self) {
            self.0()
        }
    }
    
    impl<'a> ThreadPool<'a> {
        pub fn new() -> Self {
            Self {
                task_queue: Mutex::new(VecDeque::new()),
            }
        }
    
        pub fn add_task<T>(&mut self, task: T)
        where
            T: 'a + FnOnce() -> () + Send,
        {
            // Use `get_mut()` because we already have exclusive access to the mutex
            self.task_queue
                .get_mut()
                .unwrap()
                .push_back(Task(Box::new(task)));
        }
    
        pub fn run(&mut self) {
            let thread_count = thread::available_parallelism().unwrap().get();
            println!("Threads: {}", thread_count);
    
            thread::scope(|s| {
                let mut handlers = Vec::with_capacity(thread_count);
    
                for _ in 0..thread_count {
                    handlers.push(s.spawn(|| loop {
                        let potential_task = {
                            // Important: put this in its own scope, to make sure `task_queue.lock()` is
                            // released before `task.call()`. Otherwise no two threads will be allowed
                            // to execute a task simultaneously.
                            self.task_queue.lock().unwrap().pop_front()
                        };
                        if let Some(task) = potential_task {
                            task.call();
                        } else {
                            break;
                        }
                    }));
                }
    
                // Do something with `handlers`. Or don't, `handlers` can also be removed entirely.
                // Threads get joined automatically at the end of `scope`.
            });
        }
    }
    
    use std::{thread::sleep, time::Duration};
    
    use rust_playground::ThreadPool;
    
    fn main() {
        let mut threadpool = ThreadPool::new();
    
        for i in 0..10 {
            threadpool.add_task(move || {
                println!(
                    "Running task {} on thread {:?} ...",
                    i,
                    std::thread::current().id()
                );
                sleep(Duration::from_millis(100));
            })
        }
    
        threadpool.run();
    }
    
    Threads: 4
    Running task 0 on thread ThreadId(3) ...
    Running task 1 on thread ThreadId(4) ...
    Running task 2 on thread ThreadId(2) ...
    Running task 3 on thread ThreadId(5) ...
    Running task 4 on thread ThreadId(3) ...
    Running task 5 on thread ThreadId(2) ...
    Running task 7 on thread ThreadId(5) ...
    Running task 6 on thread ThreadId(4) ...
    Running task 8 on thread ThreadId(3) ...
    Running task 9 on thread ThreadId(2) ...
    
    use rayon::prelude::*;
    
    pub struct ThreadPool<'a> {
        task_queue: Vec<Task<'a>>,
    }
    
    pub struct Task<'a>(Box<dyn FnOnce() + 'a + Send>);
    
    impl<'a> Task<'a> {
        fn call(self) {
            self.0()
        }
    }
    
    impl<'a> ThreadPool<'a> {
        pub fn new() -> Self {
            Self {
                task_queue: Vec::new(),
            }
        }
    
        pub fn add_task<T>(&mut self, task: T)
        where
            T: 'a + FnMut() -> () + Send,
        {
            self.task_queue.push(Task(Box::new(task)));
        }
    
        pub fn run(&mut self) {
            let task_queue = std::mem::take(&mut self.task_queue);
            task_queue.into_par_iter().for_each(|task| task.call());
        }
    }
    

    使用相同的 main() 和以前一样,我们得到:

    Running task 0 on thread ThreadId(2) ...
    Running task 5 on thread ThreadId(3) ...
    Running task 2 on thread ThreadId(5) ...
    Running task 3 on thread ThreadId(4) ...
    Running task 1 on thread ThreadId(2) ...
    Running task 6 on thread ThreadId(3) ...
    Running task 7 on thread ThreadId(5) ...
    Running task 4 on thread ThreadId(4) ...
    Running task 8 on thread ThreadId(2) ...
    Running task 9 on thread ThreadId(3) ...
    

    最后但同样重要的是,我认为一旦你了解了如何使用 人造丝 正确地说,您甚至不再需要编写自己的工人池。 人造丝 已经在内部处理了所有这些,只需使用其并行迭代器;)

    use std::{thread::sleep, time::Duration};
    
    use rayon::prelude::*;
    
    fn main() {
        let mut tasks = vec![];
    
        for i in 0..10 {
            tasks.push(move || {
                println!(
                    "Running task {} on thread {:?} ...",
                    i,
                    std::thread::current().id()
                );
                sleep(Duration::from_millis(100));
            })
        }
    
        tasks.par_iter_mut().for_each(|task| task());
    }
    
    Running task 0 on thread ThreadId(2) ...
    Running task 5 on thread ThreadId(5) ...
    Running task 7 on thread ThreadId(3) ...
    Running task 2 on thread ThreadId(4) ...
    Running task 1 on thread ThreadId(2) ...
    Running task 6 on thread ThreadId(5) ...
    Running task 8 on thread ThreadId(3) ...
    Running task 3 on thread ThreadId(4) ...
    Running task 4 on thread ThreadId(5) ...
    Running task 9 on thread ThreadId(3) ...