代码之家  ›  专栏  ›  技术社区  ›  andras

RxJava:concatMap()和zip()卡住了

  •  0
  • andras  · 技术社区  · 7 年前

    我有一个虚拟网络数据源:

        fun networkDataSource(): Single<List<Int>> {
            return Single.just((0 until 100).toList())
                    .delay(150, TimeUnit.MILLISECONDS)
        }
    

    这是一个无止境的观察。它的主要用途是它的计算应该被“保护”,所以 . (这里的值是1。)

        val endless = Observable
                .just(1)
                .observeOn(Schedulers.io())
                .delay(500, TimeUnit.MILLISECONDS)
                // Counts as heavy operation, do not calculate this here once again
                .doOnNext { println("=> E: Calculated once") }
                .cache()
                //.doOnNext { println("=> E: From cache") }
                .repeat()
    

        val mainStream = Observable.range(0, 6)
                .doOnNext { println("=> M: Main stream $it") }
    

    将3个可观察对象压缩到一起,并优化网络使用,这样就不会调用过多(一旦满足数据的数量(本例中为整数)。

    方法:

        mainStream
                .concatMap {index ->
                    Observables.zip(
                            Observable.just(index),
                            endless,
                            networkDataSource()
                                    .toObservable()
                                    .doOnNext { println("#> N: Network data fetch $index") }
                    )
                }
                .doOnNext { println("=> After concatmap: ${it.first}") }
                .take(4)
                .doOnNext { println("=> After take: ${it.first}") }
                .subscribe(
                        { println("=> Last onnext") },
                        { it.printStackTrace() },
                        { synchronized(check) { check.notifyAll() } }
                )
    

    完成锁紧螺纹-仅用于测试:

    synchronized(check) {
        check.wait()
    }
    println("Ending")
    

    以下是输出:

    => M: Main stream 0
    => M: Main stream 1
    => M: Main stream 2
    => M: Main stream 3
    => M: Main stream 4
    => M: Main stream 5
    #> N: Network data fetch 0
    => E: Calculated once
    => After concatmap: 0
    => After take: 0
    => Last onnext
    #> N: Network data fetch 1
    => After concatmap: 1
    => After take: 1
    => Last onnext
    

    作为旁注,如果我从 endless 可观察的:

    .doOnNext { println("=> E: From cache") }
    

    无止境的 每次迭代调用这么多次?

    flatMap() 这不是一个解决方案,因为它不需要 take(4) 并继续完成所有网络通话。

    concatMap() 工作?

    (我还补充道 RxJS公司 标签,因为这是一个反应性的问题,绝对不是关于科特林。如果RxJava库中存在这些函数,那么JS解决方案也是受欢迎的。)

    我查看了代码,这两个输出可能是因为 prefetch

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
        return concatMap(mapper, 2);
    }
    

    但我还是不明白它是怎么工作的。我只是读过 concatMap() flatmap()

    1 回复  |  直到 7 年前
        1
  •  1
  •   akarnokd    7 年前

    从评论来看:

    repeat 在里面 endless cache