作为我的后续行动
previous question about calling an async function from a synchronous one
,我发现
asyncio.run_coroutine_threadsafe
.
从纸面上看,这很理想。基于中的评论
this StackOverflow question
,这看起来很理想。我可以创建一个新线程,获取对原始事件循环的引用,并安排异步函数在原始事件循环内运行,同时只阻塞新线程。
class _AsyncBridge:
def call_async_method(self, function, *args, **kwargs):
print(f"call_async_method {threading.get_ident()}")
event_loop = asyncio.get_event_loop()
thread_pool = ThreadPoolExecutor()
return thread_pool.submit(asyncio.run, self._async_wrapper(event_loop, function, *args, **kwargs)).result()
async def _async_wrapper(self, event_loop, function, *args, **kwargs):
print(f"async_wrapper {threading.get_ident()}")
future = asyncio.run_coroutine_threadsafe(function(*args, **kwargs), event_loop)
return future.result()
这没有错,但它也永远不会回来。未来只是挂起,异步调用永远不会命中。我是否利用未来
call_async_method
,
_async_wrapper
,或者两者都有;无论我在哪里使用未来,它都会挂起。
我试着把
run_coroutine_threadsafe
直接在主事件循环中调用:
event_loop = asyncio.get_event_loop()
future = asyncio.run_coroutine_threadsafe(cls._do_work_async(arg1, arg2, arg3), event_loop)
return_value = future.result()
未来也悬在这里。
我试着用
LoopExecutor
类定义
here
,看起来像
准确的
满足我的需要。
event_loop = asyncio.get_event_loop()
loop_executor = LoopExecutor(event_loop)
future = loop_executor.submit(cls._do_work_async, arg1=arg1, arg2=arg2, arg3=arg3)
return_value = future.result()
在那里,返回的未来也悬着。
我胡思乱想,我正在阻塞我原来的事件循环,因此计划的任务永远不会运行,所以我创建了一个新的事件循环:
event_loop = asyncio.get_event_loop()
new_event_loop = asyncio.new_event_loop()
print(event_loop == new_event_loop) # sanity check to make sure the new loop is actually different from the existing one - prints False as expected
loop_executor = LoopExecutor(new_event_loop)
future = loop_executor.submit(cls._do_work_async, arg1=arg1, arg2=arg2, arg3=arg3)
return_value = future.result()
return return_value
仍在等待
future.result()
我不明白为什么。
怎么了
asyncio.run_coroutine_线程安全
/我用它的方式?