代码之家  ›  专栏  ›  技术社区  ›  Cerin

跟踪joblib.并行执行的进度

  •  57
  • Cerin  · 技术社区  · 11 年前

    是否有一种简单的方法来跟踪 joblib.Parallel 处决

    我有一个由数千个作业组成的长时间运行的执行,我想在数据库中跟踪和记录这些作业。然而,要做到这一点,每当Parallel完成一项任务时,我需要它执行回调,报告剩余的作业数。

    我以前用Python的stdlib multiprocessing.Pool完成过类似的任务,方法是启动一个线程,在Pool的作业列表中记录挂起的作业数。

    看着代码,Parallel继承了Pool,所以我想我可以完成同样的技巧,但它似乎没有使用这些列表,而且我还无法找到其他方法来“读取”它的内部状态。

    9 回复  |  直到 11 年前
        1
  •  79
  •   AlanSTACK    3 年前

    与dano和Connor的答案相比,还有一步要走,那就是将整个事情作为一个上下文管理器:

    import contextlib
    import joblib
    from tqdm import tqdm
    
    @contextlib.contextmanager
    def tqdm_joblib(tqdm_object):
        """Context manager to patch joblib to report into tqdm progress bar given as argument"""
        class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack):
            def __call__(self, *args, **kwargs):
                tqdm_object.update(n=self.batch_size)
                return super().__call__(*args, **kwargs)
    
        old_batch_callback = joblib.parallel.BatchCompletionCallBack
        joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback
        try:
            yield tqdm_object
        finally:
            joblib.parallel.BatchCompletionCallBack = old_batch_callback
            tqdm_object.close()
    

    然后,您可以这样使用它,并且在完成后不要留下猴子补丁代码:

    from math import sqrt
    from joblib import Parallel, delayed
    
    with tqdm_joblib(tqdm(desc="My calculation", total=10)) as progress_bar:
        Parallel(n_jobs=16)(delayed(sqrt)(i**2) for i in range(10))
    

    我认为这太棒了,它看起来像tqdm熊猫集成。

        2
  •  24
  •   Jon    8 年前

    为什么不能简单地使用 tqdm ? 以下内容对我有用

    from joblib import Parallel, delayed
    from datetime import datetime
    from tqdm import tqdm
    
    def myfun(x):
        return x**2
    
    results = Parallel(n_jobs=8)(delayed(myfun)(i) for i in tqdm(range(1000))
    100%|██████████| 1000/1000 [00:00<00:00, 10563.37it/s]
    
        3
  •  22
  •   dano    11 年前

    您链接到的文档说明 Parallel 有一个可选的进度表。它是通过使用 callback 关键字参数由提供 multiprocessing.Pool.apply_async :

    # This is inside a dispatch function
    self._lock.acquire()
    job = self._pool.apply_async(SafeFunction(func), args,
                kwargs, callback=CallBack(self.n_dispatched, self))
    self._jobs.append(job)
    self.n_dispatched += 1
    

    ...

    class CallBack(object):
        """ Callback used by parallel: it is used for progress reporting, and
            to add data to be processed
        """
        def __init__(self, index, parallel):
            self.parallel = parallel
            self.index = index
    
        def __call__(self, out):
            self.parallel.print_progress(self.index)
            if self.parallel._original_iterable:
                self.parallel.dispatch_next()
    

    这里是 print_progress :

    def print_progress(self, index):
        elapsed_time = time.time() - self._start_time
    
        # This is heuristic code to print only 'verbose' times a messages
        # The challenge is that we may not know the queue length
        if self._original_iterable:
            if _verbosity_filter(index, self.verbose):
                return
            self._print('Done %3i jobs       | elapsed: %s',
                        (index + 1,
                         short_format_time(elapsed_time),
                        ))
        else:
            # We are finished dispatching
            queue_length = self.n_dispatched
            # We always display the first loop
            if not index == 0:
                # Display depending on the number of remaining items
                # A message as soon as we finish dispatching, cursor is 0
                cursor = (queue_length - index + 1
                          - self._pre_dispatch_amount)
                frequency = (queue_length // self.verbose) + 1
                is_last_item = (index + 1 == queue_length)
                if (is_last_item or cursor % frequency):
                    return
            remaining_time = (elapsed_time / (index + 1) *
                        (self.n_dispatched - index - 1.))
            self._print('Done %3i out of %3i | elapsed: %s remaining: %s',
                        (index + 1,
                         queue_length,
                         short_format_time(elapsed_time),
                         short_format_time(remaining_time),
                        ))
    

    老实说,他们实现这一点的方式有点奇怪——似乎假设任务总是按照开始的顺序完成。这个 index 变量 打印进度 只是 self.n_dispatched 变量。因此,启动的第一个作业总是以 指数 即使是说,第三个作业首先完成,也为0。这也意味着他们实际上没有跟踪 完整的 工作。因此,没有要监视的实例变量。

    我认为你最好的方法是创建自己的CallBack类,并使用monkey补丁Parallel:

    from math import sqrt
    from collections import defaultdict
    from joblib import Parallel, delayed
    
    class CallBack(object):
        completed = defaultdict(int)
    
        def __init__(self, index, parallel):
            self.index = index
            self.parallel = parallel
    
        def __call__(self, index):
            CallBack.completed[self.parallel] += 1
            print("done with {}".format(CallBack.completed[self.parallel]))
            if self.parallel._original_iterable:
                self.parallel.dispatch_next()
    
    import joblib.parallel
    joblib.parallel.CallBack = CallBack
    
    if __name__ == "__main__":
        print(Parallel(n_jobs=2)(delayed(sqrt)(i**2) for i in range(10)))
    

    输出:

    done with 1
    done with 2
    done with 3
    done with 4
    done with 5
    done with 6
    done with 7
    done with 8
    done with 9
    done with 10
    [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
    

    这样,每当作业完成时都会调用回调,而不是默认的回调。

        4
  •  11
  •   Connor Clark    9 年前

    扩展dano对joblib库最新版本的回答。内部实现有一些变化。

    from joblib import Parallel, delayed
    from collections import defaultdict
    
    # patch joblib progress callback
    class BatchCompletionCallBack(object):
      completed = defaultdict(int)
    
      def __init__(self, time, index, parallel):
        self.index = index
        self.parallel = parallel
    
      def __call__(self, index):
        BatchCompletionCallBack.completed[self.parallel] += 1
        print("done with {}".format(BatchCompletionCallBack.completed[self.parallel]))
        if self.parallel._original_iterator is not None:
          self.parallel.dispatch_next()
    
    import joblib.parallel
    joblib.parallel.BatchCompletionCallBack = BatchCompletionCallBack
    
        5
  •  8
  •   Magdrop    6 年前

    TLDR解决方案 :

    使用python 3.5与joblib 0.14.0和tqdm 4.46.0配合使用。感谢frenzykriger提供的上下文建议,感谢dano和Connor提出的猴子修补方案。

    import contextlib
    import joblib
    from tqdm import tqdm
    from joblib import Parallel, delayed
    
    @contextlib.contextmanager
    def tqdm_joblib(tqdm_object):
        """Context manager to patch joblib to report into tqdm progress bar given as argument"""
    
        def tqdm_print_progress(self):
            if self.n_completed_tasks > tqdm_object.n:
                n_completed = self.n_completed_tasks - tqdm_object.n
                tqdm_object.update(n=n_completed)
    
        original_print_progress = joblib.parallel.Parallel.print_progress
        joblib.parallel.Parallel.print_progress = tqdm_print_progress
    
        try:
            yield tqdm_object
        finally:
            joblib.parallel.Parallel.print_progress = original_print_progress
            tqdm_object.close()
    

    你可以用frenzykroger描述的相同方式使用

    import time
    def some_method(wait_time):
        time.sleep(wait_time)
    
    with tqdm_joblib(tqdm(desc="My method", total=10)) as progress_bar:
        Parallel(n_jobs=2)(delayed(some_method)(0.2) for i in range(10))
    

    更长的解释:

    Jon的解决方案很容易实现,但它只测量分派的任务。如果任务需要很长时间,则在等待最后一个分派的任务完成执行时,条形图将保持在100%。

    frenzykroger的上下文管理器方法(由dano和Connor改进而来)更好,但 BatchCompletionCallBack 也可以用 ImmediateResult 在任务完成之前(请参见 Intermediate results from joblib ). 这将使我们的计数超过100%。

    而不是猴子修补 批次完成回调 ,我们可以修补 print_progress 中的函数 Parallel 这个 批次完成回调 已调用此 打印进度 无论如何如果设置了verbose(即。 Parallel(n_jobs=2, verbose=100) ) 打印进度 将打印完成的任务,尽管不如tqdm好。查看代码 打印进度 是类方法,因此它已经 self.n_completed_tasks 记录我们想要的号码。我们所要做的只是将其与joblib的当前进度进行比较,并仅在存在差异时进行更新。

    这在joblib 0.14.0和tqdm 4.46.0中使用python 3.5进行了测试。

        6
  •  4
  •   jsta    4 年前

    文本进度条

    对于那些想要文本进度条而不需要像tqdm这样的附加模块的人来说,还有一个变体。joblib=0.11,python 3.5.2在2018年4月16日在linux上的实际值,并显示子任务完成时的进度。

    重新定义本机类:

    class BatchCompletionCallBack(object):
        # Added code - start
        global total_n_jobs
        # Added code - end
        def __init__(self, dispatch_timestamp, batch_size, parallel):
            self.dispatch_timestamp = dispatch_timestamp
            self.batch_size = batch_size
            self.parallel = parallel
    
        def __call__(self, out):
            self.parallel.n_completed_tasks += self.batch_size
            this_batch_duration = time.time() - self.dispatch_timestamp
    
            self.parallel._backend.batch_completed(self.batch_size,
                                               this_batch_duration)
            self.parallel.print_progress()
            # Added code - start
            progress = self.parallel.n_completed_tasks / total_n_jobs
            print(
                "\rProgress: [{0:50s}] {1:.1f}%".format('#' * int(progress * 50), progress*100)
                , end="", flush=True)
            if self.parallel.n_completed_tasks == total_n_jobs:
                print('\n')
            # Added code - end
            if self.parallel._original_iterator is not None:
                self.parallel.dispatch_next()
    
    import joblib.parallel
    import time
    joblib.parallel.BatchCompletionCallBack = BatchCompletionCallBack
    

    在使用作业总数之前定义全局常量:

    total_n_jobs = 10
    

    这将导致如下情况:

    Progress: [########################################          ] 80.0%
    
        7
  •  1
  •   Community Mohan Dere    9 年前

    以下是您的问题的另一个答案,语法如下:

    aprun = ParallelExecutor(n_jobs=5)
    
    a1 = aprun(total=25)(delayed(func)(i ** 2 + j) for i in range(5) for j in range(5))
    a2 = aprun(total=16)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
    a2 = aprun(bar='txt')(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
    a2 = aprun(bar=None)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
    

    https://stackoverflow.com/a/40415477/232371

        8
  •  0
  •   Дмитро Олександрович    5 年前

    在Jupyter中,tqdm每次输出时都会在输出中开始一行。 因此,对于Jupyter Notebook,它将是:

    用于Jupyter笔记本。 不睡觉:

    from joblib import Parallel, delayed
    from datetime import datetime
    from tqdm import notebook
    
    def myfun(x):
        return x**2
    
    results = Parallel(n_jobs=8)(delayed(myfun)(i) for i in notebook.tqdm(range(1000)))  
    

    100%1000/1000[00:06<00:00,143.70it/s]

    有时间睡眠:

    from joblib import Parallel, delayed
    from datetime import datetime
    from tqdm import notebook
    from random import randint
    import time
    
    def myfun(x):
        time.sleep(randint(1, 5))
        return x**2
    
    results = Parallel(n_jobs=7)(delayed(myfun)(i) for i in notebook.tqdm(range(100)))
    

    我当前使用的是什么而不是joblib。Parallel:

    import concurrent.futures
    from tqdm import notebook
    from random import randint
    import time
    
    iterable = [i for i in range(50)]
    
    def myfun(x):
        time.sleep(randint(1, 5))
        return x**2
    
    def run(func, iterable, max_workers=8):
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            results = list(notebook.tqdm(executor.map(func, iterable), total=len(iterable)))
        return results
    
    run(myfun, iterable)
    
        9
  •  0
  •   Carlos H. Mendoza-Cardenas    3 年前

    背景 verbose=13 对我来说已经足够了: https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html

    我在stderr上看到一行代码,内容如下:

    [Parallel(n_jobs=16)]: Done 134 tasks      | elapsed:  7.7min