如果你真的需要将你的状态存储为字典(你发布的代码不是这样,因为里面只有一个值
self.state
正在更新),并且需要跨流程共享,那么我建议使用
托管词典
.
我发现你的代码还有其他几个问题,所以请阅读我添加的以“Booboo”开头的评论
import multiprocessing
import threading
import time
class Master:
def __init__(self, managed_dict):
self.number = 0
self.state = managed_dict
self.lock = multiprocessing.Lock()
def calculate(self):
while True:
self.number += 1
with self.lock:
self.state.update({"data": self.number})
time.sleep(1)
def get_state(self):
with self.lock:
return self.state
class Worker:
def __init__(self, master, cpu_id):
self.master = master
self.cpu_id = cpu_id
self.busy = False
def process(self, data, queue_size):
print(f"Queue size: {queue_size}")
self.busy = True
state = self.master.get_state()
print(f"Processing {[x - self.cpu_id for x in data]} using state {state} on CPU {self.cpu_id}")
time.sleep(2)
self.busy = False
def worker_process(worker, queue):
while True:
data = queue.get()
worker.process(data, queue.qsize())
def start_eel(queue):
for i in range(100, 121):
queue.put([i, i])
time.sleep(.5)
def main():
with multiprocessing.Manager() as manager:
managed_dict = manager.dict()
master = Master(managed_dict)
workers = [Worker(master, i) for i in range(multiprocessing.cpu_count())]
queue = multiprocessing.Queue()
for worker in workers:
multiprocessing.Process(target=worker_process, args=(worker, queue), daemon=True).start()
master_process = multiprocessing.Process(target=master.calculate, daemon=True)
master_process.start()
eel_process = multiprocessing.Process(target=start_eel, args=(queue,))
eel_process.start()
eel_process.join(10)
if __name__ == "__main__":
main()
打印:
Queue size: 0
Processing [99, 99] using state {'data': 1} on CPU 1
Queue size: 0
Processing [98, 98] using state {'data': 1} on CPU 3
Queue size: 0
Processing [102, 102] using state {'data': 2} on CPU 0
Queue size: 0
Processing [98, 98] using state {'data': 2} on CPU 5
Queue size: 0
Processing [100, 100] using state {'data': 3} on CPU 4
Queue size: 0
Processing [103, 103] using state {'data': 3} on CPU 2
Queue size: 0
Processing [99, 99] using state {'data': 4} on CPU 7
Queue size: 0
Processing [101, 101] using state {'data': 4} on CPU 6
Queue size: 0
Processing [107, 107] using state {'data': 5} on CPU 1
Queue size: 0
Processing [106, 106] using state {'data': 5} on CPU 3
Queue size: 0
Processing [110, 110] using state {'data': 6} on CPU 0
Queue size: 0
Processing [106, 106] using state {'data': 6} on CPU 5
Queue size: 0
Processing [108, 108] using state {'data': 7} on CPU 4
Queue size: 0
Processing [111, 111] using state {'data': 7} on CPU 2
Queue size: 0
Processing [107, 107] using state {'data': 8} on CPU 7
Queue size: 0
Processing [109, 109] using state {'data': 8} on CPU 6
Queue size: 0
Processing [115, 115] using state {'data': 9} on CPU 1
Queue size: 0
Processing [114, 114] using state {'data': 9} on CPU 3
Queue size: 0
Processing [118, 118] using state {'data': 10} on CPU 0
Queue size: 0
Processing [114, 114] using state {'data': 10} on CPU 5