代码之家  ›  专栏  ›  技术社区  ›  Yuri Astrakhan

在C#/.NET中实现生产者/消费者的异步流

  •  8
  • Yuri Astrakhan  · 技术社区  · 14 年前

    有一个lib将其结果输出到给定的Stream对象中。我想在lib完成之前开始使用结果。流应该是阻塞的,以简化使用,并避免在生产者运行过远时过度消耗内存;线程安全,以允许生产者和消费者独立存在。

    一旦lib完成,生产者线程应该关闭流,从而通知使用者没有更多的数据。

    我在考虑使用NetworkStream或PipeStream(匿名),但这两种方法在通过内核发送数据时可能都比较慢。

    
    var stream = new AsyncBlockingBufferedStream();
    
    

    void ProduceData() { // In producer thread externalLib.GenerateData(stream); stream.Close(); }

    void ConsumeData() { // In consumer thread int read; while ((read = stream.Read(...)) != 0) { ... } }

    3 回复  |  直到 14 年前
        1
  •  16
  •   Yurik    5 年前

    基于Chris Taylor之前的回答,这里是我自己的,经过修改的,有更快的基于块的操作和更正的写完成通知。它现在被标记为wiki,所以您可以更改它。

    public class BlockingStream : Stream
    {
        private readonly BlockingCollection<byte[]> _blocks;
        private byte[] _currentBlock;
        private int _currentBlockIndex;
    
        public BlockingStream(int streamWriteCountCache)
        {
            _blocks = new BlockingCollection<byte[]>(streamWriteCountCache);
        }
    
        public override bool CanTimeout { get { return false; } }
        public override bool CanRead { get { return true; } }
        public override bool CanSeek { get { return false; } }
        public override bool CanWrite { get { return true; } }
        public override long Length { get { throw new NotSupportedException(); } }
        public override void Flush() {}
        public long TotalBytesWritten { get; private set; }
        public int WriteCount { get; private set; }
    
        public override long Position
        {
            get { throw new NotSupportedException(); }
            set { throw new NotSupportedException(); }
        }
    
        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException();
        }
    
        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }
    
        public override int Read(byte[] buffer, int offset, int count)
        {
            ValidateBufferArgs(buffer, offset, count);
    
            int bytesRead = 0;
            while (true)
            {
                if (_currentBlock != null)
                {
                    int copy = Math.Min(count - bytesRead, _currentBlock.Length - _currentBlockIndex);
                    Array.Copy(_currentBlock, _currentBlockIndex, buffer, offset + bytesRead, copy);
                    _currentBlockIndex += copy;
                    bytesRead += copy;
    
                    if (_currentBlock.Length <= _currentBlockIndex)
                    {
                        _currentBlock = null;
                        _currentBlockIndex = 0;
                    }
    
                    if (bytesRead == count)
                        return bytesRead;
                }
    
                if (!_blocks.TryTake(out _currentBlock, Timeout.Infinite))
                    return bytesRead;
            }
        }
    
        public override void Write(byte[] buffer, int offset, int count)
        {
            ValidateBufferArgs(buffer, offset, count);
    
            var newBuf = new byte[count];
            Array.Copy(buffer, offset, newBuf, 0, count);
            _blocks.Add(newBuf);
            TotalBytesWritten += count;
            WriteCount++;
        }
    
        protected override void Dispose(bool disposing)
        {
            base.Dispose(disposing);
            if (disposing)
            {
                _blocks.Dispose();
            }
        }
    
        public override void Close()
        {
            CompleteWriting();
            base.Close();
        }
    
        public void CompleteWriting()
        {
            _blocks.CompleteAdding();
        }
    
        private static void ValidateBufferArgs(byte[] buffer, int offset, int count)
        {
            if (buffer == null)
                throw new ArgumentNullException("buffer");
            if (offset < 0)
                throw new ArgumentOutOfRangeException("offset");
            if (count < 0)
                throw new ArgumentOutOfRangeException("count");
            if (buffer.Length - offset < count)
                throw new ArgumentException("buffer.Length - offset < count");
        }
    }
    
        2
  •  3
  •   Chris Taylor    14 年前

    我将首先在这里,这是一个非常简约的实现,我还没有时间真正测试的性能特点。可能只有足够的资源可以自己进行一些性能测试。我在研究您的问题时得到的想法是创建一个使用 BlockingCollection 作为存储介质。

    基本上,这将给你一个流,你可以读/写从不同的线程,并将节流生产者,如果消费者方面落后。我重申,这不是一个健壮的实现,只是一个概念的快速证明,需要进行更多的错误检查、参数验证和一个体面的方案来处理错误 Close

    更新:

    公共类阻止流:流
    private int\u readTimeout=-1; private int\u writeTimeout=-1;

        public BlockingStream(int maxBytes)
        {
          _data = new BlockingCollection<byte>(maxBytes);      
        }
    
        public override int ReadTimeout
        {
          get
          {
            return _readTimeout;
          }
          set
          {
            _readTimeout = value;
          }
        }
    
        public override int WriteTimeout
        {
          get
          {
            return _writeTimeout;
          }
          set
          {
            _writeTimeout = value;
          }
        }
    
        public override bool CanTimeout
        {
          get
          {
            return true;
          }
        }
    
        public override bool CanRead
        {
          get { return true; }
        }
    
        public override bool CanSeek
        {
          get { return false; }
        }
    
        public override bool CanWrite
        {
          get { return true; }
        }
    
        public override void Flush()
        {
          return;
        }
    
        public override long Length
        {
          get { throw new NotImplementedException(); }
        }
    
        public override long Position
        {
          get
          {
            throw new NotImplementedException();
          }
          set
          {
            throw new NotImplementedException();
          }
        }
    
        public override long Seek(long offset, SeekOrigin origin)
        {
          throw new NotImplementedException();
        }
    
        public override void SetLength(long value)
        {
          throw new NotImplementedException();
        }
    
        public override int ReadByte()
        {
          int returnValue = -1;
          try
          {
            byte b;
            if (_data.TryTake(out b, ReadTimeout, _cts.Token))
            {
              returnValue = (int)b;
            }
          }
          catch (OperationCanceledException)
          {
          }
          return returnValue;
        }
    
        public override int Read(byte[] buffer, int offset, int count)
        {
          int bytesRead = 0;
          byte b;
          try
          {
            while (bytesRead < count && _data.TryTake(out b, ReadTimeout, _cts.Token))
            {
              buffer[offset + bytesRead] = b;
              bytesRead++;
            }
          }
          catch (OperationCanceledException)
          {
            bytesRead = 0;
          }
          return bytesRead;
        }
    
        public override void WriteByte(byte value)
        {
          try
          {
            _data.TryAdd(value, WriteTimeout, _cts.Token);  
          }
          catch (OperationCanceledException)
          {
          }
        }
    
        public override void Write(byte[] buffer, int offset, int count)
        {
          try
          {
            for (int i = offset; i < offset + count; ++i)
            {
              _data.TryAdd(buffer[i], WriteTimeout, _cts.Token);
            }
          }
          catch (OperationCanceledException)
          {
          }
        }
    
        public override void Close()
        {
          _cts.Cancel();
          base.Close();
        }
    
        protected override void Dispose(bool disposing)
        {
          base.Dispose(disposing);
          if (disposing)
          {
            _data.Dispose();
          }
        }
      }
    

    构造流时,传递流在阻塞写入程序之前应缓冲的最大字节数。这是一个小测试的功能,这是唯一的测试,是做。。。

      class Program
      {
        static BlockingStream _dataStream = new BlockingStream(10);
        static Random _rnd = new Random();
        [STAThread]
        static void Main(string[] args)
        {
          Task producer = new Task(() =>
            {
              Thread.Sleep(1000);
              for (int i = 0; i < 100; ++i)
              {
                _dataStream.WriteByte((byte)_rnd.Next(0, 255));            
              }          
            });
    
          Task consumer = new Task(() =>
            {
              int i = 0;
              while (true)
              {
                Console.WriteLine("{0} \t-\t {1}",_dataStream.ReadByte(), i++);
                // Slow the consumer down.
                Thread.Sleep(500);
              }
            });
    
          producer.Start();
          consumer.Start();
    
          Console.ReadKey();
        }
    
        3
  •  3
  •   Community CDub    8 年前

    Yuric BlockingStream 在我们的代码中运行20分钟到1小时后,性能急剧下降。我认为性能下降的原因是垃圾收集器和在使用它快速流式传输大量数据时在该方法中创建的大量缓冲区(我没有时间证明这一点)。我最终创建了一个环缓冲区版本,当与我们的代码一起使用时,它的性能不会下降。

    /// <summary>
    /// A ring-buffer stream that you can read from and write to from
    /// different threads.
    /// </summary>
    public class RingBufferedStream : Stream
    {
        private readonly byte[] store;
    
        private readonly ManualResetEventAsync writeAvailable
            = new ManualResetEventAsync(false);
    
        private readonly ManualResetEventAsync readAvailable
            = new ManualResetEventAsync(false);
    
        private readonly CancellationTokenSource cancellationTokenSource
            = new CancellationTokenSource();
    
        private int readPos;
    
        private int readAvailableByteCount;
    
        private int writePos;
    
        private int writeAvailableByteCount;
    
        private bool disposed;
    
        /// <summary>
        /// Initializes a new instance of the <see cref="RingBufferedStream"/>
        /// class.
        /// </summary>
        /// <param name="bufferSize">
        /// The maximum number of bytes to buffer.
        /// </param>
        public RingBufferedStream(int bufferSize)
        {
            this.store = new byte[bufferSize];
            this.writeAvailableByteCount = bufferSize;
            this.readAvailableByteCount = 0;
        }
    
        /// <inheritdoc/>
        public override bool CanRead => true;
    
        /// <inheritdoc/>
        public override bool CanSeek => false;
    
        /// <inheritdoc/>
        public override bool CanWrite => true;
    
        /// <inheritdoc/>
        public override long Length
        {
            get
            {
                throw new NotSupportedException(
                    "Cannot get length on RingBufferedStream");
            }
        }
    
        /// <inheritdoc/>
        public override int ReadTimeout { get; set; } = Timeout.Infinite;
    
        /// <inheritdoc/>
        public override int WriteTimeout { get; set; } = Timeout.Infinite;
    
        /// <inheritdoc/>
        public override long Position
        {
            get
            {
                throw new NotSupportedException(
                    "Cannot set position on RingBufferedStream");
            }
    
            set
            {
                throw new NotSupportedException(
                    "Cannot set position on RingBufferedStream");
            }
        }
    
        /// <summary>
        /// Gets the number of bytes currently buffered.
        /// </summary>
        public int BufferedByteCount => this.readAvailableByteCount;
    
        /// <inheritdoc/>
        public override void Flush()
        {
            // nothing to do
        }
    
        /// <summary>
        /// Set the length of the current stream. Always throws <see
        /// cref="NotSupportedException"/>.
        /// </summary>
        /// <param name="value">
        /// The desired length of the current stream in bytes.
        /// </param>
        public override void SetLength(long value)
        {
            throw new NotSupportedException(
                "Cannot set length on RingBufferedStream");
        }
    
        /// <summary>
        /// Sets the position in the current stream. Always throws <see
        /// cref="NotSupportedException"/>.
        /// </summary>
        /// <param name="offset">
        /// The byte offset to the <paramref name="origin"/> parameter.
        /// </param>
        /// <param name="origin">
        /// A value of type <see cref="SeekOrigin"/> indicating the reference
        /// point used to obtain the new position.
        /// </param>
        /// <returns>
        /// The new position within the current stream.
        /// </returns>
        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException("Cannot seek on RingBufferedStream");
        }
    
        /// <inheritdoc/>
        public override void Write(byte[] buffer, int offset, int count)
        {
            if (this.disposed)
            {
                throw new ObjectDisposedException("RingBufferedStream");
            }
    
            Monitor.Enter(this.store);
            bool haveLock = true;
            try
            {
                while (count > 0)
                {
                    if (this.writeAvailableByteCount == 0)
                    {
                        this.writeAvailable.Reset();
                        Monitor.Exit(this.store);
                        haveLock = false;
                        bool canceled;
                        if (!this.writeAvailable.Wait(
                            this.WriteTimeout,
                            this.cancellationTokenSource.Token,
                            out canceled) || canceled)
                        {
                            break;
                        }
    
                        Monitor.Enter(this.store);
                        haveLock = true;
                    }
                    else
                    {
                        var toWrite = this.store.Length - this.writePos;
                        if (toWrite > this.writeAvailableByteCount)
                        {
                            toWrite = this.writeAvailableByteCount;
                        }
    
                        if (toWrite > count)
                        {
                            toWrite = count;
                        }
    
                        Array.Copy(
                            buffer,
                            offset,
                            this.store,
                            this.writePos,
                            toWrite);
                        offset += toWrite;
                        count -= toWrite;
                        this.writeAvailableByteCount -= toWrite;
                        this.readAvailableByteCount += toWrite;
                        this.writePos += toWrite;
                        if (this.writePos == this.store.Length)
                        {
                            this.writePos = 0;
                        }
    
                        this.readAvailable.Set();
                    }
                }
            }
            finally
            {
                if (haveLock)
                {
                    Monitor.Exit(this.store);
                }
            }
        }
    
        /// <inheritdoc/>
        public override void WriteByte(byte value)
        {
            if (this.disposed)
            {
                throw new ObjectDisposedException("RingBufferedStream");
            }
    
            Monitor.Enter(this.store);
            bool haveLock = true;
            try
            {
                while (true)
                {
                    if (this.writeAvailableByteCount == 0)
                    {
                        this.writeAvailable.Reset();
                        Monitor.Exit(this.store);
                        haveLock = false;
                        bool canceled;
                        if (!this.writeAvailable.Wait(
                            this.WriteTimeout,
                            this.cancellationTokenSource.Token,
                            out canceled) || canceled)
                        {
                            break;
                        }
    
                        Monitor.Enter(this.store);
                        haveLock = true;
                    }
                    else
                    {
                        this.store[this.writePos] = value;
                        --this.writeAvailableByteCount;
                        ++this.readAvailableByteCount;
                        ++this.writePos;
                        if (this.writePos == this.store.Length)
                        {
                            this.writePos = 0;
                        }
    
                        this.readAvailable.Set();
                        break;
                    }
                }
            }
            finally
            {
                if (haveLock)
                {
                    Monitor.Exit(this.store);
                }
            }
        }
    
        /// <inheritdoc/>
        public override int Read(byte[] buffer, int offset, int count)
        {
            if (this.disposed)
            {
                throw new ObjectDisposedException("RingBufferedStream");
            }
    
            Monitor.Enter(this.store);
            int ret = 0;
            bool haveLock = true;
            try
            {
                while (count > 0)
                {
                    if (this.readAvailableByteCount == 0)
                    {
                        this.readAvailable.Reset();
                        Monitor.Exit(this.store);
                        haveLock = false;
                        bool canceled;
                        if (!this.readAvailable.Wait(
                            this.ReadTimeout,
                            this.cancellationTokenSource.Token,
                            out canceled) || canceled)
                        {
                            break;
                        }
    
                        Monitor.Enter(this.store);
                        haveLock = true;
                    }
                    else
                    {
                        var toRead = this.store.Length - this.readPos;
                        if (toRead > this.readAvailableByteCount)
                        {
                            toRead = this.readAvailableByteCount;
                        }
    
                        if (toRead > count)
                        {
                            toRead = count;
                        }
    
                        Array.Copy(
                            this.store,
                            this.readPos,
                            buffer,
                            offset,
                            toRead);
                        offset += toRead;
                        count -= toRead;
                        this.readAvailableByteCount -= toRead;
                        this.writeAvailableByteCount += toRead;
                        ret += toRead;
                        this.readPos += toRead;
                        if (this.readPos == this.store.Length)
                        {
                            this.readPos = 0;
                        }
    
                        this.writeAvailable.Set();
                    }
                }
            }
            finally
            {
                if (haveLock)
                {
                    Monitor.Exit(this.store);
                }
            }
    
            return ret;
        }
    
        /// <inheritdoc/>
        public override int ReadByte()
        {
            if (this.disposed)
            {
                throw new ObjectDisposedException("RingBufferedStream");
            }
    
            Monitor.Enter(this.store);
            int ret = -1;
            bool haveLock = true;
            try
            {
                while (true)
                {
                    if (this.readAvailableByteCount == 0)
                    {
                        this.readAvailable.Reset();
                        Monitor.Exit(this.store);
                        haveLock = false;
                        bool canceled;
                        if (!this.readAvailable.Wait(
                            this.ReadTimeout,
                            this.cancellationTokenSource.Token,
                            out canceled) || canceled)
                        {
                            break;
                        }
    
                        Monitor.Enter(this.store);
                        haveLock = true;
                    }
                    else
                    {
                        ret = this.store[this.readPos];
                        ++this.writeAvailableByteCount;
                        --this.readAvailableByteCount;
                        ++this.readPos;
                        if (this.readPos == this.store.Length)
                        {
                            this.readPos = 0;
                        }
    
                        this.writeAvailable.Set();
                        break;
                    }
                }
            }
            finally
            {
                if (haveLock)
                {
                    Monitor.Exit(this.store);
                }
            }
    
            return ret;
        }
    
        /// <inheritdoc/>
        protected override void Dispose(bool disposing)
        {
            if (disposing)
            {
                this.disposed = true;
                this.cancellationTokenSource.Cancel();
            }
    
            base.Dispose(disposing);
        }
    }
    

    ManualResetEventAsync 以帮助清洁关闭。

    /// <summary>
    ///     Asynchronous version of <see cref="ManualResetEvent" />
    /// </summary>
    public sealed class ManualResetEventAsync
    {
        /// <summary>
        /// The task completion source.
        /// </summary>
        private volatile TaskCompletionSource<bool> taskCompletionSource =
            new TaskCompletionSource<bool>();
    
        /// <summary>
        /// Initializes a new instance of the <see cref="ManualResetEventAsync"/>
        /// class with a <see cref="bool"/> value indicating whether to set the
        /// initial state to signaled.
        /// </summary>
        /// <param name="initialState">
        /// True to set the initial state to signaled; false to set the initial
        /// state to non-signaled.
        /// </param>
        public ManualResetEventAsync(bool initialState)
        {
            if (initialState)
            {
                this.Set();
            }
        }
    
        /// <summary>
        /// Return a task that can be consumed by <see cref="Task.Wait()"/>
        /// </summary>
        /// <returns>
        /// The asynchronous waiter.
        /// </returns>
        public Task GetWaitTask()
        {
            return this.taskCompletionSource.Task;
        }
    
        /// <summary>
        /// Mark the event as signaled.
        /// </summary>
        public void Set()
        {
            var tcs = this.taskCompletionSource;
            Task.Factory.StartNew(
                s => ((TaskCompletionSource<bool>)s).TrySetResult(true),
                tcs,
                CancellationToken.None,
                TaskCreationOptions.PreferFairness,
                TaskScheduler.Default);
            tcs.Task.Wait();
        }
    
        /// <summary>
        /// Mark the event as not signaled.
        /// </summary>
        public void Reset()
        {
            while (true)
            {
                var tcs = this.taskCompletionSource;
                if (!tcs.Task.IsCompleted
    #pragma warning disable 420
                    || Interlocked.CompareExchange(
                        ref this.taskCompletionSource,
                        new TaskCompletionSource<bool>(),
                        tcs) == tcs)
    #pragma warning restore 420
                {
                    return;
                }
            }
        }
    
        /// <summary>
        /// Waits for the <see cref="ManualResetEventAsync"/> to be signaled.
        /// </summary>
        /// <exception cref="T:System.AggregateException">
        /// The <see cref="ManualResetEventAsync"/> waiting <see cref="Task"/>
        /// was canceled -or- an exception was thrown during the execution
        /// of the <see cref="ManualResetEventAsync"/> waiting <see cref="Task"/>.
        /// </exception>
        public void Wait()
        {
            this.GetWaitTask().Wait();
        }
    
        /// <summary>
        /// Waits for the <see cref="ManualResetEventAsync"/> to be signaled.
        /// </summary>
        /// <param name="cancellationToken">
        /// A <see cref="CancellationToken"/> to observe while waiting for
        /// the task to complete.
        /// </param>
        /// <exception cref="T:System.OperationCanceledException">
        /// The <paramref name="cancellationToken"/> was canceled.
        /// </exception>
        /// <exception cref="T:System.AggregateException">
        /// The <see cref="ManualResetEventAsync"/> waiting <see cref="Task"/> was
        /// canceled -or- an exception was thrown during the execution of the
        /// <see cref="ManualResetEventAsync"/> waiting <see cref="Task"/>.
        /// </exception>
        public void Wait(CancellationToken cancellationToken)
        {
            this.GetWaitTask().Wait(cancellationToken);
        }
    
        /// <summary>
        /// Waits for the <see cref="ManualResetEventAsync"/> to be signaled.
        /// </summary>
        /// <param name="cancellationToken">
        /// A <see cref="CancellationToken"/> to observe while waiting for
        /// the task to complete.
        /// </param>
        /// <param name="canceled">
        /// Set to true if the wait was canceled via the <paramref
        /// name="cancellationToken"/>.
        /// </param>
        public void Wait(CancellationToken cancellationToken, out bool canceled)
        {
            try
            {
                this.GetWaitTask().Wait(cancellationToken);
                canceled = false;
            }
            catch (Exception ex)
                when (ex is OperationCanceledException
                    || (ex is AggregateException
                        && ex.InnerOf<OperationCanceledException>() != null))
            {
                canceled = true;
            }
        }
    
        /// <summary>
        /// Waits for the <see cref="ManualResetEventAsync"/> to be signaled.
        /// </summary>
        /// <param name="timeout">
        /// A <see cref="System.TimeSpan"/> that represents the number of
        /// milliseconds to wait, or a <see cref="System.TimeSpan"/> that
        /// represents -1 milliseconds to wait indefinitely.
        /// </param>
        /// <returns>
        /// true if the <see cref="ManualResetEventAsync"/> was signaled within
        /// the allotted time; otherwise, false.
        /// </returns>
        /// <exception cref="T:System.ArgumentOutOfRangeException">
        /// <paramref name="timeout"/> is a negative number other than -1
        /// milliseconds, which represents an infinite time-out -or-
        /// timeout is greater than <see cref="int.MaxValue"/>.
        /// </exception>
        public bool Wait(TimeSpan timeout)
        {
            return this.GetWaitTask().Wait(timeout);
        }
    
        /// <summary>
        /// Waits for the <see cref="ManualResetEventAsync"/> to be signaled.
        /// </summary>
        /// <param name="millisecondsTimeout">
        /// The number of milliseconds to wait, or
        /// <see cref="System.Threading.Timeout.Infinite"/> (-1) to wait
        /// indefinitely.
        /// </param>
        /// <returns>
        /// true if the <see cref="ManualResetEventAsync"/> was signaled within
        /// the allotted time; otherwise, false.
        /// </returns>
        /// <exception cref="T:System.ArgumentOutOfRangeException">
        /// <paramref name="millisecondsTimeout"/> is a negative number other
        /// than -1, which represents an infinite time-out.
        /// </exception>
        public bool Wait(int millisecondsTimeout)
        {
            return this.GetWaitTask().Wait(millisecondsTimeout);
        }
    
        /// <summary>
        /// Waits for the <see cref="ManualResetEventAsync"/> to be signaled.
        /// </summary>
        /// <param name="millisecondsTimeout">
        /// The number of milliseconds to wait, or
        /// <see cref="System.Threading.Timeout.Infinite"/> (-1) to wait
        /// indefinitely.
        /// </param>
        /// <param name="cancellationToken">
        /// A <see cref="CancellationToken"/> to observe while waiting for the
        /// <see cref="ManualResetEventAsync"/> to be signaled.
        /// </param>
        /// <returns>
        /// true if the <see cref="ManualResetEventAsync"/> was signaled within
        /// the allotted time; otherwise, false.
        /// </returns>
        /// <exception cref="T:System.AggregateException">
        /// The <see cref="ManualResetEventAsync"/> waiting <see cref="Task"/>
        /// was canceled -or- an exception was thrown during the execution of
        /// the <see cref="ManualResetEventAsync"/> waiting <see cref="Task"/>.
        /// </exception>
        /// <exception cref="T:System.ArgumentOutOfRangeException">
        /// <paramref name="millisecondsTimeout"/> is a negative number other
        /// than -1, which represents an infinite time-out.
        /// </exception>
        /// <exception cref="T:System.OperationCanceledException">
        /// The <paramref name="cancellationToken"/> was canceled.
        /// </exception>
        public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken)
        {
            return this.GetWaitTask().Wait(millisecondsTimeout, cancellationToken);
        }
    
        /// <summary>
        /// Waits for the <see cref="ManualResetEventAsync"/> to be signaled.
        /// </summary>
        /// <param name="millisecondsTimeout">
        /// The number of milliseconds to wait, or
        /// <see cref="System.Threading.Timeout.Infinite"/> (-1) to wait
        /// indefinitely.
        /// </param>
        /// <param name="cancellationToken">
        /// A <see cref="CancellationToken"/> to observe while waiting for the
        /// <see cref="ManualResetEventAsync"/> to be signaled.
        /// </param>
        /// <param name="canceled">
        /// Set to true if the wait was canceled via the <paramref
        /// name="cancellationToken"/>.
        /// </param>
        /// <returns>
        /// true if the <see cref="ManualResetEventAsync"/> was signaled within
        /// the allotted time; otherwise, false.
        /// </returns>
        /// <exception cref="T:System.ArgumentOutOfRangeException">
        /// <paramref name="millisecondsTimeout"/> is a negative number other
        /// than -1, which represents an infinite time-out.
        /// </exception>
        public bool Wait(
            int millisecondsTimeout,
            CancellationToken cancellationToken,
            out bool canceled)
        {
            bool ret = false;
            try
            {
                ret = this.GetWaitTask().Wait(millisecondsTimeout, cancellationToken);
                canceled = false;
            }
            catch (Exception ex)
                when (ex is OperationCanceledException
                    || (ex is AggregateException
                        && ex.InnerOf<OperationCanceledException>() != null))
            {
                canceled = true;
            }
    
            return ret;
        }
    }
    

    而且, InnerOf<T> 分机。。。

    /// <summary>
    ///     Extension functions.
    /// </summary>
    public static class Extensions
    {
        /// <summary>
        /// Finds the first exception of the requested type.
        /// </summary>
        /// <typeparam name="T">
        /// The type of exception to return
        /// </typeparam>
        /// <param name="ex">
        /// The exception to look in.
        /// </param>
        /// <returns>
        /// The exception or the first inner exception that matches the
        /// given type; null if not found.
        /// </returns>
        public static T InnerOf<T>(this Exception ex)
            where T : Exception
        {
            return (T)InnerOf(ex, typeof(T));
        }
    
        /// <summary>
        /// Finds the first exception of the requested type.
        /// </summary>
        /// <param name="ex">
        /// The exception to look in.
        /// </param>
        /// <param name="t">
        /// The type of exception to return
        /// </param>
        /// <returns>
        /// The exception or the first inner exception that matches the
        /// given type; null if not found.
        /// </returns>
        public static Exception InnerOf(this Exception ex, Type t)
        {
            if (ex == null || t.IsInstanceOfType(ex))
            {
                return ex;
            }
    
            var ae = ex as AggregateException;
            if (ae != null)
            {
                foreach (var e in ae.InnerExceptions)
                {
                    var ret = InnerOf(e, t);
                    if (ret != null)
                    {
                        return ret;
                    }
                }
            }
    
            return InnerOf(ex.InnerException, t);
        }
    }