代码之家  ›  专栏  ›  技术社区  ›  user824600

多处理池map\u async的意外行为

  •  1
  • user824600  · 技术社区  · 7 年前

    我有一些代码对python 3应用程序中的多个文件执行相同的操作,因此似乎是 multiprocessing . 我想用 Pool 将工作分配给一些进程。在进行这些计算时,我希望代码继续执行其他操作(主要是为用户显示内容),因此我希望使用 map_async 的功能 multiprocessing.Pool 类。我希望调用此函数后,代码将继续,结果将由我指定的回调处理,但这似乎没有发生。下面的代码显示了我尝试调用的三种方法 map\U异步 我看到的结果是:

    import multiprocessing
    NUM_PROCS = 4
    def func(arg_list):
        arg1 = arg_list[0]
        arg2 = arg_list[1]
        print('start func')
        print ('arg1 = {0}'.format(arg1))
        print ('arg2 = {0}'.format(arg2))
        time.sleep(1)
        result1 = arg1 * arg2
        print('end func')
        return result1
    
    def callback(result):
        print('result is {0}'.format(result))
    
    
    def error_handler(error1):
        print('error in call\n {0}'.format(error1))
    
    
    def async1(arg_list1):
        # This is how my understanding of map_async suggests i should
        # call it. When I execute this, the target function func() is not called
        with multiprocessing.Pool(NUM_PROCS) as p1:
            r1 = p1.map_async(func,
                              arg_list1,
                              callback=callback,
                              error_callback=error_handler)
    
    
    def async2(arg_list1):
        with multiprocessing.Pool(NUM_PROCS) as p1:
            # If I call the wait function on the result for a small
            # amount of time, then the target function func() is called
            # and executes sucessfully in 2 processes, but the callback
            # function is never called so the results are not processed
            r1 = p1.map_async(func,
                              arg_list1,
                              callback=callback,
                              error_callback=error_handler)
            r1.wait(0.1)
    
    
    def async3(arg_list1):
        # if I explicitly call join on the pool, then the target function func()
        # successfully executes in 2 processes and the callback function is also
        # called, but by calling join the processing is not asynchronous any more
        # as join blocks the main process until the other processes are finished.
        with multiprocessing.Pool(NUM_PROCS) as p1:
            r1 = p1.map_async(func,
                              arg_list1,
                              callback=callback,
                              error_callback=error_handler)
            p1.close()
            p1.join()
    
    
    def main():
        arg_list1 = [(5, 3), (7, 4), (-8, 10), (4, 12)]
        async3(arg_list1)
    
        print('pool executed successfully')
    
    
    if __name__ == '__main__':
        main()
    

    什么时候 async1 , async2 async3 在main中调用,结果在每个函数的注释中描述。有人能解释一下为什么不同的电话会这样吗?最后我想打电话 map\U异步 如中所述 异步1 ,这样,当工作进程繁忙时,我可以在主进程中执行其他操作。我已经用python 2.7和3.6在旧的RH6 linux机器和新的ubuntu VM上测试了这段代码,得到了相同的结果。

    1 回复  |  直到 7 年前
        1
  •  3
  •   dano    7 年前

    这是因为当您使用 multiprocessing.Pool 作为上下文管理器, pool.terminate() is called when you leave the with block ,它将立即退出所有工作人员,而无需等待正在进行的任务完成。

    版本3.3中新增: Pool 对象现在支持上下文管理协议–请参阅上下文管理器 Types. __enter__() 返回池对象,并 __exit__() 电话 terminate() .

    IMO使用 终止() 作为 __exit__ 上下文管理器的方法并不是一个很好的设计选择,因为大多数人直觉上都希望 close() 将调用,它将等待正在进行的任务完成,然后退出。不幸的是,你所能做的就是重构你的代码,避免使用上下文管理器,或者重构你的代码,以保证你不会离开上下文 具有 阻止,直到 水塘 完成了它的工作。