这有点复杂,但可以使用
forkpty(3)
创建一个新的TTY,您可以在其中完全控制
less
,将输入和输出转发到原始TTY,使其感觉无缝。
下面的代码使用Python 3及其标准库。
pexpect
可以做很多繁重的工作,但Python并没有提供这种功能。此外,这种方式更有教育意义。
import contextlib
import fcntl
import io
import os
import pty
import select
import signal
import struct
import termios
import time
import tty
假设其余代码缩进以在此上下文管理器中运行。
with contextlib.ExitStack() as stack:
我们需要获取真实的TTY并将其设置为原始模式。这可能会混淆TTY的其他用户(例如,此程序退出后的shell),因此请确保在退出后将其恢复到相同的状态。
tty_fd = os.open('/dev/tty', os.O_RDWR | os.O_CLOEXEC)
stack.callback(os.close, tty_fd)
tc = termios.tcgetattr(tty_fd)
stack.callback(termios.tcsetattr, tty_fd, termios.TCSANOW, tc)
tty.setraw(tty_fd, when=termios.TCSANOW)
然后我们可以调用
forkpty
,其名称为
pty.fork()
在Python中。这有两件事:
孩子应该跑
较少的
. 注意使用
_exit(2)
因为在
fork
.
child_pid, master_fd = pty.fork()
if child_pid == 0:
os.execv('/bin/sh', ('/bin/sh', '-c', 'echo hello | less -K -R'))
os._exit(0)
stack.callback(os.close, master_fd)
然后需要做一些工作来设置一些异步信号处理程序。
-
SIGCHLD
在子进程更改状态(例如退出)时接收。我们可以用它来跟踪孩子是否还在跑步。
-
SIGWINCH
当控制终端改变大小时接收。我们将此大小转发给PTY(PTY将自动向附加到它的进程发送另一个窗口更改信号)。我们也应该将PTY的窗口大小设置为与开始匹配。
转发信号也有意义,例如
SIGINT
,
SIGTERM
等
child_is_running = True
def handle_chld(signum, frame):
while True:
pid, status = os.waitpid(-1, os.P_NOWAIT)
if not pid:
break
if pid == child_pid:
child_is_running = False
def handle_winch(signum, frame):
tc = struct.pack('HHHH', 0, 0, 0, 0)
tc = fcntl.ioctl(tty_fd, termios.TIOCGWINSZ, tc)
fcntl.ioctl(master_fd, termios.TIOCSWINSZ, tc)
handler = signal.signal(signal.SIGCHLD, handle_chld)
stack.callback(signal.signal, signal.SIGCHLD, handler)
handler = signal.signal(signal.SIGWINCH, handle_winch)
stack.callback(signal.signal, signal.SIGWINCH, handler)
handle_winch(0, None)
现在来看真正的肉:在真实和虚假TTY之间复制数据。
target_time = time.clock_gettime(time.CLOCK_MONOTONIC_RAW) + 1
has_sent_q = False
with contextlib.suppress(OSError):
while child_is_running:
now = time.clock_gettime(time.CLOCK_MONOTONIC_RAW)
if now < target_time:
timeout = target_time - now
else:
timeout = None
if not has_sent_q:
os.write(master_fd, b'q')
has_sent_q = True
rfds, wfds, xfds = select.select((tty_fd, master_fd), (), (), timeout)
if tty_fd in rfds:
data = os.read(tty_fd, io.DEFAULT_BUFFER_SIZE)
os.write(master_fd, data)
if master_fd in rfds:
data = os.read(master_fd, io.DEFAULT_BUFFER_SIZE)
os.write(tty_fd, data)
它看起来很简单,虽然我在修饰一些东西,比如适当的短文和
SIGTTIN
/
SIGTTOU
处理(通过抑制部分隐藏
OSError
).