代码之家  ›  专栏  ›  技术社区  ›  lo tolmencre

多处理-线程池内存泄漏?

  •  1
  • lo tolmencre  · 技术社区  · 6 年前

    我正在观察我无法向自己解释的记忆用法。下面我提供了一个实际代码的精简版本,它仍然显示了这种行为。本规范旨在实现以下目的:

    以1000行为一块读取文本文件。每行都是一个句子。把这1000句话分成4个生成器。将这些生成器传递到线程池,并在250个句子上并行运行特征提取。 在我的实际代码中,我从整个文件的所有句子中积累特性和标签。 现在出现了一个奇怪的问题:内存被分配,但即使不累积这些值,也不会再次释放!我想这和线程池有关。总占用的内存量取决于为任何给定的单词提取多少特性。我在这里用 range(100) . 看看:

    from sys import argv
    from itertools import chain, islice
    from multiprocessing import Pool
    from math import ceil
    
    
    # dummyfied feature extraction function
    # the lengt of the range determines howmuch mamory is used up in total,
    # eventhough the objects are never stored
    def features_from_sentence(sentence):
        return [{'some feature'  'some value'} for i in range(100)], ['some label' for i in range(100)]
    
    
    # split iterable into generator of generators of length `size`
    def chunks(iterable, size=10):
        iterator = iter(iterable)
        for first in iterator:
            yield chain([first], islice(iterator, size - 1))
    
    
    def features_from_sentence_meta(l):
        return list(map (features_from_sentence, l))
    
    
    def make_X_and_Y_sets(sentences, i):
        print(f'start: {i}')
        pool = Pool()
        # split sentences into a generator of 4 generators
        sentence_chunks = chunks(sentences, ceil(50000/4))
        # results is a list containing the lists of pairs of X and Y of all chunks
        results = map(lambda x : x[0], pool.map(features_from_sentence_meta, sentence_chunks))
        X, Y = zip(*results)
        print(f'end: {i}')
        return X, Y
    
    
    # reads file in chunks of `lines_per_chunk` lines
    def line_chunks(textfile, lines_per_chunk=1000):
        chunk = []
        i = 0
        with open(textfile, 'r') as textfile:
            for line in textfile:
                if not line.split(): continue
                i+=1
                chunk.append(line.strip())
                if i == lines_per_chunk:
                    yield chunk
                    i = 0
                    chunk = []
            yield chunk
    
    textfile = argv[1]
    
    for i, line_chunk in enumerate(line_chunks(textfile)):
        # stop processing file after 10 chunks to demonstrate
        # that memory stays occupied (check your system monitor)
        if i == 10:
            while True:
                pass
        X_chunk, Y_chunk = make_X_and_Y_sets(line_chunk, i)
    

    我用来调试的文件有50000行非空行,这就是为什么我在一个地方使用硬编码50000的原因。如果您想使用同一个文件,为了您的方便,他是一个链接:

    https://www.dropbox.com/s/v7nxb7vrrjim349/de_wiki_50000_lines?dl=0

    现在,当您运行这个脚本并打开系统监视器时,您将观察到内存已用完,并且使用情况一直持续到第10个块,在那里我人为地进入一个无止境的循环,以证明内存仍在使用中,即使我从未存储任何东西。

    你能解释一下为什么会这样吗?我似乎对多处理池的使用方式有所遗漏。

    1 回复  |  直到 6 年前
        1
  •  3
  •   abarnert    6 年前

    首先,让我们澄清一些误解,尽管事实证明,这并不是一开始就要探索的正确途径。

    当您在python中分配内存时,当然必须从操作系统中获取内存。

    然而,当您释放内存时,它很少返回到操作系统,直到您最终退出。相反,它进入一个“自由列表”,或者实际上,出于不同的目的,进入多个级别的自由列表。这意味着下次需要内存时,python已经拥有了它,并且可以立即找到它,而无需与操作系统进行对话来分配更多的内存。这通常使内存密集型程序更快。

    但这也意味着,尤其是在现代64位操作系统上,通过查看活动监视器/任务管理器等来了解您是否真的存在内存压力问题几乎是无用的。


    这个 tracemalloc 标准库中的模块提供了一些低级工具来查看内存使用情况的实际情况。在更高的层次上,你可以使用 memory_profiler ,如果启用 跟踪malloc 支持这一点很重要)可以将这些信息与来自诸如 psutil 去弄清楚事情的发展方向。

    但是,如果您没有看到任何实际问题您的系统不会进入交换地狱,您将不会得到任何 MemoryError 例外的是,你的表现并没有达到某个奇怪的悬崖,它线性上升到n,然后突然在n+1,等等,你通常一开始就不需要为此烦恼。


    如果你 发现一个问题,那么,幸运的是,你已经在半路上解决它了。正如我在顶部提到的,在您最终退出之前,您分配的大多数内存不会返回到操作系统。但是,如果您所有的内存使用都发生在子进程中,并且这些子进程没有状态,那么您可以让它们在需要时退出并重新启动。

    当然,执行soprocess拆卸和启动时间,以及必须重新启动的页面映射和缓存,以及要求操作系统重新分配内存,都会带来性能成本,等等。还有一个复杂的成本,你不能只运行一个池,让它做它的事情;你必须参与到它的事情中,让它为你再循环过程。

    没有内置的支持 multiprocessing.Pool 因为这样做。

    当然,你可以建立自己的 Pool . 如果你想得到幻想,你可以看一下来源 multiprocessing 做它所做的。或者,您可以从 Process 对象和一对 Queue 或者你可以直接使用 过程 没有抽象池的对象。


    另一个你可能有记忆问题的原因是你的个别过程是好的,但是你有太多的过程。

    事实上,这里似乎就是这样。

    你创造了一个 在该职能部门的4名工人中:

    def make_X_and_Y_sets(sentences, i):
        print(f'start: {i}')
        pool = Pool()
        # ...
    

    对于每个块都调用这个函数:

    for i, line_chunk in enumerate(line_chunks(textfile)):
        # ...
        X_chunk, Y_chunk = make_X_and_Y_sets(line_chunk, i)
    

    因此,每一块都有4个新的进程。即使每一个都有很低的内存使用率,同时拥有数百个内存也会增加内存使用率。

    更不用说,数百个进程在4核上竞争,可能会严重损害您的时间性能,因此您将时间浪费在上下文切换和操作系统调度上,而不是实际工作上。

    正如您在评论中指出的,解决这个问题的方法是微不足道的:只需使一个全局 pool 而不是每次通话都换一个新的。


    很抱歉,在这里获取了所有的Columbo,但此代码在模块的顶层运行的还有一件事:

    对于i,枚举中的行块(行块(textfile)):
    ……
    x_chunk,y_chunk=生成x_和y_集合(line_chunk,i)
    

    而这就是试图加速池和所有子任务的代码。但是池中的每个子进程都需要 import 这个模块,这意味着它们最终都将运行相同的代码,并旋转另一个池和一整套额外的子任务。

    您可能在Linux或MacOS上运行这个程序,默认情况下 startmethod fork ,也就是说 多处理 可以避免这个 进口 所以你没有问题。但是对于其他startmethods,这段代码基本上是一个叉式炸弹,它会占用您所有的系统资源。其中包括 spawn ,这是Windows上的默认StartMethod。因此,如果有任何人可能在Windows上运行此代码,您应该将所有顶级代码放在 if __name__ == '__main__': 警卫。