代码之家  ›  专栏  ›  技术社区  ›  Toekan

在Python中分析来自连续输入流(记录)的数据,多处理?

  •  2
  • Toekan  · 技术社区  · 9 年前

    我正在实时分析用麦克风录制的数据。到目前为止,我一直在以线性方式进行这项工作:

    • 记录1秒(需要1秒)
    • 分析数据(例如50 ms)
    • 记录一秒钟
    • 分析数据

    我认为多处理将是解决方案:我启动一个单独的进程,不间断地记录特定长度的数据块,每次都通过管道将其发送到主进程,然后主进程分析数据。不幸的是,通过管道发送大量数据(或者通常,从一个进程向另一个进程发送大量数据)显然远远不够理想。还有其他方法吗?我只想让我的计算机记录数据并将其导入python(所有这些我已经在做了),同时它也在分析数据。

    谢谢

    1 回复  |  直到 9 年前
        1
  •  3
  •   danny    8 年前

    简单的生产者/消费者实施。

    虽然来回移动数据确实会导致开销并增加内存使用,但只要多个进程不需要相同的数据,那么开销是最小的。试试看:)可以通过更改队列和池大小数字来调整内存占用。

    线程化是减少内存使用的另一种选择,但代价是在GIL上被阻塞,如果处理采用python字节码,则实际上是单线程的。

    import multiprocessing
    # Some fixed size to avoid run away memory use
    recorded_data = multiprocessing.Queue(100)
    
    def process(recorded_data):
        while True:
            data = recorded_data.get()
            <process data>
    
    def record(recorded_data):
        for data in input_stream:
            recorded_data.put(data)
    
    producer = multiprocessing.Process(target=record, 
                                       args=(recorded_data,))
    producer.start()
    
    # Pool of 10 processes
    num_proc = 10
    consumer_pool = multiprocessing.Pool(num_proc)
    results = []
    for _ in xrange(num_proc):
        results.append(
          consumer_pool.apply_async(process,
                                    args=(recorded_data,)))
    
    producer.join()
    
    # If processing actually returns something
    for result in results:
        print result
    # Consumers wait for data from queue forever
    #  so terminate them when done
    consumer_pool.terminate()