正如@ChayimFriedman所指出的,您的代码有很多错误。
-
最重要的是,
不要
使用
unsafe
如果您不了解错误的根本原因,可以修复错误。你肯定只能解决症状,但根本原因仍然存在,并会在以后咬你。
-
永远不要
,甚至不在里面
不安全的
,铸造
&
对的引用
&mut
参考这总是一个坏主意,也是其他事情已经非常错误的有力迹象。
-
不使用
Arc
s过早。大多数时候,还有更好的选择,比如
std::thread::scope
.
-
不强制执行
Send
通过
不安全的
。每种类型都是
邮寄
默认情况下,如果一个类型不是,那么这是有原因的。在您的情况下,原因是您的任务类型应该是
Box<dyn FnMut() + 'a + Send>
,然后整个
Task
结构也是
邮寄
自动地
-
不使用
Mutex
过早地。在许多情况下(当然不是所有情况),有更好的选择,比如
atomics
.
-
如果您通过的内部索引迭代数组
不安全的
制作
绝对确定
你不会跑出界。您的代码在访问之前不执行溢出检查
(*queue)[index]
.
-
你从不循环;目前,每个线程只执行相同的最后一个任务。这已经证明了为什么要强制执行
&
到
&mut
是个坏主意。
-
请注意
Fn
,
FnMut
和
FnOnce
。在您的情况下,如果您确定每个任务只运行一次,请不要使用
FnMut
使用
FnOnce
相反全部的
FnMut
是自动的
FnOnce
,但不是反过来。
-
最后但同样重要的是:有非常好的板条箱可以完全实现这种并行性,比如
rayon
。使用这些,会更安全,性能更好。
也就是说,这里有两个可能的解决方案示例:
-
根据您的代码,但我更改了:
-
去除
弧
和使用
std::thread::scope
相反
-
删除整个
不安全的
事情并介绍一个
Mutex
围绕
self.task_queue
相反使用
VecDeque
而不是
Vec
,所以我们可以简单地弹出项目,而不必计数。有了这些知识,我们可以移除
task_count
所有这些。
-
改变
FnMut
到
FnOnce
,因为我们在过程中销毁了任务对象,因此拥有它的所有权。这将增加我们允许的函数类型。
-
修复Bug
use std::collections::VecDeque;
use std::sync::Mutex;
use std::thread;
pub struct ThreadPool<'a> {
task_queue: Mutex<VecDeque<Task<'a>>>,
}
pub struct Task<'a>(Box<dyn FnOnce() + 'a + Send>);
impl<'a> Task<'a> {
fn call(self) {
self.0()
}
}
impl<'a> ThreadPool<'a> {
pub fn new() -> Self {
Self {
task_queue: Mutex::new(VecDeque::new()),
}
}
pub fn add_task<T>(&mut self, task: T)
where
T: 'a + FnOnce() -> () + Send,
{
// Use `get_mut()` because we already have exclusive access to the mutex
self.task_queue
.get_mut()
.unwrap()
.push_back(Task(Box::new(task)));
}
pub fn run(&mut self) {
let thread_count = thread::available_parallelism().unwrap().get();
println!("Threads: {}", thread_count);
thread::scope(|s| {
let mut handlers = Vec::with_capacity(thread_count);
for _ in 0..thread_count {
handlers.push(s.spawn(|| loop {
let potential_task = {
// Important: put this in its own scope, to make sure `task_queue.lock()` is
// released before `task.call()`. Otherwise no two threads will be allowed
// to execute a task simultaneously.
self.task_queue.lock().unwrap().pop_front()
};
if let Some(task) = potential_task {
task.call();
} else {
break;
}
}));
}
// Do something with `handlers`. Or don't, `handlers` can also be removed entirely.
// Threads get joined automatically at the end of `scope`.
});
}
}
use std::{thread::sleep, time::Duration};
use rust_playground::ThreadPool;
fn main() {
let mut threadpool = ThreadPool::new();
for i in 0..10 {
threadpool.add_task(move || {
println!(
"Running task {} on thread {:?} ...",
i,
std::thread::current().id()
);
sleep(Duration::from_millis(100));
})
}
threadpool.run();
}
Threads: 4
Running task 0 on thread ThreadId(3) ...
Running task 1 on thread ThreadId(4) ...
Running task 2 on thread ThreadId(2) ...
Running task 3 on thread ThreadId(5) ...
Running task 4 on thread ThreadId(3) ...
Running task 5 on thread ThreadId(2) ...
Running task 7 on thread ThreadId(5) ...
Running task 6 on thread ThreadId(4) ...
Running task 8 on thread ThreadId(3) ...
Running task 9 on thread ThreadId(2) ...
use rayon::prelude::*;
pub struct ThreadPool<'a> {
task_queue: Vec<Task<'a>>,
}
pub struct Task<'a>(Box<dyn FnOnce() + 'a + Send>);
impl<'a> Task<'a> {
fn call(self) {
self.0()
}
}
impl<'a> ThreadPool<'a> {
pub fn new() -> Self {
Self {
task_queue: Vec::new(),
}
}
pub fn add_task<T>(&mut self, task: T)
where
T: 'a + FnMut() -> () + Send,
{
self.task_queue.push(Task(Box::new(task)));
}
pub fn run(&mut self) {
let task_queue = std::mem::take(&mut self.task_queue);
task_queue.into_par_iter().for_each(|task| task.call());
}
}
使用相同的
main()
和以前一样,我们得到:
Running task 0 on thread ThreadId(2) ...
Running task 5 on thread ThreadId(3) ...
Running task 2 on thread ThreadId(5) ...
Running task 3 on thread ThreadId(4) ...
Running task 1 on thread ThreadId(2) ...
Running task 6 on thread ThreadId(3) ...
Running task 7 on thread ThreadId(5) ...
Running task 4 on thread ThreadId(4) ...
Running task 8 on thread ThreadId(2) ...
Running task 9 on thread ThreadId(3) ...
最后但同样重要的是,我认为一旦你了解了如何使用
人造丝
正确地说,您甚至不再需要编写自己的工人池。
人造丝
已经在内部处理了所有这些,只需使用其并行迭代器;)
use std::{thread::sleep, time::Duration};
use rayon::prelude::*;
fn main() {
let mut tasks = vec![];
for i in 0..10 {
tasks.push(move || {
println!(
"Running task {} on thread {:?} ...",
i,
std::thread::current().id()
);
sleep(Duration::from_millis(100));
})
}
tasks.par_iter_mut().for_each(|task| task());
}
Running task 0 on thread ThreadId(2) ...
Running task 5 on thread ThreadId(5) ...
Running task 7 on thread ThreadId(3) ...
Running task 2 on thread ThreadId(4) ...
Running task 1 on thread ThreadId(2) ...
Running task 6 on thread ThreadId(5) ...
Running task 8 on thread ThreadId(3) ...
Running task 3 on thread ThreadId(4) ...
Running task 4 on thread ThreadId(5) ...
Running task 9 on thread ThreadId(3) ...