代码之家  ›  专栏  ›  技术社区  ›  Totty.js

RXJS按ID拆分并按顺序处理每个ID

  •  0
  • Totty.js  · 技术社区  · 6 年前

    问题:游戏:所以我有一些船可以到达许多行星。如果这两艘船同时到达新的星球,可能会导致同一个过程,两次所有权的改变。这个过程是异步的,并且应该在每个行星的所有权改变时发生一次。

    为了解决这个问题,我想通过行星ID分割船只流,这样每条流只能用于一个行星。现在最棘手的部分是,每艘船舶只有在前一艘船舶经过处理后才能进行处理。

    • 船$
    • 按行星ID划分
      • 行星ID1:顺序处理
      • 行星ID2:顺序处理

    这里有一些代码将显示它应该如何工作。

    const ships = [
      {
        id: 1,
        planetId: 1,
      },
      {
        id: 2,
        planetId: 1,
      },
      {
        id: 3,
        planetId: 2,
      },
      // ... never finishes 
    ]
    // the source observable never finishes 
    const source$ = interval(1000).pipe(
      take(ships.length),
      map(i => ships[i]),
    )
    
    const createSubject = (ship) => {
      // Doesn't need to be a subject, but needs to emit new items after a bit of time based on some other requests.
      console.log(`>>>`, ship.id);
      const subject = new Subject();
      setTimeout(() => {
        subject.next(ship.id + ' a' + new Date());
      }, 1000);
      setTimeout(() => {
        subject.next(ship.id + ' b' + new Date());
        subject.complete();
      }, 2000);
      return subject.asObservable();
    }
    
    // The result should be the following (t, is the time in seconds, t3, is time after 3 seconds)
    // t0: >>> 1
    // t0: >>> 3
    // t1: 1 a
    // t1: 2 a
    // t2: 1 b
    // t2: 2 b
    // t2: >>> 2 (note that the second ship didn't call the createSubject until the first finished)
    // t3: 1 a
    // t4: 1 2
    

    解决方案(在A.Winnen的大量帮助下,一些人找到了答案)

    在这里运行: https://stackblitz.com/edit/angular-8zopfk?file=src/app/app.component.ts

      const ships = [
        {
          id: 1,
          planetId: 1,
        },
        {
          id: 2,
          planetId: 1,
        },
        {
          id: 3,
          planetId: 2,
        }
      ];
      const createSubject = (ship) => {
        console.log(ship.id + ' a')
        const subject = new Subject();
        setTimeout(() => {
          //subject.next(ship.id + ' b');
        }, 500);//
        setTimeout(() => {
          subject.next(ship.id + ' c');
          subject.complete();//
        }, 1000);
        return subject.asObservable();
      }
      let x = 0;
      interval(10).pipe(//
        take(ships.length),
        map(i => ships[i]),
        groupBy(s => s.planetId),
        mergeMap(group$ => {//
          x++
          return group$.pipe(
            tap(i => console.log('x', i, x)),
            concatMap(createSubject)
          )
        }),
      ).subscribe(res => console.log('finish', res), undefined, () => console.log("completed"))
    

    如何在RXJS中实现这一点?

    代码:

      const shipArriveAction$ = action$.pipe<AppAction>(
        ofType(ShipActions.arrive),
        groupBy(action => action.payload.ship.toPlanetId),
        mergeMap((shipByPlanet$: Observable<ShipActions.Arrive>) => {
          return shipByPlanet$.pipe(
            groupBy(action => action.payload.ship.id),
            mergeMap((planet$) => {
              return planet$.pipe(
                concatMap((action) => {
                  console.log(`>>>concat`, new Date(), action);
                  // this code should be called in sequence for each ship with the same planet. I don't need only the results to be in order, but also this to be called in sequence.
                  const subject = new Subject();
                  const pushAction: PushAction = (pushedAction) => {
                    subject.next(pushedAction);
                  };
                  onShipArriveAction(state$.value, action, pushAction).then(() => {
                    subject.complete();
                  });
                  return subject.asObservable();
                }),
              )
            })
          );
    
        )
      ;
    

    来自A.Winnen的代码非常接近,但只适用于完成而不是连续的可观察源:

        const ships = [
          {
            id: 1,
            planetId: 1,
          },
          {
            id: 2,
            planetId: 1,
          },
          {
            id: 3,
            planetId: 2,
          }
        ];
        const createSubject = (ship) => {
          console.log(ship.id + ' a')
          const subject = new Subject();
          setTimeout(() => {
            subject.next(ship.id + ' b');
          }, 1000);//
          setTimeout(() => {
            subject.next(ship.id + ' c');
            subject.complete();//
          }, 2000);
          return subject.asObservable().pipe(
            finalize(null)
          );
        }
    
        interval(1000).pipe(
          take(ships.length),
          tap(console.log),
          map(i => ships[i]),
          groupBy(s => s.planetId),
          mergeMap(group => group.pipe(toArray())),
          mergeMap(group => from(group).pipe(
            concatMap(createSubject)
          ))
        ).subscribe(res => console.log(res), undefined, () => console.log("completed"))
    
    1 回复  |  直到 6 年前
        1
  •  1
  •   A.Winnen    6 年前

    您可以使用groupby和mergemap的组合来实现您的目标。

    from(ships).pipe(
      groupBy(ship => ship.planetId),
      mergeMap(planetGroup => planetGroup.pipe(
        concatMap(ship => {
          // do real processing in this step
         return of(`planetGroup: ${planetGroup.key} - processed ${ship.ship}`);
        })
      ))
    ).subscribe(result => console.log(result));
    

    我举了一个简单的例子: https://stackblitz.com/edit/angular-6etaja?file=src%2Fapp%2Fapp.component.ts

    编辑: 更新的BlitzStack: https://stackblitz.com/edit/angular-y7znvk