代码之家  ›  专栏  ›  技术社区  ›  Aks4125 Kims Sifers

RxJava OutOfMemory运行时崩溃无法创建线程

  •  1
  • Aks4125 Kims Sifers  · 技术社区  · 7 年前

    我正在使用Reformation2+RxJava+RxAndroid进行网络呼叫。

    我尝试过不同的 Schedulers 同样地 io() newThread() . 我知道使用 newThread() 但应用程序仍在关闭OOM的原因。

    脚本:

    • 筛选列表(筛选后65项)
    • 从URL下载gzip
    • 写入字节并存储在域中

    以上网络呼叫发生65次。

    代码:

    /*form version download method*/
    public void startFormDownload(List<Form> form, ApiClient apiClient) {
        Observable.fromIterable(form)
                .concatMapIterable(Form::getFormVersions) // get form version list from single form object
                .doOnSubscribe(disposable -> AppLogger.i(tag, "Form Versions download is subscribed")) // subscribe process to rx
                .filter(this::checkFormVersionToDownloadOrNot) // only get those versions who needed to downloaded
                .doOnNext(formVersion -> { // get formVersion object
    
                    AppLogger.i(tag, "download this form ---------> " + formVersion.getFormUrl());
                    AppLogger.i(tag, formVersion.getFormUrl());
                    String constructURL = formVersion.getFormUrl();
    
                    /* download form gzip process starts from here */
                    apiClient.getJsonByFormURL(constructURL)
                            .subscribeOn(Schedulers.newThread())
                            .observeOn(AndroidSchedulers.mainThread())
                            .subscribe(new SingleObserver<Response<ResponseBody>>() {
                                @Override
                                public void onSubscribe(Disposable disposable) {
                                    AppLogger.i(tag, "gzip download ->" + "subscribed");
                                }
    
                                @Override
                                public void onSuccess(Response<ResponseBody> responseBodyResponse) {
                                    AppLogger.i(tag, "gzip downloaded " + " success");
                                    AppLogger.i(tag, "gzip downloaded ->" + responseBodyResponse.toString());
    
                                    if (responseBodyResponse.code() == 401) {
                                        //not authorized
                                        return;
                                    }
    
                                    ByteArrayInputStream bais = null;
                                    if (responseBodyResponse.body() != null)
                                        try {
                                            bais = new ByteArrayInputStream(responseBodyResponse.body().bytes());
    
                                            GZIPInputStream gzis = new GZIPInputStream(bais);
                                            InputStreamReader reader = new InputStreamReader(gzis);
                                            BufferedReader in = new BufferedReader(reader);
    
                                            String readed;
                                            StringBuilder stringBuilder = new StringBuilder();
                                            while ((readed = in.readLine()) != null) {
                                                stringBuilder.append(readed);
                                            }
    
                                            baseRealm = Realm.getDefaultInstance();
                                            formVersion.setJsonString(stringBuilder.toString());
                                            baseRealm.executeTransaction(realm ->
                                                    baseRealm.copyToRealmOrUpdate(formVersion));
    
    
    
                                        } catch (IOException e) {
                                            AppLogger.e(tag, "gzip extract exception->" + e.getLocalizedMessage(), e);
                                        }
                                }
    
                                @Override
                                public void onError(Throwable throwable) {
                                    AppLogger.e(tag, "gzip failed->" + throwable.getMessage(), throwable);
                                }
                            });
    
    
                })
                .doOnTerminate(() -> AppLogger.i(tag, "Form Versions terminated"))
                .doOnError(Throwable::printStackTrace)
                .subscribe(
                        formVersion -> AppLogger.i(tag, "Form Versions download completed"),
                        throwable -> AppLogger.e(tag, throwable.getMessage(), throwable)
                );
    }
    

    可能存在以下情况 InputStream 引起OOM,但我不确定这里能做什么。

    PS.OOM只是偶尔发生,并不总是发生。

    碰撞报告

    java.lang.OutOfMemoryError: Could not allocate JNI Env
    at java.lang.Thread.nativeCreate(Native Method)
    at java.lang.Thread.start(Thread.java:730)
    at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:941)
    at java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1582)
    at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:313)
    at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:550)
    at java.util.concurrent.ScheduledThreadPoolExecutor.submit(ScheduledThreadPoolExecutor.java:651)
    at io.reactivex.internal.schedulers.NewThreadWorker.scheduleActual(NewThreadWorker.java:146)
    at io.reactivex.internal.schedulers.NewThreadWorker.schedule(NewThreadWorker.java:51)
    at io.reactivex.Scheduler.scheduleDirect(Scheduler.java:135)
    at io.reactivex.Scheduler.scheduleDirect(Scheduler.java:111)
    at io.reactivex.internal.operators.single.SingleSubscribeOn.subscribeActual(SingleSubscribeOn.java:37)
    at io.reactivex.Single.subscribe(Single.java:2779)
    at io.reactivex.internal.operators.single.SingleObserveOn.subscribeActual(SingleObserveOn.java:35)
    at io.reactivex.Single.subscribe(Single.java:2779)
    at com.visualogyx.app.fragments.VisualogyxBaseFragment.lambda$startFormDownload$10$VisualogyxBaseFragment(VisualogyxBaseFragment.java:623)
    at com.visualogyx.app.fragments.VisualogyxBaseFragment$$Lambda$9.accept(Unknown Source)
    at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:95)
    at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:52)
    at io.reactivex.internal.observers.DisposableLambdaObserver.onNext(DisposableLambdaObserver.java:58)
    at io.reactivex.internal.operators.observable.ObservableFlattenIterable$FlattenIterableObserver.onNext(ObservableFlattenIterable.java:111)
    at io.reactivex.internal.operators.observable.ObservableFromIterable$FromIterableDisposable.run(ObservableFromIterable.java:98)
    at io.reactivex.internal.operators.observable.ObservableFromIterable.subscribeActual(ObservableFromIterable.java:58)
    at io.reactivex.Observable.subscribe(Observable.java:10910)
    at io.reactivex.internal.operators.observable.ObservableFlattenIterable.subscribeActual(ObservableFlattenIterable.java:44)
    at io.reactivex.Observable.subscribe(Observable.java:10910)
    at io.reactivex.internal.operators.observable.ObservableDoOnLifecycle.subscribeActual(ObservableDoOnLifecycle.java:33)
    at io.reactivex.Observable.subscribe(Observable.java:10910)
    at io.reactivex.internal.operators.observable.ObservableFilter.subscribeActual(ObservableFilter.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10910)
    at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
    at io.reactivex.Observable.subscribe(Observable.java:10910)
    at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
    at io.reactivex.Observable.subscribe(Observable.java:10910)
    at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
    at io.reactivex.Observable.subscribe(Observable.java:10910)
    at io.reactivex.Observable.subscribe(Observable.java:10896)
    at io.reactivex.Observable.subscribe(Observable.java:10825)
    at com.visualogyx.app.fragments.VisualogyxBaseFragment.startFormDownload(VisualogyxBaseFragment.java:683)
    at com.visualogyx.app.fragments.VisualogyxBaseFragment.updateJobAndDownloadCheckListForm(VisualogyxBaseFragment.java:239)
    at com.visualogyx.app.fragments.VisualogyxBaseFragment.lambda$null$1$VisualogyxBaseFragment(VisualogyxBaseFragment.java:221)
    at com.visualogyx.app.fragments.VisualogyxBaseFragment$$Lambda$15.test(Unknown Source)
    at io.reactivex.internal.operators.observable.ObservableAllSingle$AllObserver.onNext(ObservableAllSingle.java:69)
    at io.reactivex.internal.operators.observable.ObservableFromIterable$FromIterableDisposable.run(ObservableFromIterable.java:98)
    at io.reactivex.internal.operators.observable.ObservableFromIterable.subscribeActual(ObservableFromIterable.java:58)
    at io.reactivex.Observable.subscribe(Observable.java:10910)
    at io.reactivex.internal.operators.observable.ObservableAllSingle.subscribeActual(ObservableAllSingle.java:34)
    at io.reactivex.Single.subscribe(Single.java:2779)
    at io.reactivex.Single.subscribe(Single.java:2765)
    at io.reactivex.Single.subscribe(Single.java:2686)
    at com.visualogyx.app.fragments.VisualogyxBaseFragment.lambda$storeAndRetrieveListFromDB$2$VisualogyxBaseFragment(VisualogyxBaseFragment.java:226)
    at com.visualogyx.app.fragments.VisualogyxBaseFragment$$Lambda$0.execute(Unknown Source)
    at io.realm.Realm.executeTransaction(Realm.java:1394)
    at com.visualogyx.app.fragments.VisualogyxBaseFragment.storeAndRetrieveListFromDB(VisualogyxBaseFragment.java:200)
    at com.visualogyx.app.fragments.HomeFragment$1.onSuccess(HomeFragment.java:59)
    at com.visualogyx.app.fragments.HomeFragment$1.onSuccess(HomeFragment.java:54)
    at com.visualogyx.app.controller.ApiCallback.onResponse(ApiCallback.java:36)
    at com.visualogyx.app.controller.ApiClient$11.onResponse(ApiClient.java:264)
    at retrofit2.ExecutorCallAdapterFactory$ExecutorCallbackCall$1$1.run(ExecutorCallAdapterFactory.java:70)
    at android.os.Handler.handleCallback(Handler.java:836)
    at android.os.Handler.dispatchMessage(Handler.java:103)
    at android.os.Looper.loop(Looper.java:203)
    at android.app.ActivityThread.main(ActivityThread.java:6293)
    at java.lang.reflect.Method.invoke(Native Method)
    at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:1065)
    at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:926)
    
    3 回复  |  直到 7 年前
        1
  •  2
  •   Krutik    7 年前

    您可以使用bytestream代替byte[]

    String body = null;
    String charset = "UTF-8"; // You should determine it based on response header.
    
    try (
        InputStream gzippedResponse = responseBodyResponse.body().byteStream();
        InputStream ungzippedResponse = new GZIPInputStream(gzippedResponse);
        Reader reader = new InputStreamReader(ungzippedResponse, charset);
        Writer writer = new StringWriter();
    ) {
        char[] buffer = new char[10240];
        for (int length = 0; (length = reader.read(buffer)) > 0;) {
            writer.write(buffer, 0, length);
        }
        body = writer.toString();
    }
    
        2
  •  0
  •   yu wang    7 年前

    也许您应该将日志添加到代码的不同部分,并找出内存足迹,这部分占用了内存。或者您可以使用AndroidStuido内存分配工具来监视内存分配。

        3
  •  0
  •   Aks4125 Kims Sifers    7 年前

    我不得不将领域事务相关操作移入 doFinally() 块 到现在为止,一直都还不错。

    Observable.fromIterable(form)
                .concatMapIterable(Form::getFormVersions) // get form version list from single form object
                .doOnSubscribe(disposable -> AppLogger.i(tag, "Form Versions download is subscribed")) // subscribe process to rx
                .filter(formVersion -> formVersion.getJsonString() == null) // only get those versions who is not downloaded yet
                .doOnNext(formVersion -> { // get formVersion object
    
                    AppLogger.i(tag, "download this form ---------> " + formVersion.getFormUrl());
                    AppLogger.i(tag, formVersion.getFormUrl());
                    String constructURL = formVersion.getFormUrl();
    
                    /* download form gzip process starts from here */
                    apiClient.getJsonByFormURL(constructURL)
                            .subscribeOn(Schedulers.io())
                            .subscribe(new SingleObserver<Response<ResponseBody>>() {
                                @Override
                                public void onSubscribe(Disposable disposable) {
                                    AppLogger.i(tag, "gzip download ->" + "subscribed");
    
                                }
    
                                @Override
                                public void onSuccess(Response<ResponseBody> responseBodyResponse) {
                                    AppLogger.i(tag, "gzip downloaded " + " success");
                                    AppLogger.i(tag, "gzip downloaded ->" + responseBodyResponse.toString());
    
                                    if (responseBodyResponse.code() == 401) {
                                        //not authorized
                                        return;
                                    }
                                    // You should determine it based on response header.
    
                                    if (responseBodyResponse.body() != null)
                                        try {
                                            gzipBody = null;
    
                                            gzis = responseBodyResponse.body().byteStream();
                                            ungzippedResponse = new GZIPInputStream(gzis);
                                            reader = new InputStreamReader(ungzippedResponse, charset);
                                            writer = new java.io.StringWriter();
    
                                            buffer = new char[10240];
                                            for (int length = 0; (length = reader.read(buffer)) > 0; ) {
                                                writer.write(buffer, 0, length);
                                            }
                                            gzipBody = writer.toString();
    
                                            formVersion.setJsonString(gzipBody);
                                            AppLogger.i(tag, "set json string ->" + gzipBody);
    
                                        } catch (IOException e) {
                                            AppLogger.e(tag, "gzip extract exception->" + e.getLocalizedMessage(), e);
                                        }
                                }
    
                                @Override
                                public void onError(Throwable throwable) {
                                    AppLogger.e(tag, "gzip failed->" + throwable.getMessage(), throwable);
                                }
                            });
    
    
                })
                .doOnTerminate(() -> AppLogger.i(tag, "Form Versions terminated"))
                .doOnError(Throwable::printStackTrace)
                .doFinally(() -> {
                    baseRealm = Realm.getDefaultInstance();
                    baseRealm.executeTransaction(realm ->
                            realm.copyToRealmOrUpdate(form));
                })
                .subscribe(
                        formVersion -> AppLogger.i(tag, "Form Versions download completed"),
                        throwable -> AppLogger.e(tag, throwable.getMessage(), throwable)
                );