代码之家  ›  专栏  ›  技术社区  ›  Sisoviromol

将主类的状态传递给另一个进程中的工人

  •  1
  • Sisoviromol  · 技术社区  · 1 年前

    我想和多个工人分享一个大师班的状态。为了实现这一目标,我创建了一个大师和工人班。工人被鳗鱼追踪的一个简单按钮触发。

    我的worker每秒增加一个内部状态,并可以通过方法返回该状态。

    import multiprocessing
    import time
    
    
    class Master:
        def __init__(self):
            self.number = 0
            self.state = {}
            self.lock = multiprocessing.Lock()
    
        def calculate(self):
            while True:
                self.number += 1
                with self.lock:
                    # Calculate dicts and save them into the state
                    self.state = {"data": self.number}
                time.sleep(1)
    
        def get_state(self):
            with self.lock:
                return self.state
    

    我的Worker类是用一个主类初始化的。主类从队列中获取数据,获取主类的当前状态并打印出来。

    class Worker:
        def __init__(self, master, cpu_id):
            self.master = master
            self.busy = False
    
        def process(self, data, queue_size):
            print(f"Queue size: {queue_size}")
            self.busy = True
            state = self.master.get_state()
            # Process the data using the state and the unique CPU
            print(f"Processing {[x - self.cpu_id for x in data]} using state {state} on CPU {self.cpu_id}")
            time.sleep(2)
            self.busy = False
    
    def worker_process(worker, queue):
        while True:
            if not queue.empty():
                data = queue.get()
                worker.process(data, queue.qsize())
    
    
    def main():
        master = Master()
        workers = [Worker(master, i) for i in range(multiprocessing.cpu_count())]
    
        queue = multiprocessing.Queue()
    
        for worker in workers:
            multiprocessing.Process(target=worker_process, args=(worker, queue)).start()
    
        master_process = multiprocessing.Process(target=master.calculate)
        master_process.start()
    
        eel_process = multiprocessing.Process(target=start_eel, args=(queue,))
        eel_process.start()
    
    
    if __name__ == "__main__":
        main()
    
    

    我的问题是,在初始化worker后,状态没有改变。从我的 research Python不会通过引用传递master。因此,它初始化了一个自己的主类,该主类不是startet,因此没有变化。我还考虑了包含队列的配置文件中的全局变量。但是,每个worker如何区分队列中的哪个输出属于特定的get_state()调用?如果多个worker调用状态并将其传递到队列中,我是否还需要传递worker ID,以便选择队列中的特定元素?不幸的是 research2 也没有帮我。

    0 回复  |  直到 1 年前
        1
  •  1
  •   Booboo    1 年前

    如果你真的需要将你的状态存储为字典(你发布的代码不是这样,因为里面只有一个值 self.state 正在更新),并且需要跨流程共享,那么我建议使用 托管词典 .

    我发现你的代码还有其他几个问题,所以请阅读我添加的以“Booboo”开头的评论

    import multiprocessing
    import threading
    import time
    
    
    class Master:
        def __init__(self, managed_dict):
            self.number = 0
            self.state = managed_dict
            self.lock = multiprocessing.Lock()
    
        def calculate(self):
            while True:
                self.number += 1
                with self.lock:
                    # Calculate dicts and save them into the state
                    self.state.update({"data": self.number})
                time.sleep(1)
    
        def get_state(self):
            with self.lock:
                return self.state
    
    class Worker:
        def __init__(self, master, cpu_id):
            self.master = master
            self.cpu_id = cpu_id  # Booboo - This was missiing
            self.busy = False
    
        def process(self, data, queue_size):
            print(f"Queue size: {queue_size}")
            self.busy = True
            state = self.master.get_state()
            # Process the data using the state and the unique CPU
            print(f"Processing {[x - self.cpu_id for x in data]} using state {state} on CPU {self.cpu_id}")
            time.sleep(2)
            self.busy = False
    
    def worker_process(worker, queue):
        while True:
            # Booboo - queue.empty is not reliable and you are burning
            # needless CPU cycles testing it:
            data = queue.get()  # Booboo - just block until data is available
            # Booboo - Warning: queue.qsize() is unreliable
            worker.process(data, queue.qsize())
    
    
    # Booboo - Missing start_eel function
    
    def start_eel(queue):
        for i in range(100, 121):
            queue.put([i, i])
            time.sleep(.5)
    
    def main():
        with multiprocessing.Manager() as manager:
            managed_dict = manager.dict()
            master = Master(managed_dict)
            workers = [Worker(master, i) for i in range(multiprocessing.cpu_count())]
    
            queue = multiprocessing.Queue()
    
            for worker in workers:
                # Booboo - Make these daemon processes so that they will automatically
                # terminate when the main process terminates
                multiprocessing.Process(target=worker_process, args=(worker, queue), daemon=True).start()
    
            # Booboo - make daemon:
            master_process = multiprocessing.Process(target=master.calculate, daemon=True)
            master_process.start()
    
            # Booboo - start_eel is not defined, so I will define it
            eel_process = multiprocessing.Process(target=start_eel, args=(queue,))
            eel_process.start()
            # Booboo - So that we eventually terminate.
            # If this process hasn't completed in 10 seconds,
            # we will quit anyway:
            eel_process.join(10)
    
    if __name__ == "__main__":
        main()
    

    打印:

    Queue size: 0
    Processing [99, 99] using state {'data': 1} on CPU 1
    Queue size: 0
    Processing [98, 98] using state {'data': 1} on CPU 3
    Queue size: 0
    Processing [102, 102] using state {'data': 2} on CPU 0
    Queue size: 0
    Processing [98, 98] using state {'data': 2} on CPU 5
    Queue size: 0
    Processing [100, 100] using state {'data': 3} on CPU 4
    Queue size: 0
    Processing [103, 103] using state {'data': 3} on CPU 2
    Queue size: 0
    Processing [99, 99] using state {'data': 4} on CPU 7
    Queue size: 0
    Processing [101, 101] using state {'data': 4} on CPU 6
    Queue size: 0
    Processing [107, 107] using state {'data': 5} on CPU 1
    Queue size: 0
    Processing [106, 106] using state {'data': 5} on CPU 3
    Queue size: 0
    Processing [110, 110] using state {'data': 6} on CPU 0
    Queue size: 0
    Processing [106, 106] using state {'data': 6} on CPU 5
    Queue size: 0
    Processing [108, 108] using state {'data': 7} on CPU 4
    Queue size: 0
    Processing [111, 111] using state {'data': 7} on CPU 2
    Queue size: 0
    Processing [107, 107] using state {'data': 8} on CPU 7
    Queue size: 0
    Processing [109, 109] using state {'data': 8} on CPU 6
    Queue size: 0
    Processing [115, 115] using state {'data': 9} on CPU 1
    Queue size: 0
    Processing [114, 114] using state {'data': 9} on CPU 3
    Queue size: 0
    Processing [118, 118] using state {'data': 10} on CPU 0
    Queue size: 0
    Processing [114, 114] using state {'data': 10} on CPU 5
    
        2
  •  1
  •   RaJa    1 年前

    我建议使用共享内存对象显式共享主类的状态。这样,当所有其他进程尝试读取该对象时,对该对象所做的所有更改都会立即可见。

    由于您的状态是一个简单的计数器,因此您应该对“值”类型的共享内存对象感到满意。

    这是两个过程的一个非常简单的例子。一个是更改变量,第二个只读取它:

    from multiprocessing import Process, Value, Array
    import time, random
    
    def writer(n):
        n.value = random.random()
    
    def reader(n):
        time.sleep(1)
        print(n.value)
    
    if __name__ == '__main__':
        num = Value('d', 0.0)
    
        p = Process(target=reader, args=(num,), daemon=True)
        p.start()
        q = Process(target=writer, args=(num,), daemon=True)
        q.start()
    
        p.join()
        q.join()
    

    阅读此 pyhton docu 了解更多信息。