代码之家  ›  专栏  ›  技术社区  ›  CaffeinatedMike

如何确保按给定顺序处理所有命令(和错误)

  •  2
  • CaffeinatedMike  · 技术社区  · 6 年前

    太长,读不下去了 asyncio.Queue() 并将adb命令提供给它,让它们按接收到的顺序执行(一个接一个),处理其中一个任务期间可能发生的错误(断开/重新连接),并在处理错误后继续处理队列的其余部分?


    我正在开发一个模块,利用现有的 python-adb 模块,最终控制我的android平板电脑作为媒体设备,并将其纳入我的家庭自动化设置。

    问题:
    async ,而 python-adb 模块不可用。这个 模块也不管理/限制请求。我很快发现,如果多个adb命令被请求得太快,adb连接就会过载,从而导致错误&需要在断开连接时重新连接。

    我的一个朋友设法实现了一个变通/黑客解决方案。 self._adb_lock & self._adb_error 最初设置在 AndroidDevice 班级 __init__ 功能。

    def adb_wrapper(func):
        """Wait if previous ADB commands haven't finished."""
        @functools.wraps(func)
        async def _adb_wrapper(self, *args, **kwargs):
            attempts = 0
            while self._adb_lock and attempts < 5:
                attempts += 1
                await asyncio.sleep(1)
            if (attempts == 4 and self._adb_lock) or self._adb_error:
                try:
                    await self.connect()
                    self._adb_error = False
                except self._exceptions:
                    logging.error('Failed to re-establish the ADB connection; '
                                  'will re-attempt in the next update.')
                    self._adb = None
                    self._adb_lock = False
                    self._adb_error = True
                    return
    
            self._adb_lock = True
            try:
                returns = await func(self, *args, **kwargs)
            except self._exceptions:
                returns = None
                logging.error('Failed to execute an ADB command; will attempt to '
                              're-establish the ADB connection in the next update')
                self._adb = None
                self._adb_error = True
            finally:
                self._adb_lock = False
    
            return returns
    
        return _adb_wrapper
    

    @adb_wrapper decorator高于所有发出adb调用的函数。然而,这是非常低效的&在高端设备上并不能防止adb连接过载。

    输入asyncio
    让我开始我的声明,我很少有工作经验 asyncio 在这一点上;因此,我很想找出哪些已经发布的问题对我有帮助。因此,如果答案已经在别处出现,我深表歉意。另外,为了让人们了解我的库是如何运行的,代码块会有点长,但我只包含了文件的一部分(一些函数来显示我最终是如何交互的),我试着只包含连接到显示命令链的函数。


    我的目标是能够使用 异步 要将所有命令排队并一次发送一个命令,如果命令在任何时候失败(这将导致adb断开连接),我想重新建立adb连接并继续命令队列。

    当前代码结构:

    class AndroidTV:
        """ Represents an Android TV device. """
    
        def __init__(self, host, adbkey=''):
            """ Initialize AndroidTV object.
            :param host: Host in format <address>:port.
            :param adbkey: The path to the "adbkey" file
            """
            self.host = host
            self.adbkey = adbkey
            self._adb = None
            self.state = STATE_UNKNOWN
            self.muted = False
            self.device = 'hdmi'
            self.volume = 0.
            self.app_id = None
    
            self.package_launcher = None
            self.package_settings = None
    
            self._adb_error = False
            self._adb_lock = False
            self._exceptions = (TypeError, ValueError, AttributeError,
                                InvalidCommandError, InvalidResponseError,
                                InvalidChecksumError, BrokenPipeError)
    
        @adb_wrapper
        async def connect(self):
            """ Connect to an Android TV device.
            Will attempt to establish ADB connection to the given host.
            Failure sets state to UNKNOWN and disables sending actions.
            """
            try:
                if self.adbkey:
                    signer = Signer(self.adbkey)
    
                    # Connect to the device
                    self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host, rsa_keys=[signer])
                else:
                    self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host)
    
                if not self.package_settings:
                    self._adb.Shell("am start -a android.settings.SETTINGS")
                    await asyncio.sleep(1)
                    logging.info("Getting Settings App Package")
                    self.package_settings = await self.current_app
                if not self.package_launcher:
                    await self.home()
                    await asyncio.sleep(1)
                    logging.info("Getting Launcher App Package")
                    self.package_launcher = await self.current_app
    
            except socket_error as serr:
                logging.warning("Couldn't connect to host: %s, error: %s", self.host, serr.strerror)
    
        @adb_wrapper
        async def update(self):
            """ Update the device status. """
            # Check if device is disconnected.
            if not self._adb:
                self.state = STATE_UNKNOWN
                self.app_id = None
            # Check if device is off.
            elif not await self._screen_on:
                self.state = STATE_OFF
                self.app_id = None
            else:
                self.app_id = await self.current_app
    
                if await self._wake_lock:
                    self.state = STATE_PLAYING
                elif self.app_id not in (self.package_launcher, self.package_settings):
                    # Check if state was playing on last update
                    if self.state == STATE_PLAYING:
                        self.state = STATE_PAUSED
                    elif self.state != STATE_PAUSED:
                        self.state = STATE_IDLE
                else:
                    # We're on either the launcher or in settings
                    self.state = STATE_ON
    
                # Get information from the audio status.
                audio_output = await self._dump('audio')
                stream_block = re.findall(BLOCK_REGEX, audio_output,
                                          re.DOTALL | re.MULTILINE)[0]
                self.muted = re.findall(MUTED_REGEX, stream_block,
                                        re.DOTALL | re.MULTILINE)[0] == 'true'
    
        @property
        async def current_app(self):
            filtered_dump = await self._dump("window windows", "mCurrentFocus")
            current_focus = filtered_dump.replace("\r", "")
            matches = WINDOW_REGEX.search(current_focus)
            if matches:
                (pkg, activity) = matches.group('package', 'activity')
                return pkg
            else:
                logging.warning("Couldn't get current app, reply was %s", current_focus)
                return None
    
        @property
        async def _screen_on(self):
            return await self._dump_has('power', 'Display Power', 'state=ON')
    
        @property
        async def _awake(self):
            return await self._dump_has('power', 'mWakefulness', 'Awake')
    
        @property
        async def _wake_lock(self):
            return not await self._dump_has('power', 'Locks', 'size=0')
    
        @adb_wrapper
        async def _input(self, cmd):
            if not self._adb:
                return
            self._adb.Shell('input {0}'.format(cmd))
    
        @adb_wrapper
        async def _dump(self, service, grep=None):
            if not self._adb:
                return
            if grep:
                return self._adb.Shell('dumpsys {0} | grep "{1}"'.format(service, grep))
            return self._adb.Shell('dumpsys {0}'.format(service))
    
        async def _dump_has(self, service, grep, search):
            dump_result = await self._dump(service, grep=grep)
            return dump_result.strip().find(search) > -1
    

    正如我前面说过的,上述方法部分工作,但基本上是一个创可贴。

    唯一的命令 直接地 制作 adb.Shell
    1. async def connect(self)
    async def update(self)
    async def _input(self, cmd)
    4. async def _dump(self, service, grep=None)
    5. async def _key(self, key)

    connect & update 函数导致多个 所以这可能是我的问题所在。



    2.按收到的顺序执行?
    3.处理任意点的错误,重新连接,然后继续执行命令队列的其余部分?

    这是我在完成这件事上失败的一半尝试。

    import asyncio
    
    async def produce_output(queue, commands):
        for command in commands:
            #execute the adb command
            if 'keypress' in command:
                #command contains 'input keypress ENTER'
                adb.Shell(command)
                #mark the task done because there's nothing to process
                queue.task_done()
            else:
                #command contains 'dumpsys audio'
                output = adb.Shell(command)
                #put result in queue
                await queue.put(output)
    
    async def process_adb(queue):
        while True:
            output = await queue.get()
            #return output (somehow?)
            queue.task_done()
    
    
    async def update():
        adb_queue = asyncio.Queue()
        asyncio.create_task(produce_output(adb_queue,
            [self._screen_on,
             self.current_app,
             self._wake_lock,
             self._dump('audio')]))
        #Not sure how to proceed
    
        if not self._adb:
            self.state = STATE_UNKNOWN
            self.app_id = None
        # Check if device is off.
        # Fetching result of first item in the queue - self._screen_on
        elif not await adb_queue.get():
            self.state = STATE_OFF
            self.app_id = None
        else:
            # Fetching result of second item in the queue - self.current_app
            self.app_id = await adb_queue.get()
    
            # Fetching result of third item in the queue - self._wake_lock
            if await adb_queue.get():
                self.state = STATE_PLAYING
            elif self.app_id not in (self.package_launcher, self.package_settings):
                # Check if state was playing on last update
                if self.state == STATE_PLAYING:
                    self.state = STATE_PAUSED
                elif self.state != STATE_PAUSED:
                    self.state = STATE_IDLE
            else:
                # We're on either the launcher or in settings
                self.state = STATE_ON
    
            # Get information from the audio status.
            # Fetching result of fourth item in the queue - self._dump('audio')
            audio_output = await adb_queue.get()
            stream_block = re.findall(BLOCK_REGEX, audio_output,
                                      re.DOTALL | re.MULTILINE)[0]
            self.muted = re.findall(MUTED_REGEX, stream_block,
                                    re.DOTALL | re.MULTILINE)[0] == 'true'
    
    1 回复  |  直到 6 年前
        1
  •  2
  •   Martijn Pieters    6 年前

    您需要确保只有一个任务在使用 adb 在任何给定时间执行命令的连接。这意味着你需要 synchronisation primitives 协调访问,或 use a queue 为单个辅助任务提供要执行的命令。

    接下来,因为 亚行 连接完全是 同步 缓慢的 ,我会用 thread pool executor 亚行 关闭asyncio循环的连接,以便asyncio可以自由地运行一些当前未在I/O上阻止的其他任务。否则,就没有必要 .Shell() async def 协同程序,您实际上并没有合作,也没有为其他任务的运行腾出空间。

    最后但并非最不重要的一点是,即使对连接对象进行串行访问,您也会发现它不能在每个时间段内执行太多命令,您可能希望使用某种速率限制技术。我已经 created an asyncio leaky bucket algorithm implementation before 如果需要的话,可以处理这个问题。

    队列或锁都可以确保命令按先到先服务的顺序执行,但是队列需要某种延迟响应机制来返回命令结果。队列可以让您将相关命令排队(您可以使用 queue.put_nowait() 不屈服或允许分组命令),而不必先等待锁。

    因为您想重试连接,所以我将连接对象封装在 asynchronous context manager

    import asyncio
    import collections
    from concurrent.futures import ThreadPoolExecutor
    from functools import partial
    
    try:  # Python 3.7
        base = contextlib.AbstractAsyncContextManager
    except AttributeError:
        base = object  # type: ignore
    
    _retry_exceptions = (...,)  # define exceptions on which to retry commands?
    
    class asyncnullcontext(base):
        def __init__(self, enter_result=None):
            self.enter_result = enter_result
        async def __aenter__(self):
            return self.enter_result
        async def __aexit__(self, *excinfo):
            pass
    
    class AsyncADBConnection(base):
        def __init__(
            self,
            host,
            adbkey=None,
            rate_limit=None,
            max_retry=None,
            loop=None
        ):
            self._lock = asyncio.Lock(loop=loop)
            self._max_retry = max_retry
            self._loop = None
            self._connection = None
            self._executor = ThreadPoolExecutor()
    
            self._connect_kwargs = {
                "serial": host,
                "rsa_keys": [Signer(adbkey)] if adbkey else []
            }
    
            if rate_limit is not None:
                # max commands per second
                self._limiter = AsyncLeakyBucket(rate_limit, 1, loop=loop)
            else:
                self._limiter = asyncnullcontext()
    
        async def __aenter__(self):
            await self._lock.acquire()
            await self._ensure_connection()
            return self
    
        async def __aexit__(self):
            self._lock.release()
    
        async def _ensure_connection(self):
            if self._connection is not None:
                return
            loop = self._loop or asyncio.get_running_loop()
            connector = partial(
                adb_commands.AdbCommands().ConnectDevice,
                **self._connect_kwargs
            )
            fut = loop.run_in_executor(pool, connector)
            self._connection = await fut
    
        async def shell(self, command):
            loop = self._loop or asyncio.get_running_loop()
            max_attempts = self._max_retry or 1
            attempts = 0
            while True:
                with self._limiter:
                    try:
                        fut = loop.run_in_executor(
                            self._executor,
                            self._connection.Shell,
                            command
                        )
                        return await fut
                    except _retry_exceptions as e:
                        attempts += 1
                        if attempts >= max_attempts:
                            raise
                        # re-connect on retry
                        self._connection = None
                        await self._ensure_connection()
    

    如果使用队列,请使用 Future() instances 传达结果。

    fut = asyncio.Future()
    await queue.put((command, fut))
    result = await fut
    

    你可以把它包装成一个实用函数或对象。这个 await fut 行仅在未来收到结果后返回。对于不关心结果的命令,只需 await 如果要确保命令已完成。

    while True:
        command, fut = await self.queue.get():
        async with self.connection as conn:
            response = await conn.shell(command)
            fut.set_result(response)
        self.queue.task_done()  # optional, only needed when joining the queue
    

    哪里 self.connection 是一个 AsyncADBConnection 实例。