我很确定你的问题是你如何创造
Queue
并尝试与子进程共享它们。如果只是将它们作为全局变量,则可以在子进程中重新创建它们,而不是继承它们(具体细节取决于操作系统和/或
context
您正在使用
multiprocessing
).
解决此问题的更好方法是避免使用
multiprocessing.Pool
生成流程,而不是显式创建
Process
您的工人自己的实例。这样你就可以通过
队列
需要它们的过程的实例没有任何困难(它是
从技术上讲
可以将队列传递给
Pool
工人,但这很尴尬)。
我想试试这样:
def worker_function(section_number, hist_queue, full_hist_queue): # take queues as arguments
# ... the rest of the function can work as before
# note, I renamed this from "main_function" since it's not running in the main process
if __name__ == '__main__':
hist_queue = JoinableQueue() # create the queues only in the main process
full_hist_queue = JoinableQueue() # the workers don't need to access them as globals
processes = [Process(target=worker_function, args=(i, hist_queue, full_hist_queue)
for i in range(num_sections)]
for p in processes:
p.start()
# ...
如果辅助函数的不同阶段或多或少彼此独立(即“执行更多操作”步骤不直接依赖于其上方的“执行多个操作”步骤,只需
full_histogram
),你也许可以保留
水塘
而是将不同的步骤分解成不同的函数,主进程可以通过对
map
在游泳池上。你不需要用你自己的
队列
在这种方法中,只有内置到池中的那些。这可能是最好的,尤其是当您要将工作拆分成的“部分”的数量与您计算机上的处理器内核数量不一致时。您可以让
水塘
匹配核心数,让每个核心依次处理数据的多个部分。
大致可以这样描述:
def worker_make_hist(section_number):
# do several operations, get a partial histogram
return histogram_this_section
def worker_do_more_ops(section_number, full_histogram):
# whatever...
return some_result
if __name__ == "__main__":
pool = multiprocessing.Pool() # by default the size will be equal to the number of cores
for temp_hist in pool.imap_unordered(worker_make_hist, range(number_of_sections)):
hist_full += temp_hist
some_results = pool.starmap(worker_do_more_ops, zip(range(number_of_sections),
itertools.repeat(hist_full)))