代码之家  ›  专栏  ›  技术社区  ›  tashton

在Python池中创建队列延迟而不阻塞

  •  0
  • tashton  · 技术社区  · 7 年前

    我有一个大型程序(特别是一个函数),我正试图使用 JoinableQueue 以及多重处理 map_async 方法我正在使用的函数对多维数组执行多个操作,因此我将每个数组分解为多个部分,每个部分独立计算;然而,我需要在早期将其中一个数组缝合在一起,但“缝合”发生在“评估”之前,我需要在JoinableQueue中引入某种延迟。我已经到处寻找一个可行的解决方案,但我对多重处理非常陌生,大部分都是我自己搞不懂的。

    这句话可能令人困惑——道歉。这里是我的代码大纲(我不能把它全部放在这里,因为它很长,但如果需要的话,我可以提供额外的细节)

    import numpy as np
    import multiprocessing as mp
    from multiprocessing import Pool, Pipe, JoinableQueue
    
    def main_function(section_number):
    
        #define section sizes
        array_this_section = array[:,start:end+1,:]
        histogram_this_section = np.zeros((3, dataset_size, dataset_size))
        #start and end are defined according to the size of the array
        #dataset_size is to show that the histogram is a different size than the array
    
        for m in range(1,num_iterations+1):
            #do several operations- each section of the array 
                     #corresponds to a section on the histogram
    
            hist_queue.put(histogram_this_section)
    
            #each process sends their own part of the histogram outside of the pool 
                     #to be combined with every other part- later operations 
                     #in this function must use the full histogram
    
            hist_queue.join()
            full_histogram = full_hist_queue.get()
            full_hist_queue.task_done()
    
            #do many more operations
    
    
    hist_queue = JoinableQueue()
    full_hist_queue = JoinableQueue()
    
    if __name__ == '__main__':
        pool = mp.Pool(num_sections)
        args = np.arange(num_sections)
        pool.map_async(main_function, args, chunksize=1)    
    
        #I need the map_async because the program is designed to display an output at the 
            #end of each iteration, and each output must be a compilation of all processes
    
        #a few variable definitions go here
    
        for m in range(1,num_iterations+1):
            for i in range(num_sections):
                temp_hist = hist_queue.get()    #the code hangs here because the queue 
                                                #is attempting to get before anything 
                                                #has been put
                hist_full += temp_hist
            for i in range(num_sections):
                hist_queue.task_done()
            for i in range(num_sections):
                full_hist_queue.put(hist_full)    #the full histogram is sent back into 
                                                  #the pool
    
    
                full_hist_queue.join()
    
            #etc etc
    
        pool.close()
        pool.join()
    
    1 回复  |  直到 7 年前
        1
  •  1
  •   Blckknght    7 年前

    我很确定你的问题是你如何创造 Queue 并尝试与子进程共享它们。如果只是将它们作为全局变量,则可以在子进程中重新创建它们,而不是继承它们(具体细节取决于操作系统和/或 context 您正在使用 multiprocessing ).

    解决此问题的更好方法是避免使用 multiprocessing.Pool 生成流程,而不是显式创建 Process 您的工人自己的实例。这样你就可以通过 队列 需要它们的过程的实例没有任何困难(它是 从技术上讲 可以将队列传递给 Pool 工人,但这很尴尬)。

    我想试试这样:

    def worker_function(section_number, hist_queue, full_hist_queue): # take queues as arguments
        # ... the rest of the function can work as before
        # note, I renamed this from "main_function" since it's not running in the main process
    
    if __name__ == '__main__':
        hist_queue = JoinableQueue()   # create the queues only in the main process
        full_hist_queue = JoinableQueue()  # the workers don't need to access them as globals
    
        processes = [Process(target=worker_function, args=(i, hist_queue, full_hist_queue)
                     for i in range(num_sections)]
        for p in processes:
            p.start()
    
        # ...
    

    如果辅助函数的不同阶段或多或少彼此独立(即“执行更多操作”步骤不直接依赖于其上方的“执行多个操作”步骤,只需 full_histogram ),你也许可以保留 水塘 而是将不同的步骤分解成不同的函数,主进程可以通过对 map 在游泳池上。你不需要用你自己的 队列 在这种方法中,只有内置到池中的那些。这可能是最好的,尤其是当您要将工作拆分成的“部分”的数量与您计算机上的处理器内核数量不一致时。您可以让 水塘 匹配核心数,让每个核心依次处理数据的多个部分。

    大致可以这样描述:

    def worker_make_hist(section_number):
        # do several operations, get a partial histogram
        return histogram_this_section
    
    def worker_do_more_ops(section_number, full_histogram):
        # whatever...
        return some_result
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool() # by default the size will be equal to the number of cores
    
        for temp_hist in pool.imap_unordered(worker_make_hist, range(number_of_sections)):
            hist_full += temp_hist
    
        some_results = pool.starmap(worker_do_more_ops, zip(range(number_of_sections),
                                                            itertools.repeat(hist_full)))