代码之家  ›  专栏  ›  技术社区  ›  Steve Dunn supercat

有没有办法限制任务并行库使用的线程?

  •  4
  • Steve Dunn supercat  · 技术社区  · 14 年前

    我正在使用TPL,但我发现使用它的单元测试代码很棘手。

    我尽量不去 introduce a wrapper 因为我觉得这会引起一些问题。

    我知道您可以在TPL中设置处理器关联,但真正好的是设置线程最大值(可能是每个应用程序域)。因此,当将线程最大值设置为1时,TPL将被迫使用它在其上使用的任何线程。

    你怎么认为?这有可能吗(我很肯定不是),以及 应该

    编辑:下面是一个例子

    public class Foo
    {
        public Foo( )
        {
            Task.Factory.StartNew( () => somethingLong( ) )
                .ContinueWith( a => Bar = 1 ) ;
        }
    }
    
    [Test] public void Foo_should_set_Bar_to_1( )
    {
        Assert.Equal(1, new Foo( ).Bar ) ;
    }
    

    测试 可能 除非我提出延期,否则不会通过。我想要一些 Task.MaximumThreads=1 这样TPL就可以连续运行了。

    3 回复  |  直到 8 年前
        1
  •  4
  •   Ian Mercer    14 年前

    TaskScheduler 任务调度器 ,把它传给 TaskFactory . 现在你可以吃了 Task 您创建的对象所针对的对象 那个

    不需要设置为使用一个线程。

    那么,就在你断言之前,打电话给我 Dispose() 任务调度器 :-

    public void Dispose()
    {
        if (tasks != null)
        {
            tasks.CompleteAdding();
    
            foreach (var thread in threads) thread.Join();
    
            tasks.Dispose();
            tasks = null;
        }
    }
    

    这将保证所有任务都已运行。现在你可以继续你的主张了。

    你也可以用 ContinueWith(...)

        2
  •  2
  •   Ade Miller    14 年前

    实际上,这更多的是lambda重代码的可测试性问题,而不是TPL问题。Hightechrider的建议是一个很好的建议,但是基本上你的测试仍然在测试TPL,就像测试你的代码一样。当第一个任务结束,下一个任务开始的时候,你不需要测试。

    说到这里,我想看看调度器方法是否可行。下面是一个使用修改后的StaTaskScheduler的实现 http://code.msdn.microsoft.com/ParExtSamples

        using System;
        using System.Collections.Concurrent;
        using System.Collections.Generic;
        using System.Linq;
        using System.Threading;
        using System.Threading.Tasks;
        using Xunit;
    
        namespace Example
        {
          public class Foo
          {
            private TaskScheduler _scheduler;
    
        public int Bar { get; set; }
    
        private void SomethingLong()
        {
          Thread.SpinWait(10000);
        }
    
        public Foo()
          : this(TaskScheduler.Default)
        {
        }
    
        public Foo(TaskScheduler scheduler)
        {
          _scheduler = scheduler;
        }
    
        public void DoWork()
        {
          var factory = new TaskFactory(_scheduler);
    
          factory.StartNew(() => SomethingLong())
          .ContinueWith(a => Bar = 1, _scheduler);
        }
      }
    
      public class FooTests
      {
        [Fact]
        public void Foo_should_set_Bar_to_1()
        {
          var sch = new StaTaskScheduler(3);
          var target = new Foo(sch);
          target.DoWork();
    
          sch.Dispose();
          Assert.Equal(1, target.Bar);
        }
      }
    
      public sealed class StaTaskScheduler : TaskScheduler, IDisposable
      {
        /// <summary>Stores the queued tasks to be executed by our pool of STA threads.</summary>
        private BlockingCollection<Task> _tasks;
        /// <summary>The STA threads used by the scheduler.</summary>
        private readonly List<Thread> _threads;
    
        /// <summary>Initializes a new instance of the StaTaskScheduler class with the specified concurrency level.</summary>
        /// <param name="numberOfThreads">The number of threads that should be created and used by this scheduler.</param>
        public StaTaskScheduler(int numberOfThreads)
        {
          // Validate arguments
          if (numberOfThreads < 1) throw new ArgumentOutOfRangeException("concurrencyLevel");
    
          // Initialize the tasks collection
          _tasks = new BlockingCollection<Task>();
    
          // Create the threads to be used by this scheduler
          _threads = Enumerable.Range(0, numberOfThreads).Select(i =>
          {
            var thread = new Thread(() =>
            {
              // Continually get the next task and try to execute it.
              // This will continue until the scheduler is disposed and no more tasks remain.
              foreach (var t in _tasks.GetConsumingEnumerable())
              {
                TryExecuteTask(t);
              }
            });
            thread.IsBackground = true;
            // NO STA REQUIREMENT!
            // thread.SetApartmentState(ApartmentState.STA);
            return thread;
          }).ToList();
    
          // Start all of the threads
          _threads.ForEach(t => t.Start());
        }
    
        /// <summary>Queues a Task to be executed by this scheduler.</summary>
        /// <param name="task">The task to be executed.</param>
        protected override void QueueTask(Task task)
        {
          // Push it into the blocking collection of tasks
          _tasks.Add(task);
        }
    
        /// <summary>Provides a list of the scheduled tasks for the debugger to consume.</summary>
        /// <returns>An enumerable of all tasks currently scheduled.</returns>
        protected override IEnumerable<Task> GetScheduledTasks()
        {
          // Serialize the contents of the blocking collection of tasks for the debugger
          return _tasks.ToArray();
        }
    
        /// <summary>Determines whether a Task may be inlined.</summary>
        /// <param name="task">The task to be executed.</param>
        /// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param>
        /// <returns>true if the task was successfully inlined; otherwise, false.</returns>
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
          // Try to inline if the current thread is STA
          return
          Thread.CurrentThread.GetApartmentState() == ApartmentState.STA &&
          TryExecuteTask(task);
        }
    
        /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
        public override int MaximumConcurrencyLevel
        {
          get { return _threads.Count; }
        }
    
        /// <summary>
        /// Cleans up the scheduler by indicating that no more tasks will be queued.
        /// This method blocks until all threads successfully shutdown.
        /// </summary>
        public void Dispose()
        {
          if (_tasks != null)
          {
            // Indicate that no new tasks will be coming in
            _tasks.CompleteAdding();
    
            // Wait for all threads to finish processing tasks
            foreach (var thread in _threads) thread.Join();
    
            // Cleanup
            _tasks.Dispose();
            _tasks = null;
          }
        }
      }
    }
    
        3
  •  1
  •   Jason    14 年前

    public class Foo
    {
        public Foo( )
        {
            Task.Factory.StartNew( () => somethingLong( ) )
                .ContinueWith( a => Bar = 1 ) ;
        }
    }
    
    [Test] public void Foo_should_set_Bar_to_1( )
    {
        Foo foo;
        Task.Factory.ContinueWhenAll(
            new [] {
                new Task(() => {
                    foo = new Foo();
                })
            },
            asserts => { 
                Assert.Equal(1, foo.Bar ) ;
            }
        ).Wait;
    }
    

    希望能听到一些关于这种方法的反馈。