代码之家  ›  专栏  ›  技术社区  ›  Rico Kahler

有优先权的RxJS解除缓冲

  •  1
  • Rico Kahler  · 技术社区  · 6 年前

    我想起来这条小溪有点困难。

    我要找的是 debounceTime 但有优先权。

    所以如果我有关于形状的事情 { type: 'a', priority: 2 } . 这些事件需要在几秒钟内解除警告,但不会发出最后一个事件,而是发出优先级最高的事件。

    input stream:
    ------(a|1)--(b|3)---(c|2)-----------------------(a|1)-----------------
    
    
    output stream:
    -----------------------------------(b|3)---------------------(a|1)-----
    

    我试着看看其他的接线员 window 并筛选最后一个事件的结果,但这并不理想,因为 窗口 在一个固定的节奏上工作,我希望计时器像去抖动一样在第一个事件上开始。

    0 回复  |  直到 6 年前
        1
  •  2
  •   dmcgrandle    6 年前

    我将提供以下解决方案,基于使用 scan 提供迄今为止最高优先排放量供 debounceTime() . 注意,每次成功去噪后,扫描需要重新考虑新数据,所以我使用运算符 window() 为了分解排放量,在每次排放后启动一个新的可观察窗口 解除暂停时间() .

    这是 CodeSandbox

    下面是来自CodeSandbox的一些简化代码,显示了重要的位:

    const resetScan$ = new Subject();
    
    source$.pipe(
      window(resetScan$),
      mergeMap(win$ => win$.pipe(
        scan((acc, cur) => acc.priority >= cur.priority ? acc : cur )
      )),
      debounceTime(debounceDelay),
      tap(() => resetScan$.next())
    );
    
        2
  •  3
  •   frido    6 年前

    您必须以最高优先级存储和更新项目,并映射到此 highest 然后传递给的值 debounceTime .

    let highest = null;
    source$.pipe(
      map(v => highest = highest && highest.priority > v.priority ? highest : v),
      debounceTime(2000),
      tap(() => highest = null)
    );
    

    您可以创建自己的操作员,在 defer . 推迟 确保每个订户都有自己的 最高 变量,因为每个订阅服务器都将通过调用给定的工厂函数获得自己的新可观察值。

    function debounceTimeHighest<T>(dueTime: number, getHighest: (curr: T, high: T) => T): MonoTypeOperatorFunction<T> {
      return (source: Observable<T>) => defer(() => {
        let highest: T = null;
        return source.pipe(
          map(item => highest = highest ? getHighest(item, highest) : item),
          debounceTime(dueTime),
          tap(() => highest = null)
        );
      });
    }
    
    // Usage
    source$.pipe(
      debounceTimeHighest(2000, (v1, v2) => v1.priority >= v2.priority ? v1 : v2)
    )
    

    上面的代码是Typescript。如果您想要纯Javascript,只需删除所有类型。

    https://stackblitz.com/edit/rxjs-hitqxk

        3
  •  2
  •   Ashish Patel    6 年前

    您可以将debounceTime、buffer和filter操作符结合起来,以实现所需的功能。我为它开发了这个小例子。

    https://stackblitz.com/edit/typescript-lwzt4k

    /*
    Collect clicks that occur, after 250ms emit array of clicks
    */
    clicks$.pipe(
      buffer(clicks$.pipe(debounceTime(1000))),
      // if array is greater than 1, double click occured
      map((clickArray) => {
        document.querySelector('#emittedObjects').innerHTML = (`<div>${JSON.stringify(clickArray)}</div>`); 
        const sortedArray = clickArray.sort((a, b) => {
          return a.priority < b.priority ? 1 : -1;
        });
    
        const output = sortedArray.length > 0 ? sortedArray[0] : null;
        document.querySelector('#mappedOutput').innerHTML = JSON.stringify(output);
        return output;
      })
    )
    .subscribe((obj) => {
      const str = obj ? JSON.stringify(obj) : 'NULL';
      document.querySelector('#throttledOutput').innerHTML = `<div>THROTTLED: ${str}</div>`;
    });