我想并行处理一些项目。此处理是独立的(顺序不重要),并返回输出。然后,这些输出应尽快按顺序返回。
也就是说,方法的行为应该与此等效(除了调用
Process
并行):
IEnumerable<T> OrderedParallelImmediateSelect<T> (IEnumerable<object> source)
{
foreach (var input in source) {
var result = Process (input);
yield return result;
}
}
因此,它需要尝试按顺序处理项目。由于这(当然)不能保证按顺序完成,因此结果收集器必须确保等待延迟的结果。
一旦收到下一个订单结果,必须立即退回。在对结果进行排序之前,我们不能等待整个输入被处理。
这是一个例子,说明这看起来是怎样的:
begin 0
begin 1 <-- we start processing in increasing order
begin 2
complete 1 <-- 1 is complete but we are still waiting for 0
begin 3
complete 0 <-- 0 is complete, so we can return it and 1, too
return 0
return 1
begin 4
begin 5
complete 4 <-- 2 and 3 are missing before we may return this
complete 2 <-- 2 is done, 4 must keep waiting
return 2
begin 6
complete 3 <-- 3 and 4 can now be returned
return 3
return 4
如果可能的话,我希望在常规线程池上执行处理。
这个场景是.NET提供的解决方案吗?我已经构建了一个定制的解决方案,但更愿意使用更简单的解决方案。
我知道许多类似的问题,但似乎它们都允许等待所有项目完成处理,或者不保证订购结果。
这是一个不幸的尝试,似乎不起作用。更换
IEnumerable
具有
ParallelQuery
没有效果。
int Process (int item)
{
Console.WriteLine ($"+ {item}");
Thread.Sleep (new Random (item).Next (100, 1000));
Console.WriteLine ($"- {item}");
return item;
}
void Output (IEnumerable<int> items)
{
foreach (var it in items) {
Console.WriteLine ($"=> {it}");
}
}
IEnumerable<int> OrderedParallelImmediateSelect (IEnumerable<int> source)
{
// This processes in parallel but does not return the results immediately
return source.AsParallel ().AsOrdered ().Select (Process);
}
var input = Enumerable.Range (0, 20);
Output (OrderedParallelImmediateSelect (input));
输出:
+0+1+3+2+4+4+5+5+6+7+7+9+10+11+11+8+1+12-3+13+13+5+14-7+15+9+16+11+17-14+18+16+19-0-18-16+19-0-18-2-4-6-8-13-10-15-17-12-19=>0=>1=>2=>2=>3=>4=>4=>5=>6=>6=>7=>8=>9=>9=>10=>10=>11=>11=>12=>12=>14=>15=>16=>17=>18=>19