代码之家  ›  专栏  ›  技术社区  ›  Eren

多处理不起作用,挂在windows 10上的连接上

  •  0
  • Eren  · 技术社区  · 4 年前

    我对排队有疑问 multiprocessing python 3中的模块

    这就是他们在 programming guidelines :

    请记住,将项目放入队列的进程将在之前等待 终止,直到所有缓冲的项目都由“进料器”线程送入 下面的管道。(子进程可以调用 排队取消_连接_线程 队列的方法来避免这种行为。)

    这意味着,无论何时使用队列,都需要确保所有 已放入队列的项目最终将在 过程被加入。否则,您无法确定哪些流程 将项目放入队列将终止。还要记住,非守护进程 进程将自动加入。

    死锁的示例如下:

    from multiprocessing import Process, Queue
    
    def f(q):
        q.put('X' * 1000000)
    
    if __name__ == '__main__':
        queue = Queue()
        p = Process(target=f, args=(queue,))
        p.start()
        p.join()                    # this deadlocks
        obj = queue.get()
    

    这里的一个修复方法是交换最后两行(或者简单地删除 p.join()行)。

    所以很明显, queue.get() 不应该在a之后被调用 join() .

    然而,也有使用队列的例子,其中 get 在a之后被调用 join 比如:

    import multiprocessing as mp
    import random
    import string
    
    # define a example function
    def rand_string(length, output):
        """ Generates a random string of numbers, lower- and uppercase chars. """
        rand_str = ''.join(random.choice(
                    string.ascii_lowercase
                    + string.ascii_uppercase
                    + string.digits)
        for i in range(length))
            output.put(rand_str)
    
     if __name__ == "__main__":
         # Define an output queue
         output = mp.Queue()
    
         # Setup a list of processes that we want to run
         processes = [mp.Process(target=rand_string, args=(5, output))
                        for x in range(2)]
    
         # Run processes
        for p in processes:
            p.start()
    
        # Exit the completed processes
        for p in processes:
            p.join()
    
        # Get process results from the output queue
        results = [output.get() for p in processes]
    
        print(results)
    

    我已经运行了这个程序,它是有效的(也作为StackOverFlow问题的解决方案发布) Python 3 - Multiprocessing - Queue.get() does not respond ).

    有人能帮我理解僵局的规则是什么吗?

    0 回复  |  直到 8 年前
        1
  •  56
  •   Community CDub    8 年前

    允许在进程之间传输数据的多处理中的队列实现依赖于标准操作系统管道。

    操作系统管道不是无限长的,因此在操作系统中排队数据的进程可能会被阻塞 put() 操作,直到其他进程使用 get() 从队列中检索数据。

    对于少量数据,如示例中的数据,主进程可以 join() 所有生成的子流程,然后获取数据。这通常效果很好,但无法扩展,也不清楚何时会破裂。

    但它肯定会与大量数据决裂。子流程将在中被阻止 put() 等待主进程从队列中删除一些数据 get() ,但主进程被阻塞 join() 等待子流程完成。这导致了僵局。

    下面是一个用户有 this exact issue 我在那里的答案中发布了一些代码,帮助他解决了问题。

        2
  •  7
  •   Alexander Pravdin    7 年前

    不要打电话 join() 在从共享队列获取所有消息之前,先对流程对象进行处理。

    我使用了以下解决方法,允许进程在处理所有结果之前退出:

    results = []
    while True:
        try:
            result = resultQueue.get(False, 0.01)
            results.append(result)
        except queue.Empty:
            pass
        allExited = True
        for t in processes:
            if t.exitcode is None:
                allExited = False
                break
        if allExited & resultQueue.empty():
            break
    

    它可以缩短,但我留了更长的时间,以便新手更清楚。

    在这里 resultQueue multiprocess.Queue 与分享 multiprocess.Process 物体。在这段代码之后,您将得到 result 包含队列中所有消息的数组。

    问题是,接收消息的队列管道的输入缓冲区可能已满,导致写入器无限阻塞,直到有足够的空间接收下一条消息。因此,有三种方法可以避免阻塞:

    • 增加 multiprocessing.connection.BUFFER 尺寸(不太好)
    • 减小邮件大小或数量(不太好)
    • 收到消息后立即从队列中获取消息(好方法)