代码之家  ›  专栏  ›  技术社区  ›  arynaq

N个螺纹中有1个从未连接

  •  1
  • arynaq  · 技术社区  · 7 年前

    我有线程池实现,每当我尝试停止/加入池时,池中总是有一个随机线程不会停止( state == Running )当我打电话的时候 Stop()

    我不明白为什么,我只有一个锁,我通知谁可能被阻止等待 Dequeue 具有 Monitor.PulseAll Stop . 调试器清楚地显示,他们中的大多数人都收到了消息,只是始终有1/N仍在运行。。。

    下面是池的一个最小实现

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace MultiThreading
    {
        public class WorkerHub
        {
            private readonly object _listMutex = new object();
            private readonly Queue<TaskWrapper> _taskQueue;
            private readonly List<Thread> _threads;
            private int _runCondition;
            private readonly Dictionary<string, int> _statistics;
    
            public WorkerHub(int count = 4)
            {
                _statistics = new Dictionary<string, int>();
                _taskQueue = new Queue<TaskWrapper>();
                _threads = new List<Thread>();
                InitializeThreads(count);
            }
    
            private bool ShouldRun
            {
                get => Interlocked.CompareExchange(ref _runCondition, 1, 1) == 1;
                set
                {
                    if (value)
                        Interlocked.CompareExchange(ref _runCondition, 1, 0);
                    else
                        Interlocked.CompareExchange(ref _runCondition, 0, 1);
                }
            }
    
            private void InitializeThreads(int count)
            {
                Action threadHandler = () =>
                {
                    while (ShouldRun)
                    {
                        var wrapper = Dequeue();
                        if (wrapper != null)
                        {
                            wrapper.FunctionBinding.Invoke();
                            _statistics[Thread.CurrentThread.Name] += 1;
                        }
                    }
                };
    
                for (var i = 0; i < count; ++i)
                {
                    var t = new Thread(() => { threadHandler.Invoke(); });
                    t.Name = $"WorkerHub Thread#{i}";
                    _statistics[t.Name] = 0;
                    _threads.Add(t);
                }
            }
    
    
            public Task Enqueue(Action work)
            {
                var tcs = new TaskCompletionSource<bool>();
                var wrapper = new TaskWrapper();
    
                Action workInvoker = () =>
                {
                    try
                    {
                        work.Invoke();
                        tcs.TrySetResult(true);
                    }
                    catch (Exception e)
                    {
                        tcs.TrySetException(e);
                    }
                };
                Action workCanceler = () => { tcs.TrySetCanceled(); };
                wrapper.FunctionBinding = workInvoker;
                wrapper.CancelBinding = workCanceler;
    
    
                lock (_taskQueue)
                {
                    _taskQueue.Enqueue(wrapper);
                    Monitor.PulseAll(_taskQueue);
                }
    
    
                return tcs.Task;
            }
    
            private TaskWrapper Dequeue()
            {
                lock (_listMutex)
                {
                    while (_taskQueue.Count == 0)
                    {
                        if (!ShouldRun)
                            return null;
                        Monitor.Wait(_listMutex);
                    }
    
                    _taskQueue.TryDequeue(out var wrapper);
                    return wrapper;
                }
            }
    
            public void Stop()
            {
                ShouldRun = false;
    
                //Wake up whoever is waiting for dequeue
                lock (_listMutex)
                {
                    Monitor.PulseAll(_listMutex);
                }
    
                foreach (var thread in _threads)
                {
                    thread.Join();
                }
                var sum = _statistics.Sum(pair => pair.Value) * 1.0;
                foreach (var stat in _statistics)
                {
                    Console.WriteLine($"{stat.Key} ran {stat.Value} functions, {stat.Value/sum * 100} percent of the total.");
                }
            }
    
            public void Start()
            {
                ShouldRun = true;
                foreach (var thread in _threads) thread.Start();
            }
        }
    }
    

    进行测试运行

    public static async Task Main(string[] args)
        {
            var hub = new WorkerHub();
            var tasks = Enumerable.Range(0, (int) 100).Select(x => hub.Enqueue(() => Sum(x)))
                .ToArray();
            var sw = new Stopwatch();
            sw.Start();
            hub.Start();
            await Task.WhenAll(tasks);
            hub.Stop();
            sw.Start();
            Console.WriteLine($"Work took: {sw.ElapsedMilliseconds}ms.");
        }
    
        public static int Sum(int n)
        {
            var sum = 0;
            for (var i = 0; i <= n; ++i) sum += i;
            Console.WriteLine($"Sum of numbers up to {n} is {sum}");
            return sum;
        }
    

    我是不是错过了一些基本的东西?请注意,这不是生产代码(phew),而是我刚刚丢失的东西,所以您可能会发现不止一个问题:)

    1 回复  |  直到 7 年前
        1
  •  1
  •   Peter Wishart    7 年前

    一开始我无法重新调试你的MCVE,因为我是以非异步方式运行它的 Main() ...

    如果在调用时查看“线程”调试窗口 hub.Stop(); 你应该看到执行已经转到 . 这就是为什么一个工作线程没有响应。

    我认为这与所描述的问题有关 here .

    Enqueue(Action work) 使用 TaskCreationOptions.RunContinuationsAsynchronously 应该修复它:

    var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
    

    [编辑]

    public class TaskWorkerHub
    {
        ConcurrentQueue<Action> workQueue = new ConcurrentQueue<Action>();
        int concurrentTasks;
        CancellationTokenSource cancelSource;
        List<Task> workers = new List<Task>();
    
        private async Task Worker(CancellationToken cancelToken)
        {
            while (workQueue.TryDequeue(out var workTuple))
            {
                await Task.Run(workTuple, cancelToken);
            }
        }
    
        public TaskWorkerHub(int concurrentTasks = 4)
        {
            this.concurrentTasks = concurrentTasks;
        }
    
        public void Enqueue(Action work) => workQueue.Enqueue(work);
    
        public void Start()
        {
            cancelSource  = new CancellationTokenSource();
    
            for (int i = 0; i < concurrentTasks; i++)
            {        
                workers.Add(Worker(cancelSource.Token));
            }
        }
    
        public void Stop() => cancelSource.Cancel();
    
        public Task WaitAsync() => Task.WhenAll(workers);    
    }