太长,读不下去了
asyncio.Queue()
并将adb命令提供给它,让它们按接收到的顺序执行(一个接一个),处理其中一个任务期间可能发生的错误(断开/重新连接),并在处理错误后继续处理队列的其余部分?
我正在开发一个模块,利用现有的
python-adb
模块,最终控制我的android平板电脑作为媒体设备,并将其纳入我的家庭自动化设置。
问题:
async
,而
python-adb
模块不可用。这个
模块也不管理/限制请求。我很快发现,如果多个adb命令被请求得太快,adb连接就会过载,从而导致错误&需要在断开连接时重新连接。
我的一个朋友设法实现了一个变通/黑客解决方案。
self._adb_lock
&
self._adb_error
最初设置在
AndroidDevice
班级
__init__
功能。
def adb_wrapper(func):
"""Wait if previous ADB commands haven't finished."""
@functools.wraps(func)
async def _adb_wrapper(self, *args, **kwargs):
attempts = 0
while self._adb_lock and attempts < 5:
attempts += 1
await asyncio.sleep(1)
if (attempts == 4 and self._adb_lock) or self._adb_error:
try:
await self.connect()
self._adb_error = False
except self._exceptions:
logging.error('Failed to re-establish the ADB connection; '
'will re-attempt in the next update.')
self._adb = None
self._adb_lock = False
self._adb_error = True
return
self._adb_lock = True
try:
returns = await func(self, *args, **kwargs)
except self._exceptions:
returns = None
logging.error('Failed to execute an ADB command; will attempt to '
're-establish the ADB connection in the next update')
self._adb = None
self._adb_error = True
finally:
self._adb_lock = False
return returns
return _adb_wrapper
@adb_wrapper
decorator高于所有发出adb调用的函数。然而,这是非常低效的&在高端设备上并不能防止adb连接过载。
输入asyncio
让我开始我的声明,我很少有工作经验
asyncio
在这一点上;因此,我很想找出哪些已经发布的问题对我有帮助。因此,如果答案已经在别处出现,我深表歉意。另外,为了让人们了解我的库是如何运行的,代码块会有点长,但我只包含了文件的一部分(一些函数来显示我最终是如何交互的),我试着只包含连接到显示命令链的函数。
我的目标是能够使用
异步
要将所有命令排队并一次发送一个命令,如果命令在任何时候失败(这将导致adb断开连接),我想重新建立adb连接并继续命令队列。
当前代码结构:
class AndroidTV:
""" Represents an Android TV device. """
def __init__(self, host, adbkey=''):
""" Initialize AndroidTV object.
:param host: Host in format <address>:port.
:param adbkey: The path to the "adbkey" file
"""
self.host = host
self.adbkey = adbkey
self._adb = None
self.state = STATE_UNKNOWN
self.muted = False
self.device = 'hdmi'
self.volume = 0.
self.app_id = None
self.package_launcher = None
self.package_settings = None
self._adb_error = False
self._adb_lock = False
self._exceptions = (TypeError, ValueError, AttributeError,
InvalidCommandError, InvalidResponseError,
InvalidChecksumError, BrokenPipeError)
@adb_wrapper
async def connect(self):
""" Connect to an Android TV device.
Will attempt to establish ADB connection to the given host.
Failure sets state to UNKNOWN and disables sending actions.
"""
try:
if self.adbkey:
signer = Signer(self.adbkey)
# Connect to the device
self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host, rsa_keys=[signer])
else:
self._adb = adb_commands.AdbCommands().ConnectDevice(serial=self.host)
if not self.package_settings:
self._adb.Shell("am start -a android.settings.SETTINGS")
await asyncio.sleep(1)
logging.info("Getting Settings App Package")
self.package_settings = await self.current_app
if not self.package_launcher:
await self.home()
await asyncio.sleep(1)
logging.info("Getting Launcher App Package")
self.package_launcher = await self.current_app
except socket_error as serr:
logging.warning("Couldn't connect to host: %s, error: %s", self.host, serr.strerror)
@adb_wrapper
async def update(self):
""" Update the device status. """
# Check if device is disconnected.
if not self._adb:
self.state = STATE_UNKNOWN
self.app_id = None
# Check if device is off.
elif not await self._screen_on:
self.state = STATE_OFF
self.app_id = None
else:
self.app_id = await self.current_app
if await self._wake_lock:
self.state = STATE_PLAYING
elif self.app_id not in (self.package_launcher, self.package_settings):
# Check if state was playing on last update
if self.state == STATE_PLAYING:
self.state = STATE_PAUSED
elif self.state != STATE_PAUSED:
self.state = STATE_IDLE
else:
# We're on either the launcher or in settings
self.state = STATE_ON
# Get information from the audio status.
audio_output = await self._dump('audio')
stream_block = re.findall(BLOCK_REGEX, audio_output,
re.DOTALL | re.MULTILINE)[0]
self.muted = re.findall(MUTED_REGEX, stream_block,
re.DOTALL | re.MULTILINE)[0] == 'true'
@property
async def current_app(self):
filtered_dump = await self._dump("window windows", "mCurrentFocus")
current_focus = filtered_dump.replace("\r", "")
matches = WINDOW_REGEX.search(current_focus)
if matches:
(pkg, activity) = matches.group('package', 'activity')
return pkg
else:
logging.warning("Couldn't get current app, reply was %s", current_focus)
return None
@property
async def _screen_on(self):
return await self._dump_has('power', 'Display Power', 'state=ON')
@property
async def _awake(self):
return await self._dump_has('power', 'mWakefulness', 'Awake')
@property
async def _wake_lock(self):
return not await self._dump_has('power', 'Locks', 'size=0')
@adb_wrapper
async def _input(self, cmd):
if not self._adb:
return
self._adb.Shell('input {0}'.format(cmd))
@adb_wrapper
async def _dump(self, service, grep=None):
if not self._adb:
return
if grep:
return self._adb.Shell('dumpsys {0} | grep "{1}"'.format(service, grep))
return self._adb.Shell('dumpsys {0}'.format(service))
async def _dump_has(self, service, grep, search):
dump_result = await self._dump(service, grep=grep)
return dump_result.strip().find(search) > -1
正如我前面说过的,上述方法部分工作,但基本上是一个创可贴。
唯一的命令
直接地
制作
adb.Shell
1.
async def connect(self)
async def update(self)
async def _input(self, cmd)
4.
async def _dump(self, service, grep=None)
5.
async def _key(self, key)
connect
&
update
函数导致多个
所以这可能是我的问题所在。
2.按收到的顺序执行?
3.处理任意点的错误,重新连接,然后继续执行命令队列的其余部分?
这是我在完成这件事上失败的一半尝试。
import asyncio
async def produce_output(queue, commands):
for command in commands:
#execute the adb command
if 'keypress' in command:
#command contains 'input keypress ENTER'
adb.Shell(command)
#mark the task done because there's nothing to process
queue.task_done()
else:
#command contains 'dumpsys audio'
output = adb.Shell(command)
#put result in queue
await queue.put(output)
async def process_adb(queue):
while True:
output = await queue.get()
#return output (somehow?)
queue.task_done()
async def update():
adb_queue = asyncio.Queue()
asyncio.create_task(produce_output(adb_queue,
[self._screen_on,
self.current_app,
self._wake_lock,
self._dump('audio')]))
#Not sure how to proceed
if not self._adb:
self.state = STATE_UNKNOWN
self.app_id = None
# Check if device is off.
# Fetching result of first item in the queue - self._screen_on
elif not await adb_queue.get():
self.state = STATE_OFF
self.app_id = None
else:
# Fetching result of second item in the queue - self.current_app
self.app_id = await adb_queue.get()
# Fetching result of third item in the queue - self._wake_lock
if await adb_queue.get():
self.state = STATE_PLAYING
elif self.app_id not in (self.package_launcher, self.package_settings):
# Check if state was playing on last update
if self.state == STATE_PLAYING:
self.state = STATE_PAUSED
elif self.state != STATE_PAUSED:
self.state = STATE_IDLE
else:
# We're on either the launcher or in settings
self.state = STATE_ON
# Get information from the audio status.
# Fetching result of fourth item in the queue - self._dump('audio')
audio_output = await adb_queue.get()
stream_block = re.findall(BLOCK_REGEX, audio_output,
re.DOTALL | re.MULTILINE)[0]
self.muted = re.findall(MUTED_REGEX, stream_block,
re.DOTALL | re.MULTILINE)[0] == 'true'