代码之家  ›  专栏  ›  技术社区  ›  fizzer

如何在多线程C++中分解观察者关系?

  •  11
  • fizzer  · 技术社区  · 17 年前

    Subscribe(Observer*) Unsubscribe(Observer*) 客户。Subject在自己的线程中运行(从中调用 Notify()

    • 持有互斥体——即使是递归的 互斥体-当我通知观察者时 僵局风险。
    • 来自主题线程。然后 客户可以等待一个特殊的 “可安全删除”通知。这 看起来很安全,但很麻烦

    #include <set>
    #include <functional>
    #include <boost/thread.hpp>
    #include <boost/bind.hpp>
    
    using namespace std;
    using namespace boost;
    
    class Observer
    {
    public:
        void Notify() {}
    };
    
    class Subject
    {
    public:
        Subject() : t(bind(&Subject::Run, this))
        {
        }
    
        void Subscribe(Observer* o)
        {
            mutex::scoped_lock l(m);
            observers.insert(o);
        }
    
        void Unsubscribe(Observer* o)
        {
            mutex::scoped_lock l(m);
            observers.erase(o);
        }
    
        void Run()
        {
            for (;;)
            {
                WaitForSomethingInterestingToHappen();
                set<Observer*> notifyList;
                {
                    mutex::scoped_lock l(m);
                    notifyList = observers;
                }
                // Problem here
                for_each(notifyList.begin(), notifyList.end(), 
                         mem_fun(&Observer::Notify));
            }
        }
    
    private:
        set<Observer*> observers;
        thread t;
        mutex m;
    };
    

    由于死锁风险,在持有互斥体时无法通知观察者。最明显的发生方式是客户端从Notify内部调用Subscribe或Unsubscribe,通过使互斥体递归可以很容易地纠正这种情况。更阴险的是不同线程上间歇性死锁的风险。

    9 回复  |  直到 17 年前
        1
  •  7
  •   Rob K    17 年前

    Unsubscribes()应该是同步的,这样它就不会返回,直到Observer被保证不再在Subject的列表中。这是唯一安全的方法。

    由于时间似乎不是问题,在通知每个观察者之间获取并释放互斥体。你不能像现在这样使用for_each,你必须检查迭代器以确保它仍然有效。

    for ( ... )
    {
        take mutex
        check iterator validity
        notify
        release mutex
    }
    

        2
  •  3
  •   Éric Malenfant    17 年前

    关于这很难“正确”的例子,请参阅 Boost.Signals adopted -但尚未分发 Boost.Signals2

        3
  •  1
  •   Matthieu M.    11 年前

    “理想”的解决方案将涉及使用 shared_ptr weak_ptr 然而,为了具有通用性,它还必须考虑到 Subject Observer (是的,这也可能发生)。

    class Subject {
    public:
        void Subscribe(std::weak_ptr<Observer> o);
        void Unsubscribe(std::weak_ptr<Observer> o);
    
    private:
        std::mutex mutex;
        std::set< std::weak_ptr<Observer> > observers;
    };
    
    class Observer: boost::noncopyable {
    public:
        ~Observer();
    
        void Notify();
    
    private:
        std::mutex;
        std::weak_ptr<Subject> subject;
    };
    

    通过这种结构,我们创建了一个循环图,但要明智地使用 weak_ptr 观察者

    观察者 观察一个 一次,但它可以很容易地观察到多个受试者。


    Unsubscribe 。或者至少,打电话给 退订 将从外部同步,但异步实现。

    • 呼叫 退订 在队列中发布事件(payload Observer*
    • 主题 退订 事件,它唤醒等待的线程

    注意:此解决方案完全无法解释 主题 过早死亡。

        4
  •  1
  •   anon anon    17 年前

    subject.Unsubscribe( obsever );l
    while( subject.IsSubscribed( observer ) ) {
       sleep_some_short_time;   // OS specific sleep stuff
    }
    delete observer;
    

    这并不太繁重。

        5
  •  1
  •   m-sharp    17 年前

        6
  •  1
  •   Diego Sevilla    17 年前

    重新思考编辑 好的,现在我想我明白你的问题了。我认为解决你问题的最好办法是做以下事情:

    鉴于取消订阅操作将阻塞互斥体以重置有效标志(并且该特定观察者将不再在您的线程中使用),该代码是线程安全的,客户端可以在取消订阅返回后立即删除任何观察者。

        7
  •  1
  •   Greg Rogers    17 年前

    这样的东西令人满意吗?尽管如此,在收到通知时取消订阅观察者仍然是不安全的,因为你需要一个像你提到的那样的界面(据我所知)。

    Subscribe(Observer *x)
    {
        mutex.lock();
        // add x to the list
        mutex.unlock();
    }
    
    Unsubscribe(Observer *x)
    {
        mutex.lock();
        while (!ok_to_delete)
            cond.wait(mutex);
        // remove x from list
        mutex.unlock();
    }
    
    NotifyLoop()
    {
        while (true) {
            // wait for something to trigger a notify
    
            mutex.lock();
            ok_to_delete = false;
            // build a list of observers to notify
            mutex.unlock();
    
            // notify all observers from the list saved earlier
    
            mutex.lock();
            ok_to_delete = true;
            cond.notify_all();
            mutex.unlock();
        }
    }
    

    (客户IMO的一个糟糕的设计决定……) 您可以将通知线程的线程id添加到数据结构中。在Unsubscribe函数中,您可以根据当前线程的id检查该线程id(大多数线程库都提供此功能,例如pthread_self)。如果它们相同,则无需等待条件变量即可继续。

    注意:如果客户端负责删除观察者,这意味着您会遇到这样的情况,即在Notify回调中,您将取消订阅并删除了观察者,但仍在使用该指针执行某些操作。客户端必须意识到这一点,并且只能在Notify()结束时删除它。

        8
  •  0
  •   Patrick    17 年前

    class Subject {
    public:
    Subject() : t(bind(&Subject::Run, this)),m_key(0)    {    }
    void Subscribe(Observer* o) {
        mutex::scoped_lock l(m);
        InternalObserver io( o );
        boost::shared_ptr<InternalObserver> sp(&io);
        observers.insert(pair<int,boost::shared_ptr<InternalObserver>> (MakeKey(o),sp));
    }
    
    void Unsubscribe(Observer* o) {
        mutex::scoped_lock l(m);
        observers.find( MakeKey(o) )->second->exists = false;    }
    
    void WaitForSomethingInterestingToHappen() {}
    void Run()
    {
        for (;;)
        {
            WaitForSomethingInterestingToHappen();
            for( unsigned int i = 0; i < observers.size(); ++ i )
            {
                mutex::scoped_lock l(m);
                if( observers[i]->exists )
                {
                    mem_fun(&Observer::Notify);//needs changing
                }
                else
                {
                    observers.erase(i);
                    --i;
                }
            }
        }
    }
    private:
    
    int MakeKey(Observer* o) {
        return ++m_key;//needs changeing, sha of the object?
    }
    class InternalObserver {
    public:
        InternalObserver(Observer* o) : m_o( o ), exists( true ) {}
        Observer* m_o;
        bool exists;
    };
    
    map< int, boost::shared_ptr<InternalObserver> > observers;
    thread t;
    mutex m;
    int m_key;
    };
    
        9
  •  0
  •   Sameer    11 年前

    observers map Observer* Observer volatile 观察者 是有效的。在……里面 subscribe 有效的 国家。在……里面 unsubscribe 无效 Notify 被召唤 包装器 而不是 实际观察员 。包装器将调用 通知 实际观察员 如果它有效(仍然订阅)

    #include <map>
    #include <functional>
    #include <boost/thread.hpp>
    #include <boost/bind.hpp>
    
    using namespace std;
    using namespace boost;
    
    class Observer
    {
    public:
        void Notify() {}
    };
    
    class ObserverWrapper : public Observer
    {
    public:
        Observer* wrappee;
        volatile bool valid;
        ObserverWrapper(Observer* o) 
        {
            wrappee = o;
            valid = true;
        }
    
        void Notify() 
        {
            if (valid) wrappee->Notify();
        }
    }
    class Subject
    {
    public:
        Subject() : t(bind(&Subject::Run, this))
        {
        }
    
        void Subscribe(Observer* o)
        {
            mutex::scoped_lock l(m);
            boost::shared_ptr<ObserverWrapper> sptr(new ObserverWrapper(o));
            observers.insert(pair<Observer*, sptr));
        }
    
        void Unsubscribe(Observer* o)
        {
            mutex::scoped_lock l(m);
            observers.find(o)->second->valid = false;
            observers.erase(o);
        }
    
        void Run()
        {
            for (;;)
            {
                WaitForSomethingInterestingToHappen();
                vector<ObserverWrapper*> notifyList;
                {
                    mutex::scoped_lock l(m);
                    boost::copy(observers | boost::adaptors::map_values, std::back_inserter(notifyList));
                }
                // Should be no problem here
                for_each(notifyList.begin(), notifyList.end(), 
                         mem_fun(&ObserverWrapper::Notify));
            }
        }
    
    private:
        map<Observer*, ObserverWrapper*> observers;
        thread t;
        mutex m;
    };
    
    推荐文章