代码之家  ›  专栏  ›  技术社区  ›  James Black

在使用ConcurrentQueue时,尝试在并行循环时退出队列

  •  2
  • James Black  · 技术社区  · 15 年前

    我在.NET4应用程序中使用并行数据结构,我有一个 ConcurrentQueue

    我想做一些类似的事情:

    personqueue.AsParallel().WithDegreeOfParallelism(20).ForAll(i => ... );

    当我调用数据库来保存数据时,我限制了并发线程的数量。

    ForAll

    ForAll(i => {
        personqueue.personqueue.TryDequeue(...);
        ...
    });
    

    因为不能保证我会跳出正确的一个。

    或者,使用 PLINQ 要并行处理吗?

    2 回复  |  直到 15 年前
        1
  •  4
  •   Gamlor Gabriele Ran    15 年前

    我不是100%确定你想在这里存档什么。你是想把所有的东西都排出来直到什么都不剩吗?或者一次就把很多东西排出来?

     theQueue.AsParallel()
    

    对于ConcurrentQueue,您将得到一个“快照”枚举器。因此,当您迭代并发堆栈时,只迭代快照,而不迭代“活动”队列。

    总的来说,我认为在迭代过程中迭代你正在改变的东西不是一个好主意。

            // this way it's more clear, that we only deque for theQueue.Count items
            // However after this, the queue is probably not empty
            // or maybe the queue is also empty earlier   
            Parallel.For(0, theQueue.Count,
                         new ParallelOptions() {MaxDegreeOfParallelism = 20},
                         () => { 
                             theQueue.TryDequeue(); //and stuff
                         });
    

    这样可以避免在迭代时对某个对象进行操作。但是,在该语句之后,队列仍然可以包含在for循环期间添加的数据。

    要让队列及时清空,您可能需要做更多的工作。这里有一个非常丑陋的解决方案。当队列仍有项目时,创建新任务。每个任务启动时尽可能从队列中退出。最后,我们等待所有任务结束。为了限制并行性,我们从不创建超过20个任务。

            // Probably a kitty died because of this ugly code ;)
            // However, this code tries to get the queue empty in a very aggressive way
            Action consumeFromQueue = () =>
                                          {
                                              while (tt.TryDequeue())
                                              {
                                                  ; // do your stuff
                                              }
                                          };
            var allRunningTasks = new Task[MaxParallism];
            for(int i=0;i<MaxParallism && tt.Count>0;i++)
            {
                allRunningTasks[i] = Task.Factory.StartNew(consumeFromQueue);  
            }
            Task.WaitAll(allRunningTasks);
    
        2
  •  0
  •   ZXX    15 年前

    如果你的目标是一个高吞吐量的真实网站,你不必立即更新数据库,你会更好地选择非常保守的解决方案,而不是额外的层库。

    制作固定大小的数组(guestimate size-比如1000个项目或N秒的请求)和互锁索引,这样请求就可以将数据放入插槽并返回。当一个块被填满(继续检查计数)时,创建另一个块并生成异步委托来处理并将刚刚被填满的块发送给SQL。根据您的数据结构,委托可以将所有数据打包到逗号分隔的数组中,甚至可以是一个简单的XML(当然要测试该数组的性能)并将它们发送到SQL存储过程,这样就最好逐个记录地处理它们—决不要持有大锁。如果它变重了,你可以把你的积木分成几个小的积木。关键的是,您最小化了对SQL的请求数量,始终保持一个分离度,甚至不必为线程池付出代价—您可能根本不需要使用超过2个异步线程。

    这比摆弄Parallel-s要快得多。