我尝试使用以下命令组合阻塞任务和非阻塞(I/O绑定)任务 ProcessPoolExecutor 发现自己的行为出乎意料。
ProcessPoolExecutor
class BlockingQueueListener(BaseBlockingListener): def run(self): # Continioulsy listening a queue blocking_listen() class NonBlockingListener(BaseNonBlocking): def non_blocking_listen(self): while True: await self.get_message() def run(blocking): blocking.run() if __name__ == "__main__": loop = asyncio.get_event_loop() executor = ProcessPoolExecutor() blocking = BlockingQueueListener() non_blocking = NonBlockingListener() future = loop.run_in_executor(executor, run(blocking)) loop.run_until_complete( asyncio.gather( non_blocking.main(), future ) )
我原以为这两个任务将同时具有控制权,但阻止任务是在中开始的 ProcessPoolExecutor进程池执行器 阻止和从不返回控制。怎么会这样?在multiprocessing executor中,将常规协同路由和未来相结合的正确方法是什么?
ProcessPoolExecutor进程池执行器
该行:
future = loop.run_in_executor(executor, run(blocking))
将实际运行阻塞函数并将其结果提供给执行器。
根据 documentation ,您需要显式地传递函数,后跟其参数。
future = loop.run_in_executor(executor, run, blocking)