代码之家  ›  专栏  ›  技术社区  ›  leonard

在python中并行调用多个对象的方法

  •  2
  • leonard  · 技术社区  · 6 年前

    我有两节课。一个叫 algorithm 另一个打电话来 Chain . 在 算法

    换言之 算法 类实例化n个链,我想运行 _sample 方法,它属于 类中的每个平行链 班级。

    我在这里看到过类似的问题: Apply a method to a list of objects in parallel using multi-processing ,但如函数所示 _sample_chains_parallel_worker

    问题1 :为什么在这种情况下不起作用?

    sample_chains_parallel 甚至不并行运行。

    问题2 :为什么?

    问题3

    import time
    import multiprocessing
    
    class Chain():
    
        def __init__(self):
            self.thetas = []
    
        def _sample(self):
            for i in range(3):
                time.sleep(1)
                self.thetas.append(i)
    
        def clear_thetas(self):
            self.thetas = []
    
    class algorithm():
    
        def __init__(self, n=3):
            self.n = n
            self.chains = []
    
        def _init_chains(self):
            for _ in range(self.n):
                self.chains.append(Chain())
    
        def _sample_chains(self):
            for chain in self.chains:
                chain.clear_thetas()
                chain._sample()
    
        def _sample_chains_parallel(self):
            pool = multiprocessing.Pool(processes=self.n)
            for chain in self.chains:
                chain.clear_thetas()
                pool.apply_async(chain._sample())
            pool.close()
            pool.join()
    
        def _sample_chains_parallel_worker(self):
    
            def worker(obj):
                obj._sample()
    
            pool = multiprocessing.Pool(processes=self.n)
            pool.map(worker, self.chains)
    
            pool.close()
            pool.join()
    
    
    if __name__=="__main__":
        import time
    
        alg = algorithm()
        alg._init_chains()
    
        start = time.time()
        alg._sample_chains()
        end = time.time()
        print "sequential", end - start
    
        start = time.time()
        alg._sample_chains_parallel()
        end = time.time()
        print "parallel", end - start
    
        start = time.time()
        alg._sample_chains_parallel_worker()
        end = time.time()
        print "parallel, map and worker", end - start
    
    1 回复  |  直到 6 年前
        1
  •  1
  •   Darkonaut    6 年前

    _sample_chains_parallel 你在打电话吗 chain._sample() 不只是传递函数: pool.apply_async(chain._sample()) . 所以你把结果作为一个论点来传递,而不是让 apply_async 算算吧。

    () 对您帮助不大,因为python2不能pickle实例方法(Python+3.5也可以)。除非你打电话,否则不会出错 get() 在result对象上,如果您看到这种方法的时间很短,请不要高兴,这是因为它很快就会退出,并出现一个未确认的异常。

    对于并行版本,您必须重新定位 worker 到模块级并调用它 pool.apply_async(worker (chain,)) pool.map(worker, self.chains) .

    注意你忘了 clear_thetas() 对于 _sample_chains_parallel_worker . 更好的解决办法是让 Chain._sample self._clear_thetas() .