代码之家  ›  专栏  ›  技术社区  ›  D-Dᴙum

在另一个线程上订阅BehaviorSubject

  •  4
  • D-Dᴙum  · 技术社区  · 6 年前

    我刚接触到RXJava,我选择使用它是因为我认为它非常适合我的用例。

    我有一些 Integer 我希望在无限长的时间内观察到的值。每当这些值中的一个发生更改(即事件)时,我希望调用它的所有观察者 另一个线程 .

    由于观察时间长,我认为我需要使用 BehaviorSubject 课程(尽管最初我认为 Observable 是我所需要的……因为我只需要“观察”),我可以使用 subscribeOn() 方法来设置调度程序,从而实现在后台线程上调用订阅服务器:

    private BehaviorSubject<Integer> rotationPositionSubject = BehaviorSubject.createDefault(getRotorPosition());
    rotationPositionSubject.subscribeOn(scheduler);
    

    我有一个 rotate() 我用来更新的方法 rotationPositionSubject 从主线程调用:

    @Override
    public synchronized int rotate()
    {
        final int newRotorPosition = super.rotate();
        rotationPositionSubject.onNext(newRotorPosition);
    
        return newRotorPosition;
    }
    

    但是,通过上面的代码,我发现订阅服务器是在“主”线程上调用的。正在检查文档 subscribeOn() :

    返回:

    源ObservableSource已修改,以便其订阅发生在指定的计划程序上

    所以我上面的代码不会工作,因为我没有使用返回的ObservableSource,但是返回对象是 可观察的 哪个对我的申请没有用处?

    问题是,我如何长期观察RxJava后台线程上的任何对象和调用订阅服务器,或者RxJava是错误的选择?

    1 回复  |  直到 6 年前
        1
  •  0
  •   D-Dᴙum    6 年前

    经过一些试验,人们在使用时似乎需要一些照顾。 BehaviorSubject 对象及其使用并不像我从各种接口的名称中推断的那样明显。

    作为一种测试方法来演示我目前正在做的工作:

    @Test
    public void test()
    {
        System.out.println("Executing test on thread ID: " + Thread.currentThread().getId());
        final BehaviorSubject<Integer> rotorBehaviour = BehaviorSubject.create();
        rotorBehaviour.subscribeOn(Schedulers.single());
        rotorBehaviour.subscribe(new Observer<Integer>()
        {
            @Override
            public void onSubscribe(final Disposable d)
            {
                System.out.println("onSubscribe() called on thread ID: " + Thread.currentThread().getId());
            }
    
            @Override
            public void onNext(final Integer integer)
            {
                System.out.println("onNext() called on thread ID: " + Thread.currentThread().getId());
            }
    
            @Override
            public void onError(final Throwable e)
            {
                System.out.println("onError() called on thread ID: " + Thread.currentThread().getId());
            }
    
            @Override
            public void onComplete()
            {
                System.out.println("onComplete() called on thread ID: " + Thread.currentThread().getId());
            }
        });
    
    
        rotorBehaviour.onNext(1);
        rotorBehaviour.onNext(2);
    
    }
    

    这会导致不期望的结果:

    对线程ID:1执行测试
    在线程ID:1上调用OnSubscribe()。
    在线程ID:1上调用OnNext()。
    在线程ID:1上调用OnNext()。

    进程已完成,退出代码为0

    (不希望是因为 onNext() 在主线程上调用)

    修改代码以使用 Observable 从呼叫返回到 subscribeOn 给出相同的不期望结果:

    @Test
    public void test()
    {
        System.out.println("Executing test on thread ID: " + Thread.currentThread().getId());
        final BehaviorSubject<Integer> rotorBehaviour = BehaviorSubject.create();
        Observable<Integer> rotorObservable = rotorBehaviour.subscribeOn(Schedulers.single());
    
        rotorObservable.subscribe(new Observer<Integer>()
        {
            @Override
            public void onSubscribe(final Disposable d)
            {
                System.out.println("onSubscribe() called on thread ID: " + Thread.currentThread().getId());
            }
    
            @Override
            public void onNext(final Integer integer)
            {
                System.out.println("onNext() called on thread ID: " + Thread.currentThread().getId());
            }
    
            @Override
            public void onError(final Throwable e)
            {
                System.out.println("onError() called on thread ID: " + Thread.currentThread().getId());
            }
    
            @Override
            public void onComplete()
            {
                System.out.println("onComplete() called on thread ID: " + Thread.currentThread().getId());
            }
        });
        rotorBehaviour.onNext(1);
        rotorBehaviour.onNext(2);
    }
    

    结果:

    对线程ID:1执行测试
    在线程ID:1上调用OnSubscribe()。
    在线程ID:1上调用OnNext()。
    在线程ID:1上调用OnNext()。

    进程已完成,退出代码为0

    但是使用 observeOn() 方法确实给出了所需的结果:

    @Test
    public void test()
    {
        System.out.println("Executing test on thread ID: " + Thread.currentThread().getId());
        final BehaviorSubject<Integer> rotorBehaviour = BehaviorSubject.create();
        Observable<Integer>rotorObservable = rotorBehaviour.observeOn(Schedulers.single());
    
        rotorObservable.subscribe(new Observer<Integer>()
        {
            @Override
            public void onSubscribe(final Disposable d)
            {
                System.out.println("onSubscribe() called on thread ID: " + Thread.currentThread().getId());
            }
    
            @Override
            public void onNext(final Integer integer)
            {
                System.out.println("onNext() called on thread ID: " + Thread.currentThread().getId());
            }
    
            @Override
            public void onError(final Throwable e)
            {
                System.out.println("onError() called on thread ID: " + Thread.currentThread().getId());
            }
    
            @Override
            public void onComplete()
            {
                System.out.println("onComplete() called on thread ID: " + Thread.currentThread().getId());
            }
        });
        rotorBehaviour.onNext(1);
        rotorBehaviour.onNext(2);
    }
    

    对线程ID:1执行测试
    在线程ID:1上调用OnSubscribe()。
    在线程id:13上调用了onnext()。
    在线程id:13上调用了onnext()。

    进程已完成,退出代码为0

    同样,在所有示例中,我仍然使用 行为主体 对象发起一个事件,我只是偶然发现,这将提供所需的结果。

    我担心的是我可能正在使用 可观察的 行为主体 以一种错误的方式,即在后台线程上调用订户,从而“碰巧”给出正确的结果。除非我在文档的某个地方遗漏了它,否则如何使用这些对象来获得所需的结果似乎并不明显。