代码之家  ›  专栏  ›  技术社区  ›  Łukasz

RXJava 1.3-AMB不订阅或取消订阅较慢的流

  •  0
  • Łukasz  · 技术社区  · 7 年前
        Observable<Object> obs1 = Observable
                .create(subscriber -> subscriber.onNext("obs 1 event"))
                .doOnSubscribe(() -> System.out.println("obs1 sub"))
                .doOnUnsubscribe(() -> System.out.println("obs1 unsub"));
    
        Observable<Object> obs2 = Observable
                .create(subscriber -> subscriber.onNext("obs 2 event"))
                .doOnSubscribe(() -> System.out.println("obs2 sub"))
                .doOnUnsubscribe(() -> System.out.println("obs2 unsub"));
    
        Observable
                .amb(obs1, obs2)
                .subscribe(System.out::println);
    
        Thread.sleep(500);
    

    应该调用obs2 doon*方法,并且应该只发出两个事件中的一个。程序输出:

    obs1 sub
    obs 1 event
    

    没有调用obs2的*订阅方法。

    1 回复  |  直到 7 年前
        1
  •  1
  •   akarnokd    7 年前

    RxJava源和运算符在默认情况下是同步的,除非它们使用调度程序引入异步。上面的代码都没有涉及任何调度程序,因此执行将是同步的。 amb 不要试图订阅第二个,因为第一个已经赢得了比赛。

    当我将create替换为 just 它按我的预期工作

    原因 只是 产生不同的结果是反压力,你没有用你不赞成的方法实现它。 create 用法。 磁悬浮轴承 先订阅源,然后从源请求,这样就得到了订阅的副作用。在您中断的实现中,第一个源会立即将其项推送到 磁悬浮轴承 到它的win状态,防止第二次订阅发生。