我找到的最好的解决方案(它只适用于某些类型的问题)是使用Python的BaseManager和SyncManager类的客户机/服务器设置。要做到这一点,首先要设置一个服务器,为数据提供一个代理类。
数据服务器。py公司
from multiprocessing.managers import SyncManager
import numpy
gData = {}
class DataProxy(object):
def __init__(self):
pass
def getData(self, key, default=None):
global gData
return gData.get(key, None)
if __name__ == '__main__':
port = 5000
print 'Simulate loading some data'
for i in xrange(1000):
gData[i] = numpy.random.rand(1000)
print 'Serving data. Press <ctrl>-c to stop.'
class myManager(SyncManager): pass
myManager.register('DataProxy', DataProxy)
mgr = myManager(address=('', port), authkey='DataProxy01')
server = mgr.get_server()
server.serve_forever()
运行上述操作一次并保持其运行。下面是用于访问数据的客户端类。
DataClient。py公司
from multiprocessing.managers import BaseManager
import psutil
class DataClient(object):
def __init__(self, port):
assert self._checkForProcess('DataServer.py'), 'Must have DataServer running'
class myManager(BaseManager): pass
myManager.register('DataProxy')
self.mgr = myManager(address=('localhost', port), authkey='DataProxy01')
self.mgr.connect()
self.proxy = self.mgr.DataProxy()
@staticmethod
def _checkForProcess(name):
for proc in psutil.process_iter():
if proc.name() == name:
return True
return False
下面是使用多处理尝试此操作的测试代码。
测试MP。py公司
import time
import multiprocessing as mp
import numpy
from DataClient import *
gProxy = None
gMode = None
gDummy = None
def init(port, mode):
global gProxy, gMode, gDummy
gProxy = DataClient(port).proxy
gMode = mode
gDummy = numpy.random.rand(1000)
def worker(key):
global gProxy, gMode, gDummy
if 0 == gMode:
array = gProxy.getData(key)
elif 1 == gMode:
array = gDummy
else: assert 0, 'unknown mode: %s' % gMode
for i in range(1000):
x = sum(array)
return x
if __name__ == '__main__':
port = 5000
maxkey = 1000
numpts = 100
for mode in [1, 0]:
for nprocs in [16, 1]:
if 0==mode: print 'Using client/server and %d processes' % nprocs
if 1==mode: print 'Using local data and %d processes' % nprocs
keys = [numpy.random.randint(0,maxkey) for k in xrange(numpts)]
pool = mp.Pool(nprocs, initializer=init, initargs=(port,mode))
start = time.time()
ret_data = pool.map(worker, keys, chunksize=1)
print ' took %4.3f seconds' % (time.time()-start)
pool.close()
当我在我的机器上运行时,我得到。。。
Using local data and 16 processes
took 0.695 seconds
Using local data and 1 processes
took 5.849 seconds
Using client/server and 16 processes
took 0.811 seconds
Using client/server and 1 processes
took 5.956 seconds
在多处理系统中,这是否适用取决于获取数据的频率。每次传输都有一小部分开销。如果在
x=sum(array)
环在某个时候,您将花费更多的时间来获取数据,而不是处理数据。
除了多处理之外,我也喜欢这种模式,因为我只需在服务器程序中加载一次大数组数据,它会一直加载到我杀死服务器为止。这意味着我可以针对数据运行一组单独的脚本,它们执行得很快;无需等待数据加载。
虽然这里的方法有点类似于使用数据库,但它的优点是可以处理任何类型的python对象,而不仅仅是简单的字符串和int的DB表等。我发现,对于那些简单的类型,使用DB要快一些,但对我来说,它往往需要更多的编程工作,而且我的数据并不总是容易移植到数据库。