代码之家  ›  专栏  ›  技术社区  ›  user4113344

如何使用urwid和asyncio使长任务无阻塞?

  •  5
  • user4113344  · 技术社区  · 8 年前

    我正在编写一个Python curses应用程序,它通过进程发送和接收字符串来控制外部(Linux,如果有帮助的话)进程' stdin stdout 分别地接口使用 urwid . 我编写了一个类来控制外部进程,并为几个urwid组件编写了几个其他类。

    我还有一个按钮,用来向外部进程发送命令。然而,进程不会立即响应,其任务通常需要几秒钟,在此期间,我希望界面不要冻结。

    下面是我如何运行子进程:

    def run(self, args):
        import io, fcntl, os
        from subprocess import Popen, PIPE
    
        # Run wpa_cli with arguments, use a thread to feed the process with an input queue
        self._pss = Popen(["wpa_cli"] + args, stdout=PIPE, stdin=PIPE)
        self.stdout = io.TextIOWrapper(self._pss.stdout, encoding="utf-8")
        self.stdin = io.TextIOWrapper(self._pss.stdin, encoding="utf-8", line_buffering=True)
    
        # Make the process' stdout a non-blocking file
        fd = self.stdout.fileno()
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        ...
    

    我必须使进程的输出流无阻塞,才能解析其输出。我不知道这对我的问题是否重要。

    以下是我用来控制子进程输入和输出流的方法:

    def read(self, parser=None, transform=None, sentinel='>'):
        """ Reads from the controlled process' standard output until a sentinel
        is found. Optionally execute a callable object with every line. Parsed
        lines are placed in a list, which the function returns upon exiting. """
    
        if not transform:
            transform = lambda str: str
    
        def readline():
            return transform(self.stdout.readline().strip())
    
        # Keep a list of (non-empty) parsed lines
        items = []
        for line in iter(readline, sentinel):
            if callable(parser):
                item = parser(line)
                if item is not None:
                    items.append(item)
        return items
    
    def send(self, command, echo=True):
        """ Sends a command to the controlled process. Action commands are
        echoed to the standard output. Argument echo controls whether or not
        they're removed by the reader function before parsing. """
    
        print(command, file=self.stdin)
    
        # Should we remove the echoed command?
        if not echo:
            self.read(sentinel=command)
    

    我提到的按钮只是从主脚本输入函数中设置了回调。该回调应该向子进程发送命令,并在结果输出行中循环,直到找到给定的文本,在这种情况下,回调函数退出。在此之前,该过程会输出一些我需要捕捉并显示在用户界面中的有趣信息。

    例如:

    def button_callback():
        # This is just an illustration
    
        filter = re.compile('(event1|event2|...)')
    
        def formatter(text):
            try:
                return re.search(filter, text).group(1)
            except AttributeError:
                return text
    
        def parser(text):
            if text == 'event1':
                # Update the info Text accordingly
            if text == 'event2':
                # Update the info Text accordingly
    
        controller.send('command')
        controller.read(sentinel='beacon', parser=parser, transform=formatter)
    

    需要注意的是:

    • 这个 read() 函数旋转(我找不到其他方法),即使进程输出流是静默的,直到从(可选)解析的行中读取sentinel值,
    • 直到按钮回调函数退出,urwid接口才会刷新,这会阻止 乌尔维德 刷新屏幕的主循环。

    我可以用一根线,但从我读到的 乌尔维德 支架 asyncio 这就是我想要实现的。你可以叫我哑巴,因为我只是不清楚,即使在浏览之后 乌尔维德 asyncio示例和Python阅读 异步IO 文档

    考虑到有修改这些方法的余地,我仍然希望保留进程控制类,即包含 读取() send() 尽可能通用。

    到目前为止,我所做的任何尝试都没有导致在进程繁忙时更新接口。接收进程“通知”的组件是普通的 urwid.Text() 小装置。

    1 回复  |  直到 8 年前
        1
  •  4
  •   Elias Dorneles    8 年前

    首先有两件事:

    • 使用urwid执行异步操作不一定需要asyncio,因为它已经有了 simple event loop 这通常已经足够好了,它有用于处理许多IO场景的原语。

    • 在编写异步代码时,您需要注意在找到sentinel值之前编写类似于那些函数循环的同步代码,因为它会阻止任何其他代码(包括事件循环本身)的执行:这意味着UI将冻结,直到该函数返回

    对于您的情况,您可能可以使用默认的简单事件循环,并使用 MainLoop.watch_pipe 方法创建一个可在子流程中使用的管道(已将其置于异步/非阻塞模式,顺便说一下:),并在有新数据写入管道时调用回调。

    下面是一个使用它的简单示例,显示shell命令的输出,同时保持UI不被阻止(注意,由于懒惰,使用一些全局变量):

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    from __future__ import print_function, absolute_import, division
    import subprocess
    import urwid
    
    
    def show_or_exit(key):
        if key in ('q', 'Q', 'esc'):
            raise urwid.ExitMainLoop()
    
    
    def update_text(read_data):
        text.set_text(text.text + read_data)
    
    
    def enter_idle():
        loop.remove_watch_file(pipe.stdout)
    
    
    if __name__ == '__main__':
        widget = urwid.Pile([
            urwid.Button('Here is a button'),
            urwid.Button('And here another button'),
            urwid.Button('One more, just to be sure'),
            urwid.Button("Heck, let's add yet another one!"),
        ])
        text = urwid.Text('PROCESS OUTPUT:\n')
        widget = urwid.Columns([widget, text])
        widget = urwid.Filler(widget, 'top')
    
        loop = urwid.MainLoop(widget, unhandled_input=show_or_exit)
        stdout = loop.watch_pipe(update_text)
        stderr = loop.watch_pipe(update_text)
        pipe = subprocess.Popen('for i in $(seq 50); do echo -n "$i "; sleep 0.5; done',
                                shell=True, stdout=stdout, stderr=stderr)
        loop.run()
    

    注意回调中的代码 update_text 没有理由阻止:它获取已读取的数据,更新组件,仅此而已。在循环等待其他事情发生时没有。

    在您的情况下,您可能需要调整解析 wpa_cli 所以他们也没有理由阻止。例如,当他们找到或没有找到感兴趣的哨兵值时,他们可以设置一些变量或其他信号,而不是等待循环直到找到值。

    我希望这是有意义的,如果你需要澄清什么,请告诉我!:)