代码之家  ›  专栏  ›  技术社区  ›  James Lemieux

如何使用RxJS设计声明式流?

  •  2
  • James Lemieux  · 技术社区  · 2 年前

    我的问题

    我想学习更多关于用RxJS编写声明性/函数式代码的知识。我试图使用拆分、合并和其他形式的流协调来封装逻辑,而不是强制处理事情。我有一个示例场景,希望你能帮助我。

    背景

    当用户点击时,我想记录他们的点击统计数据;悬停行为。如果统计数据被视为“主要”,我还想显示一个通知。

    const click$ = fromEvent(document, 'click');
    
    // emits very frequently, as the user moves the mouse around the screen
    const hover$ = new Subject<CustomHoverEvent>();
    
    // The user can hover their mouse around the application, and when
    // they click, emit the latest hover event. If there was a click without
    // any hovers, dont emit anything.
    const lastHoverEvent$ = hover$.pipe(
        buffer(click$),
        map(events => events[events.length - 1]),
        filter(event => !!event)
    );
    
    const [primaryHover$, secondaryHover$] = partition(
        lastHoverEvent$,
        (event) => event.isPrimaryHover()
    );
    
    const primaryHoverStats$ = primaryHover$.pipe(
        map(event => calculatePrimaryHoverStats(event)
    );
    
    const primaryHoverStatsNotification$ = primaryHoverStats$.pipe(
        map(stats => createNotification(stats)
    );
    
    const secondaryHoverStats$ = secondaryHover$.pipe(
        map(event => calculateSecondaryHoverStats(event)
    );
    
    const hoverStats$ = merge(primaryHoverStats$, secondaryHoverStats$);
    
    
    hoverStats$.subscribe(stats => logStats(stats));
    
    primaryHoverStatsNotification$.subscribe(notification => notify(notification));
    

    以下是上述流当前的工作方式。每种颜色代表因每次订阅而运行的代码。对每个订阅者重复所有上游工作(缓冲事件,并计算主要订阅者的统计数据)。

    但我实际上希望它是这样工作的。在管道中完成的工作不会重复,每个订阅者只处理自己的部分。

    为了解决这个问题,我添加了一些 share() 运营商,以便组播已经完成的工作。

    const lastHoverEvent$ = hover$.pipe(
        buffer(click$),
        map(events => events[events.length - 1]),
        filter(event => !!event),
        share()
    );
    
    ...
    
    const primaryHoverStats$ = primaryHover$.pipe(
        map(event => calculatePrimaryHoverStats(event),
        share()
    );
    

    我试图用普通函数复制的流类型看起来像这样。

    const lastHoverEvent = getLastHoverEvent();
    let hoverStats;
    if (lastHoverEvent.isPrimaryHover()) {
        hoverStats = calculatePrimaryHoverStats(lastHoverEvent);
        const notification = createNotification(hoverStats);
        notify(notification);
    } else {
        hoverStats = calculateSecondaryHoverStats(lastHoverEvent);
    }
    logStats(hoverStats);
    

    或者像这样用a map 在原始代码中

    hover$.pipe(
        buffer(click$),
        map(events => events[events.length - 1]),
        filter(event => !!event),
        map(event => {
            let hoverStats;
            if (event.isPrimaryHover()) {
                hoverStats = calculatePrimaryHoverStats(event);
                const notification = createNotification(hoverStats);
                notify(notification);
            } else {
                hoverStats = calculateSecondaryHoverStats(event);
            }
            return hoverStats;
        })
    ).subscribe(stats => logStats(stats));
    

    问题

    1. 正在添加 share 为了让RxJS以这种方式工作,你的流真的有必要吗?在上面的“普通函数”示例中,它看起来非常简单,没有什么不同寻常的,但当你试图通过将流拆分为单独的事件来实现时,会完成重复的上游工作,这对我来说似乎有点不直观。

      我觉得我错过了什么。这不是编写RxJS的好方法吗?当我根据上面的图表思考逻辑时,我感觉很对,就像相互连接的流一样。。。但开销 分享 看起来很奇怪。我应该坚持做这样的事情吗 地图 最后一个代码示例中的解决方案?


    1. (无关的问题)我用 buffer 点击时获取最新的悬停事件。我在考虑使用 ReplaySubject withLatestFrom() ,以减少悬停事件的内存存储(因为我只需要最新的一个)。

      const click$ = fromEvent(document, 'click');
      const hover$ = new ReplaySubject<CustomHoverEvent>(1);
      const lastHoverEvent$ = click$.pipe(
          withLatestFrom(hover$),
          map(([click, hover]) => hover),
          ...
      );
      

      但这行不通,因为我需要 click$ 事件以清除已通过的任何悬停事件。例如,如果用户连续点击两次而没有悬停在任何东西上,我不希望检索到过时/旧的悬停事件。

      没有更好的方法吗 缓冲器 ? 为了得到最后一个事件而将所有事件存储在内存中似乎是浪费。我希望在上找到一个尺寸配置 缓冲器 ,并将其设置为1,但我没有看到任何。

    0 回复  |  直到 2 年前
        1
  •  1
  •   Andrei Gătej    2 年前

    对问题1的答复

    虽然乍一看可能有点违反直觉, share() 为了避免不必要地订阅源,绝对需要。

    基本上,每个可观察到的源都可以被减少,就像这样:

    new Observable(subscriber => {
      ...
    });
    

    注:对于主题,情况略有不同 .

    例如,以下是一些相关部分 fromEvent :

    return new Observable<T>((subscriber) => {
      const handler = (...args: any[]) => subscriber.next(1 < args.length ? args : args[0]);
    
      // ...
    }
    

    尽管以非常声明性的方式定义数据源(例如。 hover$ -> lastHoverEvent$ -> primaryHover$ ->…),每次订阅都会导致再次订阅源。万一 悬停$ ,每次订阅其任何 衍生来源 (例如。 primary悬停$ )将导致 新订户 被添加到 悬停$ 主题的订阅者列表。

    这条路 共享() 操作员通过使用 local Subject 这将跟踪所有订阅者。原始数据源将是 subscribed only once ,通过检查有多少订阅者处于活动状态。

    对问题2的答复

    我想你还是可以去的 withLatestFrom 方法,但使用 Subject 而不是a ReplaySubject .

    const click$ = fromEvent(document, 'click');
    const hover$ = new Subject<CustomHoverEvent>();
    const lastHoverEvent$ = click$.pipe(
      withLatestFrom(hover$),
    
      // Covering the case where `hover$` has been cleared.
      filter(([, hover]) => !!hover),
      
      map(([click, hover]) => hover),
      tap({ next: () => {
        // Clearing the possible stale hover events.
        hover$.next(null);
      }}),
    );
    

    以下是我根据上面的片段可以想到的场景:

    1. 用户首先悬停 悬停$ next() 最新的悬停事件;当用户点击时,由于使用 withLatestFrom ,将仅使用最新值
    2. 如果用户点击2次:在第一次点击之后, hover$.next(null) 将被调用,因此该值 最新来源 关于 悬停$ null ; 然后,当用户第二次点击时 filter 条件不会过去;但是,如果用户再次悬停 悬停$ 将相应地更新,因此,如果用户单击 滤波器 通行证将检查