代码之家  ›  专栏  ›  技术社区  ›  Joe Daley

切换到新的Observable,输入以前Observable的最新值

  •  2
  • Joe Daley  · 技术社区  · 7 年前

    我有一个请求对象流:

    let request$ = new Subject();
    

    我有一个函数,它接受一个请求对象和当前应用程序状态对象,并返回一个新状态对象流:

    function stream(request, state) {
        // Return a cold Observable of new state objects
    }
    

    当发出第一个请求时,我想调用 stream 然后传入该请求和初始状态对象,并开始处理它返回的状态。

    一段时间后,会发出第二个请求。第一个流需要停止并替换为通过调用 流动 再次,传入第二个请求和从第一个流发出的最新状态。

    我觉得也许 scan 高阶流可以做到,但我被困在这里:

    request$.pipe(
        scan((state$, request) => {
            let previousState = ???
            return stream(request, previousState);
        }, of(initialState)),
        switchAll()
    )
    

    编辑-示例

    在现实生活中, 流动 函数建立到服务器的WebSockets连接,发送请求中的部分请求和状态参数,然后在从服务器接收消息时发出新的状态对象。

    举个简单的例子 request state 只是数字而已 流动 函数只是重复添加 请求 状态 ,每秒发出一个结果:

    function stream(request, state) {
        return interval(1000).pipe(
            map(i => state + (i+1) * request)
        );
    }
    // e.g. stream(2, 10) will emit 12, 14, 16, 18, ...
    

    期望的输出是:

    initialState = 10
    
    request$       2------------5-----------------
    stream(2, 10)  --12--14--16-|
    stream(5, 16)               --21--26--31--36--
    ...
    output         --12--14--16---21--26--31--36--
    

    注意当时 5 发出请求,两个 最新状态 16 需要创建下一个流。

    2 回复  |  直到 7 年前
        1
  •  3
  •   cartant    7 年前

    一个简单的方法就是 defer 要创建可存储最新状态的每个订阅状态,请执行以下操作:

    const source = defer(() => {
        let lastState = initialState;
        return request$.pipe(
            switchMap(request => stream(request, lastState)),
            tap(state => lastState = state)
        );
    });
    

    用这种方式, 推迟 将确保每个订阅 source 会有自己的 lastState .

        2
  •  1
  •   ibenjelloun    7 年前

    这个 scan 运算符接受具有前一状态和当前值的函数 initialState 也不应该是 Observable .

    我认为这样会更好:

    request$.pipe(
        scan((previousState, request) => {
            return stream(request, previousState);
        }, initialState),
        switchAll()
    )