代码之家  ›  专栏  ›  技术社区  ›  mafu

并行订购消耗品

  •  2
  • mafu  · 技术社区  · 7 年前

    我想并行处理一些项目。此处理是独立的(顺序不重要),并返回输出。然后,这些输出应尽快按顺序返回。

    也就是说,方法的行为应该与此等效(除了调用 Process 并行):

    IEnumerable<T> OrderedParallelImmediateSelect<T> (IEnumerable<object> source)
    {
        foreach (var input in source) {
            var result = Process (input);
            yield return result;
        }
    }
    

    因此,它需要尝试按顺序处理项目。由于这(当然)不能保证按顺序完成,因此结果收集器必须确保等待延迟的结果。

    一旦收到下一个订单结果,必须立即退回。在对结果进行排序之前,我们不能等待整个输入被处理。

    这是一个例子,说明这看起来是怎样的:

    begin 0
    begin 1     <-- we start processing in increasing order
    begin 2
    complete 1  <-- 1 is complete but we are still waiting for 0
    begin 3
    complete 0  <-- 0 is complete, so we can return it and 1, too
    return 0
    return 1
    begin 4
    begin 5
    complete 4  <-- 2 and 3 are missing before we may return this
    complete 2  <-- 2 is done, 4 must keep waiting
    return 2
    begin 6
    complete 3  <-- 3 and 4 can now be returned
    return 3
    return 4
    

    如果可能的话,我希望在常规线程池上执行处理。

    这个场景是.NET提供的解决方案吗?我已经构建了一个定制的解决方案,但更愿意使用更简单的解决方案。

    我知道许多类似的问题,但似乎它们都允许等待所有项目完成处理,或者不保证订购结果。

    这是一个不幸的尝试,似乎不起作用。更换 IEnumerable 具有 ParallelQuery 没有效果。

    int Process (int item)
    {
        Console.WriteLine ($"+ {item}");
        Thread.Sleep (new Random (item).Next (100, 1000));
        Console.WriteLine ($"- {item}");
        return item;
    }
    void Output (IEnumerable<int> items)
    {
        foreach (var it in items) {
            Console.WriteLine ($"=> {it}");
        }
    }
    
    IEnumerable<int> OrderedParallelImmediateSelect (IEnumerable<int> source)
    {
        // This processes in parallel but does not return the results immediately
        return source.AsParallel ().AsOrdered ().Select (Process);
    }
    
    var input = Enumerable.Range (0, 20);
    Output (OrderedParallelImmediateSelect (input));
    

    输出:

    +0+1+3+2+4+4+5+5+6+7+7+9+10+11+11+8+1+12-3+13+13+5+14-7+15+9+16+11+17-14+18+16+19-0-18-16+19-0-18-2-4-6-8-13-10-15-17-12-19=>0=>1=>2=>2=>3=>4=>4=>5=>6=>6=>7=>8=>9=>9=>10=>10=>11=>11=>12=>12=>14=>15=>16=>17=>18=>19

    1 回复  |  直到 7 年前
        1
  •  3
  •   Damien_The_Unbeliever    7 年前

    我创建了这个程序,作为控制台应用程序:

    using System;
    using System.Linq;
    using System.Threading;
    
    namespace PlayAreaCSCon
    {
        class Program
        {
            static void Main(string[] args)
            {
                var items = Enumerable.Range(0, 1000);
                int prodCount = 0;
    
                foreach(var item in items.AsParallel()
                .AsOrdered()
                .WithMergeOptions(ParallelMergeOptions.NotBuffered)
                .Select((i) =>
                {
                    Thread.Sleep(i % 100);
                    Interlocked.Increment(ref prodCount);
                    return i;
                }))
                {
                    Console.WriteLine(item);
                }
                Console.ReadLine();
            }
        }
    }
    

    然后我最初在上设置了一个断点 Console.WriteLine(item); 。运行程序,当我第一次点击那个断点时, prodCount 是5-我们肯定会在所有处理完成之前消耗结果。在删除断点之后,所有结果似乎都按原始顺序生成。