代码之家  ›  专栏  ›  技术社区  ›  Kenneth Breugelmans

Python在多个处理进程之间共享一个deque

  •  4
  • Kenneth Breugelmans  · 技术社区  · 7 年前

    在pas一小时内,我一直在看以下问题,但没有任何运气:

    Python sharing a dictionary between parallel processes

    multiprocessing: sharing a large read-only object between processes?

    multiprocessing in python - sharing large object (e.g. pandas dataframe) between multiple processes

    我已经编写了一个非常基本的测试文件来说明我正在尝试做什么:

    from collections import deque
    from multiprocessing import Process
    import numpy as np
    
    
    class TestClass:
        def __init__(self):
            self.mem = deque(maxlen=4)
            self.process = Process(target=self.run)
    
        def run(self):
            while True:
                self.mem.append(np.array([0, 1, 2, 3, 4]))
    
    
    def print_values(x):
        while True:
            print(x)
    
    
    test = TestClass()
    process = Process(target=print_values(test.mem))
    
    test.process.start()
    process.start()
    

    目前,这将输出以下内容:

    deque([], maxlen=4)
    

    如何从主代码或运行“print\u values”的进程访问mem值?

    2 回复  |  直到 7 年前
        1
  •  7
  •   bivouac0    7 年前

    不幸地 multiprocessing.Manager() 不支持 deque 但它确实与 list , dict , Queue , Value Array . A. 列表 非常接近,所以我在下面的示例中使用了它。。

    from multiprocessing import Process, Manager, Lock
    import numpy as np
    
    class TestClass:
        def __init__(self):
            self.maxlen = 4
            self.manager = Manager()
            self.mem = self.manager.list()
            self.lock = self.manager.Lock()
            self.process = Process(target=self.run, args=(self.mem, self.lock))
    
        def run(self, mem, lock):
            while True:
                array = np.random.randint(0, high=10, size=5)
                with lock:
                    if len(mem) >= self.maxlen:
                        mem.pop(0)
                    mem.append(array)
    
    def print_values(mem, lock):
        while True:
            with lock:
                print mem
    
    test = TestClass()
    print_process = Process(target=print_values, args=(test.mem, test.lock))
    test.process.start()
    print_process.start()
    
    test.process.join()
    print_process.join()
    

    使用管理器对象时必须稍微小心。你可以像他们引用的对象一样使用它们,但你不能像。。。 mem = mem[-4:] 因为要更改引用的对象,所以要截断这些值。

    至于编码风格,我可能会移动 Manager 对象或移动 print_values 函数,但举个例子,这是可行的。如果你移动东西,请注意你不能使用 self.mem 直接在 run 方法您需要在启动流程或 fork python在后台执行的操作将创建一个新实例,并且不会被共享。

    希望这对你的情况有效,如果不行,我们可以试着调整一下。

        2
  •  3
  •   Kenneth Breugelmans    7 年前

    因此,通过结合@bivouac0提供的代码和@Marijn Pieters发布的评论,我提出了以下解决方案:

    from multiprocessing import Process, Manager, Queue
    
    
    class testClass:
        def __init__(self, maxlen=4):
            self.mem = Queue(maxsize=maxlen)
            self.process = Process(target=self.run)
    
        def run(self):
            i = 0
    
            while True:
                self.mem.empty()
                while not self.mem.full():
                    self.mem.put(i)
                    i += 1
    
    
    def print_values(queue):
        while True:
            values = queue.get()
            print(values)
    
    
    if __name__ == "__main__":
        test = testClass()
        print_process = Process(target=print_values, args=(test.mem,))
    
        test.process.start()
        print_process.start()
    
        test.process.join()
        print_process.join()