我有一个虚拟网络数据源:
fun networkDataSource(): Single<List<Int>> {
return Single.just((0 until 100).toList())
.delay(150, TimeUnit.MILLISECONDS)
}
这是一个无止境的观察。它的主要用途是它的计算应该被“保护”,所以
.
(这里的值是1。)
val endless = Observable
.just(1)
.observeOn(Schedulers.io())
.delay(500, TimeUnit.MILLISECONDS)
// Counts as heavy operation, do not calculate this here once again
.doOnNext { println("=> E: Calculated once") }
.cache()
//.doOnNext { println("=> E: From cache") }
.repeat()
val mainStream = Observable.range(0, 6)
.doOnNext { println("=> M: Main stream $it") }
将3个可观察对象压缩到一起,并优化网络使用,这样就不会调用过多(一旦满足数据的数量(本例中为整数)。
方法:
mainStream
.concatMap {index ->
Observables.zip(
Observable.just(index),
endless,
networkDataSource()
.toObservable()
.doOnNext { println("#> N: Network data fetch $index") }
)
}
.doOnNext { println("=> After concatmap: ${it.first}") }
.take(4)
.doOnNext { println("=> After take: ${it.first}") }
.subscribe(
{ println("=> Last onnext") },
{ it.printStackTrace() },
{ synchronized(check) { check.notifyAll() } }
)
完成锁紧螺纹-仅用于测试:
synchronized(check) {
check.wait()
}
println("Ending")
以下是输出:
=> M: Main stream 0
=> M: Main stream 1
=> M: Main stream 2
=> M: Main stream 3
=> M: Main stream 4
=> M: Main stream 5
#> N: Network data fetch 0
=> E: Calculated once
=> After concatmap: 0
=> After take: 0
=> Last onnext
#> N: Network data fetch 1
=> After concatmap: 1
=> After take: 1
=> Last onnext
作为旁注,如果我从
endless
可观察的:
.doOnNext { println("=> E: From cache") }
无止境的
每次迭代调用这么多次?
flatMap()
这不是一个解决方案,因为它不需要
take(4)
并继续完成所有网络通话。
concatMap()
工作?
(我还补充道
RxJS公司
标签,因为这是一个反应性的问题,绝对不是关于科特林。如果RxJava库中存在这些函数,那么JS解决方案也是受欢迎的。)
我查看了代码,这两个输出可能是因为
prefetch
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return concatMap(mapper, 2);
}
但我还是不明白它是怎么工作的。我只是读过
concatMap()
flatmap()