代码之家  ›  专栏  ›  技术社区  ›  cf-

从asyncio.run_coroutine_threadsafe的未来永远挂起?

  •  0
  • cf-  · 技术社区  · 5 年前

    作为我的后续行动 previous question about calling an async function from a synchronous one ,我发现 asyncio.run_coroutine_threadsafe .

    从纸面上看,这很理想。基于中的评论 this StackOverflow question ,这看起来很理想。我可以创建一个新线程,获取对原始事件循环的引用,并安排异步函数在原始事件循环内运行,同时只阻塞新线程。

    class _AsyncBridge:
        def call_async_method(self, function, *args, **kwargs):
            print(f"call_async_method {threading.get_ident()}")
            event_loop = asyncio.get_event_loop()
            thread_pool = ThreadPoolExecutor()
            return thread_pool.submit(asyncio.run, self._async_wrapper(event_loop, function, *args, **kwargs)).result()
    
        async def _async_wrapper(self, event_loop, function, *args, **kwargs):
            print(f"async_wrapper {threading.get_ident()}")
            future = asyncio.run_coroutine_threadsafe(function(*args, **kwargs), event_loop)
            return future.result()
    

    这没有错,但它也永远不会回来。未来只是挂起,异步调用永远不会命中。我是否利用未来 call_async_method , _async_wrapper ,或者两者都有;无论我在哪里使用未来,它都会挂起。

    我试着把 run_coroutine_threadsafe 直接在主事件循环中调用:

    event_loop = asyncio.get_event_loop()
    future = asyncio.run_coroutine_threadsafe(cls._do_work_async(arg1, arg2, arg3), event_loop)
    return_value = future.result()
    

    未来也悬在这里。

    我试着用 LoopExecutor 类定义 here ,看起来像 准确的 满足我的需要。

    event_loop = asyncio.get_event_loop()
    loop_executor = LoopExecutor(event_loop)
    future = loop_executor.submit(cls._do_work_async, arg1=arg1, arg2=arg2, arg3=arg3)
    return_value = future.result()
    

    在那里,返回的未来也悬着。

    我胡思乱想,我正在阻塞我原来的事件循环,因此计划的任务永远不会运行,所以我创建了一个新的事件循环:

    event_loop = asyncio.get_event_loop()
    new_event_loop = asyncio.new_event_loop()
    print(event_loop == new_event_loop) # sanity check to make sure the new loop is actually different from the existing one - prints False as expected
    loop_executor = LoopExecutor(new_event_loop)
    future = loop_executor.submit(cls._do_work_async, arg1=arg1, arg2=arg2, arg3=arg3)
    return_value = future.result()
    return return_value
    

    仍在等待 future.result() 我不明白为什么。

    怎么了 asyncio.run_coroutine_线程安全 /我用它的方式?

    1 回复  |  直到 5 年前
        1
  •  1
  •   Sraw    5 年前

    我认为有两个问题。第一个是 run_coroutine_threadsafe 只提交协程但不真正运行它。

    所以

    event_loop = asyncio.get_event_loop()
    future = asyncio.run_coroutine_threadsafe(cls._do_work_async(arg1, arg2, arg3), event_loop)
    return_value = future.result()
    

    不工作,因为你从来没有运行过这个循环。

    理论上,你可以用 asyncio.run(future) ,但实际上,您不能,可能是因为它是由 运行协同程序线程安全 . 以下操作将起作用:

    import asyncio
    
    async def stop():
        await asyncio.sleep(3)
    
    event_loop = asyncio.get_event_loop()
    coro = asyncio.sleep(1, result=3)
    future = asyncio.run_coroutine_threadsafe(coro, event_loop)
    event_loop.run_until_complete(stop())
    print(future.result())
    

    第二个问题是,我想你已经注意到你的结构在某种程度上是颠倒的。您应该在分离的线程中运行事件循环,但从主线程提交任务。如果在分离的线程中提交它,仍然需要在主线程中运行事件循环来实际执行它。我建议在分离的线程中创建另一个事件循环。