代码之家  ›  专栏  ›  技术社区  ›  Rafa Viotti aiwiguna

如何用电机干净地关闭变更流?

  •  8
  • Rafa Viotti aiwiguna  · 技术社区  · 6 年前

    TL;博士

    这确实是Motor 1.2.0中的一个bug,由a.Jesse Jiryu Davis及时修复,并在1.2.1或更高版本的驱动程序中提供。

    原始问题

    我在Python 3上编写了一个程序,使用MongoDB集合的新更改流特性来监视对MongoDB集合的更改。以下是MCVE:

    from asyncio import get_event_loop, CancelledError
    from contextlib import suppress
    from motor.motor_asyncio import AsyncIOMotorClient
    
    async def watch(collection):
        async with collection.watch([]) as stream:
            async for change in stream:
                print(change)
    
    async def cleanup():
        task.cancel()
    
        with suppress(CancelledError):
            await task
    
    if __name__ == '__main__':
        conn = AsyncIOMotorClient()
        loop = get_event_loop()
        task = loop.create_task(watch(conn.database.collection))  # Replace with a real collection.
    
        try:
            loop.run_forever()
    
        except KeyboardInterrupt:
            pass
    
        finally:
            loop.run_until_complete(cleanup())
            loop.shutdown_asyncgens()
            loop.close()
    

    当我用CTRL+C终止程序时,它会引发三个不同的异常。

    ^Cexception calling callback for <Future at 0x102efea58 state=finished raised InvalidStateError>
    Traceback (most recent call last):
      File "/Users/viotti/motor/lib/python3.6/site-packages/motor/core.py", line 1259, in _next
        change = self.delegate.next()
      File "/Users/viotti/motor/lib/python3.6/site-packages/pymongo/change_stream.py", line 79, in next
        change = self._cursor.next()
      File "/Users/viotti/motor/lib/python3.6/site-packages/pymongo/command_cursor.py", line 292, in next
        raise StopIteration
    StopIteration
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/thread.py", line 56, in run
        result = self.fn(*self.args, **self.kwargs)
      File "/Users/viotti/motor/lib/python3.6/site-packages/motor/core.py", line 1264, in _next
        future.set_exception(StopAsyncIteration())
    asyncio.base_futures.InvalidStateError: invalid state
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", line 324, in _invoke_callbacks
        callback(self)
      File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py", line 414, in _call_set_state
        dest_loop.call_soon_threadsafe(_set_state, destination, source)
      File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 620, in call_soon_threadsafe
        self._check_closed()
      File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
        raise RuntimeError('Event loop is closed')
    RuntimeError: Event loop is closed
    

    有没有办法让程序安静地关闭?

    我正在macOS Sierra上使用Python 3.6.4、Motor 1.2和pymongo 3.6.0进行测试。

    1 回复  |  直到 4 年前
        1
  •  4
  •   Mikhail Gerasimov    6 年前

    我认为你的代码是正确的,问题出在 motor 的一侧。

    在调查过程中,我发现了两个问题:

    1. 如果此时未建立闭环连接,您将 exception calling callback for <Future 由于循环在异步回调完成之前关闭,因此出错。它似乎与异步生成器或流无关,但与任何 发动机 用法
    2. AgnosticChangeStream 异步迭代机制( _next function )当它被取消时,没有考虑到案例。尝试将异常设置为已取消的未来潜在客户 InvalidStateError .

    此代码演示了两个问题和可能的解决方法:

    import types
    import asyncio
    from contextlib import suppress
    from motor.motor_asyncio import AsyncIOMotorClient
    
    
    async def test():
        while True:
            await asyncio.sleep(0.1)
    
    
    async def cleanup(task):
        task.cancel()
        with suppress(asyncio.CancelledError):
            await task
    
    
    def _next(self, future):
        try:
            if not self.delegate:
                self.delegate = self._collection.delegate.watch(**self._kwargs)
    
            change = self.delegate.next()
            self._framework.call_soon(self.get_io_loop(),
                                      future.set_result,
                                      change)
        except StopIteration:
            future.set_exception(StopAsyncIteration())
        except Exception as exc:
    
            # CASE 2:
            # Cancellation of async iteration (and future with it) happens immediately
            # and trying to set exception to cancelled future leads to InvalidStateError,
            # we should prevent it:
            if future.cancelled():
                return
    
            future.set_exception(exc)
    
    
    async def watch(collection):
        async with collection.watch([]) as stream:
    
            # Patch stream to achieve CASE 2:
            stream._next = types.MethodType(_next, stream)
    
            async for change in stream:
                print(change)
    
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        tmp = asyncio.ensure_future(test())  # Way to receive KeyboardInterrupt immediately.
    
        client = AsyncIOMotorClient()
        collection = client.test_database.test_collection
        task = asyncio.ensure_future(watch(collection))
    
        try:
            loop.run_forever()
        except KeyboardInterrupt:
            print('KeyboardInterrupt')
        finally:
            loop.run_until_complete(cleanup(tmp))
            loop.run_until_complete(cleanup(task))
    
            # CASE 1:
            # Looks like propagating KeyboardInterrupt doesn't affect motor's try
            # to establish connection to db and I didn't find a way to stop this manually.
            # We should keep event loop alive until we receive ServerSelectionTimeoutError
            # and motor would be able to execute it's asyncio callbacks:
            loop.run_until_complete(asyncio.sleep(client.server_selection_timeout))
    
            loop.shutdown_asyncgens()
            loop.close()
    

    由于添加了修复程序,它完成时没有警告/异常(至少在我的机器上)。

    我不建议你使用上面的黑客! 这只是为了说明问题所在和可能的解决方案。我不确定它是否一切正常。

    相反,我建议你 create issue at motor user group / Jira 将您的代码片段和我的答案添加到此处,然后等待bug得到修复。

    推荐文章