代码之家  ›  专栏  ›  技术社区  ›  Benyamin Jafari

如何由工作进程重置异步事件循环?

  •  2
  • Benyamin Jafari  · 技术社区  · 6 年前

    我在和一个 asyncio forever() 事件循环现在,我想在进程、信号或文件更改之后重新启动循环(停止循环并重新创建一个新循环),但是我有一些问题要做:


    下面是三个简化的代码片段,演示了一些协同工作程序和协同循环重启程序:


    第第一尝试:

    import asyncio
    
    async def coro_worker(proc):
        print(f'Worker: {proc} started.')
        while True:
            print(f'Worker: {proc} process.')
            await asyncio.sleep(proc)
    
    async def reset_loop(loop):
        # Some process
        for i in range(5):  # Like a process.
            print(f'{i} counting for reset the eventloop.')
            await asyncio.sleep(1)
    
        main(loop)  # Expected close the current loop and start a new loop!
    
    def main(previous_loop=None):
        offset = 0
        if previous_loop is not None:  # Trying for close the last loop if exist.
            offset = 1  # An offset to change the process name.
            for task in asyncio.Task.all_tasks():
                print('Cancel the tasks')  # Why it increase up?
                task.cancel()
                # task.clear()
                # task.close()
                # task.stop()
    
            print("Done cancelling tasks")
            asyncio.get_event_loop().stop()
    
        process = [1 + offset, 2 + offset]
        loop = asyncio.get_event_loop()
        futures = [loop.create_task(coro_worker(proc)) for proc in process]
        futures.append(loop.create_task(reset_loop(loop)))
    
        try:
            loop.run_forever()
        except KeyboardInterrupt:
            pass
        except asyncio.CancelledError:
            print('Tasks has been canceled')
            main()  # Recursively
        finally:
            print("Closing Loop")
            loop.close()
    main()
    

    出〔1〕:

    Worker: 1 started.
    Worker: 1 process.
    Worker: 2 started.
    Worker: 2 process.
    0 counting for reset the eventloop.
    Worker: 1 process.
    1 counting for reset the eventloop.
    Worker: 2 process.
    Worker: 1 process.
    2 counting for reset the eventloop.
    Worker: 1 process.
    3 counting for reset the eventloop.
    Worker: 2 process.
    Worker: 1 process.
    4 counting for reset the eventloop.
    Worker: 1 process.
    Cancel the tasks
    Cancel the tasks
    Cancel the tasks
    Done cancelling tasks
    Closing Loop
    Closing Loop
    Task exception was never retrieved
    future: <Task cancelling coro=<reset_loop() done, defined at reset_asycio.py:11> exception=RuntimeError('Cannot close a running event loop',)>
    Traceback (most recent call last):
      File "reset_asycio.py", line 40, in main
        loop.run_forever()
      File "/usr/lib/python3.6/asyncio/base_events.py", line 425, in run_forever
        raise RuntimeError('This event loop is already running')
    RuntimeError: This event loop is already running
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "reset_asycio.py", line 17, in reset_loop
        main(loop)  # Expected close the current loop and start a new loop!
      File "reset_asycio.py", line 48, in main
        loop.close()
      File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
        super().close()
      File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
        raise RuntimeError("Cannot close a running event loop")
    RuntimeError: Cannot close a running event loop
    Task was destroyed but it is pending!
    task: <Task pending coro=<reset_loop() running at reset_asycio.py:11>>
    reset_asycio.py:51: RuntimeWarning: coroutine 'reset_loop' was never awaited
      main()
    Task was destroyed but it is pending!
    task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
    reset_asycio.py:51: RuntimeWarning: coroutine 'coro_worker' was never awaited
      main()
    Task was destroyed but it is pending!
    task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
    Task was destroyed but it is pending!
    task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>
    Task was destroyed but it is pending!
    task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>
    

    第2尝试:

    .
    .
    .
    
    def main(previous_loop=None):
        offset = 0
        if previous_loop is not None:  # Trying for close the last loop if exist.
            previous_loop.stop()
            previous_loop.close()
            offset = 1  # An offset to change the process name.
    
        process = [1 + offset, 2 + offset]
        loop = asyncio.get_event_loop()
        futures = [loop.create_task(coro_worker(proc)) for proc in process]
        futures.append(loop.create_task(reset_loop(loop)))
    
        try:
            loop.run_forever()
        except KeyboardInterrupt:
            pass
        except asyncio.CancelledError:
            print('Tasks has been canceled')
            main()  # Recursively
        finally:
            print("Closing Loop")
            loop.close()
    main()
    

    出〔2〕:

    Worker: 1 started.
    Worker: 1 process.
    Worker: 2 started.
    Worker: 2 process.
    0 counting for reset the eventloop.
    Worker: 1 process.
    1 counting for reset the eventloop.
    Worker: 2 process.
    Worker: 1 process.
    2 counting for reset the eventloop.
    Worker: 1 process.
    3 counting for reset the eventloop.
    Worker: 2 process.
    Worker: 1 process.
    4 counting for reset the eventloop.
    Worker: 1 process.
    Closing Loop
    Task exception was never retrieved
    future: <Task finished coro=<reset_loop() done, defined at reset_asycio.py:9> exception=RuntimeError('Cannot close a running event loop',)>
    Traceback (most recent call last):
      File "reset_asycio.py", line 15, in reset_loop
        main(loop)  # Expected close the current loop and start new loop!
      File "reset_asycio.py", line 21, in main
        previous_loop.close()
      File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
        super().close()
      File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
        raise RuntimeError("Cannot close a running event loop")
    RuntimeError: Cannot close a running event loop
    Task was destroyed but it is pending!
    task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f138>()]>>
    Task was destroyed but it is pending!
    task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f048>()]>>
    

    第3尝试:

    .
    .
    .
    
    def main(previous_loop=None):
        offset = 0
        if previous_loop is not None:  # Trying for close the last loop if exist.
            offset = 1  # An offset to change the process name.
            for task in asyncio.Task.all_tasks():
                print('Cancel the tasks')  # Why it increase up?
                task.cancel()
    
        process = [1 + offset, 2 + offset]
        loop = asyncio.get_event_loop()
        futures = [loop.create_task(coro_worker(proc)) for proc in process]
        futures.append(loop.create_task(reset_loop(loop)))
    
        try:
            loop.run_forever()
        except KeyboardInterrupt:
            pass
        except asyncio.CancelledError:
            print('Tasks has been canceled')
            main()  # Recursively
        finally:
            print("Closing Loop")
            loop.close()
    main()
    

    出〔3〕:

    Worker: 1 started.
    Worker: 1 process.
    Worker: 2 started.
    Worker: 2 process.
    0 counting for reset the eventloop.
    Worker: 1 process.
    1 counting for reset the eventloop.
    Worker: 2 process.
    Worker: 1 process.
    2 counting for reset the eventloop.
    Worker: 1 process.
    3 counting for reset the eventloop.
    Worker: 2 process.
    Worker: 1 process.
    4 counting for reset the eventloop.
    Worker: 1 process.
    Cancel the tasks
    Cancel the tasks
    Cancel the tasks
    Closing Loop
    Worker: 2 started.
    Worker: 2 process.
    Worker: 3 started.
    Worker: 3 process.
    0 counting for reset the eventloop.
    1 counting for reset the eventloop.
    Worker: 2 process.
    2 counting for reset the eventloop.
    Worker: 3 process.
    3 counting for reset the eventloop.
    Worker: 2 process.
    4 counting for reset the eventloop.
    Cancel the tasks
    Cancel the tasks
    Cancel the tasks
    Cancel the tasks
    Cancel the tasks
    Cancel the tasks
    Closing Loop
    Worker: 2 started.
    Worker: 2 process.
    Worker: 3 started.
    Worker: 3 process.
    .
    .
    .
    

    问题:

    • 第3尝试 很明显我做到了,但是 print('Cancel the tasks') 每次重新启动后都增加,原因是什么?!

    • 有没有更好的方法来克服这个问题?

    请原谅我问了这么长的问题,我试图简化它!


    [注记]:

    • 我不是在找 asyncio.timeout()
    • 我还尝试使用另一个线程来重新启动EventLoop,但没有成功。
    • 我在用 Python 3.6
    1 回复  |  直到 6 年前
        1
  •  1
  •   user4815162342    6 年前

    对的递归调用 main() 新的事件循环增加了不必要的复杂性。这里有一个更简单的原型-它监视一个外部源(文件系统),当创建一个文件时,它只是停止循环。 主体() 包含一个处理(重新)创建和取消任务的循环:

    import os, asyncio, random
    
    async def monitor():
        loop = asyncio.get_event_loop()
        while True:
            if os.path.exists('reset'):
                print('reset!')
                os.unlink('reset')
                loop.stop()
            await asyncio.sleep(1)
    
    async def work(workid):
        while True:
            t = random.random()
            print(workid, 'sleeping for', t)
            await asyncio.sleep(t)
    
    def main():
        loop = asyncio.get_event_loop()
        loop.create_task(monitor())
        offset = 0
        while True:
            workers = []
            workers.append(loop.create_task(work(offset + 1)))
            workers.append(loop.create_task(work(offset + 2)))
            workers.append(loop.create_task(work(offset + 3)))
            loop.run_forever()
            for t in workers:
                t.cancel()
            offset += 3
    
    if __name__ == '__main__':
        main()
    

    另一种选择是永远不要停止事件循环,只需触发重置事件:

    async def monitor(evt):
        while True:
            if os.path.exists('reset'):
                print('reset!')
                os.unlink('reset')
                evt.set()
            await asyncio.sleep(1)
    

    在这个设计中 主体() 可以是连体衣:

    async def main():
        loop = asyncio.get_event_loop()
        reset_evt = asyncio.Event()
        loop.create_task(monitor(reset_evt))
        offset = 0
        while True:
            workers = []
            workers.append(loop.create_task(work(offset + 1)))
            workers.append(loop.create_task(work(offset + 2)))
            workers.append(loop.create_task(work(offset + 3)))
            await reset_evt.wait()
            reset_evt.clear()
            for t in workers:
                t.cancel()
            offset += 3
    
    if __name__ == '__main__':
        asyncio.run(main())
        # or asyncio.get_event_loop().run_until_complete(main())
    

    请注意,在这两个变体中,取消任务是由 await 抬高 CancelledError 例外。任务不能捕获所有使用 try: ... except: ... 如果这样,就需要重新提出例外。