代码之家  ›  专栏  ›  技术社区  ›  goji

Python中的threading.condition变量是否有一种替代方法,可以更好地支持不进行轮询的超时?

  •  3
  • goji  · 技术社区  · 15 年前

    我在需要超时的线程中使用条件变量。在看到大量线程运行时的CPU使用情况之前,我没有注意到线程模块中提供的条件变量实际上没有休眠,而是在超时作为参数提供时进行轮询。

    有没有其他的方法可以像pthreads那样睡觉?

    很多线程以多秒的间隔休眠似乎很痛苦,只是让它仍然占用CPU时间。

    谢谢!

    2 回复  |  直到 15 年前
        1
  •  3
  •   David Underhill    15 年前

    这在Python中似乎很难做到,但这里有一个解决方案。它依赖于生成额外的线程,但不使用轮询,并确保在超时到期或原始wait()返回时立即唤醒原始线程。

    注意:下面的代码包括一个测试用例,它测试由于超时而结束的条件等待和由于通知而结束的条件等待。

    from thread import start_new_thread
    from threading import Condition, Timer
    
    class ConditionWithoutPolling():
        """Implements wait() with a timeout without polling.  Wraps the Condition
        class."""
        def __init__(self, condition):
            self.condition = condition
            self.wait_timeout_condition = Condition()
    
        def wait(self, timeout=None):
            """Same as Condition.wait() but it does not use a poll-and-sleep method
            to implement timeouts.  Instead, if a timeout is requested two new
            threads are spawned to implement a non-pol-and-wait method."""
            if timeout is None:
                # just use the original implementation if no waiting is involved
                self.condition.wait()
                return
            else:
                # this new boolean will tell us whether we are done waiting or not
                done = [False]
    
                # wait on the original condition in a new thread
                start_new_thread(self.wait_on_original, (done,))
    
                # wait for a timeout (without polling) in a new thread
                Timer(timeout, lambda : self.wait_timed_out(done)).start()
    
                # wait for EITHER of the previous threads to stop waiting
                with self.wait_timeout_condition:
                    while not done[0]:
                        self.wait_timeout_condition.wait()
    
        def wait_on_original(self, done):
            """Waits on the original Condition and signals wait_is_over when done."""
            self.condition.wait()
            self.wait_is_over(done)
    
        def wait_timed_out(self, done):
            """Called when the timeout time is reached."""
            # we must re-acquire the lock we were waiting on before we can return
            self.condition.acquire()
            self.wait_is_over(done)
    
        def wait_is_over(self, done):
            """Modifies done to indicate that the wait is over."""
            done[0] = True
            with self.wait_timeout_condition:
                self.wait_timeout_condition.notify()
    
        # wrap Condition methods since it wouldn't let us subclass it ...
        def acquire(self, *args):
            self.condition.acquire(*args)
        def release(self):
            self.condition.release()
        def notify(self):
            self.condition.notify()
        def notify_all(self):
            self.condition.notify_all()
        def notifyAll(self):
            self.condition.notifyAll()
    
    def test(wait_timeout, wait_sec_before_notification):
        import time
        from threading import Lock
        lock = Lock()
        cwp = ConditionWithoutPolling(Condition(lock))
        start = time.time()
    
        def t1():
            with lock:
                print 't1 has the lock, will wait up to %f sec' % (wait_timeout,)
                cwp.wait(wait_timeout)
            time_elapsed = time.time() - start
            print 't1: alive after %f sec' % (time_elapsed,)        
    
        # this thread will acquire the lock and then conditionally wait for up to 
        # timeout seconds and then print a message 
        start_new_thread(t1, ())
    
        # wait until it is time to send the notification and then send it
        print 'main thread sleeping (will notify in %f sec)' % (wait_sec_before_notification,)
        time.sleep(wait_sec_before_notification)
        with lock:
            cwp.notifyAll()
            print 'notification sent, will continue in 2sec'
        time.sleep(2.0) # give the other time thread to finish before exiting
    
    if __name__ == "__main__":
        print 'test wait() ending before the timeout ...'
        test(2.0, 1.0)
    
        print '\ntest wait() ending due to the timeout ...'
        test(2.0, 4.0)
    
        2
  •  1
  •   Ioan    15 年前

    我不熟悉Python,但是如果您能够在条件变量上阻塞(没有超时),那么您可以自己实现超时。让阻塞线程存储它开始阻塞的时间,并设置一个计时器来通知它。当它唤醒时,检查超时所用的时间。这不是一个很好的方法,除非您可以将计时器聚合到一个线程,否则,您的线程计数将毫无理由地增加一倍。