代码之家  ›  专栏  ›  技术社区  ›  Ivan

限制核心数量:新任务将被阻止,直到核心从其他任务中释放

  •  0
  • Ivan  · 技术社区  · 3 年前

    我有1000条输入消息要处理。我正在循环输入集合,并为要处理的每条消息启动新任务。

    //Assume this messages collection contains 1000 items
    var messages = new List<string>();
    
    foreach (var msg in messages)
    {
       Task.Factory.StartNew(() =>
       {
        Process(msg);
       });
     }
    

    我们能猜测一次同时处理的最大消息数量吗(假设是正常的四核处理器),或者我们能限制一次处理的最大信息数量吗?

    如何确保此消息按照集合的相同顺序/顺序进行处理?

    0 回复  |  直到 10 年前
        1
  •  78
  •   Hari Prasad    10 年前

    你可以使用 Parallel.Foreach 并依赖 MaxDegreeOfParallelism 相反

    Parallel.ForEach(messages, new ParallelOptions {MaxDegreeOfParallelism = 10},
    msg =>
    {
         // logic
         Process(msg);
    });
    
        2
  •  66
  •   CarenRose    7 年前

    在这种情况下,SemaphoreSlim是一个非常好的解决方案,我强烈建议OP尝试一下,但@Manoj的回答有评论中提到的缺陷。在生成这样的任务之前,应该等待semaphore。

    更新的答案: 正如@Vasyl所指出的,信号量可能在任务完成前被处理,并在 Release() 方法,因此在退出using块之前必须等待所有创建的Task完成。

    int maxConcurrency=10;
    var messages = new List<string>();
    using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
    {
        List<Task> tasks = new List<Task>();
        foreach(var msg in messages)
        {
            concurrencySemaphore.Wait();
    
            var t = Task.Factory.StartNew(() =>
            {
                try
                {
                     Process(msg);
                }
                finally
                {
                    concurrencySemaphore.Release();
                }
            });
    
            tasks.Add(t);
        }
    
        Task.WaitAll(tasks.ToArray());
    }
    

    对意见的答复 对于那些想了解如何在没有 Task.WaitAll 在控制台应用程序中运行以下代码,将引发此异常。

    System.ObjectDisposedException:“信号量已被释放。”

    static void Main(string[] args)
    {
        int maxConcurrency = 5;
        List<string> messages =  Enumerable.Range(1, 15).Select(e => e.ToString()).ToList();
    
        using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
        {
            List<Task> tasks = new List<Task>();
            foreach (var msg in messages)
            {
                concurrencySemaphore.Wait();
    
                var t = Task.Factory.StartNew(() =>
                {
                    try
                    {
                        Process(msg);
                    }
                    finally
                    {
                        concurrencySemaphore.Release();
                    }
                });
    
                tasks.Add(t);
            }
    
           // Task.WaitAll(tasks.ToArray());
        }
        Console.WriteLine("Exited using block");
        Console.ReadKey();
    }
    
    private static void Process(string msg)
    {            
        Thread.Sleep(2000);
        Console.WriteLine(msg);
    }
    
        3
  •  11
  •   Mr.Hunt Жорик Пургидзе    7 年前

    我认为最好使用并行LINQ

      Parallel.ForEach(messages ,
         new ParallelOptions{MaxDegreeOfParallelism = 4},
                x => Process(x);
            );
    

    其中x是最大并行度

        4
  •  7
  •   5andr0    4 年前

    具有 .NET 5.0 核心3.0 channels 介绍。
    这种生产者/消费者并发模式的主要好处是,您还可以限制输入数据处理,以减少对资源的影响。
    这在处理数百万条数据记录时尤其有用。
    现在,您可以连续地只查询数据块,等待工作人员处理后再查询更多数据,而不是一次将整个数据集读取到内存中。

    队列容量为50条消息和5个使用者线程的代码示例:

    /// <exception cref="System.AggregateException">Thrown on Consumer Task exceptions.</exception>
    public static async Task ProcessMessages(List<string> messages)
    {
        const int producerCapacity = 10, consumerTaskLimit = 3;
        var channel = Channel.CreateBounded<string>(producerCapacity);
    
        _ = Task.Run(async () =>
        {
            foreach (var msg in messages)
            {
                await channel.Writer.WriteAsync(msg);
                // blocking when channel is full
                // waiting for the consumer tasks to pop messages from the queue
            }
    
            channel.Writer.Complete();
            // signaling the end of queue so that 
            // WaitToReadAsync will return false to stop the consumer tasks
        });
    
        var tokenSource = new CancellationTokenSource();
        CancellationToken ct = tokenSource.Token;
    
        var consumerTasks = Enumerable
        .Range(1, consumerTaskLimit)
        .Select(_ => Task.Run(async () =>
        {
            try
            {
                while (await channel.Reader.WaitToReadAsync(ct))
                {
                    ct.ThrowIfCancellationRequested();
                    while (channel.Reader.TryRead(out var message))
                    {
                        await Task.Delay(500);
                        Console.WriteLine(message);
                    }
                }
            }
            catch (OperationCanceledException) { }
            catch
            {
                tokenSource.Cancel();
                throw;
            }
        }))
        .ToArray();
    
        Task waitForConsumers = Task.WhenAll(consumerTasks);
        try { await waitForConsumers; }
        catch
        {
            foreach (var e in waitForConsumers.Exception.Flatten().InnerExceptions)
                Console.WriteLine(e.ToString());
    
            throw waitForConsumers.Exception.Flatten();
        }
    }
    

    正如所指出的 Theodor Zoulias : 对于多个使用者异常,剩余的任务将继续运行,并且必须承担终止任务的负载。为了避免这种情况,我实现了一个CancellationToken来停止所有剩余的任务,并处理 聚合异常 属于 等待消费者。异常

    旁注:
    这个 Task Parallel Library (TPL) 可能擅长根据您的本地资源自动限制任务。但是,当您通过RPC远程处理数据时,有必要手动限制RPC调用,以避免填充网络/处理堆栈!

        5
  •  2
  •   empz    5 年前

    如果你 Process 方法是异步的,不能使用 Task.Factory.StartNew 因为它不能很好地处理异步委托。此外,使用它时还有一些其他细微差别(请参阅 this 例如)。

    在这种情况下,正确的方法是使用 Task.Run 。这是为异步进程方法修改的@ClearLogic答案。

    static void Main(string[] args)
    {
        int maxConcurrency = 5;
        List<string> messages =  Enumerable.Range(1, 15).Select(e => e.ToString()).ToList();
    
        using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
        {
            List<Task> tasks = new List<Task>();
            foreach (var msg in messages)
            {
                concurrencySemaphore.Wait();
    
                var t = Task.Run(async () =>
                {
                    try
                    {
                        await Process(msg);
                    }
                    finally
                    {
                        concurrencySemaphore.Release();
                    }
                });
    
                tasks.Add(t);
            }
    
           Task.WaitAll(tasks.ToArray());
        }
        Console.WriteLine("Exited using block");
        Console.ReadKey();
    }
    
    private static async Task Process(string msg)
    {            
        await Task.Delay(2000);
        Console.WriteLine(msg);
    }
    
        6
  •  0
  •   Community Mohan Dere    9 年前

    您可以创建自己的TaskScheduler并在那里覆盖QueueTask。

    protected virtual void QueueTask(Task task)
    

    然后你可以做任何你喜欢的事。

    这里有一个例子:

    Limited concurrency level task scheduler (with task priority) handling wrapped tasks

        7
  •  0
  •   error_handler    10 年前

    您可以这样简单地设置最大并发度:

    int maxConcurrency=10;
    var messages = new List<1000>();
    using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
    {
        foreach(var msg in messages)
        {
            Task.Factory.StartNew(() =>
            {
                concurrencySemaphore.Wait();
                try
                {
                     Process(msg);
                }
                finally
                {
                    concurrencySemaphore.Release();
                }
            });
        }
    }
    
        8
  •  0
  •   Neil Hunt    8 年前

    如果您需要按顺序排队(处理可能以任何顺序完成),则不需要信号量。老式的if语句运行良好:

            const int maxConcurrency = 5;
            List<Task> tasks = new List<Task>();
            foreach (var arg in args)
            {
                var t = Task.Run(() => { Process(arg); } );
    
                tasks.Add(t);
    
                if(tasks.Count >= maxConcurrency)
                    Task.WaitAny(tasks.ToArray());
            }
    
            Task.WaitAll(tasks.ToArray());
    
        9
  •  0
  •   Patrick Knott    4 年前

    我遇到了一个类似的问题,我想在调用api等时产生5000个结果。因此,我进行了一些速度测试。

    Parallel.ForEach(products.Select(x => x.KeyValue).Distinct().Take(100), id =>
    {
        new ParallelOptions { MaxDegreeOfParallelism = 100 };
        GetProductMetaData(productsMetaData, client, id).GetAwaiter().GetResult();
    });
    

    在30秒内产生了100个结果。

    Parallel.ForEach(products.Select(x => x.KeyValue).Distinct().Take(100), id =>
    {
        new ParallelOptions { MaxDegreeOfParallelism = 100 };
        GetProductMetaData(productsMetaData, client, id);
    });
    

    将GetAwaiter().GetResult()移动到GetProductMetaData内部的各个异步api调用,结果是在14.09秒内生成100个结果。

    foreach (var id in ids.Take(100))
    {
        GetProductMetaData(productsMetaData, client, id);
    }
    

    使用GetAwaiter()完成非异步编程。api调用中的GetResult()只需13.417秒。

    var tasks = new List<Task>();
    while (y < ids.Count())
    {
        foreach (var id in ids.Skip(y).Take(100))
        {
            tasks.Add(GetProductMetaData(productsMetaData, client, id));
        }
    
        y += 100;
        Task.WhenAll(tasks).GetAwaiter().GetResult();
        Console.WriteLine($"Finished {y}, {sw.Elapsed}");
    }
    

    形成一份任务清单,一次完成100个任务,结果速度为7.36秒。

                using (SemaphoreSlim cons = new SemaphoreSlim(10))
                {
                    var tasks = new List<Task>();
                    foreach (var id in ids.Take(100))
                    {
                        cons.Wait();
                        var t = Task.Factory.StartNew(() =>
                        {
                            try
                            {
                                GetProductMetaData(productsMetaData, client, id);
                            }
                            finally
                            {
                                cons.Release();
                            }
                        });
    
                        tasks.Add(t);
                    }
    
                    Task.WaitAll(tasks.ToArray());
                }
    

    使用SemaphoreSlim只需13.369秒,但也需要一段时间才能启动使用。

    var throttler = new SemaphoreSlim(initialCount: take);
    foreach (var id in ids)
    {
        throttler.WaitAsync().GetAwaiter().GetResult();
        tasks.Add(Task.Run(async () =>
        {
            try
            {
                skip += 1;
                await GetProductMetaData(productsMetaData, client, id);
    
                if (skip % 100 == 0)
                {
                    Console.WriteLine($"started {skip}/{count}, {sw.Elapsed}");
                }
            }
            finally
            {
                throttler.Release();
            }
        }));
    }
    

    将Semaphore Slim与节流器一起用于我的异步任务花费了6.12秒。

    在这个特定的项目中,我的答案是使用Semaphore Slim的节流器。尽管while foreach任务列表有时确实击败了throttler,但throttler有4/6次赢得了1000条记录。

    我意识到我没有使用OP代码,但我认为这很重要,并增加了这一讨论,因为如何有时不是唯一应该问的问题,答案有时是“这取决于你试图做什么。”

    现在回答具体问题:

    1. 如何限制c#中并行任务的最大数量:我展示了如何限制一次完成的任务数量。
    2. 我们能猜测一次同时处理的最大消息数量吗(假设是正常的四核处理器),或者我们能限制一次处理的最大信息数量吗?我无法猜测一次会处理多少个,除非我设定了上限,但我可以设定上限。显然,由于CPU、RAM等以及程序本身可以访问的线程和内核数量,以及在同一台计算机上串联运行的其他程序,不同的计算机以不同的速度运行。
    3. 如何确保此消息按照集合的相同顺序/顺序进行处理?如果你想按特定的顺序处理所有事情,那就是同步编程。能够异步运行的要点是确保他们可以在没有订单的情况下完成所有工作。从我的代码中可以看出,除非使用异步代码,否则100条记录中的时间差是最小的。如果您需要一个正在做的事情的顺序,那么在这之前使用异步编程,然后等待并从那里同步地做事情。例如,task1a.start、task2a.start,然后是更晚的task1a.wait、task2a.await…然后是更迟的task1b.start task1b.wait和task2b.start task2b.wait。
        10
  •  -1
  •   Daniel    7 年前
     public static void RunTasks(List<NamedTask> importTaskList)
        {
            List<NamedTask> runningTasks = new List<NamedTask>();
    
            try
            {
                foreach (NamedTask currentTask in importTaskList)
                {
                    currentTask.Start();
                    runningTasks.Add(currentTask);
    
                    if (runningTasks.Where(x => x.Status == TaskStatus.Running).Count() >= MaxCountImportThread)
                    {
                        Task.WaitAny(runningTasks.ToArray());
                    }
                }
    
                Task.WaitAll(runningTasks.ToArray());
            }
            catch (Exception ex)
            {
                Log.Fatal("ERROR!", ex);
            }
        }
    
        11
  •  -1
  •   Shahar Shokrani    6 年前

    你可以使用 BlockingCollection ,如果已达到消耗收集限制,生产将停止生产,直到消耗过程结束。我发现这种模式比 SemaphoreSlim

    int TasksLimit = 10;
    BlockingCollection<Task> tasks = new BlockingCollection<Task>(new ConcurrentBag<Task>(), TasksLimit);
    
    void ProduceAndConsume()
    {
        var producer = Task.Factory.StartNew(RunProducer);
        var consumer = Task.Factory.StartNew(RunConsumer);
    
        try
        {
            Task.WaitAll(new[] { producer, consumer });
        }
        catch (AggregateException ae) { }
    }
    
    void RunConsumer()
    {
        foreach (var task in tasks.GetConsumingEnumerable())
        {
            task.Start();
        }
    }
    
    void RunProducer()
    {
        for (int i = 0; i < 1000; i++)
        {
            tasks.Add(new Task(() => Thread.Sleep(1000), TaskCreationOptions.AttachedToParent));
        }
    }
    

    请注意 RunProducer RunConsumer 产生了两个独立的任务。