代码之家  ›  专栏  ›  技术社区  ›  jedierikb grijalvaromero

高效的圆形缓冲器?

  •  86
  • jedierikb grijalvaromero  · 技术社区  · 14 年前

    我想创造一个高效的 circular buffer 在python中(目标是获取缓冲区中整数值的平均值)。

    这是使用列表收集值的有效方法吗?

    def add_to_buffer( self, num ):
        self.mylist.pop( 0 )
        self.mylist.append( num )
    

    什么更有效率(为什么)?

    13 回复  |  直到 6 年前
        1
  •  170
  •   aaronasterling    14 年前

    我会用 collections.deque 用一个 maxlen 精胺

    >>> import collections
    >>> d = collections.deque(maxlen=10)
    >>> d
    deque([], maxlen=10)
    >>> for i in xrange(20):
    ...     d.append(i)
    ... 
    >>> d
    deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)
    

    有一个 recipe 在文档中 deque 这和你想要的差不多。我断言它是最有效的,完全是因为它是由一个非常熟练的团队在C语言中实现的,这个团队习惯于开发出一流的代码。

        2
  •  11
  •   John La Rooy    14 年前

    从列表头弹出会导致整个列表被复制,因此效率低下

    您应该使用固定大小的列表/数组和在添加/删除项时在缓冲区中移动的索引。

        3
  •  8
  •   Community CDub    8 年前

    巨蟒的出界很慢。你也可以用numpy.roll代替 How do you rotate the numbers in an numpy array of shape (n,) or (n,1)?

    在这个基准测试中,deque为448ms,numpy.roll为29ms。 http://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/

        4
  •  6
  •   Community CDub    8 年前

    基于 MoonCactus's answer ,这是一个 circularlist 类。与他的说法不同的是 c[0] 将始终给出最旧的附加元素, c[-1] 最新的附加元素, c[-2] 倒数第二个…这对于应用来说更自然。

    c = circularlist(4)
    c.append(1); print c, c[0], c[-1]    #[1] (1 items)              first 1, last 1
    c.append(2); print c, c[0], c[-1]    #[1, 2] (2 items)           first 1, last 2
    c.append(3); print c, c[0], c[-1]    #[1, 2, 3] (3 items)        first 1, last 3
    c.append(8); print c, c[0], c[-1]    #[1, 2, 3, 8] (4 items)     first 1, last 8
    c.append(10); print c, c[0], c[-1]   #[10, 2, 3, 8] (4 items)    first 2, last 10
    c.append(11); print c, c[0], c[-1]   #[10, 11, 3, 8] (4 items)   first 3, last 11
    

    班级:

    class circularlist(object):
        def __init__(self, size):
            """Initialization"""
            self.index = 0
            self.size = size
            self._data = []
    
        def append(self, value):
            """Append an element"""
            if len(self._data) == self.size:
                self._data[self.index] = value
            else:
                self._data.append(value)
            self.index = (self.index + 1) % self.size
    
        def __getitem__(self, key):
            """Get element by index, relative to the current index"""
            if len(self._data) == self.size:
                return(self._data[(key + self.index) % self.size])
            else:
                return(self._data[key])
    
        def __repr__(self):
            """Return string representation"""
            return self._data.__repr__() + ' (' + str(len(self._data))+' items)'
    
        5
  •  5
  •   SmartElectron    12 年前

    可以使用deque类,但对于问题(平均值)的要求,这是我的解决方案:

    >>> from collections import deque
    >>> class CircularBuffer(deque):
    ...     def __init__(self, size=0):
    ...             super(CircularBuffer, self).__init__(maxlen=size)
    ...     @property
    ...     def average(self):  # TODO: Make type check for integer or floats
    ...             return sum(self)/len(self)
    ...
    >>>
    >>> cb = CircularBuffer(size=10)
    >>> for i in range(20):
    ...     cb.append(i)
    ...     print "@%s, Average: %s" % (cb, cb.average)
    ...
    @deque([0], maxlen=10), Average: 0
    @deque([0, 1], maxlen=10), Average: 0
    @deque([0, 1, 2], maxlen=10), Average: 1
    @deque([0, 1, 2, 3], maxlen=10), Average: 1
    @deque([0, 1, 2, 3, 4], maxlen=10), Average: 2
    @deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2
    @deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3
    @deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3
    @deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4
    @deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4
    @deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5
    @deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6
    @deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7
    @deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8
    @deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9
    @deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10
    @deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11
    @deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12
    @deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13
    @deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14
    
        6
  •  3
  •   scls    11 年前

    你也可以看到这个相当古老 Python recipe .

    以下是我自己的numpy数组版本:

    #!/usr/bin/env python
    
    import numpy as np
    
    class RingBuffer(object):
        def __init__(self, size_max, default_value=0.0, dtype=float):
            """initialization"""
            self.size_max = size_max
    
            self._data = np.empty(size_max, dtype=dtype)
            self._data.fill(default_value)
    
            self.size = 0
    
        def append(self, value):
            """append an element"""
            self._data = np.roll(self._data, 1)
            self._data[0] = value 
    
            self.size += 1
    
            if self.size == self.size_max:
                self.__class__  = RingBufferFull
    
        def get_all(self):
            """return a list of elements from the oldest to the newest"""
            return(self._data)
    
        def get_partial(self):
            return(self.get_all()[0:self.size])
    
        def __getitem__(self, key):
            """get element"""
            return(self._data[key])
    
        def __repr__(self):
            """return string representation"""
            s = self._data.__repr__()
            s = s + '\t' + str(self.size)
            s = s + '\t' + self.get_all()[::-1].__repr__()
            s = s + '\t' + self.get_partial()[::-1].__repr__()
            return(s)
    
    class RingBufferFull(RingBuffer):
        def append(self, value):
            """append an element when buffer is full"""
            self._data = np.roll(self._data, 1)
            self._data[0] = value
    
        7
  •  3
  •   djvg Carl Meyer    7 年前

    虽然这里已经有很多好的答案,但我找不到任何直接比较所提到的选项的时间。因此,请在下面比较一下我的谦虚尝试。

    仅用于测试目的,类可以在 list -基于缓冲区,A collections.deque -基于缓冲区,以及 Numpy.roll -基于缓冲区。

    注意, update 方法一次只添加一个值,以保持简单。

    import numpy
    import timeit
    import collections
    
    
    class CircularBuffer(object):
        buffer_methods = ('list', 'deque', 'roll')
    
        def __init__(self, buffer_size, buffer_method):
            self.content = None
            self.size = buffer_size
            self.method = buffer_method
    
        def update(self, scalar):
            if self.method == self.buffer_methods[0]:
                # Use list
                try:
                    self.content.append(scalar)
                    self.content.pop(0)
                except AttributeError:
                    self.content = [0.] * self.size
            elif self.method == self.buffer_methods[1]:
                # Use collections.deque
                try:
                    self.content.append(scalar)
                except AttributeError:
                    self.content = collections.deque([0.] * self.size,
                                                     maxlen=self.size)
            elif self.method == self.buffer_methods[2]:
                # Use Numpy.roll
                try:
                    self.content = numpy.roll(self.content, -1)
                    self.content[-1] = scalar
                except IndexError:
                    self.content = numpy.zeros(self.size, dtype=float)
    
    # Testing and Timing
    circular_buffer_size = 100
    circular_buffers = [CircularBuffer(buffer_size=circular_buffer_size,
                                       buffer_method=method)
                        for method in CircularBuffer.buffer_methods]
    timeit_iterations = 1e4
    timeit_setup = 'from __main__ import circular_buffers'
    timeit_results = []
    for i, cb in enumerate(circular_buffers):
        # We add a convenient number of convenient values (see equality test below)
        code = '[circular_buffers[{}].update(float(j)) for j in range({})]'.format(
            i, circular_buffer_size)
        # Testing
        eval(code)
        buffer_content = [item for item in cb.content]
        assert buffer_content == range(circular_buffer_size)
        # Timing
        timeit_results.append(
            timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations)))
        print '{}: total {:.2f}s ({:.2f}ms per iteration)'.format(
            cb.method, timeit_results[-1],
            timeit_results[-1] / timeit_iterations * 1e3)
    

    在我的系统中,这会产生:

    list:  total 1.06s (0.11ms per iteration)
    deque: total 0.87s (0.09ms per iteration)
    roll:  total 6.27s (0.63ms per iteration)
    
        8
  •  2
  •   MoonCactus    8 年前

    这个不需要任何图书馆。它增加一个列表,然后按索引在其中循环。

    占地面积非常小(没有库),它的运行速度至少是出列的两倍。这确实有助于计算移动平均值,但请注意,这些项目不会按上述年龄进行排序。

    class CircularBuffer(object):
        def __init__(self, size):
            """initialization"""
            self.index= 0
            self.size= size
            self._data = []
    
        def record(self, value):
            """append an element"""
            if len(self._data) == self.size:
                self._data[self.index]= value
            else:
                self._data.append(value)
            self.index= (self.index + 1) % self.size
    
        def __getitem__(self, key):
            """get element by index like a regular array"""
            return(self._data[key])
    
        def __repr__(self):
            """return string representation"""
            return self._data.__repr__() + ' (' + str(len(self._data))+' items)'
    
        def get_all(self):
            """return a list of all the elements"""
            return(self._data)
    

    要获得平均值,例如:

    q= CircularBuffer(1000000);
    for i in range(40000):
        q.record(i);
    print "capacity=", q.size
    print "stored=", len(q.get_all())
    print "average=", sum(q.get_all()) / len(q.get_all())
    

    结果:

    capacity= 1000000
    stored= 40000
    average= 19999
    
    real 0m0.024s
    user 0m0.020s
    sys  0m0.000s
    

    这大约是出列时间的1/3。

        9
  •  2
  •   d8aninja    6 年前

    怎么样 the solution from the Python Cookbook ,包括当环缓冲区实例变满时对其重新分类吗?

    class RingBuffer:
        """ class that implements a not-yet-full buffer """
        def __init__(self,size_max):
            self.max = size_max
            self.data = []
    
        class __Full:
            """ class that implements a full buffer """
            def append(self, x):
                """ Append an element overwriting the oldest one. """
                self.data[self.cur] = x
                self.cur = (self.cur+1) % self.max
            def get(self):
                """ return list of elements in correct order """
                return self.data[self.cur:]+self.data[:self.cur]
    
        def append(self,x):
            """append an element at the end of the buffer"""
            self.data.append(x)
            if len(self.data) == self.max:
                self.cur = 0
                # Permanently change self's class from non-full to full
                self.__class__ = self.__Full
    
        def get(self):
            """ Return a list of elements from the oldest to the newest. """
            return self.data
    
    # sample usage
    if __name__=='__main__':
        x=RingBuffer(5)
        x.append(1); x.append(2); x.append(3); x.append(4)
        print(x.__class__, x.get())
        x.append(5)
        print(x.__class__, x.get())
        x.append(6)
        print(x.data, x.get())
        x.append(7); x.append(8); x.append(9); x.append(10)
        print(x.data, x.get())
    

    在实现中值得注意的设计选择是,因为 对象在 它们的生命周期从非完全缓冲区到完全缓冲区(以及行为 在那一点上的变化)我通过改变 self.__class__ 。 即使在python 2.2中,只要两个类都具有相同的属性,这也是有效的。 插槽(例如,对于两个经典类,它可以很好地工作,例如 环形缓冲器和 __Full 在这个配方中)。

    在许多语言中,更改实例的类可能很奇怪, 但它是一种与其他表现方式不同的蟒蛇。 偶然的、大规模的、不可逆的和离散的状态变化 极大地影响行为,如在这个配方中。幸好那条巨蟒 支持所有类型的类。

    学分:斯巴斯滕·凯姆

        10
  •  1
  •   sirlark    8 年前

    我在做串行编程之前遇到过这个问题。就在一年多以前,我也找不到任何有效的实现,所以我最后写了 one as a C extension 而且也有 on pypi 在麻省理工学院的执照下。它是超基本的,只处理8位有符号字符的缓冲区,但是它的长度是灵活的,所以如果您需要字符以外的东西,可以在上面使用struct或其他东西。我现在通过谷歌搜索发现,现在有几个选择,所以你可能也想看看这些。

        11
  •  0
  •   Schmouk    9 年前

    最初的问题是:“ 有效率的 “圆形缓冲区。 根据所要求的效率,阿隆斯特林的回答似乎是绝对正确的。 使用在python中编程的专用类,并将时间处理与collections.deque进行比较,显示了deque的5.2倍加速! 下面是测试这个的非常简单的代码:

    class cb:
        def __init__(self, size):
            self.b = [0]*size
            self.i = 0
            self.sz = size
        def append(self, v):
            self.b[self.i] = v
            self.i = (self.i + 1) % self.sz
    
    b = cb(1000)
    for i in range(10000):
        b.append(i)
    # called 200 times, this lasts 1.097 second on my laptop
    
    from collections import deque
    b = deque( [], 1000 )
    for i in range(10000):
        b.append(i)
    # called 200 times, this lasts 0.211 second on my laptop
    

    要将deque转换为列表,只需使用:

    my_list = [v for v in my_deque]
    

    然后您将获得o(1)随机访问deque项。当然,只有在设置了deque一次之后,需要对它进行许多随机访问时,这才是有价值的。

        12
  •  0
  •   Johnny Wong    8 年前

    你的回答不对。 圆形缓冲干线有两个原则 https://en.wikipedia.org/wiki/Circular_buffer )

    1. 设置缓冲器的长度;
    2. 先进先出;
    3. 添加或删除项目时,其他项目不应移动其位置。

    您的代码如下:

    def add_to_buffer( self, num ):
        self.mylist.pop( 0 )
        self.mylist.append( num )
    

    让我们通过使用代码来考虑列表已满的情况:

    self.mylist = [1, 2, 3, 4, 5]
    

    现在我们附加6,列表更改为

    self.mylist = [2, 3, 4, 5, 6]
    

    列表中预期为1的项已更改其位置

    您的代码是一个队列,而不是一个循环缓冲区。

    巴斯基的回答,我认为是最有效的。

    另外,一个循环缓冲区可以提高操作的性能。 添加项目。

        13
  •  0
  •   David Torrens    7 年前

    这将相同的主体应用于一些用于保存最新文本消息的缓冲区。

    import time
    import datetime
    import sys, getopt
    
    class textbffr(object):
        def __init__(self, size_max):
            #initialization
            self.posn_max = size_max-1
            self._data = [""]*(size_max)
            self.posn = self.posn_max
    
        def append(self, value):
            #append an element
            if self.posn == self.posn_max:
                self.posn = 0
                self._data[self.posn] = value   
            else:
                self.posn += 1
                self._data[self.posn] = value
    
        def __getitem__(self, key):
            #return stored element
            if (key + self.posn+1) > self.posn_max:
                return(self._data[key - (self.posn_max-self.posn)])
            else:
                return(self._data[key + self.posn+1])
    
    
    def print_bffr(bffr,bffer_max): 
        for ind in range(0,bffer_max):
            stored = bffr[ind]
            if stored != "":
                print(stored)
        print ( '\n' )
    
    def make_time_text(time_value):
        return(str(time_value.month).zfill(2) + str(time_value.day).zfill(2)
          + str(time_value.hour).zfill(2) +  str(time_value.minute).zfill(2)
          + str(time_value.second).zfill(2))
    
    
    def main(argv):
        #Set things up 
        starttime = datetime.datetime.now()
        log_max = 5
        status_max = 7
        log_bffr = textbffr(log_max)
        status_bffr = textbffr(status_max)
        scan_count = 1
    
        #Main Loop
        # every 10 secounds write a line with the time and the scan count.
        while True: 
    
            time_text = make_time_text(datetime.datetime.now())
            #create next messages and store in buffers
            status_bffr.append(str(scan_count).zfill(6) + " :  Status is just fine at : " + time_text)
            log_bffr.append(str(scan_count).zfill(6) + " : " + time_text + " : Logging Text ")
    
            #print whole buffers so far
            print_bffr(log_bffr,log_max)
            print_bffr(status_bffr,status_max)
    
            time.sleep(2)
            scan_count += 1 
    
    if __name__ == '__main__':
        main(sys.argv[1:])