代码之家  ›  专栏  ›  技术社区  ›  Prune

从子进程流式读取

  •  10
  • Prune  · 技术社区  · 6 年前

    我需要在子进程生成时读取它的输出——也许不是在每个进程上 write 但是在这个过程完成之前。我试过从python3医生那里得到解决方案,所以有问题。 here here 但是在孩子死之前我什么也得不到。

    该应用程序用于监控深度学习模型的培训。我需要获取测试输出(每次迭代大约250字节,大约间隔1分钟),并观察统计失败。

    • 我不能更改培训引擎;例如,我不能插入 stdout.flush() 在子进程代码中。
    • 我可以合理地等待十几行输出的累积;我希望有一个缓冲区填充来解决我的问题。

    代码:变更被注释掉。

    起源

    cmd = ["/usr/bin/python3", "zzz.py"]
    # test_proc = subprocess.Popen(
    test_proc = subprocess.run(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT
        )
    
    out_data = ""
    print(time.time(), "START")
    while not "QUIT" in str(out_data):
        out_data = test_proc.stdout
        # out_data, err_data = test_proc.communicate()
        print(time.time(), "MAIN received", out_data)
    

    儿童(ZZZ,PY)

    from time import sleep
    import sys
    
    for _ in range(5):
        print(_, "sleeping", "."*1000)
        # sys.stdout.flush()
        sleep(1)
    
    print("QUIT this exercise")
    

    尽管发送1000多字节的行,但是缓冲区(在其他地方测试为2KB;在这里,我已经达到50KB)填充不会导致父级“看到”新文本。

    我怎么会错过这个工作?


    更新链接、评论和 iBug 发布的答案:

    • Popen 而不是 run 修复了阻塞问题。不知怎么的,我在文档和我的实验中都错过了这一点。
    • universal_newline=True 整齐地改变字节返回到字符串:在接收端更容易处理,尽管有交错的空行(容易检测和丢弃)。
    • 设置 bufsize 小的东西(例如 1 )没有任何影响;家长仍然需要等待孩子填写 stdout 缓冲区,在我的情况下是8K。
    • export PYTHONUNBUFFERED=1 执行前 解决缓冲问题。多亏了 wim 用于链接。

    除非有人提出一个规范的、漂亮的解决方案,使这些过时,我明天将接受IBUG的答案。

    1 回复  |  直到 6 年前
        1
  •  3
  •   iBug    6 年前

    subprocess.run 总是产生子进程,并且 在线程退出之前将其阻塞 .

    你唯一的选择就是 p = subprocess.Popen(...) 和阅读行 s = p.stdout.readline() p.stdout.__iter__() (见下文)。

    如果子进程在打印行后刷新stdout,则此代码对我有效(请参阅下面的扩展说明)。

    cmd = ["/usr/bin/python3", "zzz.py"]
    test_proc = subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT
    )
    
    out_data = ""
    print(time.time(), "START")
    while not "QUIT" in str(out_data):
        out_data = test_proc.stdout.readline()
        print(time.time(), "MAIN received", out_data)
    test_proc.communicate()  # shut it down
    

    查看我的终端日志(删除点 zzz.py ):

    ibug@ubuntu:~/t $ python3 p.py
    1546450821.9174328 START
    1546450821.9793346 MAIN received b'0 sleeping \n'
    1546450822.987753 MAIN received b'1 sleeping \n'
    1546450823.993136 MAIN received b'2 sleeping \n'
    1546450824.997726 MAIN received b'3 sleeping \n'
    1546450825.9975247 MAIN received b'4 sleeping \n'
    1546450827.0094354 MAIN received b'QUIT this exercise\n'
    

    你也可以用 for 循环:

    for out_data in test_proc.stdout:
        if "QUIT" in str(out_data):
            break
        print(time.time(), "MAIN received", out_data)
    

    如果无法修改子进程, unbuffer (从包装中) expect -使用apt或yum)安装可能会有所帮助。这是我的工作父代码 没有 更改子代码。

    test_proc = subprocess.Popen(
        ["unbuffer"] + cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT
    )