代码之家  ›  专栏  ›  技术社区  ›  Brad Solomon

Python多处理:理解“chunksize”背后的逻辑`

  •  22
  • Brad Solomon  · 技术社区  · 7 年前

    什么因素决定了一个最佳方案 chunksize 方法的参数,如 multiprocessing.Pool.map() ? 这个 .map() 方法似乎对其默认的chunksize使用了任意的启发式(解释如下);这种选择的动机是什么?是否有一种基于特定情况/设置的更为深思熟虑的方法?

    比如说,我是:

    我天真的想法是给24名员工每人一块大小相等的块,即。 15_000_000 / 24 或625000。大批量生产应在充分利用所有员工的同时减少营业额/管理费用。但这似乎忽略了给每个工人大批量生产的一些潜在不利因素。这是一张不完整的照片吗?我错过了什么?


    我的部分问题来自if的默认逻辑 chunksize=None :两者 .map() .starmap() .map_async()

    def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
                   error_callback=None):
        # ... (materialize `iterable` to list if it's an iterator)
        if chunksize is None:
            chunksize, extra = divmod(len(iterable), len(self._pool) * 4)  # ????
            if extra:
                chunksize += 1
        if len(iterable) == 0:
            chunksize = 0
    

    背后的逻辑是什么 divmod(len(iterable), len(self._pool) * 4) ? 这意味着chunksize将更接近 15_000_000 / (24 * 4) == 156_250 . 乘法的目的是什么 len(self._pool) 4点之前?

    这使得生成的chunksize是4的一个因子 而不是我上面的“幼稚逻辑”,它只是将iterable的长度除以 pool._pool .

    snippet 从Python文档开始 .imap() 这进一步激发了我的好奇心:

    这个 块大小 参数与 map() 块大小 完成工作 比使用默认值1更快。


    Python multiprocessing: why are large chunksizes slower?

    3 回复  |  直到 4 年前
        1
  •  239
  •   Darkonaut    6 年前

    简短回答

    Pool的chunksize算法是一种启发式算法。它为您试图填充到Pool方法中的所有可想象的问题场景提供了一个简单的解决方案。因此,它无法针对任何情况进行优化 具体的 脚本

    该算法将iterable任意划分为比naive方法多大约四倍的块。更多的块意味着更多的开销,但增加了调度的灵活性。这个答案将如何显示,这将导致平均更高的工人利用率,但

    “知道这一点很好,”你可能会想,“但知道这一点如何帮助我解决具体的多处理问题呢?”嗯,事实并非如此。更诚实的简短回答是,“没有简短的答案”,“多处理是复杂的”和“这取决于”。观察到的症状可能有不同的根源,即使是在类似的情况下。

    这个答案试图为您提供一些基本概念,帮助您更清楚地了解池的调度黑匣子。它还试图为您提供一些基本的工具,帮助您识别和避免与块大小相关的潜在悬崖。


    目录

    1. 定义
    2. 并行化目标
    3. 并行化场景
    4. 池的Chunksize算法
    5. 6.3效率

      6.3.1绝对分配效率(ADE)

      6.3.2相对分配效率(RDE)

    Part II

    1. Naive vs.Pool的Chunksize算法


    1.定义


    大块

    这里的一大块是一部分 iterable -在池方法调用中指定的参数。chunksize是如何计算的,以及它会产生什么影响,这是本答案的主题。


    任务

    任务在辅助进程中的数据物理表示如下图所示。

    figure0

    此图显示了对的调用示例 pool.map() ,沿代码行显示,取自 multiprocessing.pool.worker 函数,其中任务从 inqueue 打开包装。 worker 是中的基础主功能 MainThread 池工作进程的。这个 func func 工人 -函数用于单个调用方法,如 apply_async 以及 imap 具有 chunksize=1 chunksize -处理函数的参数 func 将是一个映射器函数( mapstar starmapstar )。此函数映射指定的用户 func -iterable(->“映射任务”)传输块的每个元素上的参数。所需的时间定义了 任务 工作单位 .


    而“任务”一词的用法 整体 一个块的处理由内的代码匹配 multiprocessing.pool ,没有迹象显示 单次呼叫 发送给指定的用户 func 应引用块的元素作为参数。为避免因命名冲突而产生混淆(请考虑 maxtasksperchild -池的参数 __init__ -方法),此答案将参考 任务中的单个工作单元,如 塔塞尔

    A. 塔塞尔 (来自 任务+el EMT)是一个系统中最小的工作单元 任务 . -a参数 Pool -方法,使用从中获取的参数调用 单一元素 传输的 . A. 包括 块大小


    并行化开销(PO)

    人事军官 由Python内部开销和进程间通信(IPC)开销组成。Python中的每任务开销附带了打包和解包任务及其结果所需的代码。IPC开销伴随着线程的必要同步以及不同地址空间之间的数据复制(需要两个复制步骤:父级->队列->子级)。IPC开销的大小取决于操作系统、硬件和数据的大小,因此很难对影响进行概括。


    2.并行化目标

    当使用多处理时,我们的总体目标(显然)是最小化所有任务的总处理时间。为了实现这一总体目标,我们的 技术目标 优化硬件资源的利用 .

    • 最小化并行化开销(最著名的,但并非唯一的: IPC )
    • 所有cpu核的高利用率
    • 限制内存使用以防止操作系统过度分页( trashing )

    首先,任务的计算量必须足够大(密集),以便 我们必须为并行化支付的PO。PO的相关性随着每个任务的绝对计算时间的增加而降低。或者,换句话说,绝对计算时间越大 佩尔塔塞尔 对于你的问题,不太相关的人需要减少采购订单。如果您的计算每个任务需要几个小时,那么IPC开销相比之下可以忽略不计。这里的主要关注点是防止在分配所有任务后使工作进程空闲。保持所有内核都加载意味着,我们正在尽可能多地并行化。


    3.并行化场景

    什么因素决定了multiprocessing.Pool.map()等方法的最佳chunksize参数

    问题的主要因素是计算时间的长短 变化 变异系数 ( CV )对于每个任务的计算时间。

    从这种变化的程度来看,以下两种极端情况是:

    1. 所有Taskel都需要完全相同的计算时间。

    为了更好地记忆,我将这些场景称为:

    1. 密集场景
    2. 广泛情景


    在一个 最好一次分发所有TaskEL,以将必要的IPC和上下文切换保持在最低限度。这意味着我们只想创建尽可能多的块,尽可能多的工作进程。如上所述,PO的权重随着每个taskel的计算时间缩短而增加。

    为了获得最大吞吐量,我们还希望所有工作进程都处于繁忙状态,直到处理完所有任务(没有空闲的工作进程)。为了实现这一目标,分布式块应该具有相同的大小或接近相同的大小。


    广泛情景

    最典型的例子是 广泛情景 这将是一个优化问题,结果要么快速收敛,要么计算可能需要数小时,甚至数天。在这种情况下,通常无法预测任务将包含“轻任务”和“重任务”的混合,因此不建议一次在任务批中分发太多任务。一次分发的任务比可能的少,这意味着增加了调度的灵活性。这是实现所有核心的高利用率这一子目标所必需的。

    如果 水塘


    4.块大小的风险>1.

    广泛情景 -iterable,我们要将其传递到池方法中:

    good_luck_iterable = [60, 60, 86400, 60, 86400, 60, 60, 84600]
    

    为了简单起见,我们假设所需的计算时间为秒,而不是实际值,仅为1分钟或1天。 我们假设该池有四个工作进程(在四个核心上),并且 块大小 设置为 2 . 由于订单将被保留,因此发送给工人的区块将如下所示:

    [(60, 60), (86400, 60), (86400, 60), (60, 84600)]
    

    因为我们有足够的工人,而且计算时间足够长,我们可以说,每个工人进程首先都会得到一块工作空间。(快速完成任务的情况不一定如此)。此外,我们可以说,整个处理过程大约需要86400+60秒,因为在这个人工场景中,这是块的最高总计算时间,我们只分配一次块。

    bad_luck_iterable = [60, 60, 86400, 86400, 60, 60, 60, 84600]
    

    …以及相应的块:

    [(60, 60), (86400, 86400), (60, 60), (60, 84600)]
    

    不幸的是,我们的iterable的排序几乎是我们总处理时间的两倍(86400+86400)!获得恶意(8640086400)块的工人正在阻止任务中的第二个重taskel分配给已经完成(60,60)块的空闲工人之一。如果我们采取行动,我们显然不会冒这样一个令人不快的结果的风险 chunksize=1 .

    这就是大块头的风险。使用更高的块大小,我们可以用调度灵活性换取更少的开销,在上述情况下,这是一个糟糕的交易。

    我们将在第章中看到什么 6.量化算法效率 密集场景


    在下面,您将在源代码中找到该算法的一个稍加修改的版本。如你所见,我切断了下半部分,并将其包装成一个函数,用于计算 块大小 外部论证。我也换了 4 用一个 factor 参数,并将 len()

    # mp_utils.py
    
    def calc_chunksize(n_workers, len_iterable, factor=4):
        """Calculate chunksize argument for Pool-methods.
    
        Resembles source-code within `multiprocessing.pool.Pool._map_async`.
        """
        chunksize, extra = divmod(len_iterable, n_workers * factor)
        if extra:
            chunksize += 1
        return chunksize
    

    为了确保我们都在同一页上,下面是 divmod 做:

    divmod(x, y) 是一个内置函数,返回 (x//y, x%y) . x // y x / y 虽然 x % y 模运算是否从返回余数 x/y . 因此,例如。 divmod(10, 3) (3, 1) .

    chunksize, extra = divmod(len_iterable, n_workers * 4) ,你会注意到的 n_workers y 在里面 x/y 4. ,而无须透过 if extra: chunksize +=1 稍后,将生成初始的chunksize 至少 小四倍(用于 len_iterable >= n_workers * 4 )如果不是这样的话。

    用于查看乘法的效果 在中间块大小的结果上考虑这个函数:

    def compare_chunksizes(len_iterable, n_workers=4):
        """Calculate naive chunksize, Pool's stage-1 chunksize and the chunksize
        for Pool's complete algorithm. Return chunksizes and the real factors by
        which naive chunksizes are bigger.
        """
        cs_naive = len_iterable // n_workers or 1  # naive approach
        cs_pool1 = len_iterable // (n_workers * 4) or 1  # incomplete pool algo.
        cs_pool2 = calc_chunksize(n_workers, len_iterable)
    
        real_factor_pool1 = cs_naive / cs_pool1
        real_factor_pool2 = cs_naive / cs_pool2
    
        return cs_naive, cs_pool1, cs_pool2, real_factor_pool1, real_factor_pool2
    

    上面的函数计算原始chunksize( cs_naive )以及池的chunksize算法的第一步chunksize( cs_pool1 cs_pool2 )。进一步计算 真实因素 rf_pool1 = cs_naive / cs_pool1 rf_pool2 = cs_naive / cs_pool2 ,它告诉我们简单计算的chunkSize比池的内部版本大多少倍。

    下面您可以看到使用此函数的输出创建的两个图形。左图仅显示了的块大小 n_workers=4 直到合适的长度 500 . 右图显示了的值 rf_pool1 16 ,真正的因素变成 >=4 (用于 )它的最大值是 7 对于可调长度 28-31 4. 该算法收敛到更长的迭代时间。”这里的“更长”是相对的,取决于指定工人的数量。

    figure1

    记住块大小 政务司司长1 仍然缺乏 extra -用剩余部分进行调整 迪夫莫德 包含在 从完整的算法。

    该算法还包括:

    if extra:
        chunksize += 1
    

    现在,在一些案例中 余数 额外的 根据divmod操作),将chunksize增加1显然不能解决所有任务。毕竟,如果它愿意的话,就不会有剩余的东西了。

    在下图中,您可以看到“ 额外治疗 “具有这样的效果,即 真实因素 对于 rf_pool2 现在向 4. 从…起 在下面 偏差稍微平滑一些。标准偏差 n_工人=4 len_iterable=500 0.5233 对于 rf_池1 0.4115 对于 .

    figure2

    最终,增加 块大小 由1产生的效果是,传输的最后一个任务的大小仅为 len_iterable % chunksize or chunksize .

    更有趣的是,我们以后会看到,更重要的是 额外治疗 但是,可以观察到 生成的块数 ( n_chunks 对于足够长的iterables,Pool完成的chunksize算法( n_pool2 在下图中)将稳定块的数量 n_chunks == n_workers * 4 . 相比之下,朴素算法(在初始打嗝之后)会在 n_chunks == n_workers n_chunks == n_workers + 1 随着iterable长度的增加。

    figure3

    下面,您将找到两个用于池的增强信息函数和naive chunksize算法。下一章将需要这些函数的输出。

    # mp_utils.py
    
    from collections import namedtuple
    
    
    Chunkinfo = namedtuple(
        'Chunkinfo', ['n_workers', 'len_iterable', 'n_chunks',
                      'chunksize', 'last_chunk']
    )
    
    def calc_chunksize_info(n_workers, len_iterable, factor=4):
        """Calculate chunksize numbers."""
        chunksize, extra = divmod(len_iterable, n_workers * factor)
        if extra:
            chunksize += 1
        # `+ (len_iterable % chunksize > 0)` exploits that `True == 1`
        n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
        # exploit `0 == False`
        last_chunk = len_iterable % chunksize or chunksize
    
        return Chunkinfo(
            n_workers, len_iterable, n_chunks, chunksize, last_chunk
        )
    

    不要被你可能意想不到的表情弄糊涂了 calc_naive_chunksize_info . 这个 额外的 不用于计算chunksize。

    def calc_naive_chunksize_info(n_workers, len_iterable):
        """Calculate naive chunksize numbers."""
        chunksize, extra = divmod(len_iterable, n_workers)
        if chunksize == 0:
            chunksize = 1
            n_chunks = extra
            last_chunk = chunksize
        else:
            n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
            last_chunk = len_iterable % chunksize or chunksize
    
        return Chunkinfo(
            n_workers, len_iterable, n_chunks, chunksize, last_chunk
        )
    

    现在,在我们看到 水塘 的chunksize算法与naive算法的输出相比看起来有所不同。。。

    • 如何判断Pool的方法是否正确 改进 某物
    • 这到底是什么

    如前一章所示,对于更长的iterables(更多的Taskel),Pool的chunksize算法 将iterable分为四次 更多 块比朴素的方法。更小的块意味着更多的任务,更多的任务意味着更多 并行化开销(PO) ,这一成本必须与增加调度灵活性的好处进行权衡(召回)

    由于相当明显的原因,Pool的基本chunksize算法无法权衡调度灵活性和 人事军官 对我们来说。IPC开销取决于操作系统、硬件和数据大小。算法不知道我们在什么硬件上运行代码,也不知道taskel需要多长时间才能完成。它是一种启发式方法,为用户提供基本功能 全部的 可能的情况。这意味着它无法针对任何特定场景进行优化。如前所述,, 人事军官 随着每个任务计算时间的增加(负相关),问题也越来越少。

    当你回忆起 并行化目标 第2章中的一个要点是:

    • 所有cpu核的高利用率

    前面提到的 某物 ,池的chunksize算法 可以 努力改进是关键 最小化空闲工作进程 ,分别为 cpu核的利用 .

    multiprocessing.Pool 在您希望所有工作进程都很忙的情况下,有人会询问是否有未使用的内核/空闲的工作进程。虽然这可能有很多原因,但在计算结束时空闲工作进程是我们经常可以观察到的现象,即使是 密集场景 (每个任务的相等计算时间)在工人数量不是 除数 块的数量( n_chunks % n_workers > 0

    我们如何实际地将我们对块大小的理解转化为能够解释观察到的工人利用率的东西,甚至在这方面比较不同算法的效率?


    6.1模型

    为了在这里获得更深入的见解,我们需要一种并行计算的抽象形式,它将过于复杂的现实简化到可管理的复杂程度,同时在定义的边界内保持重要性。这种抽象称为抽象 模型 并行化模型(PM) 如果要收集数据,则生成与实际计算相同的工作映射元数据(时间戳)。模型生成的元数据允许在某些约束条件下预测并行计算的度量。

    figure4

    此处定义的模型中的两个子模型之一 颗粒物 分布模型(DM) . 这个 DM ,当不考虑除相应chunksize算法以外的其他因素时,考虑工作人员数量、输入可数(任务数)及其计算持续时间。这意味着任何形式的开销都是无效的 包括。

    颗粒物 DM 用一个 ,代表各种形式的 . 这样的模型需要针对每个节点分别进行校准(硬件依赖性、操作系统依赖性)。一个表中表示了多少种形式的开销 是开着的,所以是多个 OMs 复杂程度各不相同。实现了哪一级别的准确度 需求由产品的总重量决定 对于具体的计算。较短的Taskel会导致更高的 人事军官 ,这反过来又需要更精确的 预测 .


    这个 平行时间表 是并行计算的二维表示,其中x轴表示时间,y轴表示并行工作池。工人数量和总计算时间标志着矩形的延伸,在其中绘制较小的矩形。这些较小的矩形表示原子工作单元(taskel)。

    在下面,您可以看到一个 附言 使用来自 DM 池的chunksize算法的 .

    figure5

    • x轴被划分为相等的时间单位,其中每个单位代表taskel所需的计算时间。
    • 此处的taskel显示为最小的青色矩形,放在匿名工作进程的时间线(日程表)中。
    • 任务是工作时间线中的一个或多个任务,以相同的色调连续高亮显示。
    • 怠速时间单位用红色的瓷砖表示。

    组成部分的名称见下图。

    figure6

    完完全全 包括 这个 空转份额 不仅限于尾部,还包括任务之间甚至任务之间的空间。


    • 分配效率(DE) -借助于 (或一种简化的计算方法) ).
    • 并行化效率(PE) 颗粒物

    需要注意的是,计算出的效率 不要 给定并行化问题的总体计算。在此上下文中,工人利用率仅区分具有已启动但未完成任务的工人和没有此类“开放”任务的工人。也就是说,可能是空转 taskel的时间跨度为 注册的。

    上述所有效率基本上都是通过计算除法商得到的 判定元件 体育课 伴随着忙碌的分享而来 在整个平行进度计划中占据较小的一部分,用于延长开销 颗粒物

    这个答案将进一步讨论一个简单的计算方法 判定元件 对于密集场景。这足以比较不同的chunksize算法,因为。。。

    1. ... 这个 这部分是 颗粒物 ,它会随着所采用的chunksize算法的不同而变化。
    2. ... 这个 密集场景 每个taskel的计算持续时间相等,表示一种“稳定状态”,对于这种状态,这些时间跨度从方程中消失。任何其他场景都只会导致随机结果,因为taskel的顺序很重要。

    6.3.1绝对分配效率(ADE)

    :

    绝对分配效率(ADE) = 忙共享 / 平行时间表

    对于 密集场景

    # mp_utils.py
    
    def calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
        """Calculate Absolute Distribution Efficiency (ADE).
    
        `len_iterable` is not used, but contained to keep a consistent signature
        with `calc_rde`.
        """
        if n_workers == 1:
            return 1
    
        potential = (
            ((n_chunks // n_workers + (n_chunks % n_workers > 1)) * chunksize)
            + (n_chunks % n_workers == 1) * last_chunk
        ) * n_workers
    
        n_full_chunks = n_chunks - (chunksize > last_chunk)
        taskels_in_regular_chunks = n_full_chunks * chunksize
        real = taskels_in_regular_chunks + (chunksize > last_chunk) * last_chunk
        ade = real / potential
    
        return ade
    

    如果没有 空转份额 忙共享 相同的 平行时间表 因此,我们得到了一个 艾德 百分之百。在我们的简化模型中,这是一个场景,其中所有可用流程都将在处理所有任务所需的整个时间内忙碌。换句话说,整个作业有效地并行化到100%。

    但我为什么一直提到 体育课 完全的 体育课 在这里

    为了理解这一点,我们必须考虑CulkSead(CS)的一个可能的情况,它确保了最大的调度灵活性(也可以是高地人的数量。巧合?)

    __________________________________ ~1~ __________________________________

    chunksize=1 ,只是因为 n_工人=4 不是37的除数。除以37/4的余数是1。剩下的这一个taskel必须由一个工人来处理,而剩下的三个则处于空闲状态。

    同样地,仍然会有一名工人在工作,有39个Taskel,如下图所示。

    figure7

    当你比较上 平行时间表 对于 chunksize=1 以下版本适用于 chunksize=3 ,您会注意到上面的 平行时间表 越小,x轴上的时间轴越短。现在应该很明显了,大块头的尺寸有多大 可以 导致总体计算时间增加,即使对于 密集场景 .

    但是为什么不使用x轴的长度来计算效率呢?

    因为开销不包含在这个模型中。这两种块大小都会不同,因此x轴实际上不具有直接可比性。开销仍然会导致更长的总计算时间,如中所示 案例2 如下图所示。

    figure8


    6.3.2相对分配效率(RDE)

    这个 艾德 较好的 较好的 这里还是指一个较小的 空转份额

    得到 判定元件 为可能的最大值调整的值 判定元件 ,我们必须将 艾德 通过 我们得到了 chunksize=1 .

    相对分配效率(RDE) = 阿德_cs_x / ADE_cs_1

    下面是代码中的外观:

    # mp_utils.py
    
    def calc_rde(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
        """Calculate Relative Distribution Efficiency (RDE)."""
        ade_cs1 = calc_ade(
            n_workers, len_iterable, n_chunks=len_iterable,
            chunksize=1, last_chunk=1
        )
        ade = calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk)
        rde = ade / ade_cs1
    
        return rde
    

    这里是如何定义的,本质上是一个关于动物尾巴的故事 平行时间表 . RDE 受尾部包含的最大有效块大小的影响。(此尾部可以是x轴长度 块大小 last_chunk .) 其结果是 RDE 对于下图所示的各种“尾部外观”,自然收敛到100%(偶数)。

    figure9

    RDE ...

    • 是优化潜力的有力暗示。
    • 自然地,对于较长的iterables来说,可能性较小,因为整个iterables的相对尾部部分 平行时间表 收缩。

    请参阅本答案的第二部分 here .

        2
  •  60
  •   Darkonaut    7 年前

    这个答案是公认答案的第二部分 above


    7.Naive vs.Pool的Chunksize算法

    在详细讨论之前,请考虑下面两个GIFS。对于一系列不同的 iterable 长度,它们显示了两个比较的算法是如何对传递的数据进行分块的

    cs_4_50

    cs_200_250

    如第章所示“ 5.池的Chunksize算法 n_chunks == n_workers * 4 对于足够大的iterables,它会在 n_chunks == n_workers n_chunks == n_workers + 1 用天真的方法。对于朴素算法的应用:因为 n_chunks % n_workers == 1 True 对于 n_块==n_工作者+1

    原始块大小算法:

    您可能认为您使用相同数量的工人创建了任务,但这仅适用于没有剩余工人的情况 len_iterable / n_workers 剩下的部分,将有一个新的部分,每个工作人员只有一个任务。在这一点上,您的计算将不再是并行的。

    下图与第5章中的图相似,但显示的是节数而不是块数。对于池的全chunksize算法( n_pool2 ), n_sections 将稳定在臭名昭著的硬编码因子 4 . 对于naive算法, n_段 将在1和2之间交替。

    figure10

    对于Pool的chunksize算法,在 n_chunks = n_workers * 4 通过前面提到的 额外治疗 ,防止在此创建新节,并保留 空转份额 限制一名工人工作足够长的时间。不仅如此,该算法还将不断缩小 空转份额 ,这导致RDE值收敛到100%。

    “足够长”的时间 n_workers=4 len_iterable=210 例如对于等于或大于该值的可比性 空转份额 将仅限于一名员工,这一特征最初是由于 4. -首先是chunksize算法中的乘法。

    figure11

    朴素的chunksize算法也会收敛到100%,但收敛速度较慢。会聚效应完全取决于这样一个事实,即在有两个截面的情况下,尾部的相对部分会收缩。只有一名工作人员的尾巴限制在x轴长度内 n_workers - 1 len_iterable/n_工人 .

    下面是两张热图,显示了 适用于最大长度为5000的所有iterable长度的值,适用于2到100的所有工人数量。 色标从0.5到1(50%-100%)。在左侧的热图中,您会发现naive算法有更多的黑暗区域(较低的RDE值)。相比之下,Pool右侧的chunksize算法绘制了一幅更加阳光明媚的画面。

    figure12

    左下角暗角与右上角亮角的对角线梯度再次显示出对工人数量的依赖性,即所谓的“长可伸缩性”。

    每种算法会有多糟糕?

    使用Pool的chunksize算法a RDE 81.25%的值是上述规定的工人范围和可承受长度的最低值:

    figure13

    RDE 这是50.72%。在这种情况下,几乎一半的计算时间只有一个工人在运行!所以,小心,骄傲的主人 Knights Landing . ;)

    figure14


    8.现实检查

    在前几章中,我们考虑了纯数学分布问题的简化模型,从使多重处理成为一个棘手话题的基本细节中剥离出来。更好地了解分销模式(DM)的发展程度 为了有助于解释实际观察到的工作人员利用率,我们现在将看一看由 真实的

    安装程序

    下面的图都处理了一个简单的、cpu绑定的伪函数的并行执行,该函数使用各种参数调用,因此我们可以观察绘制的并行计划是如何随输入值而变化的。此函数中的“功”仅由范围对象上的迭代组成。这已经足够让核心保持忙碌,因为我们传入了大量数据。可选地,该函数需要一些额外的taskel data 这是刚刚返回的原样。由于每个taskel包含完全相同的工作量,因此我们仍在这里处理密集场景。

    @stamp_taskel
    def busy_foo(i, it, data=None):
        """Dummy function for CPU-bound work."""
        for _ in range(int(it)):
            pass
        return i, data
    
    
    def stamp_taskel(func):
        """Decorator for taking timestamps on start and end of decorated
        function execution.
        """
        @wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time_ns()
            result = func(*args, **kwargs)
            end_time = time_ns()
            return (current_process().name, (start_time, end_time)), result
        return wrapper
    

    Pool的starmap方法也被修饰为只对starmap调用本身进行计时。此调用的“开始”和“结束”确定生成的并行计划x轴上的最小值和最大值。

    我们将在一台具有以下规格的机器上观察四个辅助进程上40个Taskel的计算:

    将改变的输入值是for循环中的迭代次数 (30k、30M、600M)和额外发送的数据大小(每个taskel、numpy-ndarray:0 MiB、50 MiB)。

    ...
    N_WORKERS = 4
    LEN_ITERABLE = 40
    ITERATIONS = 30e3  # 30e6, 600e6
    DATA_MiB = 0  # 50
    
    iterable = [
        # extra created data per taskel
        (i, ITERATIONS, np.arange(int(DATA_MiB * 2**20 / 8)))  # taskel args
        for i in range(LEN_ITERABLE)
    ]
    
    
    with Pool(N_WORKERS) as pool:
        results = pool.starmap(busy_foo, iterable)
    

    下面显示的运行是精心挑选的,具有相同的块顺序,因此您可以更好地发现与分布模型中的并行计划相比的差异,但不要忘记工作人员获取任务的顺序是不确定的。

    DM预测

    figure15

    第一次运行:30k次迭代和;每个任务0个MiB数据

    figure16

    我们在这里的第一次跑步非常短,Taskel非常“轻”。整体 pool.starmap() -通话总共只花了14.5毫秒。 你会注意到,与 DM ,空转不仅限于尾部,还发生在任务之间,甚至任务之间。这是因为我们的实际日程安排自然包括各种开销。在这里无所事事意味着一切 塔斯克尔的名字。可能的 真实的 空转 在期间 taskel并没有像前面提到的那样被捕获。

    此外,您还可以看到,并非所有员工都能同时完成任务。这是因为所有的工人都是通过一个共享的 inqueue 而且一次只能有一个工人从中读取数据。这同样适用于 outqueue . 这可能会导致更大的混乱,只要你正在传输非边际大小的数据,我们将在稍后看到。

    turbo boost 此时worker-3/4的核心不再可用,因此他们以较低的时钟频率处理任务。

    整个计算过程非常简单,以至于硬件或操作系统引入的混沌因素会使计算结果产生偏差 附言 DM -即使对于理论上合适的情景,预测也没有什么意义。

    第二次运行:3000万次迭代和;每个任务0个MiB数据

    figure17

    将for循环中的迭代次数从30000次增加到3000万次,将产生一个真正的并行计划,该计划与由 DM DM 预测。

    第三次运行:3000万次迭代和;每个任务50个MiB数据

    figure18

    保持30M的迭代,但每个taskel来回发送50个MiB会再次扭曲图片。在这里,排队效应是显而易见的。Worker-4需要比Worker-1等待第二个任务更长的时间。现在想象一下这个有70名工人的时间表!

    如果taskel在计算上很轻,但可以提供大量数据作为有效负载,那么单个共享队列的瓶颈可能会阻止向池中添加更多工作线程的任何额外好处,即使它们由物理内核支持。在这种情况下,Worker-1可以完成其第一个任务,并在Worker-40完成其第一个任务之前等待新任务。

    现在应该很明显,为什么计算时间在 Pool 不要总是随着工人数量的增加而减少。发送相对大量的数据 可以

    第四次运行:6亿次迭代和;每个任务50个MiB数据

    figure19

    在这里,我们再次发送50个MiB,但将迭代次数从30M增加到600M,这使总计算时间从10秒增加到152秒。拟定的平行时间表 再一次


    所讨论的乘法 4. 增加了调度灵活性,但也利用了taskel分布的不均匀性。如果没有这种倍增,闲置的份额将仅限于一个工人,即使是短期的iterables(例如 DM 使用密集场景)。Pool的chunksize算法需要具有一定大小的输入项才能恢复该特性。

    正如这个答案所希望显示的那样,Pool的chunksize算法与naive方法相比,平均而言能带来更好的核心利用率,至少在平均情况下是这样,并且不考虑长开销。这里的naive算法的分布效率(DE)可以低至约51%,而Pool的chunksize算法的分布效率低至约81%。 判定元件 然而,它不像IPC那样包含并行化开销(PO)。第8章表明

    判定元件 与天真的方法相比, 它并没有为每个输入星座提供最优taskel分布。 虽然一个简单的静态分块算法不能优化(包括开销)并行化效率(PE),但没有内在的原因说明它不能 总是 判定元件 如同 chunksize=1 . 一个简单的chunksize算法只包含基本的数学知识,可以自由地以任何方式“切蛋糕”。

    与Pool实现的“等大小分块”算法不同,“等大小分块”算法将提供 每一次都是100% len_iterable n_workers 结合体在池的源代码中实现一个均匀大小的分块算法会稍微复杂一些,但可以在现有算法的基础上进行调整,只需将任务打包到外部(我将从这里链接,以防我在如何实现这一点上提出问题)。

        3
  •  9
  •   Brad Solomon    7 年前

    我认为你缺少的部分是,你天真的估计假设每个工作单元花费的时间相同,在这种情况下,你的策略是最好的。但是,如果某些作业比其他作业完成得更快,则某些内核可能会闲置,等待缓慢的作业完成。

    因此,通过将块分成4倍多的块,如果一个块提前完成,那么该核心可以开始下一个块(而其他核心继续处理其较慢的块)。

    我不知道他们为什么选择了因子4,但这将是一种在最小化map代码开销(需要尽可能大的块)和平衡不同时间段的块(需要尽可能小的块)之间的权衡。