代码之家  ›  专栏  ›  技术社区  ›  Boris Gorelik

平滑不规则采样时间数据

  •  6
  • Boris Gorelik  · 技术社区  · 15 年前

    给定一个表,其中第一列是经过某个参考点的秒数,第二列是任意测量:

    6   0.738158581
    21  0.801697222
    39  1.797224596
    49  2.77920469
    54  2.839757536
    79  3.832232283
    91  4.676794376
    97  5.18244704
    100 5.521878863
    118 6.316630137
    131 6.778507504
    147 7.020395216
    157 7.331607129
    176 7.637492223
    202 7.848079136
    223 7.989456499
    251 8.76853608
    278 9.092367123 
        ...
    

    如您所见,测量是在不规则的时间点进行采样的。我需要在每次测量之前(在python中)将读数平均为100秒来平滑数据。因为数据表很大,所以最好使用基于迭代器的方法。 不幸的是,经过两个小时的编码,我无法找到高效而优雅的解决方案。

    有人能帮我吗?

    编辑 S

    1. 我希望每个原始读数都有一个平滑读数,平滑读数是原始读数和前100(delta)秒内任何其他读数的算术平均值。(约翰,你说得对)

    2. 大型~1E6-10E6线路+需要使用紧密的RAM

    3. 数据大致是随机行走

    4. 数据已排序

    分辨率

    我已经测试了J Machin和Yairchu提出的解决方案。它们都给出了相同的结果,但是,在我的数据集中,J machine的版本是指数级的,而Yairchu的版本是线性的。下面是IPython的执行时间 %时间 (以微秒计):

    data size   J Machin    yairchu
    10        90.2        55.6
    50          930         258
    100         3080        514
    500         64700       2660
    1000        253000      5390
    2000        952000      11500
    

    谢谢大家的帮助。

    8 回复  |  直到 15 年前
        1
  •  2
  •   yairchu    15 年前

    我使用的是一个求和结果,我要将新成员和旧成员相减。然而,这样一来,人们可能会遭受累积的浮点误差。

    因此,我用一个列表来实现一个“deque”。每当我的德克重新分配到一个较小的规模。我在同一时间重新计算总数。

    我还计算到x点的平均值,包括x点,所以至少有一个样本点要平均。

    def getAvgValues(data, avgSampleTime):
      lastTime = 0
      prevValsBuf = []
      prevValsStart = 0
      tot = 0
      for t, v in data:
        avgStart = t - avgSampleTime
        # remove too old values
        while prevValsStart < len(prevValsBuf):
          pt, pv = prevValsBuf[prevValsStart]
          if pt > avgStart:
            break
          tot -= pv
          prevValsStart += 1
        # add new item
        tot += v
        prevValsBuf.append((t, v))
        # yield result
        numItems = len(prevValsBuf) - prevValsStart
        yield (t, tot / numItems)
        # clean prevVals if it's time
        if prevValsStart * 2 > len(prevValsBuf):
          prevValsBuf = prevValsBuf[prevValsStart:]
          prevValsStart = 0
          # recalculate tot for not accumulating float precision error
          tot = sum(v for (t, v) in prevValsBuf)
    
        2
  •  2
  •   John Machin Santi    15 年前

    你还没有确切地说出你想要什么时候输出。我假设您希望每个原始读数都有一个平滑读数,平滑读数是原始读数和之前100(delta)秒内任何其他读数的算术平均值。

    简短回答:使用collections.deque…它的读数永远不会超过“delta”秒。按照我的设置,你可以像对待列表一样对待deque,并且很容易计算出平均值或一些花哨的gizmoid,它为最近的读数提供了更多的权重。

    长回答:

    >>> the_data = [tuple(map(float, x.split())) for x in """\
    ... 6       0.738158581
    ... 21      0.801697222
    [snip]
    ... 251     8.76853608
    ... 278     9.092367123""".splitlines()]
    >>> import collections
    >>> delta = 100.0
    >>> q = collections.deque()
    >>> for t, v in the_data:
    ...     while q and q[0][0] <= t - delta:
    ...         # jettison outdated readings
    ...         _unused = q.popleft()
    ...     q.append((t, v))
    ...     count = len(q)
    ...     print t, sum(item[1] for item in q) / count, count
    ...
    ...
    6.0 0.738158581 1
    21.0 0.7699279015 2
    39.0 1.112360133 3
    49.0 1.52907127225 4
    54.0 1.791208525 5
    79.0 2.13137915133 6
    91.0 2.49500989771 7
    97.0 2.8309395405 8
    100.0 3.12993279856 9
    118.0 3.74976297144 9
    131.0 4.41385300278 9
    147.0 4.99420529389 9
    157.0 5.8325615685 8
    176.0 6.033109419 9
    202.0 7.15545189083 6
    223.0 7.4342562845 6
    251.0 7.9150342134 5
    278.0 8.4246097095 4
    >>>
    

    编辑

    一站式商店:在这里买你的小发明。代码如下:

    numerator = sum(item[1] * upsilon ** (t - item[0]) for item in q)
    denominator = sum(upsilon ** (t - item[0]) for item in q)
    gizmoid = numerator / denominator
    

    其中upsilon应该小于1.0(<=0是非法的,刚好高于零的平滑度很小,一个得到算术平均值加上浪费的CPU时间,而大于一个则给出与您的目的相反的结果)。

        3
  •  0
  •   rix0rrr    15 年前

    您的数据似乎大致呈线性:

    Plot of your data http://rix0r.nl/~rix0r/share/shot-20090621.144851.gif

    你想要什么样的平滑?一条直线与该数据集的最小二乘拟合?某种低通滤波器?或者别的什么?

    请把申请表告诉我们,以便我们能给你更好的建议。

    编辑:例如,根据应用程序的不同,在第一个点和最后一个点之间插入一条线就足够了。

        4
  •  0
  •   nosklo    15 年前

    这一条使得它是线性的:

    def process_data(datafile):
        previous_n = 0
        previous_t = 0
        for line in datafile:
            t, number = line.strip().split()
            t = int(t)
            number = float(number)
            delta_n = number - previous_n
            delta_t = t - previous_t
            n_per_t = delta_n / delta_t
            for t0 in xrange(delta_t):
                yield previous_t + t0, previous_n + (n_per_t * t0)
            previous_n = n
            previous_t = t
    
    f = open('datafile.dat')
    
    for sample in process_data(f):
        print sample
    
        5
  •  0
  •   yairchu    15 年前

    o(1)内存,以防您可以多次迭代输入-您可以使用一个迭代器表示“左”,一个表示“右”。

    def getAvgValues(makeIter, avgSampleTime):
      leftIter = makeIter()
      leftT, leftV = leftIter.next()
      tot = 0
      count = 0
      for rightT, rightV in makeIter():
        tot += rightV
        count += 1
        while leftT <= rightT - avgSampleTime:
          tot -= leftV
          count -= 1
          leftT, leftV = leftIter.next()
        yield rightT, tot / count
    
        6
  •  0
  •   Community CDub    8 年前

    虽然它给出的是指数衰减平均值,而不是总平均值,但我想你可能需要我所说的 exponential moving average with varying alpha 这是一个真正的单极低通滤波器。现在有了这个问题的解决方案,它以时间线性的方式运行到数据点的数量。看看它是否适合你。

        7
  •  -1
  •   Anurag Uniyal    15 年前

    这样的情况怎么样,一直存储值,直到与上一次的时间差为100,平均并产生这样的值 例如

    def getAvgValues(data):
        lastTime = 0
        prevValues = []
        avgSampleTime=100
    
        for t, v in data:
            if t - lastTime < avgSampleTime:
                prevValues.append(v)
            else:
                avgV = sum(prevValues)/len(prevValues)
                lastTime = t
                prevValues = [v]
                yield (t,avgV)
    
    for v in getAvgValues(data):
        print v
    
        8
  •  -2
  •   Brent Baisley    15 年前

    听起来你需要一个简单的四舍五入公式。要将任何数字四舍五入为任意间隔:

    圆形(数字/间隔)*间隔

    你可以用地板或天花板来代替“前导”或“自”影响。它可以用任何语言工作,包括SQL。