代码之家  ›  专栏  ›  技术社区  ›  Etienne Charland

如何在运行时更改SemaphoreSlim大小

  •  0
  • Etienne Charland  · 技术社区  · 5 年前

    我面临一个问题,需要限制对另一个web服务器的调用次数。它会有所不同,因为服务器是共享的,可能会有更多或更少的容量。

    我在考虑使用SemaphoreSlim类,但没有公共属性来更改最大计数。

    我应该将SemaphoreSlim类包装在另一个处理最大计数的类中吗?有没有更好的办法?

    编辑:

    以下是我正在尝试的:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace Semaphore
    {
    class Program
    {
        static SemaphoreSlim _sem = new SemaphoreSlim(10,10000);
    
        static void Main(string[] args)
        {
            int max = 15;
    
            for (int i = 1; i <= 50; i++)
            {
                new Thread(Enter).Start(new int[] { i, max});
            }
    
            Console.ReadLine();
    
            max = 11;
    
            for (int i = 1; i <= 50; i++)
            {
                new Thread(Enter).Start(new int[] { i, max });
            }
        }
    
        static void Enter(object param)
        {
            int[] arr = (int[])param;
            int id = arr[0];
            int max = arr[1];
    
            try
            {
                Console.WriteLine(_sem.CurrentCount);
    
                if (_sem.CurrentCount <= max)
                    _sem.Release(1);
                else
                {
                    _sem.Wait(1000);
    
                    Console.WriteLine(id + " wants to enter");
    
                    Thread.Sleep((1000 * id) / 2); // can be here at
    
                    Console.WriteLine(id + " is in!"); // Only three threads
    
                }
            }
            catch(Exception ex)
            {
                Console.WriteLine("opps ", id);
                Console.WriteLine(ex.Message);
            }
            finally            
            {
                _sem.Release();
            }
        }
    }
    }
    

    问题:

    1-_sem.Wait(1000)应该取消将执行超过1000ms的线程的执行,不是吗?

    2-我有使用Release/Wait的想法吗?

    0 回复  |  直到 11 年前
        1
  •  27
  •   Jim Mischel    11 年前

    您无法更改最大计数,但可以创建 SemaphoreSlim 它有一个非常高的最大计数,并保留其中的一些。请参阅 this constructor .

    那么,让我们说 绝对最大值 并发呼叫的数量是100,但最初您希望它是25。您初始化信号量:

    SemaphoreSlim sem = new SemaphoreSlim(25, 100);
    

    因此,25是可以同时处理的请求数量。您已经预订了另外75个。

    如果你想增加允许的号码,只需致电 Release(num) 如果你打电话来 Release(10) ,那么这个数字将变为35。

    现在,如果你想减少可用请求的数量,你必须致电 WaitOne 多次。例如,如果你想从可用计数中删除10:

    for (var i = 0; i < 10; ++i)
    {
        sem.WaitOne();
    }
    

    这有可能阻塞,直到其他客户端释放信号量。也就是说,如果你允许35个并发请求,并希望将其减少到25个,但已经有35个客户端有活动请求,那么 WaitOne 将阻塞,直到客户端调用 Release ,循环在10个客户端释放之前不会终止。

        2
  •  6
  •   Servy    11 年前
    1. 获取信号。
    2. 将容量设置为比你需要的要高得多的值。
    3. 将初始容量设置为您想要的容量 实际的 最大容量。
    4. 把信号发给别人使用。

    此时,您可以根据需要等待信号量(无需相应的释放调用)以降低容量。您可以多次释放信号量(无需相应的等待调用)以增加有效容量。

    如果你做得足够多,你可以创建自己的信号量类,组成一个 SemaphoreSlim 并封装了这一逻辑。如果你的代码已经在不等待信号量的情况下释放了信号量,那么这种组合也是必不可少的;使用您自己的类,您可以确保此类版本没有操作。(也就是说,你一开始就应该避免把自己放在那种位置上,真的。)

        3
  •  4
  •   desautelsj    6 年前

    我是这样解决这种情况的:我创建了一个自定义的信号量瘦类,它允许我增加和减少插槽的数量。这个类还允许我设置最大插槽数,这样我就永远不会超过“合理”的数量,也可以设置最小插槽数,所以我不会低于“合理”阈值。

    using Picton.Messaging.Logging;
    using System;
    using System.Threading;
    
    namespace Picton.Messaging.Utils
    {
        /// <summary>
        /// An improvement over System.Threading.SemaphoreSlim that allows you to dynamically increase and
        /// decrease the number of threads that can access a resource or pool of resources concurrently.
        /// </summary>
        /// <seealso cref="System.Threading.SemaphoreSlim" />
        public class SemaphoreSlimDynamic : SemaphoreSlim
        {
            #region FIELDS
    
            private static readonly ILog _logger = LogProvider.GetLogger(typeof(SemaphoreSlimDynamic));
            private readonly ReaderWriterLockSlim _lock;
    
            #endregion
    
            #region PROPERTIES
    
            /// <summary>
            /// Gets the minimum number of slots.
            /// </summary>
            /// <value>
            /// The minimum slots count.
            /// </value>
            public int MinimumSlotsCount { get; private set; }
    
            /// <summary>
            /// Gets the number of slots currently available.
            /// </summary>
            /// <value>
            /// The available slots count.
            /// </value>
            public int AvailableSlotsCount { get; private set; }
    
            /// <summary>
            /// Gets the maximum number of slots.
            /// </summary>
            /// <value>
            /// The maximum slots count.
            /// </value>
            public int MaximumSlotsCount { get; private set; }
    
            #endregion
    
            #region CONSTRUCTOR
    
            /// <summary>
            /// Initializes a new instance of the <see cref="SemaphoreSlimDynamic"/> class.
            /// </summary>
            /// <param name="minCount">The minimum number of slots.</param>
            /// <param name="initialCount">The initial number of slots.</param>
            /// <param name="maxCount">The maximum number of slots.</param>
            public SemaphoreSlimDynamic(int minCount, int initialCount, int maxCount)
                : base(initialCount, maxCount)
            {
                _lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
    
                this.MinimumSlotsCount = minCount;
                this.AvailableSlotsCount = initialCount;
                this.MaximumSlotsCount = maxCount;
            }
    
            #endregion
    
            #region PUBLIC METHODS
    
            /// <summary>
            /// Attempts to increase the number of slots
            /// </summary>
            /// <param name="millisecondsTimeout">The timeout in milliseconds.</param>
            /// <param name="increaseCount">The number of slots to add</param>
            /// <returns>true if the attempt was successfully; otherwise, false.</returns>
            public bool TryIncrease(int millisecondsTimeout = 500, int increaseCount = 1)
            {
                return TryIncrease(TimeSpan.FromMilliseconds(millisecondsTimeout), increaseCount);
            }
    
            /// <summary>
            /// Attempts to increase the number of slots
            /// </summary>
            /// <param name="timeout">The timeout.</param>
            /// <param name="increaseCount">The number of slots to add</param>
            /// <returns>true if the attempt was successfully; otherwise, false.</returns>
            public bool TryIncrease(TimeSpan timeout, int increaseCount = 1)
            {
                if (increaseCount < 0) throw new ArgumentOutOfRangeException(nameof(increaseCount));
                else if (increaseCount == 0) return false;
    
                var increased = false;
    
                try
                {
                    if (this.AvailableSlotsCount < this.MaximumSlotsCount)
                    {
                        var lockAcquired = _lock.TryEnterWriteLock(timeout);
                        if (lockAcquired)
                        {
                            for (int i = 0; i < increaseCount; i++)
                            {
                                if (this.AvailableSlotsCount < this.MaximumSlotsCount)
                                {
                                    Release();
                                    this.AvailableSlotsCount++;
                                    increased = true;
                                }
                            }
    
                            if (increased) _logger.Trace($"Semaphore slots increased: {this.AvailableSlotsCount}");
    
                            _lock.ExitWriteLock();
                        }
                    }
                }
                catch (SemaphoreFullException)
                {
                    // An exception is thrown if we attempt to exceed the max number of concurrent tasks
                    // It's safe to ignore this exception
                }
    
                return increased;
            }
    
            /// <summary>
            /// Attempts to decrease the number of slots
            /// </summary>
            /// <param name="millisecondsTimeout">The timeout in milliseconds.</param>
            /// <param name="decreaseCount">The number of slots to add</param>
            /// <returns>true if the attempt was successfully; otherwise, false.</returns>
            public bool TryDecrease(int millisecondsTimeout = 500, int decreaseCount = 1)
            {
                return TryDecrease(TimeSpan.FromMilliseconds(millisecondsTimeout), decreaseCount);
            }
    
            /// <summary>
            /// Attempts to decrease the number of slots
            /// </summary>
            /// <param name="timeout">The timeout.</param>
            /// <param name="decreaseCount">The number of slots to add</param>
            /// <returns>true if the attempt was successfully; otherwise, false.</returns>
            public bool TryDecrease(TimeSpan timeout, int decreaseCount = 1)
            {
                if (decreaseCount < 0) throw new ArgumentOutOfRangeException(nameof(decreaseCount));
                else if (decreaseCount == 0) return false;
    
                var decreased = false;
    
                if (this.AvailableSlotsCount > this.MinimumSlotsCount)
                {
                    var lockAcquired = _lock.TryEnterWriteLock(timeout);
                    if (lockAcquired)
                    {
                        for (int i = 0; i < decreaseCount; i++)
                        {
                            if (this.AvailableSlotsCount > this.MinimumSlotsCount)
                            {
                                if (Wait(timeout))
                                {
                                    this.AvailableSlotsCount--;
                                    decreased = true;
                                }
                            }
                        }
    
                        if (decreased) _logger.Trace($"Semaphore slots decreased: {this.AvailableSlotsCount}");
    
                        _lock.ExitWriteLock();
                    }
                }
    
                return decreased;
            }
    
            #endregion
        }
    }
    
        4
  •  2
  •   Thiago Custodio    11 年前

    好的,我可以在mono项目中解决我的问题。

    // SemaphoreSlim.cs
    //
    // Copyright (c) 2008 Jérémie "Garuma" Laval
    //
    // Permission is hereby granted, free of charge, to any person obtaining a copy
    // of this software and associated documentation files (the "Software"), to deal
    // in the Software without restriction, including without limitation the rights
    // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    // copies of the Software, and to permit persons to whom the Software is
    // furnished to do so, subject to the following conditions:
    //
    // The above copyright notice and this permission notice shall be included in
    // all copies or substantial portions of the Software.
    //
    // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
    // THE SOFTWARE.
    //
    //
    
    using System;
    using System.Diagnostics;
    using System.Threading.Tasks;
    
    namespace System.Threading
    {
        public class SemaphoreSlimCustom : IDisposable
        {
            const int spinCount = 10;
            const int deepSleepTime = 20;
            private object _sync = new object();
    
    
            int maxCount;
            int currCount;
            bool isDisposed;
    
            public int MaxCount
            {
                get { lock (_sync) { return maxCount; } }
                set
                {
                    lock (_sync)
                    {
                        maxCount = value;
                    }
                }
            }
    
            EventWaitHandle handle;
    
            public SemaphoreSlimCustom (int initialCount) : this (initialCount, int.MaxValue)
            {
            }
    
            public SemaphoreSlimCustom (int initialCount, int maxCount)
            {
                if (initialCount < 0 || initialCount > maxCount || maxCount < 0)
                    throw new ArgumentOutOfRangeException ("The initialCount  argument is negative, initialCount is greater than maxCount, or maxCount is not positive.");
    
                this.maxCount = maxCount;
                this.currCount = initialCount;
                this.handle = new ManualResetEvent (initialCount > 0);
            }
    
            public void Dispose ()
            {
                Dispose(true);
            }
    
            protected virtual void Dispose (bool disposing)
            {
                isDisposed = true;
            }
    
            void CheckState ()
            {
                if (isDisposed)
                    throw new ObjectDisposedException ("The SemaphoreSlim has been disposed.");
            }
    
            public int CurrentCount {
                get {
                    return currCount;
                }
            }
    
            public int Release ()
            {
                return Release(1);
            }
    
            public int Release (int releaseCount)
            {
                CheckState ();
                if (releaseCount < 1)
                    throw new ArgumentOutOfRangeException ("releaseCount", "releaseCount is less than 1");
    
                // As we have to take care of the max limit we resort to CAS
                int oldValue, newValue;
                do {
                    oldValue = currCount;
                    newValue = (currCount + releaseCount);
                    newValue = newValue > maxCount ? maxCount : newValue;
                } while (Interlocked.CompareExchange (ref currCount, newValue, oldValue) != oldValue);
    
                handle.Set ();
    
                return oldValue;
            }
    
            public void Wait ()
            {
                Wait (CancellationToken.None);
            }
    
            public bool Wait (TimeSpan timeout)
            {
                return Wait ((int)timeout.TotalMilliseconds, CancellationToken.None);
            }
    
            public bool Wait (int millisecondsTimeout)
            {
                return Wait (millisecondsTimeout, CancellationToken.None);
            }
    
            public void Wait (CancellationToken cancellationToken)
            {
                Wait (-1, cancellationToken);
            }
    
            public bool Wait (TimeSpan timeout, CancellationToken cancellationToken)
            {
                CheckState();
                return Wait ((int)timeout.TotalMilliseconds, cancellationToken);
            }
    
            public bool Wait (int millisecondsTimeout, CancellationToken cancellationToken)
            {
                CheckState ();
                if (millisecondsTimeout < -1)
                    throw new ArgumentOutOfRangeException ("millisecondsTimeout",
                                                           "millisecondsTimeout is a negative number other than -1");
    
                Stopwatch sw = Stopwatch.StartNew();
    
                Func<bool> stopCondition = () => millisecondsTimeout >= 0 && sw.ElapsedMilliseconds > millisecondsTimeout;
    
                do {
                    bool shouldWait;
                    int result;
    
                    do {
                        cancellationToken.ThrowIfCancellationRequested ();
                        if (stopCondition ())
                            return false;
    
                        shouldWait = true;
                        result = currCount;
    
                        if (result > 0)
                            shouldWait = false;
                        else
                            break;
                    } while (Interlocked.CompareExchange (ref currCount, result - 1, result) != result);
    
                    if (!shouldWait) {
                        if (result == 1)
                            handle.Reset ();
                        break;
                    }
    
                    SpinWait wait = new SpinWait ();
    
                    while (Thread.VolatileRead (ref currCount) <= 0) {
                        cancellationToken.ThrowIfCancellationRequested ();
                        if (stopCondition ())
                            return false;
    
                        if (wait.Count > spinCount) {
                            int diff = millisecondsTimeout - (int)sw.ElapsedMilliseconds;
    
                            int timeout = millisecondsTimeout < 0 ? deepSleepTime :
    
    
                                Math.Min (Math.Max (diff, 1), deepSleepTime);
                            handle.WaitOne (timeout);
                        } else
                            wait.SpinOnce ();
                    }
                } while (true);
    
                return true;
            }
    
            public WaitHandle AvailableWaitHandle {
                get {
                    return handle;
                }
            }
    
            public Task WaitAsync ()
            {
                return Task.Factory.StartNew (() => Wait ());
            }
    
            public Task WaitAsync (CancellationToken cancellationToken)
            {
                return Task.Factory.StartNew (() => Wait (cancellationToken), cancellationToken);
            }
    
            public Task<bool> WaitAsync (int millisecondsTimeout)
            {
                return Task.Factory.StartNew (() => Wait (millisecondsTimeout));
            }
    
            public Task<bool> WaitAsync (TimeSpan timeout)
            {
                return Task.Factory.StartNew (() => Wait (timeout));
            }
    
            public Task<bool> WaitAsync (int millisecondsTimeout, CancellationToken cancellationToken)
            {
                return Task.Factory.StartNew (() => Wait (millisecondsTimeout, cancellationToken), cancellationToken);
            }
    
            public Task<bool> WaitAsync (TimeSpan timeout, CancellationToken cancellationToken)
            {
                return Task.Factory.StartNew (() => Wait (timeout, cancellationToken), cancellationToken);
            }
        }
    }
    
        5
  •  -1
  •   Tod    3 年前

    更新。Net Core 5答案:

    假设我想要一个最多有10个请求的锁,但大多数时候我只想要1个。

    private readonly static SemaphoreSlim semLock = new(1, 10);
    

    现在,当我想发布一些资源时,我可以这样做:

    semLock.Release(Math.Min(9, requiredAmount));
    

    请注意,9比10小1,因为我们最初已经有一个版本。

    一旦我想再次限制可用资源,我可以调用:

    while(semLock.CurrentCount > 1)
    {
        await semLock.WaitAsync();
    }
    

    这将等待它回到1