代码之家  ›  专栏  ›  技术社区  ›  Maksim Ostrovidov

RxJava fromCallable和unsubscription

  •  2
  • Maksim Ostrovidov  · 技术社区  · 7 年前

    我有以下代码(RxJava 1.3.8):

    Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());
    
    Subscription subscription1 = Completable.fromCallable(() -> {
      Thread.sleep(1000);
      System.out.println("first Callable executed");
      return 0;
    })
        .subscribeOn(scheduler)
        .subscribe();
    
    Subscription subscription2 = Completable.fromCallable(() -> {
      Thread.sleep(1000);
      System.out.println("second Callable executed");
      return 0;
    })
        .subscribeOn(scheduler)
        .subscribe();
    
    CompositeSubscription subscriptions = new CompositeSubscription();
    subscriptions.addAll(subscription1, subscription2);
    subscriptions.clear();
    

    输出:

    执行第二次可调用

    问题是-为什么第二个可调用的被执行了?在运行之前,如果取消订阅,我希望已检查订阅并取消执行。

    1 回复  |  直到 7 年前
        1
  •  2
  •   Stuart Wakefield    6 年前

    public static Completable fromCallable(final Callable<?> callable) {
        requireNonNull(callable);
        return create(new OnSubscribe() {
            @Override
            public void call(rx.CompletableSubscriber s) {
                BooleanSubscription bs = new BooleanSubscription();
                s.onSubscribe(bs);
                try {
                    // calls the callable regardless
                    callable.call();
                } catch (Throwable e) {
                    if (!bs.isUnsubscribed()) {
                        s.onError(e);
                    }
                    return;
                }
                // the emission of the complete is guarded
                if (!bs.isUnsubscribed()) {
                    s.onCompleted();
                }
            }
        });
    }
    

    所以如果我们修改你的代码如下添加 doOnComplete 对每个人。

    Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());
    
    Subscription subscription1 = Completable.fromCallable(() -> {
      Thread.sleep(1000);
      System.out.println("first Callable executed");
      return 0;
    })
        .doOnComplete(() -> System.out.println("first Completable complete"))
        .subscribeOn(scheduler)
        .subscribe();
    
    Subscription subscription2 = Completable.fromCallable(() -> {
      Thread.sleep(1000);
      System.out.println("second Callable executed");
      return 0;
    })
        .doOnComplete(() -> System.out.println("second Completable complete"))
        .subscribeOn(scheduler)
        .subscribe();
    
    CompositeSubscription subscriptions = new CompositeSubscription();
    subscriptions.addAll(subscription1, subscription2);
    subscriptions.clear();
    

    我们得到:

    first Callable executed
    second Callable executed
    

    如果我们不清除订阅,我们将看到额外的输出,即。

    first Callable executed
    first Completable complete
    second Callable executed
    second Completable complete