public static Completable fromCallable(final Callable<?> callable) {
requireNonNull(callable);
return create(new OnSubscribe() {
@Override
public void call(rx.CompletableSubscriber s) {
BooleanSubscription bs = new BooleanSubscription();
s.onSubscribe(bs);
try {
// calls the callable regardless
callable.call();
} catch (Throwable e) {
if (!bs.isUnsubscribed()) {
s.onError(e);
}
return;
}
// the emission of the complete is guarded
if (!bs.isUnsubscribed()) {
s.onCompleted();
}
}
});
}
所以如果我们修改你的代码如下添加
doOnComplete
对每个人。
Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());
Subscription subscription1 = Completable.fromCallable(() -> {
Thread.sleep(1000);
System.out.println("first Callable executed");
return 0;
})
.doOnComplete(() -> System.out.println("first Completable complete"))
.subscribeOn(scheduler)
.subscribe();
Subscription subscription2 = Completable.fromCallable(() -> {
Thread.sleep(1000);
System.out.println("second Callable executed");
return 0;
})
.doOnComplete(() -> System.out.println("second Completable complete"))
.subscribeOn(scheduler)
.subscribe();
CompositeSubscription subscriptions = new CompositeSubscription();
subscriptions.addAll(subscription1, subscription2);
subscriptions.clear();
我们得到:
first Callable executed
second Callable executed
如果我们不清除订阅,我们将看到额外的输出,即。
first Callable executed
first Completable complete
second Callable executed
second Completable complete