代码之家  ›  专栏  ›  技术社区  ›  Hector

如何将Firebase实时数据库回调与RxJava2结合起来

  •  0
  • Hector  · 技术社区  · 7 年前

    我当前的Android项目在Firebase实时数据库中保存着不同类型的参考数据。

    Android应用程序拥有此参考数据的本地副本,必须根据上次更新的日期进行刷新。

    我希望使用RxJava来控制并行刷新此参考数据。

    成功刷新所有参考数据后,我希望将上次本地更新的日期重置为今天的日期。

    我目前使用的解决方案没有达到预期效果。我无法重置上次更新的日期,以等待成功刷新参考数据。

    我的代码目前类似于:-

        Observable.just(Boolean.TRUE)
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .doOnNext(refresh -> referenceTypeA())
                .doOnNext(refresh -> referenceTypeB())
                .doOnNext(refresh -> referenceTypeC())
                .doOnNext(refresh -> recordTodaysDate())
                .subscribe();
    

    每个 referenceTypeX() 方法的结构如下所示:-

    private void referenceTypeX() {
    
            firebaseService
                    .getReferenceTypeX()
                    .doAfterSuccess(this::referenceTypeX)
                    .doOnError(this::processError)
                    .subscribe();
        }
    
    
    public Single<List<ReferenceTypeX>> getReferenceTypeX() {
        return Single.create(emitter ->
                mReferenceTypeXDatabaseReference
                        .orderByChild("typeXKey")
                        .addValueEventListener(buildTypeXListener(emitter)));
    }
    

    这就是问题所在,因为Firebase回调在主线程上运行,并且一些参考数据有1000行,我在后台线程上处理数据更改事件,如下所示:-

    public class TypeXListener extends BaseValueEventListener<List<ReferenceTypeX>> {
        private static final TypeXMapper TYPE_X_MAPPER = Selma.builder(TypeXMapper.class).build();
        private static final List<ReferenceTypeX> TYPE_X = new ArrayList<>();
    
        public TypeXListener(final SingleEmitter<List<ReferenceTypeX>> emitter) {
            super(emitter);
        }
    
        @Override
        public void onDataChange(final DataSnapshot dataSnapshot) {
            final GenericTypeIndicator<TypeXCsv> typeXTypeIndicator = new GenericTypeIndicator<TypeXCsv>() {
            };
    
            final Thread thread = new Thread(() -> {
                for (final DataSnapshot typeXDataSnapshot : dataSnapshot.getChildren()) {
                    final TypeXCsv typeXCsv = typeXDataSnapshot.getValue(typeXDataSnapshot);
                    final TypeX typeX = TypeXMapper.mapTypeX(typeXCsv);
    
                    TYPE_X_s.add(typeX);
                }
    
                emitter.onSuccess(TYPE_X_s);
            });
    
            thread.start();
    
        }
    }
    

    我认为我需要为每个 referenceTypeX() 方法并将其传递给 Observable.just()

    然后使用flatMap()并行化参考数据更新。

    我如何做到这一点?

    尤其是当我需要firebase实时回调在后台线程而不是主线程上运行时?

    我相信我的解决方案类似于

    Observable.just(referenceTypeA(), referenceTypeB(), referenceTypeC())
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.io())
                    .flatMap(?????????????)
                    .doOnComplete(recordTodaysDate())
                    .subscribe();
    
    1 回复  |  直到 7 年前
        1
  •  1
  •   ESala    7 年前

    您需要修改 referenceTypeX() 函数,使其不在函数中订阅:

    private Single<List<ReferenceTypeX>> referenceTypeX() {
        return firebaseService
                .getReferenceTypeX()
                .doAfterSuccess(this::referenceTypeX)
                .doOnError(this::processError)
                .subscribe(); // <-- remove this, just return
    }
    

    然后,您可以通过组合这些函数来构建主要的可观察对象:

    Single
        .zip(
            referenceTypeA().subscribeOn(Schedulers.io()),
            referenceTypeB().subscribeOn(Schedulers.io()),
            referenceTypeC().subscribeOn(Schedulers.io()),
            Function3<A,B,C,Result> { combineIntoResult(...) })
        .flatMap(???) // is this necessary?
        .map(recordTodaysDate())
        .subscribe();