代码之家  ›  专栏  ›  技术社区  ›  char m

如何创建一个返回Observable的方法,该方法会发出两个需要一个接一个执行的承诺的结果?

  •  0
  • char m  · 技术社区  · 3 年前

    我问了一个问题

    Is Observable from chained promises equivalent of observables created with from and chained with concatMap?

    在完全错误的前提下。似乎我的两个解决方案都与我的意图无关。

    我创建了一个返回Observable的方法,并调用了两个返回Promise的方法。我尝试了两种方法:

      public setItemInfos(itemInfos: IItemInfo[]): Observable<number> {
        return from(this.db.selectionItemInfos.clear().then(() => {
          return this.db.selectionItemInfos.bulkAdd(itemInfos);
        }));
      }
    
      public setItemInfos(itemInfos: IItemInfo[]): Observable<number> {
        const clear$ = from(this.db.selectionItemInfos.clear());
        const bulkAdd$ = from(this.db.selectionItemInfos.bulkAdd(itemInfos));
    
        return clear$.pipe(concatMap(() => bulkAdd$))
      }
    

    用途包括:

    myService.setItemInfos(itemInfos).subsribe(count => {
      console.log(`Cleared the table 1st and then added ${count} new items`);
    });
    

    从两个版本中我都认为:

    1. 表clear是指bulkAdd启动时执行完成
    2. 当bulkAdd完成时,我从订阅中获取计数

    这到底应该怎么做?还是可以做到?

    0 回复  |  直到 3 年前
        1
  •  1
  •   Mrk Sef    3 年前

    这是(从我这里可以告诉你的),我将如何做到这一点。

    一般来说 defer (或任何高阶运算符)是从承诺中创建可观测的更好方法。“延迟”允许您获取承诺的热切评估语义,并将其转换为可观察对象的懒惰评估语义。

    然后,所有常见的可观测算子等都将按预期运行。

    public setItemInfos(itemInfos: IItemInfo[]): Observable<number> {
      const clear$ = defer(() => this.db.selectionItemInfos.clear());
      const bulkAdd$ = defer(() => this.db.selectionItemInfos.bulkAdd(itemInfos));
    
      return concat(clear$, bulkAdd$);
    }
    

    更新1:

    所以我想我可能知道你在找什么。这并不是真正惯用的RxJS,因为它是声明式、命令式代码风格的交错组合。即便如此,这应该管用吗?我还没有完全测试过它,但是做了一些修补,我认为这应该符合你的要求。

    毫无疑问,有更好的方法来完成同样的事情,但如果你没有看到你所追求的更大的图景,很难说。

    
    interface Tagged<T> {
      payload: T,
      tag: number
    }
    
    class abitraryClass{
    
      private setItemInfoSub: Subject<Tagged<IItemInfo[]>>;
      private processItemInfo: Observable<Tagged<number>>;
      private itemInfoTag = 0;
    
      constructor(){
        this.setItemInfoSub = new Subject<Tagged<IItemInfo[]>>();
        this.processItemInfo = this.setItemInfoSub.pipe(
          concatMap(({tag, payload: itemInfos}) => this.db.selectionItemInfos.clear().pipe(
            ignoreElements(),
            concatWith(defer(() => this.db.selectionItemInfos.bulkAdd(itemInfos))),
            map(response => ({
              payload: response,
              tag
            }))
          )),
          shareReplay(1)
        );
        // Make the processing pipeline live at all times.
        this.processItemInfo.subscribe();
      }
    
      public setItemInfos(itemInfos: IItemInfo[]): Observable<number> {
        const myTag = this.itemInfoTag++;
    
        this.setItemInfoSub.next({
          payload: itemInfos,
          tag: myTag
        });
    
        return this.processItemInfo.pipe(
          filter(({tag}) => tag == myTag),
          map(({payload}) => payload)
        );
      }
    }