代码之家  ›  专栏  ›  技术社区  ›  krupal.agile

如何取消RxJava改造中的单个网络请求?

  •  2
  • krupal.agile  · 技术社区  · 8 年前

    我正在使用改型和rxjava从网络下载一些文件。在我的应用程序中,用户可以取消下载。

    伪代码:

       Subscription subscription = Observable.from(urls)
                .concatMap(this::downloadFile)
                .subscribe(file -> addFileToUI(file), Throwable::printStackTrace);
    

    现在,如果我取消订阅,那么所有请求都会被取消。我想逐一下载,这就是为什么使用concatMap。如何取消特定请求?

    1 回复  |  直到 8 年前
        1
  •  3
  •   akarnokd    8 年前

    有一种机制可以通过外部刺激来抵消个人资金流: takeUntil . 不过,您必须使用一些外部跟踪:

    ConcurrentHashMap<String, PublishSubject<Void>> map =
         new ConcurrentHashMap<>();
    
    
    Observable.from(urls)
    .concatMap(url -> {
        PublishSubject<Void> subject = PublishSubject.create();
        if (map.putIfAbsent(url, subject) == null) {
            return downloadFile(url)
                .takeUntil(subject)
                .doAfterTerminate(() -> map.remove(url))
                .doOnUnsubscribe(() -> map.remove(url));
        }
        return Observable.empty();
    })
    .subscribe(file -> addFileToUI(file), Throwable::printStackTrace);
    
    // sometime later
    
    PublishSubject<Void> ps = map.putIfAbsent("someurl", PublishSubject.create());
    ps.onCompleted();