代码之家  ›  专栏  ›  技术社区  ›  n0shadow

RxJava:为一个完整表列表指定最大并发性

  •  2
  • n0shadow  · 技术社区  · 7 年前

    我试图查询一个API,该API为我提供了一个要下载的文件列表(如下所示)。然后,我继续下载这些文件,同时重新查询API以查找在初始调用中可能遗漏的任何内容。

    Completable#mergeDelayError(Iterable<? extends CompletableSource> sources) 用于确保我可以并行执行多个任务,并在完成所有任务后收到通知。

    fun fetchAndDownload(details: List<String>): Completable = 
        exampleApi.fetchPackages(details) // This is a Single
            .flatMapCompletable { (results, retry) -> 
                val completables = mutableListOf<Completable>()
                results.mapTo(completables) { value ->
                    exampleApi.download(value).subscribeOn(Schedulers.io())
                }
    
                if (retry.isNotEmpty()) { 
                    completables += fetchAndDownload(retry)
                        .delay(3L, TimeUnit.SECONDS)
                        .subscribeOn(Schedulers.io())
                }
                Completable.mergeDelayError(completables)
            }
    

    然而,这种实现有可能通过一次执行太多的事情来压倒网络和/或线程数。因此,我想知道限制 completables 正在同时运行。

    我知道 Completable#mergeDelayError(Publisher<? extends CompletableSource> sources, int maxConcurrency) 但我不知道如何转换我的 List<Completable> 到所需的 Publisher 。另一种解决方案是提供自定义 Scheduler 这有一个最大线程数,但我也不知道如何提供这样一个 Schduler (当不再需要时,我可以清洁并丢弃)。

    2 回复  |  直到 7 年前
        1
  •  2
  •   n0shadow    7 年前

    最简单的方法是使用 Floable.fromIterable 转换 List 属于 Completable 到a Publisher

    这将允许使用 Completable#mergeDelayError(Publisher<? extends CompletableSource> sources, int maxConcurrency)

        2
  •  1
  •   paul    7 年前

    你可以使用 平面图 具有 最大并发 值,然后使管道异步运行。

      @Test
        public void asyncFlatMapWithMaxConcurrent() {
            Observable.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
                    .flatMap(value -> Observable.just(value)
                            .map(number -> {
                                try {
                                    Thread.sleep(1000);
                                    System.out.println(String.format("Value %s in Thread execution:%s",number, Thread.currentThread().getName()));
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                return number;
                            }).subscribeOn(Schedulers.newThread())
                            , 2)//This is the max concurrenrt
            .subscribe();
            new TestSubscriber()
                    .awaitTerminalEvent(15, TimeUnit.SECONDS);    }
    

    如果您看到flatMap函数之后的第二个参数,那么我们将传递一个值2,它是可以为该flatMap运行的最大并发线程数

    你可以在这里看到这个例子和更多。

    https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/transforming/ObservableFlatMap.java