代码之家  ›  专栏  ›  技术社区  ›  user40780

跨多处理python共享熊猫数据帧字典

  •  17
  • user40780  · 技术社区  · 7 年前

    我有一本python熊猫数据帧字典。这本词典的总大小约为2GB。然而,当我在16个多进程中共享它时(在子进程中,我只读取dict的数据而不修改它),它需要32GB的ram。因此,我想问一下,我是否可以在多处理过程中共享此词典,而不必复制它。我试着把它换成经理。dict()。但似乎时间太长了。实现这一目标的最标准方式是什么?非常感谢。

    1 回复  |  直到 7 年前
        1
  •  15
  •   bivouac0    7 年前

    我找到的最好的解决方案(它只适用于某些类型的问题)是使用Python的BaseManager和SyncManager类的客户机/服务器设置。要做到这一点,首先要设置一个服务器,为数据提供一个代理类。

    数据服务器。py公司

    #!/usr/bin/python
    from    multiprocessing.managers import SyncManager
    import  numpy
    
    # Global for storing the data to be served
    gData = {}
    
    # Proxy class to be shared with different processes
    # Don't put big data in here since that will force it to be piped to the
    # other process when instantiated there, instead just return a portion of
    # the global data when requested.
    class DataProxy(object):
        def __init__(self):
            pass
    
        def getData(self, key, default=None):
            global gData
            return gData.get(key, None)
    
    if __name__ == '__main__':
        port  = 5000
    
        print 'Simulate loading some data'
        for i in xrange(1000):
            gData[i] = numpy.random.rand(1000)
    
        # Start the server on address(host,port)
        print 'Serving data. Press <ctrl>-c to stop.'
        class myManager(SyncManager): pass
        myManager.register('DataProxy', DataProxy)
        mgr = myManager(address=('', port), authkey='DataProxy01')
        server = mgr.get_server()
        server.serve_forever()
    

    运行上述操作一次并保持其运行。下面是用于访问数据的客户端类。

    DataClient。py公司

    from   multiprocessing.managers import BaseManager
    import psutil   #3rd party module for process info (not strictly required)
    
    # Grab the shared proxy class.  All methods in that class will be availble here
    class DataClient(object):
        def __init__(self, port):
            assert self._checkForProcess('DataServer.py'), 'Must have DataServer running'
            class myManager(BaseManager): pass
            myManager.register('DataProxy')
            self.mgr = myManager(address=('localhost', port), authkey='DataProxy01')
            self.mgr.connect()
            self.proxy = self.mgr.DataProxy()
    
        # Verify the server is running (not required)
        @staticmethod
        def _checkForProcess(name):
            for proc in psutil.process_iter():
                if proc.name() == name:
                    return True
            return False
    

    下面是使用多处理尝试此操作的测试代码。

    测试MP。py公司

    #!/usr/bin/python
    import time
    import multiprocessing as mp
    import numpy
    from   DataClient import *    
    
    # Confusing, but the "proxy" will be global to each subprocess, 
    # it's not shared across all processes.
    gProxy = None
    gMode  = None
    gDummy = None
    def init(port, mode):
        global gProxy, gMode, gDummy
        gProxy  = DataClient(port).proxy
        gMode  = mode
        gDummy = numpy.random.rand(1000)  # Same as the dummy in the server
        #print 'Init proxy ', id(gProxy), 'in ', mp.current_process()
    
    def worker(key):
        global gProxy, gMode, gDummy
        if 0 == gMode:   # get from proxy
            array = gProxy.getData(key)
        elif 1 == gMode: # bypass retrieve to test difference
            array = gDummy
        else: assert 0, 'unknown mode: %s' % gMode
        for i in range(1000):
            x = sum(array)
        return x    
    
    if __name__ == '__main__':
        port   = 5000
        maxkey = 1000
        numpts = 100
    
        for mode in [1, 0]:
            for nprocs in [16, 1]:
                if 0==mode: print 'Using client/server and %d processes' % nprocs
                if 1==mode: print 'Using local data and %d processes' % nprocs                
                keys = [numpy.random.randint(0,maxkey) for k in xrange(numpts)]
                pool = mp.Pool(nprocs, initializer=init, initargs=(port,mode))
                start = time.time()
                ret_data = pool.map(worker, keys, chunksize=1)
                print '   took %4.3f seconds' % (time.time()-start)
                pool.close()
    

    当我在我的机器上运行时,我得到。。。

    Using local data and 16 processes
       took 0.695 seconds
    Using local data and 1 processes
       took 5.849 seconds
    Using client/server and 16 processes
       took 0.811 seconds
    Using client/server and 1 processes
       took 5.956 seconds
    

    在多处理系统中,这是否适用取决于获取数据的频率。每次传输都有一小部分开销。如果在 x=sum(array) 环在某个时候,您将花费更多的时间来获取数据,而不是处理数据。

    除了多处理之外,我也喜欢这种模式,因为我只需在服务器程序中加载一次大数组数据,它会一直加载到我杀死服务器为止。这意味着我可以针对数据运行一组单独的脚本,它们执行得很快;无需等待数据加载。

    虽然这里的方法有点类似于使用数据库,但它的优点是可以处理任何类型的python对象,而不仅仅是简单的字符串和int的DB表等。我发现,对于那些简单的类型,使用DB要快一些,但对我来说,它往往需要更多的编程工作,而且我的数据并不总是容易移植到数据库。