代码之家  ›  专栏  ›  技术社区  ›  Dan Gravell

实现分支逻辑时跨Flowables的取消

  •  0
  • Dan Gravell  · 技术社区  · 4 年前

    我有一些 Flowable s组成使用 FlowableTransformer 这样地:

    public static FlowableTransformer<I, O> transformer() {
        return f -> {
            return Flowable.mergeArray(5, 5, f.filter(...).map(...), f.filter(...).map(...));
        }
    }
    

    这里的基本用例是具有某种形式的分支逻辑 -如果元素是A( filter )那么做B( map )或者如果它们是C,那么就做D。

    然而,在下游,当我储存 Disposable 稍后再打电话 cancel 在它上面,取消只会在最远的地方出现 可流动 返回由 mergeArray 上游,即呼叫上游的运营商 compose(transformer()) ,从未收到取消通知。

    所以我认为这是个问题 Flowable.merge() 本身(或者更确切地说:我缺乏理解),所以我改为使用 replay() 以及a ConnectableFlowable :

    public static FlowableTransformer<I, O> transformer() {
        return f -> {
            ConnectableFlowable<TaggedUpdates> cf = taggedUpdateFlowables.replay(5);
            
            Flowable<O> o = f
                .filter(...)
                .map(...)
                .concatWith(
                        cf
                            .filter(...)
                            .map(...)
                );
            
            cf.connect();
            return o;
        };
    }
    

    然而,同样的事情也会发生。 cancellation 只会膨胀到 可连接可流动 .

    但是,我需要在整个Flowable中传播取消,以便项目的来源停止生产。

    有办法吗?或者我应该用另一种方式来处理分支逻辑吗?

    0 回复  |  直到 4 年前
        1
  •  0
  •   Dan Gravell    4 年前

    以下是我目前的工作方法。而不是使用合并或 replay 我已经把它改成只用了 flatMap :

    public static FlowableTransformer<I, O> transformer() {
        return f -> {
            return f
                .flatMap(i -> {
                    return Flowable.just(i.items)
                         .filter(...)
                         .map(...)
                         .concatWith(
                             .flatMap(i -> {
                                 Flowable.just(i.items)
                                     .filter(...)
                                     .map(...)
                             })
                         );
                   });
        };
    }
    

    然而,我有点担心以后我将无法做到这个变通方法,因为我 需要使用 重播 例如,在上游计算昂贵的情况下。