代码之家  ›  专栏  ›  技术社区  ›  Aftab Naveed

如何在C++中添加异步等待?

  •  3
  • Aftab Naveed  · 技术社区  · 5 年前

    我有两个线程在运行,一个是产生数据并将其放入队列的生产者,另一个是消耗数据的消费者。我希望生产者生产的数据与几秒钟的延迟,但不希望消费者等待它应该处理异步数据,一旦它是没有延迟可用。这是密码。

    #include <iostream>
    #include <thread>
    #include <mutex>
    #include <condition_variable>
    #include <queue>
    #include <string>
    
    std::mutex mutex;
    std::condition_variable cond;
    std::queue<int> buffer;
    
    
    
    void producer(int val)
    {
    
        while(val) {
            std::unique_lock locker(mutex);
            cond.wait(locker, []() {
              
                return buffer.size() < 50;
            });
    
            // add pulse
            std::this_thread::sleep_for(std::chrono::seconds(1));
    
            buffer.push(val);
        
    
            std::cout << "Produced " << val << std::endl;
            val --;
    
            locker.unlock();
            cond.notify_one();
    
        }
    }
    
    void consumer()
    {
        while(true) {
            std::unique_lock locker(mutex);
            cond.wait(locker, [](){
                return buffer.size() > 0;
            });
    
            int val = buffer.front();
            std::cout << "Consumer " << val << std::endl;
    
            buffer.pop();
          
            locker.unlock();
            cond.notify_one();
    
        }
    }
    
    
    int main()
    {
    
        std::thread t1(producer, 50);
        std::thread t2(consumer);
    
        t1.join();
        t2.join();
    
        return 0;
    }
    

    结果呢

    未与std::this_线程::sleep_并行运行(std::chrono::seconds(3));

    Produced 10
    Produced 9
    Produced 8
    Produced 7
    Produced 6
    Produced 5
    Produced 4
    Produced 3
    Produced 2
    Produced 1
    Consumer 10
    Consumer 9
    Consumer 8
    Consumer 7
    Consumer 6
    Consumer 5
    Consumer 4
    Consumer 3
    Consumer 2
    

    没有等待

    Produced 40
    Produced 39
    Produced 38
    Produced 37
    Produced 36
    Produced 35
    Produced 34
    Produced 33
    Produced 32
    Produced 31
    Produced 30
    Produced 29
    Produced 28
    Produced 27
    Produced 26
    Produced 25
    Produced 24
    Produced 23
    Produced 22
    Produced 21
    Produced 20
    Produced 19
    Produced 18
    Produced 17
    Produced 16
    Produced 15
    Produced 14
    Consumer 40
    Consumer 39
    Consumer 38
    Consumer 37
    Consumer 36
    Consumer 35
    Consumer 34
    Consumer 33
    Consumer 32
    Consumer 31
    Consumer 30
    Consumer 29
    Consumer 28
    Consumer 27
    Consumer 26
    Consumer 25
    Consumer 24
    Consumer 23
    Consumer 22
    Consumer 21
    Consumer 20
    Consumer 19
    Consumer 18
    Consumer 17
    Consumer 16
    Consumer 15
    Produced 13
    Produced 12
    Produced 11
    Produced 10
    Produced 9
    Produced 8
    Produced 7
    Produced 6
    Produced 5
    Produced 4
    Produced 3
    Produced 2
    Produced 1
    Consumer 14
    Consumer 13
    Consumer 12
    Consumer 11
    Consumer 10
    Consumer 9
    Consumer 8
    Consumer 7
    Consumer 6
    Consumer 5
    Consumer 4
    Consumer 3
    Consumer 2
    

    问题

    当我不让生产者等待,但我使用添加延迟时,它工作得很好 std::this_thread::sleep_for(std::chrono::seconds(3));

    1 回复  |  直到 5 年前
        1
  •  2
  •   Sam Varshavchik    5 年前

    std::this_thread::sleep_for 继续,即使条件变量发出信号,直到它可以重新锁定互斥锁(在收到通知后)。如果另一个执行线程将其锁定,则很难做到这一点。

    确实,生产者稍后会解锁互斥锁,然后向条件变量发送信号。然而,接下来发生的事情是它在循环的下一次迭代中重新锁定互斥锁。这引发了下一个问题:即使线程正在等待一个条件变量,当发出信号时,也不能保证它在收到信号时能够重新锁定互斥锁。唯一可以保证的是,在向条件变量发出信号后的某个时刻,等待的线程将唤醒并执行此操作,但这不是一个不可分割的操作。

    这里发生的事情是,生产者只是以足够快的速度重新锁定互斥锁,以防止使用者重新锁定互斥锁并从中返回 std::condition_variable::wait

    最简单的解决办法是 sleep_for 之后 解锁互斥锁。

    此外,对于经典的互斥体语义,应该在互斥体仍然被锁定时通知条件变量,而不是在解除锁定之后。这在这里并没有太大的区别,但在某些边缘情况下,它确实很重要,而且总是使用相同的信令/锁定顺序更容易,而不是试图找出在每个特定情况下它是否无关紧要。

    #include <queue>
    #include <string>
    
    std::mutex mutex;
    std::condition_variable cond;
    std::queue<int> buffer;
    
    
    
    void producer(int val)
    {
    
        while(val) {
            std::unique_lock locker(mutex);
            cond.wait(locker, []() {
    
                return buffer.size() < 50;
            });
    
            buffer.push(val);
    
    
            std::cout << "Produced " << val << std::endl;
            val --;
    
            cond.notify_one();
            locker.unlock();
    
            // add pulse
            std::this_thread::sleep_for(std::chrono::seconds(1));
    
    
        }
    }
    
    void consumer()
    {
        while(true) {
            std::unique_lock locker(mutex);
            cond.wait(locker, [](){
                return buffer.size() > 0;
            });
    
            int val = buffer.front();
            std::cout << "Consumer " << val << std::endl;
    
            buffer.pop();
    
            cond.notify_one();
            locker.unlock();
        }
    }
    
    
    int main()
    {
    
        std::thread t1(producer, 50);
        std::thread t2(consumer);
    
        t1.join();
        t2.join();
    
        return 0;
    }