代码之家  ›  专栏  ›  技术社区  ›  jhrmnn

检测空闲异步事件循环

  •  6
  • jhrmnn  · 技术社区  · 8 年前

    是否有某种编程模式可以让我在以下意义上检测异步IO事件循环何时变为空闲?假设我的执行路径以某种复杂的方式分支,例如,使用asyncio。gather(),但我知道每个分支最终都会等待一些空闲的协程,例如套接字或子进程。假设我知道这些协程实际上永远不会产生,因此事件循环将执行它可以执行的任何python代码,但最终只会等待那些空闲的协程。有没有一种编程方法来检测这种状态并停止循环?

    1 回复  |  直到 8 年前
        1
  •  5
  •   user4815162342    4 年前

    正如Philip Couling所指出的,解决方案如下所示 不起作用 . StackOverflow不允许删除已接受的答案,因此我添加了此免责声明。


    您所谓的“空闲”可能更准确地描述为“等待IO或超时”。在正确编写的异步代码中,不需要检测循环是否处于该状态,因为它不应该处于该状态 -循环正在完成它的工作,这取决于以下工具 asyncio.gather , asyncio.wait loop.run_until_complete 确保它在适当的时间结束。然而,事情并不总是完美的,如果你真的想这样做,这当然是可能的。

    在事件循环的每个步骤中,它都会检查准备运行的任务。如果有,则调用它们的步骤。一旦没有更多任务就绪,事件循环将等待IO事件或最快的超时,以先发生的为准。需要注意的重要一点是,运行任务始终优先于等待IO。因此,为了检测没有任务准备就绪的情况,可以安排一个已知会立即触发的虚拟IO事件。

    以下协同程序设置此类事件并等待其触发:

    import socket, asyncio
    
    async def detect_iowait():
        loop = asyncio.get_event_loop()
        rsock, wsock = socket.socketpair()
        wsock.close()
        await loop.sock_recv(rsock, 1)
        rsock.close()
    

    它设置了一个 socket pair 其中,从一个套接字读取返回写入另一个套接字的数据。它立即关闭其中一个套接字,以便从另一个套接字读取时立即返回EOF,表示为空的bytearray。等待从该套接字读取基本上是非阻塞的,但是 asyncio 不知道,所以它将套接字放在IO等待列表中。如上所述,只要不存在可运行的任务,asyncio就会等待IO和 detect_iowait 将等待套接字上的读取并退出。因此等待 detect_iowait() 自身检测IO等待。

    使用的测试代码 detect\u iowait() 可能如下所示:

    # stop loop.run_forever once iowait is detected
    async def stop_on_iowait():
        await detect_iowait()
        print('iowait detected, stopping!')
        asyncio.get_event_loop().stop()
    
    # a dummy calculation coroutine, emulating your execution path
    async def calc(n):
        print('calc %d start' % n)
        async def noop():
            pass
        for i in range(n):
            await noop()
        print('calc %d end' % n)
    
    # coroutine that waits on IO forever, also (ab)using a socket pair,
    # this time creating a socket whose recv will never complete
    async def io_forever():
        loop = asyncio.get_event_loop()
        sock, _ = socket.socketpair()
        sock.setblocking(False)
        await loop.sock_recv(sock, 1)
    
    loop = asyncio.get_event_loop()
    for t in calc(1000), calc(10000), calc(100000), io_forever():
        loop.create_task(t)
    loop.create_task(stop_on_iowait())
    loop.run_forever()