代码之家  ›  专栏  ›  技术社区  ›  Christian P.

带宽限制(C)#

  •  17
  • Christian P.  · 技术社区  · 16 年前

    我正在开发一个在后台连续发送数据流的程序,我希望允许用户设置上传和下载限制的上限。

    我读过 token bucket leaky bucket Alghorhithms,似乎后者符合描述,因为这不是最大化网络带宽的问题,而是尽可能不引人注目。

    不过,我对如何实现这一点有点不确定。一种自然的方法是扩展抽象流类以使扩展现有流量变得简单,但是这是否不需要额外的线程参与在同时接收数据的同时发送数据(漏桶)?任何关于其他实现的提示都会得到赞赏。

    此外,虽然我可以修改程序接收的数据量,但是带宽限制在C级别的工作情况如何?计算机仍然会接收数据并简单地保存它,有效地取消节流效果,还是会等到我要求接收更多数据?

    编辑:我对限制传入和传出数据很感兴趣,因为我无法控制流的另一端。

    3 回复  |  直到 8 年前
        1
  •  24
  •   arul    16 年前

    this article about ThrottledStream class. 它应该适合你的需要。

        2
  •  2
  •   Johannes Egger    8 年前

    基于@0xDeadBeef的解决方案,我创建了基于Rx调度程序的以下(可测试)解决方案:

    public class ThrottledStream : Stream
    {
        private readonly Stream parent;
        private readonly int maxBytesPerSecond;
        private readonly IScheduler scheduler;
        private readonly IStopwatch stopwatch;
    
        private long processed;
    
        public ThrottledStream(Stream parent, int maxBytesPerSecond, IScheduler scheduler)
        {
            this.maxBytesPerSecond = maxBytesPerSecond;
            this.parent = parent;
            this.scheduler = scheduler;
            stopwatch = scheduler.StartStopwatch();
            processed = 0;
        }
    
        public ThrottledStream(Stream parent, int maxBytesPerSecond)
            : this (parent, maxBytesPerSecond, Scheduler.Immediate)
        {
        }
    
        protected void Throttle(int bytes)
        {
            processed += bytes;
            var targetTime = TimeSpan.FromSeconds((double)processed / maxBytesPerSecond);
            var actualTime = stopwatch.Elapsed;
            var sleep = targetTime - actualTime;
            if (sleep > TimeSpan.Zero)
            {
                using (var waitHandle = new AutoResetEvent(initialState: false))
                {
                    scheduler.Sleep(sleep).GetAwaiter().OnCompleted(() => waitHandle.Set());
                    waitHandle.WaitOne();
                }
            }
        }
    
        public override bool CanRead
        {
            get { return parent.CanRead; }
        }
    
        public override bool CanSeek
        {
            get { return parent.CanSeek; }
        }
    
        public override bool CanWrite
        {
            get { return parent.CanWrite; }
        }
    
        public override void Flush()
        {
            parent.Flush();
        }
    
        public override long Length
        {
            get { return parent.Length; }
        }
    
        public override long Position
        {
            get
            {
                return parent.Position;
            }
            set
            {
                parent.Position = value;
            }
        }
    
        public override int Read(byte[] buffer, int offset, int count)
        {
            var read = parent.Read(buffer, offset, count);
            Throttle(read);
            return read;
        }
    
        public override long Seek(long offset, SeekOrigin origin)
        {
            return parent.Seek(offset, origin);
        }
    
        public override void SetLength(long value)
        {
            parent.SetLength(value);
        }
    
        public override void Write(byte[] buffer, int offset, int count)
        {
            Throttle(count);
            parent.Write(buffer, offset, count);
        }
    }
    

    一些测试只需要几毫秒:

    [TestMethod]
    public void ShouldThrottleReading()
    {
        var content = Enumerable
            .Range(0, 1024 * 1024)
            .Select(_ => (byte)'a')
            .ToArray();
        var scheduler = new TestScheduler();
        var source = new ThrottledStream(new MemoryStream(content), content.Length / 8, scheduler);
        var target = new MemoryStream();
    
        var t = source.CopyToAsync(target);
    
        t.Wait(10).Should().BeFalse();
        scheduler.AdvanceTo(TimeSpan.FromSeconds(4).Ticks);
        t.Wait(10).Should().BeFalse();
        scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks - 1);
        t.Wait(10).Should().BeFalse();
        scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks);
        t.Wait(10).Should().BeTrue();
    }
    
    [TestMethod]
    public void ShouldThrottleWriting()
    {
        var content = Enumerable
            .Range(0, 1024 * 1024)
            .Select(_ => (byte)'a')
            .ToArray();
        var scheduler = new TestScheduler();
        var source = new MemoryStream(content);
        var target = new ThrottledStream(new MemoryStream(), content.Length / 8, scheduler);
    
        var t = source.CopyToAsync(target);
    
        t.Wait(10).Should().BeFalse();
        scheduler.AdvanceTo(TimeSpan.FromSeconds(4).Ticks);
        t.Wait(10).Should().BeFalse();
        scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks - 1);
        t.Wait(10).Should().BeFalse();
        scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks);
        t.Wait(10).Should().BeTrue();
    }
    
        3
  •  1
  •   0xDEADBEEF    10 年前

    我提出了由Arul所提到的throttledStream类的另一个实现。我的版本使用一个waithandle和一个间隔为1s的计时器:

    public ThrottledStream(Stream parentStream, int maxBytesPerSecond=int.MaxValue) 
    {
        MaxBytesPerSecond = maxBytesPerSecond;
        parent = parentStream;
        processed = 0;
        resettimer = new System.Timers.Timer();
        resettimer.Interval = 1000;
        resettimer.Elapsed += resettimer_Elapsed;
        resettimer.Start();         
    }
    
    protected void Throttle(int bytes)
    {
        try
        {
            processed += bytes;
            if (processed >= maxBytesPerSecond)
                wh.WaitOne();
        }
        catch
        {
        }
    }
    
    private void resettimer_Elapsed(object sender, ElapsedEventArgs e)
    {
        processed = 0;
        wh.Set();
    }
    

    当带宽限制超过时,线程将休眠到下一秒开始。无需计算最佳睡眠时间。

    全面实施:

    public class ThrottledStream : Stream
    {
        #region Properties
    
        private int maxBytesPerSecond;
        /// <summary>
        /// Number of Bytes that are allowed per second
        /// </summary>
        public int MaxBytesPerSecond
        {
            get { return maxBytesPerSecond; }
            set 
            {
                if (value < 1)
                    throw new ArgumentException("MaxBytesPerSecond has to be >0");
    
                maxBytesPerSecond = value; 
            }
        }
    
        #endregion
    
    
        #region Private Members
    
        private int processed;
        System.Timers.Timer resettimer;
        AutoResetEvent wh = new AutoResetEvent(true);
        private Stream parent;
    
        #endregion
    
        /// <summary>
        /// Creates a new Stream with Databandwith cap
        /// </summary>
        /// <param name="parentStream"></param>
        /// <param name="maxBytesPerSecond"></param>
        public ThrottledStream(Stream parentStream, int maxBytesPerSecond=int.MaxValue) 
        {
            MaxBytesPerSecond = maxBytesPerSecond;
            parent = parentStream;
            processed = 0;
            resettimer = new System.Timers.Timer();
            resettimer.Interval = 1000;
            resettimer.Elapsed += resettimer_Elapsed;
            resettimer.Start();         
        }
    
        protected void Throttle(int bytes)
        {
            try
            {
                processed += bytes;
                if (processed >= maxBytesPerSecond)
                    wh.WaitOne();
            }
            catch
            {
            }
        }
    
        private void resettimer_Elapsed(object sender, ElapsedEventArgs e)
        {
            processed = 0;
            wh.Set();
        }
    
        #region Stream-Overrides
    
        public override void Close()
        {
            resettimer.Stop();
            resettimer.Close();
            base.Close();
        }
        protected override void Dispose(bool disposing)
        {
            resettimer.Dispose();
            base.Dispose(disposing);
        }
    
        public override bool CanRead
        {
            get { return parent.CanRead; }
        }
    
        public override bool CanSeek
        {
            get { return parent.CanSeek; }
        }
    
        public override bool CanWrite
        {
            get { return parent.CanWrite; }
        }
    
        public override void Flush()
        {
            parent.Flush();
        }
    
        public override long Length
        {
            get { return parent.Length; }
        }
    
        public override long Position
        {
            get
            {
                return parent.Position;
            }
            set
            {
                parent.Position = value;
            }
        }
    
        public override int Read(byte[] buffer, int offset, int count)
        {
            Throttle(count);
            return parent.Read(buffer, offset, count);
        }
    
        public override long Seek(long offset, SeekOrigin origin)
        {
            return parent.Seek(offset, origin);
        }
    
        public override void SetLength(long value)
        {
            parent.SetLength(value);
        }
    
        public override void Write(byte[] buffer, int offset, int count)
        {
            Throttle(count);
            parent.Write(buffer, offset, count);
        }
    
        #endregion
    
    
    }
    
    推荐文章