代码之家  ›  专栏  ›  技术社区  ›  Jaume Colom Ferrer

如何在Android RxJava Observable中顺序运行2个查询?

  •  15
  • Jaume Colom Ferrer  · 技术社区  · 10 年前

    我想运行两个异步任务,一个接着一个(按顺序)。我读过一些关于ZIP或Flat的文章,但我不太理解。。。

    我的目的是从本地SQLite加载数据,完成后,它将查询调用到服务器(远程)。

    有人能给我一个建议吗?

    这是我正在使用的RxJava Observable框架(单个任务):

        // RxJava Observable
        Observable.OnSubscribe<Object> onSubscribe = subscriber -> {
            try {
    
                // Do the query or long task...
    
                subscriber.onNext(object);
                subscriber.onCompleted();
            } catch (Exception e) {
                subscriber.onError(e);
            }
        };
    
        // RxJava Observer
        Subscriber<Object> subscriber = new Subscriber<Object>() {
            @Override
            public void onCompleted() {
                // Handle the completion
            }
    
            @Override
            public void onError(Throwable e) {
                // Handle the error
            }
    
            @Override
            public void onNext(Object result) {
    
              // Handle the result
    
            }
        };
    
        Observable.create(onSubscribe)
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(subscriber);
    
    3 回复  |  直到 10 年前
        1
  •  10
  •   Lukas Batteau    10 年前

    操作人员将 merge 看见 http://reactivex.io/documentation/operators/merge.html .

    我的方法是创建两个可观测值,比如 observableLocal observableRemote ,并合并输出:

    Observable<Object> observableLocal = Observable.create(...)
    Observable<Object> observableRemote = Observable.create(...)
    Observable.merge(observableLocal, observableRemote)
              .subscribe(subscriber)
    

    如果要确保远程在本地之后运行,可以使用 concat .

        2
  •  10
  •   ajplumlee33    9 年前

    如果查询不相互依赖,卢卡斯·巴托的答案是最好的。但是,如果需要从本地SQLite查询获取数据 之前 运行远程查询(例如,您需要远程查询参数或头的数据),然后可以从本地可观察值开始,然后将其平面映射以组合两个可观察值 之后 您可以从本地查询中获取数据:

       Observable<Object> localObservable = Observable.create(...)
       localObservable.flatMap(object -> 
       {
           return Observable.zip(Observable.just(object), *create remote observable here*, 
               (localObservable, remoteObservable) -> 
               {
                   *combining function*
               });
       }).subscribe(subscriber);
    

    flatmap函数允许您将局部可观测值转换为局部&通过zip函数进行远程观察。重申一下,这里的优点是这两个可观测值是连续的,zip函数只在两个相关可观测值运行后运行。

    此外,zip函数将允许您组合可观察对象,即使底层对象具有不同的类型。在这种情况下,您将提供一个组合函数作为第三个参数。如果基础数据是相同的类型,请用merge替换zip函数。

        3
  •  5
  •   Phong Nguyen    8 年前

    你可以试试我的解决方案,有几种方法可以解决你的问题。
    为了确保它正常工作,我创建了一个独立的工作示例,并使用此API进行测试: https://jsonplaceholder.typicode.com/posts/1

    private final Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("https://jsonplaceholder.typicode.com/posts/")
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .build();
    
        private final RestPostsService restPostsService = retrofit.create(RestPostsService.class);
    
        private Observable<Posts> getPostById(int id) {
            return restPostsService.getPostsById(id);
        }
    

    RestPostService.java

    package app.com.rxretrofit;
    
    import retrofit2.http.GET;
    import retrofit2.http.Path;
    import rx.Observable;
    
    /**
     * -> Created by Think-Twice-Code-Once on 11/26/2017.
     */
    
    public interface RestPostsService {
    
        @GET("{id}")
        Observable<Posts> getPostsById(@Path("id") int id);
    }
    

    解决方案1 : 当按顺序调用多个任务时使用,前一个任务的结果始终是下一任务的输入

    getPostById(1)
                    .concatMap(posts1 -> {
                        //get post 1 success
                        return getPostById(posts1.getId() + 1);
                    })
                    .concatMap(posts2 -> {
                        //get post 2 success
                        return getPostById(posts2.getId() + 1);
                    })
                    .concatMap(posts3 -> {
                        //get post 3success
                        return getPostById(posts3.getId() + 1);
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(finalPosts -> {
                        //get post 4 success
                        Toast.makeText(this, "Final result: " + finalPosts.getId() + " - " + finalPosts.getTitle(),
                                Toast.LENGTH_LONG).show();
                    });
    

    解决方案2 : 当按顺序调用多个任务时使用,以前任务的所有结果都是最终任务的输入(例如:上传头像图像和封面图像后,调用api创建具有这些图像URL的新用户) :

    Observable
                    .zip(getPostById(1), getPostById(2), getPostById(3), (posts1, posts2, posts3) -> {
                        //this method defines how to zip all separate results into one
                        return posts1.getId() + posts2.getId() + posts3.getId();
                    })
                    .flatMap(finalPostId -> {
                        //after get all first three posts, get the final posts,
                        // the final posts-id is sum of these posts-id
                        return getPostById(finalPostId);
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(finalPosts -> {
                        Toast.makeText(this, "Final posts: " + finalPosts.getId() + " - " + finalPosts.getTitle(),
                                Toast.LENGTH_SHORT).show();
                    });
    

    Android清单

     <uses-permission android:name="android.permission.INTERNET"/>
    

    根构建渐变

    // Top-level build file where you can add configuration options common to all sub-projects/modules.
    
    buildscript {
        repositories {
            jcenter()
        }
        dependencies {
            classpath 'com.android.tools.build:gradle:2.3.3'
            classpath 'me.tatarka:gradle-retrolambda:3.2.0'
            classpath 'me.tatarka.retrolambda.projectlombok:lombok.ast:0.2.3.a2'
    
            // NOTE: Do not place your application dependencies here; they belong
            // in the individual module build.gradle files
        }
    
        // Exclude the version that the android plugin depends on.
        configurations.classpath.exclude group: 'com.android.tools.external.lombok'
    }
    
    allprojects {
        repositories {
            jcenter()
        }
    }
    
    task clean(type: Delete) {
        delete rootProject.buildDir
    }
    

    应用程序/内置gradle

    apply plugin: 'me.tatarka.retrolambda'
    apply plugin: 'com.android.application'
    
    android {
        compileSdkVersion 26
        buildToolsVersion "26.0.1"
        defaultConfig {
            applicationId "app.com.rxretrofit"
            minSdkVersion 15
            targetSdkVersion 26
            versionCode 1
            versionName "1.0"
            testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner"
        }
        buildTypes {
            release {
                minifyEnabled false
                proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
            }
        }
    
        compileOptions {
            sourceCompatibility JavaVersion.VERSION_1_8
            targetCompatibility JavaVersion.VERSION_1_8
        }
    }
    
    dependencies {
        compile fileTree(dir: 'libs', include: ['*.jar'])
        androidTestCompile('com.android.support.test.espresso:espresso-core:2.2.2', {
            exclude group: 'com.android.support', module: 'support-annotations'
        })
        compile 'com.android.support:appcompat-v7:26.+'
        compile 'com.android.support.constraint:constraint-layout:1.0.2'
        testCompile 'junit:junit:4.12'
    
        provided 'org.projectlombok:lombok:1.16.6'
        compile 'com.squareup.retrofit2:retrofit:2.3.0'
        compile 'com.squareup.retrofit2:converter-gson:2.3.0'
        compile 'com.squareup.retrofit2:adapter-rxjava:2.3.0'
        compile 'io.reactivex:rxandroid:1.2.1'
    }
    

    模型

    package app.com.rxretrofit;
    import com.google.gson.annotations.SerializedName;
    /**
     * -> Created by Think-Twice-Code-Once on 11/26/2017.
     */
    public class Posts {
        @SerializedName("userId")
        private int userId;
        @SerializedName("id")
        private int id;
        @SerializedName("title")
        private String title;
        @SerializedName("body")
        private String body;
        public int getUserId() {
            return userId;
        }
        public void setUserId(int userId) {
            this.userId = userId;
        }
        public int getId() {
            return id;
        }
        public void setId(int id) {
            this.id = id;
        }
        public String getTitle() {
            return title;
        }
        public void setTitle(String title) {
            this.title = title;
        }
        public String getBody() {
            return body;
        }
        public void setBody(String body) {
            this.body = body;
        }
    }
    

    顺便说一下,使用 Rx+改装+匕首+MVP模式 是一个伟大的联合体。