代码之家  ›  专栏  ›  技术社区  ›  aaronasterling

除非最后一条语句很慢,否则python函数将无法返回

  •  3
  • aaronasterling  · 技术社区  · 14 年前

    threading.Thread 它允许它的方法被调用并在被调用的对象所表示的线程中运行,而不是通常的行为。我是通过在目标方法上使用decorator来实现的,它将对方法的调用放在 collections.deque 使用 run 方法来处理deque。

    运行 方法使用 while not self.__stop: 声明和 threading.Condition 对象等待调用被放入deque中,然后调用 self.__process_calls . 这个 else 部分 while 循环最后一次调用 __process_calls self.__stop

    问题是 除非最后一条语句是 print 我在调试时发现的。我试过了 a = 1 和一个明确的 return 打印

    我已经证实了这一点

    import threading
    import collections    
    
    class BrokenPromise(Exception): pass    
    class CallableThreadError(Exception): pass    
    class CallToNonRunningThreadError(CallableThreadError): pass   
    
    
    class Promise(object):
        def __init__(self, deque, condition):
            self._condition = condition
            self._deque = deque
    
        def read(self, timeout=None):
            if not self._deque:
                with self._condition:
                    if timeout:
                        self._condition.wait(timeout)
                   else:
                        self._condition.wait()
            if self._deque:
                value = self._deque.popleft()
                del self._deque
                del self._condition
                return value
            else:
               raise BrokenPromise
    
        def ready(self):
            return bool(self._deque) 
    
    class CallableThread(threading.Thread):
        def __init__(self, *args, **kwargs): 
            # _enqueued_calls is used to store tuples that encode a function call.
            # It is processed by the run method 
            self.__enqueued_calls = collections.deque() 
            # _enqueue_call_permission is for callers to signal that they have
            # placed something on the queue 
            self.__enqueue_call_permission = threading.Condition()
            self.__stop = False
            super(CallableThread, self).__init__(*args, **kwargs) 
    
        @staticmethod
        def blocking_method(f): 
            u"""A decorator function to implement a blocking method on a thread""" 
            # the returned function enqueues the decorated function and blocks
            # until the decorated function# is called and returns. It then returns
            # the value unmodified. The code in register runs in the calling thread
            # and the decorated method runs in thread that it is called on 
            f = CallableThread.nonblocking_method_with_promise(f)
            def register(self, *args, **kwargs):
                p = f(self, *args, **kwargs)
                return p.read()
            return register
    
        @staticmethod 
        def nonblocking_method_with_promise(f):
            u"""A decorator function to implement a non-blocking method on a
            thread
            """ 
            # the returned function enqueues the decorated function and returns a
            # Promise object.N The code in register runs in the calling thread 
            # and the decorated method runs in thread that it is called on. 
            def register(self, *args, **kwargs): 
                call_complete = threading.Condition() 
                response_deque = collections.deque()
                self.__push_call(f, args, kwargs, response_deque, call_complete)
                return Promise(response_deque, call_complete)
            return register
    
        @staticmethod
        def nonblocking_method(f):
            def register(self, *args, **kwargs):
                self.__push_call(f, args, kwargs)
            return register
    
        def run(self):        
            while not self.__stop:  # while we've not been killed 
                with self.__enqueue_call_permission:
                    # get the condition so that we can wait on it if we need too. 
                    if not self.__enqueued_calls: 
                        self.__enqueue_call_permission.wait() 
                self.__process_calls()
            else:
                # if we exit because self._run == False, finish processing
                # the pending calls if there are any
                self.__process_calls()
    
        def stop(self): 
            u""" Signal the thread to stop"""
            with self.__enqueue_call_permission:
               # we do this in case the run method is stuck waiting on an update
               self.__stop = True
               self.__enqueue_call_permission.notify()
    
        def __process_calls(self):
            print "processing calls"
            while self.__enqueued_calls:
                ((f,  args, kwargs),
                response_deque, call_complete) = self.__enqueued_calls.popleft()
                if call_complete:
                    with call_complete:
                        response_deque.append(f(self, *args, **kwargs)) 
                        call_complete.notify()
                else:
                    f(self, *args, **kwargs)
            # this is where you place the print statement if you want to see the
            # behavior        
    
        def __push_call(self, f, args, kwargs, response_deque=None,
                        call_complete=None):
            if self.__stop:
                raise CallToNonRunningThreadError(
                      "This thread is no longer accepting calls")
            with self.__enqueue_call_permission:
                self.__enqueued_calls.append(((f, args, kwargs),
                                               response_deque, call_complete))
                self.__enqueue_call_permission.notify()
    
    
    #if __name__=='__main__':      i lost the indent on the following code in copying but
    #it doesn't matter in this context
    class TestThread(CallableThread): 
        u"""Increment a counter on each call and print the value""" 
        counter = 0
    
        @CallableThread.nonblocking_method_with_promise
        def increment(self): 
            self.counter += 1
            return self.counter
    
    class LogThread(CallableThread):
    
        @CallableThread.nonblocking_method
        def log(self, message):
            print message
    
    l = LogThread()
    l.start()
    l.log("logger started")
    t = TestThread() 
    t.start()
    l.log("test thread started")
    p = t.increment()
    l.log("promise aquired")
    v = p.read()
    l.log("promise read")
    l.log("{0} read from promise".format(v))
    l.stop()
    t.stop()
    l.join()
    t.join()
    
    1 回复  |  直到 14 年前
        1
  •  1
  •   bstpierre Edgar Aviles    14 年前
    1. __process_calls 正在修改 __enqueued_calls

    2. :deque可能是“线程安全的”(即不被线程访问损坏),但其状态检查仍应锁定。

    内联注释:

    def run(self):        
        while not self.__stop:  # while we've not been killed 
            with self.__enqueue_call_permission:
                # get the condition so that we can wait on it if we need too. 
                ### should be checking __stop here, it could have been modified before
                ### you took the lock.
                if not self.__enqueued_calls: 
                    self.__enqueue_call_permission.wait() 
            self.__process_calls()
        else:
            # if we exit because self._run == False, finish processing
            # the pending calls if there are any
            self.__process_calls()