代码之家  ›  专栏  ›  技术社区  ›  Nate Glenn

使用Popen控制更少

  •  4
  • Nate Glenn  · 技术社区  · 7 年前

    我在努力控制 less 来自Mac OSX上的Python脚本。基本上,我希望能够转发控制字符(上/下/左/右),但在Python程序中处理其他输入。我正在使用 Popen 开始 较少的 但是 less reads user input from a source other than stdin . 因此,我不确定如何将任何字符发送到less。

    程序打开较少,等待一秒钟,然后尝试发送 q 要使用两个单独的频道退出,请执行以下操作: 标准DIN /dev/tty (因为我在上面链接的SO问题中提到了这一点)。两者都不起作用。

    from subprocess import Popen, PIPE
    import time
    p1 = Popen("echo hello | less -K -R", stdin=PIPE, shell=True)
    time.sleep(1)
    p1.stdin.write(bytes('q', 'utf-8'))
    with open("/dev/tty", 'w') as tty:
        tty.write('q')
    p1.wait()
    

    我如何控制 较少的 来自Python脚本?

    1 回复  |  直到 7 年前
        1
  •  1
  •   ephemient    7 年前

    这有点复杂,但可以使用 forkpty(3) 创建一个新的TTY,您可以在其中完全控制 less ,将输入和输出转发到原始TTY,使其感觉无缝。

    下面的代码使用Python 3及其标准库。 pexpect 可以做很多繁重的工作,但Python并没有提供这种功能。此外,这种方式更有教育意义。

    import contextlib
    import fcntl
    import io
    import os
    import pty
    import select
    import signal
    import struct
    import termios
    import time
    import tty
    

    假设其余代码缩进以在此上下文管理器中运行。

    with contextlib.ExitStack() as stack:
    

    我们需要获取真实的TTY并将其设置为原始模式。这可能会混淆TTY的其他用户(例如,此程序退出后的shell),因此请确保在退出后将其恢复到相同的状态。

    tty_fd = os.open('/dev/tty', os.O_RDWR | os.O_CLOEXEC)
    stack.callback(os.close, tty_fd)
    tc = termios.tcgetattr(tty_fd)
    stack.callback(termios.tcsetattr, tty_fd, termios.TCSANOW, tc)
    tty.setraw(tty_fd, when=termios.TCSANOW)
    

    然后我们可以调用 forkpty ,其名称为 pty.fork() 在Python中。这有两件事:

    • 创建 pseudoterminal .
    • 分叉一个新的孩子。
    • 将孩子连接到PTY的从端。
    • 将子PID和PTY的主端返回到原始流程。

    孩子应该跑 较少的 . 注意使用 _exit(2) 因为在 fork .

    child_pid, master_fd = pty.fork()
    if child_pid == 0:
        os.execv('/bin/sh', ('/bin/sh', '-c', 'echo hello | less -K -R'))
        os._exit(0)
    stack.callback(os.close, master_fd)
    

    然后需要做一些工作来设置一些异步信号处理程序。

    • SIGCHLD 在子进程更改状态(例如退出)时接收。我们可以用它来跟踪孩子是否还在跑步。
    • SIGWINCH 当控制终端改变大小时接收。我们将此大小转发给PTY(PTY将自动向附加到它的进程发送另一个窗口更改信号)。我们也应该将PTY的窗口大小设置为与开始匹配。

    转发信号也有意义,例如 SIGINT , SIGTERM

    child_is_running = True
    def handle_chld(signum, frame):
        while True:
            pid, status = os.waitpid(-1, os.P_NOWAIT)
            if not pid:
                break
            if pid == child_pid:
                child_is_running = False
    def handle_winch(signum, frame):
        tc = struct.pack('HHHH', 0, 0, 0, 0)
        tc = fcntl.ioctl(tty_fd, termios.TIOCGWINSZ, tc)
        fcntl.ioctl(master_fd, termios.TIOCSWINSZ, tc)
    handler = signal.signal(signal.SIGCHLD, handle_chld)
    stack.callback(signal.signal, signal.SIGCHLD, handler)
    handler = signal.signal(signal.SIGWINCH, handle_winch)
    stack.callback(signal.signal, signal.SIGWINCH, handler)
    handle_winch(0, None)
    

    现在来看真正的肉:在真实和虚假TTY之间复制数据。

    target_time = time.clock_gettime(time.CLOCK_MONOTONIC_RAW) + 1
    has_sent_q = False
    with contextlib.suppress(OSError):
        while child_is_running:
            now = time.clock_gettime(time.CLOCK_MONOTONIC_RAW)
            if now < target_time:
                timeout = target_time - now
            else:
                timeout = None
                if not has_sent_q:
                    os.write(master_fd, b'q')
                    has_sent_q = True
            rfds, wfds, xfds = select.select((tty_fd, master_fd), (), (), timeout)
            if tty_fd in rfds:
                data = os.read(tty_fd, io.DEFAULT_BUFFER_SIZE)
                os.write(master_fd, data)
            if master_fd in rfds:
                data = os.read(master_fd, io.DEFAULT_BUFFER_SIZE)
                os.write(tty_fd, data)
    

    它看起来很简单,虽然我在修饰一些东西,比如适当的短文和 SIGTTIN / SIGTTOU 处理(通过抑制部分隐藏 OSError ).