代码之家  ›  专栏  ›  技术社区  ›  Dan Tao

对于并发集合/队列组合,这似乎是一种合理的方法吗?

  •  4
  • Dan Tao  · 技术社区  · 15 年前

    更新 :正如Brian指出的,我最初的想法确实存在并发问题。这在一定程度上被这名律师的签名所掩盖 ConcurrentDictionary<TKey, TValue>.AddOrUpdate

    回想起来,我真傻,竟然有这样的期望。事实上,无论执行 AddOrUpdate ,应该很清楚,在我最初的想法中,竞争条件仍然存在,正如Brian所指出的:在添加到集合之前,将发生推送到队列,因此可能会发生以下事件序列:

    1. 从队列中弹出的项
    2. 项目(未)从集合中删除
    3. 添加到集合的项

    可以

    public bool Enqueue(T item)
    {
        // This should:
        // 1. return true only when the item is first added to the set
        // 2. subsequently return false as long as the item is in the set;
        //    and it will not be removed until after it's popped
        if (_set.TryAdd(item, true))
        {
            _queue.Enqueue(item);
            return true;
        }
    
        return false;
    }
    

    以这种方式构建 Enqueue 电话只有一次-- 项目在集合中。因此,队列中的重复项应该不是问题。而且似乎由于队列操作是由set操作“bookend”的,也就是说,一个项目只被推送 之后 它被添加到集合中,并且被弹出 它已从集合中删除--上面列出的有问题的事件序列不应发生。

    人们怎么看?这能解决问题吗?(和布莱恩一样,我倾向于怀疑自己,并猜测答案是 ,我又错过了一些东西。但是嘿,如果这很容易的话,那就不是一个有趣的挑战了,对吧?)


    我基本上需要一个线程安全的集合/队列组合类。换句话说,它应该是一个不允许重复的FIFO集合(因此,如果相同的项已经在队列中,那么 排队 调用将返回false,直到该项从队列中弹出)。

    我意识到我可以用一个简单的 HashSet<T> Queue<T> 锁在所有必要的地方。然而,我有兴趣用 ConcurrentDictionary<TKey, TValue> ConcurrentQueue<T> 来自.NET4.0的类(也可以作为.NET3.5的Rx扩展的一部分,这是我正在使用的),我理解为某种程度上是无锁集合*。

    我的基本计划是实现这个集合:

    class ConcurrentSetQueue<T>
    {
        ConcurrentQueue<T> _queue;
        ConcurrentDictionary<T, bool> _set;
    
        public ConcurrentSetQueue(IEqualityComparer<T> comparer)
        {
            _queue = new ConcurrentQueue<T>();
            _set = new ConcurrentDictionary<T, bool>(comparer);
        }
    
        public bool Enqueue(T item)
        {
            // This should:
            // 1. if the key is not present, enqueue the item and return true
            // 2. if the key is already present, do nothing and return false
            return _set.AddOrUpdate(item, EnqueueFirst, EnqueueSecond);
        }
    
        private bool EnqueueFirst(T item)
        {
            _queue.Enqueue(item);
            return true;
        }
    
        private bool EnqueueSecond(T item, bool dummyFlag)
        {
            return false;
        }
    
        public bool TryDequeue(out T item)
        {
            if (_queue.TryDequeue(out item))
            {
                // Another thread could come along here, attempt to enqueue, and
                // fail; however, this seems like an acceptable scenario since the
                // item shouldn't really be considered "popped" until it's been
                // removed from both the queue and the dictionary.
                bool flag;
                _set.TryRemove(item, out flag);
    
                return true;
            }
    
            return false;
        }
    }
    

    我想清楚了吗?表面上,我看不到任何明显的错误,在这个基本的想法,因为我写了上面。但也许我忽略了什么。或者使用 ConcurrentQueue<T> ConcurrentDictionary<T, bool>

    任何关于这个问题的想法或有用的信息将不胜感激!

    2 回复  |  直到 15 年前
        1
  •  5
  •   Brian Gideon    15 年前

    简短的是否,问题中的代码不是线程安全的。

    MSDN文档在网络上非常稀疏 AddOrUpdate 所以我看了一下 反射器中的方法。这里是基本的算法(我不是出于法律原因发布反射器输出,这很容易自己做)。

    TValue value;
    do
    {
      if (!TryGetValue(...))
      {
        value = AddValueFactoryDelegate(key);
        if (!TryAddInternal(...))
        {
          continue;
        }
        return value;
      }
      value = UpdateValueFactoryDelegate(key);
    } 
    while (!TryUpdate(...))
    return value;
    

    很明显 AddValueFactoryDelegate UpdateValueFactoryDelegate 可以执行多次。这里没有必要作进一步的解释。很明显这会破坏你的代码。实际上,我有点震惊,代表可以执行多次。文件没有提到这一点。你可能会认为这是一个非常重要的点,让打电话的人知道要避免经过有副作用的代理(就像你的情况一样)。

    但即使保证代表们只执行一次,仍然会有问题。通过替换您的 Enqueue 方法的内容 添加ValueFactoryDelegate _queue ,但在将项添加到中之前,线程可能会被上下文切换中断 _set . 第二个线程可以调用 TryDequeue 方法并从中提取该项 ,但未能将其从 _套

    更新:

    好吧,我认为不可能让它起作用。 ConcurrentQueue CAS 相当于 TryDequeue公司 方法。如果有这样的行动,那么我 以下代码是正确的。我用的是神话故事 TryDequeueCas 方法,该方法接受一个比较值,该比较值用作条件,条件是当且仅当队列上的顶部项等于比较值时,才以原子方式执行此操作。这个想法和我们在 Interlocked.CompareExchange

    注意代码如何使用 bool ConcurrentDictionary TryUpdate 它被用来获取和释放这个“虚拟”锁。因为锁是“虚拟的”,实际上并不阻止并发访问 while 循环中的 TryDequeue公司 方法是必需的。这符合CAS操作的规范模式,因为它们通常在循环中执行,直到成功为止。

    .NET 4.0 style try-finally pattern 对于lock acquire symantics,可以帮助防止由带外(异步)异常引起的问题。

    注意:同样,代码使用了神秘的 ConcurrentQueue.TryDequeueCas

    class ConcurrentSetQueue<T>
    {
        ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
        ConcurrentDictionary<T, bool> _set = new ConcurrentDictionary<T, bool>();
    
        public ConcurrentSetQueue()
        {
        }
    
        public bool Enqueue(T item)
        {
            bool acquired = false;
            try
            {
                acquired = _set.TryAdd(item, true);
                if (acquired)
                {
                    _queue.Enqueue(item);
                    return true;
                }
                return false;
            }
            finally
            {
                if (acquired) _set.TryUpdate(item, false, true);
            }
        }
    
        public bool TryDequeue(out T item)
        {
            while (_queue.TryPeek(out item))
            {
                bool acquired = false;
                try
                {
                    acquired = _set.TryUpdate(item, true, false);
                    if (acquired)
                    {
                        if (_queue.TryDequeueCas(out item, item))
                        {
                            return true;
                        }
                    }
                }
                finally
                {
                    if (acquired) _set.TryRemove(item, out acquired);
                }
            }
            item = default(T);
            return false;
        }
    }
    

    更新2:

    关于你的修改,请注意,和我的相比,它是多么相似。事实上,如果你从我的变体中去掉所有的绒毛 排队 方法具有 同样的陈述顺序。

    我更担心的是 TryDequeue公司 不过,这就是为什么我添加了“虚拟”锁的概念,这在我的实现中需要很多额外的东西。我特别担心访问数据结构的顺序相反(字典然后队列) 方法,但在 方法)但是,我越想你的改进方法,我就越喜欢它。我现在想是的 因为

        2
  •  1
  •   GregC Benjamin Baumann    15 年前

    看看埃里克·Lppert的博客。你可能会找到你喜欢的东西。。。 http://blogs.msdn.com/b/ericlippert/archive/tags/immutability/