我不得不将领域事务相关操作移入
doFinally()
块
到现在为止,一直都还不错。
Observable.fromIterable(form)
.concatMapIterable(Form::getFormVersions) // get form version list from single form object
.doOnSubscribe(disposable -> AppLogger.i(tag, "Form Versions download is subscribed")) // subscribe process to rx
.filter(formVersion -> formVersion.getJsonString() == null) // only get those versions who is not downloaded yet
.doOnNext(formVersion -> { // get formVersion object
AppLogger.i(tag, "download this form ---------> " + formVersion.getFormUrl());
AppLogger.i(tag, formVersion.getFormUrl());
String constructURL = formVersion.getFormUrl();
/* download form gzip process starts from here */
apiClient.getJsonByFormURL(constructURL)
.subscribeOn(Schedulers.io())
.subscribe(new SingleObserver<Response<ResponseBody>>() {
@Override
public void onSubscribe(Disposable disposable) {
AppLogger.i(tag, "gzip download ->" + "subscribed");
}
@Override
public void onSuccess(Response<ResponseBody> responseBodyResponse) {
AppLogger.i(tag, "gzip downloaded " + " success");
AppLogger.i(tag, "gzip downloaded ->" + responseBodyResponse.toString());
if (responseBodyResponse.code() == 401) {
//not authorized
return;
}
// You should determine it based on response header.
if (responseBodyResponse.body() != null)
try {
gzipBody = null;
gzis = responseBodyResponse.body().byteStream();
ungzippedResponse = new GZIPInputStream(gzis);
reader = new InputStreamReader(ungzippedResponse, charset);
writer = new java.io.StringWriter();
buffer = new char[10240];
for (int length = 0; (length = reader.read(buffer)) > 0; ) {
writer.write(buffer, 0, length);
}
gzipBody = writer.toString();
formVersion.setJsonString(gzipBody);
AppLogger.i(tag, "set json string ->" + gzipBody);
} catch (IOException e) {
AppLogger.e(tag, "gzip extract exception->" + e.getLocalizedMessage(), e);
}
}
@Override
public void onError(Throwable throwable) {
AppLogger.e(tag, "gzip failed->" + throwable.getMessage(), throwable);
}
});
})
.doOnTerminate(() -> AppLogger.i(tag, "Form Versions terminated"))
.doOnError(Throwable::printStackTrace)
.doFinally(() -> {
baseRealm = Realm.getDefaultInstance();
baseRealm.executeTransaction(realm ->
realm.copyToRealmOrUpdate(form));
})
.subscribe(
formVersion -> AppLogger.i(tag, "Form Versions download completed"),
throwable -> AppLogger.e(tag, throwable.getMessage(), throwable)
);