Retrofit結(jié)合RxJava源碼分析

  • 為什么要使用RxJava處理Retrofit網(wǎng)絡(luò)請求

    Retrofit的調(diào)用過程我們前面文章已經(jīng)整理過了,對于Android來講呢载弄,絕大部分的網(wǎng)絡(luò)請求任務(wù)都是需要回調(diào)操作進行UI修改的浓恳,Retrofit網(wǎng)絡(luò)請求底層是socket通信琢感,因為網(wǎng)絡(luò)的不確定性所以是阻塞性的,那么網(wǎng)絡(luò)請求的工作就要放在子線程里去做糕殉,比如下面這行代碼:

    List<User> users = service.groupList(1001).execute().body();
    

    我們需要把這塊代碼放進子線程中去調(diào)用亩鬼,然后調(diào)用定義在主線程的方法去更新UI,這塊工作實現(xiàn)起來也不麻煩阿蝶,但是看起來沒有那么優(yōu)雅且需要自己維護線程間的通信雳锋。

    RxJava是一個優(yōu)秀的基于觀察者模式的異步調(diào)用框架,結(jié)合Retrofit可以完成優(yōu)雅的網(wǎng)絡(luò)請求回調(diào)任務(wù)羡洁。

  • 結(jié)合ViewModel中調(diào)用應(yīng)用場景

    實際開發(fā)中玷过,為了解除耦合,MVC模式下我們通常會把網(wǎng)絡(luò)請求的代碼放在ViewModel中筑煮,下面的代碼可以全部用kotlin或者java實現(xiàn)辛蚊,這里都用到了而已∨匚粒看下面的調(diào)用代碼:

    BaseRequest req = new BaseRequest();
    MyServiceImpl.getData(req, lifecycleProvider)
            .subscribe(new HttpObserver<MyResponse, BaseRequest>(req,
                    RxApiManager.HttpTaskId.TASK_ID_FLIGHT_DETAILS_FLIGHTWEATHERINFO, getIViewModelService()) {
                @Override
                public void onSuccess(@NonNull MyResponse response) {
                    dataBeanData.postValue(response);
                }
    
                @Override
                public void onFailure(@NotNull BaseResponse response) {
                    dataBeanData.postValue(null);
                }
          });
    

    getData內(nèi)容下面會講到嚼隘,這里先知道它返回一個Observable對象,然后通過subscribe方法和Observer聯(lián)系起來袒餐。這里的dataBeanData是一個MutableLiveData飞蛹,它會通過post方法把數(shù)據(jù)發(fā)送到主線程。

    看一下getData:

    public static Observable<MyResponse> getData(BaseRequest request, @NonNull LifecycleProvider lifecycle) {
        return RxObservable.INSTANCE.getObservable(new MyApi().getData(request), lifecycle);
    }
    

    RxObservable.INSTANCE.getObservable:

    fun <T> getObservable(@NonNull apiObservable: Observable<T>, @NonNull lifecycle: LifecycleProvider<*>): Observable<T> {
        return apiObservable
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .compose<T>(lifecycle.bindToLifecycle())
    }
    

    apiObservable是什么灸眼?通過new MyApi().getData(request)獲得:

    fun getData(request: BaseRequest): Observable<MyResponse> {
        return mApi.getData(getRequestBody(request))
    }
    

    mApi是:

    private var mApi: MyService = getApiService(MyService::class.java)
    

    getApiService是其父類中的方法:

    fun <T> getApiService(apiService: Class<T>): T {
        return HttpCall.getInstance().client.httpClient.create(apiService)
    }
    

    HttpCall是自定義的擴展工具類:

    public class HttpCall {
    
        private static HttpCall call;
        private HttpClient client;
    
        private HttpCall() {
            client = new HttpClient() {
                @Override
                protected String setBaseUrl() {
                    LogUtils.v(this, "client.getUrl() = "+HttpUrlUtil.getInstance().getUrl());
                    return HttpUrlUtil.getInstance().getUrl();
                }
            };
        }
    
        public static HttpCall getInstance() {
            if (call == null) {
                synchronized (HttpCall.class) {
                    if (call == null) {
                        call = new HttpCall();
                    }
                }
            }
            return call;
        }
    
        public HttpClient getClient() {
            return client;
        }
    }
    

    它的client是自定義的HttpClient:

    abstract class HttpClient {
    
        private lateinit var client: Retrofit
    
        val httpClient: Retrofit
            get() {
                if (!::client.isInitialized) {
                    val okHttpClientBuilder = OkHttpClient.Builder()
                            .retryOnConnectionFailure(true)
                            .addInterceptor(HeaderInterceptor())
                            .addInterceptor(LoggingInterceptor())
                            .addInterceptor(TransactionIdInterceptor())
                            .addInterceptor(EncodeRequestInterceptor())
                            .addInterceptor(DecodeResponseInterceptor())
                            .addInterceptor(DynamicTimeoutInterceptor())
                            .hostnameVerifier(SafeHostnameVerifier())
    
                    val sslSocketFactory = SslContextFactory.getSSLSocketFactory()
                    if (sslSocketFactory != null) {
                        okHttpClientBuilder.sslSocketFactory(sslSocketFactory, CustomTrustManager())
                    }
    
                    val okHttpClient = okHttpClientBuilder.build()
    
                    client = Retrofit.Builder()
                            .baseUrl(setBaseUrl())
                            .client(okHttpClient)
                            .addConverterFactory(GsonConverterFactory.create(CustomGson.buildGson()))
                            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                            .build()
                }
                return client
            }
    
        /**
         * base url 統(tǒng)一以/http
         *
         * @return String
         */
        protected abstract fun setBaseUrl(): String
    }
    

    可以看到卧檐,這個類封裝了創(chuàng)建Retrofit對象的工作。

    所以mApi其實就是Retrofit對象創(chuàng)建的MyService的代理對象焰宣,那么mApi.getData就是MyService里面的getData方法:

    @POST(Api.RELATIVE_URL)
    Observable<MyResponse> queryFlightList(@Body RequestBody requestBody);
    

    Api.RELATIVE_URL是一個相對的url地址霉囚,結(jié)合前面的baseUrl組合成完整的請求url。

    到這一步就相當(dāng)于Retrofit的invoke方法執(zhí)行完返回Call了匕积,那這里為什么是返回的Observable<MyResponse>呢?

    還記得我們前面分析Retrofit源碼的時候有一個addCallAdapterFactory的api嗎盈罐,這里添加的是RxJava2CallAdapterFactory.create()榜跌,返回了RxJava2CallAdapterFactory,這就是原因盅粪。

    這還得從Retrofit源碼說起钓葫,HttpServiceMethod的parseAnnotations中返回了一個CallAdapted,構(gòu)造它的方法入?yún)⒂幸粋€callAdapter票顾,這個callAdapter通過createCallAdapter創(chuàng)建:

    public CallAdapter<?, ?> nextCallAdapter(
        @Nullable CallAdapter.Factory skipPast, Type returnType, Annotation[] annotations) {
      Objects.requireNonNull(returnType, "returnType == null");
      Objects.requireNonNull(annotations, "annotations == null");
    
      int start = callAdapterFactories.indexOf(skipPast) + 1;
      for (int i = start, count = callAdapterFactories.size(); i < count; i++) {
        CallAdapter<?, ?> adapter = callAdapterFactories.get(i).get(returnType, annotations, this);
        if (adapter != null) {
          return adapter;
        }
      }
    
      StringBuilder builder =
          new StringBuilder("Could not locate call adapter for ").append(returnType).append(".\n");
      if (skipPast != null) {
        builder.append("  Skipped:");
        for (int i = 0; i < start; i++) {
          builder.append("\n   * ").append(callAdapterFactories.get(i).getClass().getName());
        }
        builder.append('\n');
      }
      builder.append("  Tried:");
      for (int i = start, count = callAdapterFactories.size(); i < count; i++) {
        builder.append("\n   * ").append(callAdapterFactories.get(i).getClass().getName());
      }
      throw new IllegalArgumentException(builder.toString());
    }
    

    callAdapterFactories.get(i).get(returnType, annotations, this)會得到一個CallAdapter础浮,構(gòu)建Retrofit的build方法中:

    List<CallAdapter.Factory> callAdapterFactories = new ArrayList<>(this.callAdapterFactories);
    callAdapterFactories.addAll(platform.defaultCallAdapterFactories(callbackExecutor));
    

    this.callAdapterFactories又是通過addCallAdapterFactory方法添加的元素:

    public Builder addCallAdapterFactory(CallAdapter.Factory factory) {
      callAdapterFactories.add(Objects.requireNonNull(factory, "factory == null"));
      return this;
    }
    

    所以callAdapterFactories中含有了RxJava2CallAdapterFactory,那么調(diào)用它的get方法就得到了一個CallAdapter:

    @Override
    public @Nullable CallAdapter<?, ?> get(
        Type returnType, Annotation[] annotations, Retrofit retrofit) {
      //getRawType的作用:譬如List[]奠骄、List<>等形式的類型會得到List.class
      Class<?> rawType = getRawType(returnType);
    
      if (rawType == Completable.class) {
        // Completable is not parameterized (which is what the rest of this method deals with) so it
        // can only be created with a single configuration.
        return new RxJava2CallAdapter(
            Void.class, scheduler, isAsync, false, true, false, false, false, true);
      }
    
      boolean isFlowable = rawType == Flowable.class;
      boolean isSingle = rawType == Single.class;
      boolean isMaybe = rawType == Maybe.class;
      if (rawType != Observable.class && !isFlowable && !isSingle && !isMaybe) {
        return null;
      }
    
      boolean isResult = false;
      boolean isBody = false;
      Type responseType;
      //如果rawType不是前面的五種則必須是ParameterizedType(T<>這種泛型類型)
      if (!(returnType instanceof ParameterizedType)) {
        String name =
            isFlowable ? "Flowable" : isSingle ? "Single" : isMaybe ? "Maybe" : "Observable";
        throw new IllegalStateException(
            name
                + " return type must be parameterized"
                + " as "
                + name
                + "<Foo> or "
                + name
                + "<? extends Foo>");
      }
      //得到泛型參數(shù)中的上界(<? extends String>會得到String的Type,如果不是WildcardType這中<?>通配符的類型的則直接返回returnType)
      //前兩個if-elseif直接按照Observable<Foo<Foo2>>這樣理解,如果取到的上界類型是Response或Result則必須上界類型也是ParameterizedType豆同,因為這兩個類的定義里有泛型要求(Response<T>、Result<T>)
      Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);
      Class<?> rawObservableType = getRawType(observableType);
      if (rawObservableType == Response.class) {
        if (!(observableType instanceof ParameterizedType)) {
          throw new IllegalStateException(
              "Response must be parameterized" + " as Response<Foo> or Response<? extends Foo>");
        }
        responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
      } else if (rawObservableType == Result.class) {
        if (!(observableType instanceof ParameterizedType)) {
          throw new IllegalStateException(
              "Result must be parameterized" + " as Result<Foo> or Result<? extends Foo>");
        }
        responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
        isResult = true;
      } else {
        responseType = observableType;
        isBody = true;
      }
    
      return new RxJava2CallAdapter(
          responseType, scheduler, isAsync, isResult, isBody, isFlowable, isSingle, isMaybe, false);
    }
    

    所以callAdapter.adapt方法實際上就是RxJava2CallAdapter的adapt方法:

    @Override
    public Object adapt(Call<R> call) {
      Observable<Response<R>> responseObservable =
          isAsync ? new CallEnqueueObservable<>(call) : new CallExecuteObservable<>(call);
    
      Observable<?> observable;
      if (isResult) {
        observable = new ResultObservable<>(responseObservable);
      } else if (isBody) {
        observable = new BodyObservable<>(responseObservable);
      } else {
        observable = responseObservable;
      }
    
      if (scheduler != null) {
        observable = observable.subscribeOn(scheduler);
      }
    
      if (isFlowable) {
        return observable.toFlowable(BackpressureStrategy.LATEST);
      }
      if (isSingle) {
        return observable.singleOrError();
      }
      if (isMaybe) {
        return observable.singleElement();
      }
      if (isCompletable) {
        return observable.ignoreElements();
      }
      return RxJavaPlugins.onAssembly(observable);
    }
    

    所以為什么上面的MyService的方法返回為什么是Observable含鳞。

    那現(xiàn)在RxObservable.INSTANCE.getObservable中的apiObservable就得到了影锈,再貼一遍:

      fun <T> getObservable(@NonNull apiObservable: Observable<T>, @NonNull lifecycle: LifecycleProvider<*>): Observable<T> {
          return apiObservable
                  .subscribeOn(Schedulers.io())
                  .observeOn(AndroidSchedulers.mainThread())
                  .compose<T>(lifecycle.bindToLifecycle())
      }
    

    subscribeOn(Schedulers.io())表示在非阻塞線程中進行網(wǎng)絡(luò)請求,observeOn(AndroidSchedulers.mainThread())表示在主線程中處理回調(diào)蝉绷,lifecycleProvider是Activity或Fragment的生命周期引用精居,compose將這個RxJava請求和生命周期綁定在一起實現(xiàn)其生命周期自動跟隨組件生命周期一致,這個知識會再寫一篇文章分析潜必。

    至此靴姿,MyServiceImpl.getData(req, lifecycleProvider)就走完了,接下來就是執(zhí)行網(wǎng)絡(luò)請求了磁滚。

  • 調(diào)用過程

    ViewModel中的subscribe方法是開始的起點佛吓,其參數(shù)HttpObserver是一個自定義的實現(xiàn)了Observer接口的類:

    @SuppressWarnings("all")
    abstract class HttpObserver<T : BaseResponse, K : BaseRequest> : Observer<T> {
    
        private var mIViewModelService: IViewModelService? = null
        private var showErrorMessage = true
        private var showLoading = true
        private var manuelFinishLoading = false
        private var needSuccessResultMsg = true
        private var showSuccessMessage = false
        private var httpTaskId: RxApiManager.HttpTaskId
        private var request: K
    
        /**
         * 默認(rèn)只有1個任務(wù)處理
         */
        private var finishCount = 1
    
        /**
         * @param httpTaskId         注意:HttpTaskId 非同一請求不可使用相同ID
         * @param mIViewModelService IViewModelService
         */
        constructor(request: K, httpTaskId: RxApiManager.HttpTaskId, mIViewModelService: IViewModelService?) {
            this.request = request
            this.mIViewModelService = mIViewModelService
            this.httpTaskId = httpTaskId
            LogUtils.e(this, "HttpObserver transactionId = "+request.transactionId)
        }
    
        /**
         * @param httpTaskId         注意:HttpTaskId 非同一請求不可使用相同ID
         * @param mIViewModelService IViewModelService
         */
        constructor(request: K, httpTaskId: RxApiManager.HttpTaskId, mIViewModelService: IViewModelService?,
                    showErrorMessage: Boolean = true) {
            this.mIViewModelService = mIViewModelService
            this.showErrorMessage = showErrorMessage
            this.httpTaskId = httpTaskId
            this.request = request
        }
    
        /**
         * @param httpTaskId         注意:HttpTaskId 非同一請求不可使用相同ID
         * @param mIViewModelService IViewModelService
         */
        constructor(request: K, httpTaskId: RxApiManager.HttpTaskId, mIViewModelService: IViewModelService?,
                    showErrorMessage: Boolean = true, showLoading: Boolean = true) {
            this.mIViewModelService = mIViewModelService
            this.showErrorMessage = showErrorMessage
            this.showLoading = showLoading
            this.httpTaskId = httpTaskId
            this.request = request
        }
    
        constructor(request: K, httpTaskId: RxApiManager.HttpTaskId, mIViewModelService: IViewModelService?,
                    showErrorMessage: Boolean = true, showLoading: Boolean = true,
                    showSuccessMessage: Boolean = false) {
            this.mIViewModelService = mIViewModelService
            this.showErrorMessage = showErrorMessage
            this.showSuccessMessage = showSuccessMessage
            this.showLoading = showLoading
            this.httpTaskId = httpTaskId
            this.request = request
        }
    
        /**
         * 手動調(diào)用結(jié)束loading
         *
         * @param manuelFinishLoading
         */
        fun setManuelFinishLoading(manuelFinishLoading: Boolean): HttpObserver<T, K> {
            this.manuelFinishLoading = manuelFinishLoading
            return this
        }
    
        /**
         * 多個任務(wù)情況下,根據(jù)任務(wù)數(shù)量結(jié)束loading
         *
         * @param finishCount
         * @return
         */
        fun setFinishCount(finishCount: Int): HttpObserver<T, K> {
            this.finishCount = finishCount
            return this
        }
    
        /**
         * 設(shè)置是否顯示loading
         *
         * @param isShowLoading
         * @return
         */
        fun setShowLoading(isShowLoading: Boolean): HttpObserver<T, K> {
            this.showLoading = isShowLoading
            return this
        }
    
        /**
         * 是否顯示錯誤信息
         *
         * @param showErrorMessage
         * @return
         */
        fun setShowErrorMsg(showErrorMessage: Boolean): HttpObserver<T, K> {
            this.showErrorMessage = showErrorMessage
            return this
        }
    
        /**
         * 是否默認(rèn)設(shè)置"成功"
         */
        fun setNeedSuccessResultMsg(needSuccessResultMsg: Boolean): HttpObserver<T, K> {
            this.needSuccessResultMsg = needSuccessResultMsg
            return this
        }
    
        override fun onComplete() {
            RxApiManager.get().cancel(this.httpTaskId)
            if (!manuelFinishLoading && showLoading && finishCount == 1) {
                mIViewModelService?.finishLoading()
            }
        }
    
        override fun onSubscribe(d: Disposable) {
            RxApiManager.get().add(httpTaskId, d)
            if (showLoading) {
                mIViewModelService?.preLoading()
            }
        }
    
        /**
         * 根據(jù)具體的Api 業(yè)務(wù)邏輯去重寫 onSuccess 方法垂攘!
         * http==0&&code==0 回調(diào)
         * 必須實現(xiàn)
         *
         * @param response response
         */
        abstract fun onSuccess(response: T)
    
        /**
         * 選擇性重寫
         * 如果只需要code跟msg重寫此方法
         */
        open fun onFailure(response: BaseResponse) {
    
        }
    
        /**
         * 選擇性重寫
         * 如果需要用到code维雇、msg、data重寫此方法
         */
        open fun onServerFailure(response: T) {
            onError(ServerException(response.resultCode, response.resultMsg))
        }
    
        /**
         * Gio埋點處理
         */
        open fun onGioHttpErrorInfoCollect(response: T){
            //兩個都為null的時候不埋
            if(response.resultCode.isEmpty() && response.resultMsg.isEmpty()){
                return
            }
            val gioHttpErrorInfo = GioHttpErrorInfo(response.resultCode, response.resultMsg, response.transactionId)
            CollectDataUtil.setGioEvent(CollectDataUtil.GioEventKey.ERRORMESSAGE, gioHttpErrorInfo)
        }
    
        /**
         * 處理公共邏輯
         * 包含顯示公共彈框方法和邏輯
         */
        open fun onServerHandle(response: T) {
            val hasPublicDialog = response.publicDialogResponse != null
                    && !response.publicDialogResponse?.content.isNullOrEmpty()
            val hasPublicDialogs = response.publicDialogResponses?.dialogs != null
            if (hasPublicDialog || hasPublicDialogs) {
                //顯示公共彈框使用dialog和對應(yīng)的值
                val publicDialogBean = PublicDialogBean()
                        .setPublicDialogResponse(response.publicDialogResponse)
                        .setPublicDialogResponses(response.publicDialogResponses)
                        .setHttpObserverPublicDialogListener(HttpObserverPublicDialogListener { _: PublicDialogBean, _: T, _: Byte ->
    
                        })
                ARouterManager.startDialogActivityPublic(response, publicDialogBean)
    
                //使用透明activity彈對話框(暫時使用延時跳轉(zhuǎn)晒他,防止跳轉(zhuǎn)頁面時候被覆蓋)
    //            Handler().postDelayed({
    //                ARouterManager.startDialogActivityPublic(response, publicDialogBean)
    //            }, 500)
    
    //            //使用普通dialog模式
    //            mIViewModelService?.showPublicDialog(publicDialogBean, response, HttpObserverPublicDialogListener {
    //                publicDialogBean : PublicDialogBean, response : T , clickType : Byte->
    //                mIViewModelService?.publicDialogExcute(publicDialogBean, response, clickType)
    //            })
            }
        }
    
        override fun onNext(t: T) {
            if (finishCount > 1) {
                finishCount--
            }
            when {
                HttpErrorCode.SUCCESS_CODE == t.resultCode -> {
                    CacheManager.saveResponse(httpTaskId, t)
                    if (TextUtils.isEmpty(t.resultMsg)) {
                        //成功情況若后臺沒有msg且自定義設(shè)置默認(rèn)吱型,則默認(rèn)"成功"
                        if (needSuccessResultMsg) {
                            t.resultMsg = BaseApp.getBaseApplication().getString(R.string.success)
                        }
                    }
                    if (showSuccessMessage) {
                        if (TextUtils.isEmpty(t.resultMsg)) {
                            t.resultMsg = BaseApp.getBaseApplication().getString(R.string.success)
                        }
                        mIViewModelService?.showSuccessMessage(t.resultMsg)
                    }
                    onSuccess(t)
                    onServerHandle(t)
                }
                HttpErrorCode.SESSION_OUT_CODE == t.resultCode -> {
                    //session驗證不通過,會報會話超時陨仅,客戶端與服務(wù)重新握手津滞,獲取session,并刪除本地用戶信息
                    AppDataManager.mAppSpService.cleanPublicDialog()
                    AppDataManager.mAppSpService.setLogout()
                    JPushLocalUtil.instance.cleanAliasAndTag()
                    AppDataManager.mAppSpService.appSessionId = ""
                    if (mIViewModelService != null) {
                        if (BuildConfig.DEBUG && TextUtils.isEmpty(t.resultMsg)) {
                            t.resultMsg = BaseApp.getAppString(R.string.http_session_error)
                        }
                        //GIO Http錯誤信息埋點
                        onGioHttpErrorInfoCollect(t)
                        mIViewModelService?.onFailure(true, t.resultMsg)
                    }
                    getTimeOut()
                }
                else -> {
                    //GIO Http錯誤信息埋點
                    onGioHttpErrorInfoCollect(t)
                    if (BuildConfig.DEBUG && TextUtils.isEmpty(t.resultMsg)) {
                        //服務(wù)器報錯時如果沒有錯誤信息灼伤,則默認(rèn)未知錯誤
                        t.resultMsg = BaseApp.getBaseApplication().getString(R.string.http_un_known_error)
                    }
                    onServerFailure(t)
                    onServerHandle(t)
                }
            }
    
            if (!manuelFinishLoading && showLoading && finishCount == 1) {
                mIViewModelService?.finishLoading()
            }
        }
    
        override fun onError(t: Throwable) {
            LogUtils.e(this, t)
            var code = HttpErrorCode.CONNECT_ERROR_CODE
            var errorMessage = ""
            if (t is NetErrorException) {
                errorMessage = t.errorMessage
            } else if (t is HttpException) {
                errorMessage = t.message ?: ""
            } else if (t is JsonParseException
                    || t is JSONException
                    || t is ParseException
                    || t is MalformedJsonException) {
                //解析數(shù)據(jù)錯誤
                errorMessage = BaseApp.getBaseApplication().getString(R.string.http_analytic_data_error)
            } else if (t is SocketTimeoutException) {
                errorMessage = BaseApp.getBaseApplication().getString(R.string.http_service_time_out_error)
            } else if (t is ConnectException) {
                errorMessage = BaseApp.getBaseApplication().getString(R.string.http_connect_error)
            } else if (t is UnknownHostException) {
                errorMessage = BaseApp.getBaseApplication().getString(R.string.http_analytic_server_error)
            } else if (t is UnknownServiceException) {
                errorMessage = BaseApp.getBaseApplication().getString(R.string.http_un_known_error)
            } else if (t is ServerException) {
                //服務(wù)器非0
                code = t.code
                errorMessage = t.message ?: ""
            }
            if (TextUtils.isEmpty(errorMessage)) {
                errorMessage = BaseApp.getBaseApplication().getString(R.string.http_un_known_error)
            }
            //showLoading為true時触徐,防止靜默加載接口finishLoading,影響其他同時調(diào)用的線程
            if (!manuelFinishLoading && showLoading) {
                mIViewModelService?.finishLoading()
            }
            mIViewModelService?.onFailure(showErrorMessage, errorMessage)
            val baseResponse = BaseResponse()
            baseResponse.resultCode = code
            baseResponse.resultMsg = errorMessage
            onFailure(baseResponse)
        }
    
        /**
         * session驗證不通過狐赡,會報會話超時撞鹉,客戶端與服務(wù)重新握手,獲取session
         * 完成后跳轉(zhuǎn)首頁
         */
        private fun getTimeOut() {
            LibApi().getTimeout(TimeoutRequest())
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(object : Observer<TimeoutResponse> {
                        override fun onSubscribe(d: Disposable) {
                            RxApiManager.get().add(RxApiManager.HttpTaskId.TASK_ID_GET_TIME_OUT, d)
                        }
    
                        override fun onNext(timeoutResponse: TimeoutResponse) {
                            if (HttpErrorCode.SUCCESS_CODE == timeoutResponse.resultCode && timeoutResponse.data != null) {
                                AppDataManager.mAppSpService.appSessionId = timeoutResponse.data?.appSessionId
                                NbsUtil.setNbsUserIdentifier()
                            }
                        }
    
                        override fun onError(e: Throwable) {
                            RxApiManager.get().cancel(RxApiManager.HttpTaskId.TASK_ID_GET_TIME_OUT)
                        }
    
                        override fun onComplete() {
                            mIViewModelService?.finishLoading()
    
                            if ("MainActivity" != ActivityStackManager.getManager().lastActivityClassName?.simpleName) {
                                ARouterManager.startToMainActivity(AppConstant.MAIN_TAB_INDEX_HOME)
                            }
                            RxApiManager.get().cancel(RxApiManager.HttpTaskId.TASK_ID_GET_TIME_OUT)
                        }
                    })
        }
    
    }
    

    這就是觀察者,通過subscribe方法和前面得到的的Observable綁定在一起鸟雏。根據(jù)RxJava的設(shè)計原理享郊,subscribe最終就是調(diào)用被觀察者的subscribeActual方法,這里的被觀察者要回到RxJava2CallAdapter的adapt方法中找孝鹊,根據(jù)RxJava2CallAdapterFactory的get方法得知我們這里的返回類型的rawType不是Flowable拂蝎、Maybe、Single惶室、Completable,其第一個泛型類型的rawType也不是Response和Result玄货,所以isBody是true皇钞,所以adapt返回的是BodyObservable,它持有了一個Observable<Response<R>>對象松捉,因為我們是通過RxJava2CallAdapterFactory.create()創(chuàng)建的而不是RxJava2CallAdapterFactory.createAsync()夹界,所以isAsync是false,所以這里的Observable<Response<R>>是CallExecuteObservable隘世,它的subscribeActual如下:

    @Override
    protected void subscribeActual(Observer<? super Response<T>> observer) {
      // Since Call is a one-shot type, clone it for each new observer.
      Call<T> call = originalCall.clone();
      CallDisposable disposable = new CallDisposable(call);
      observer.onSubscribe(disposable);
      if (disposable.isDisposed()) {
        return;
      }
    
      boolean terminated = false;
      try {
        Response<T> response = call.execute();
        if (!disposable.isDisposed()) {
          observer.onNext(response);
        }
        if (!disposable.isDisposed()) {
          terminated = true;
          observer.onComplete();
        }
      } catch (Throwable t) {
        Exceptions.throwIfFatal(t);
        if (terminated) {
          RxJavaPlugins.onError(t);
        } else if (!disposable.isDisposed()) {
          try {
            observer.onError(t);
          } catch (Throwable inner) {
            Exceptions.throwIfFatal(inner);
            RxJavaPlugins.onError(new CompositeException(t, inner));
          }
        }
      }
    }
    

    CallDisposable是:

    private static final class CallDisposable implements Disposable {
      private final Call<?> call;
      private volatile boolean disposed;
    
      CallDisposable(Call<?> call) {
        this.call = call;
      }
    
      @Override
      public void dispose() {
        disposed = true;
        call.cancel();
      }
    
      @Override
      public boolean isDisposed() {
        return disposed;
      }
    }
    

    observer.onSubscribe就是HttpObserver的onSubscribe:

    override fun onSubscribe(d: Disposable) {
        RxApiManager.get().add(httpTaskId, d)
        if (showLoading) {
            mIViewModelService?.preLoading()
        }
    }
    

    RxApiManager保存此請求的taskId可柿,如果需要的話彈出加載框。

    RxApiManager是用來管理taskId的:

    public class RxApiManager {
    
        private static RxApiManager sInstance = null;
    
        private ArrayMap<HttpTaskId, Disposable> maps = new ArrayMap<>();
        private ArrayMap<LogicTaskId, Disposable> logicMaps = new ArrayMap<>();
    
        public static RxApiManager get() {
            if (sInstance == null) {
                synchronized (RxApiManager.class) {
                    if (sInstance == null) {
                        sInstance = new RxApiManager();
                    }
                }
            }
            return sInstance;
        }
    
        public boolean hasHttpTaskById(HttpTaskId tag) {
            if (tag == null) {
                return false;
            }
            if (maps.containsKey(tag)) {
                return true;
            }
            return false;
        }
    
        public void add(HttpTaskId tag, Disposable disposable) {
            if (tag == null) {
                return;
            }
            if (maps.containsKey(tag)) {
                Disposable mapDisposable = maps.get(tag);
                if (mapDisposable != null && !mapDisposable.isDisposed()) {
                    mapDisposable.dispose();
                    maps.remove(tag);
                }
            }
            maps.put(tag, disposable);
        }
    
        public void add(LogicTaskId tag, Disposable disposable) {
            if (tag == null) {
                return;
            }
            if (logicMaps.containsKey(tag)) {
                Disposable mapDisposable = logicMaps.get(tag);
                if (mapDisposable != null && !mapDisposable.isDisposed()) {
                    mapDisposable.dispose();
                    logicMaps.remove(tag);
                }
            }
            logicMaps.put(tag, disposable);
        }
    
        public void remove(HttpTaskId tag) {
            if (!maps.isEmpty()) {
                maps.remove(tag);
            }
        }
    
        public void removeAll() {
            maps.clear();
            logicMaps.clear();
        }
    
        public void cancel(HttpTaskId tag) {
            if (tag == null) {
                return;
            }
            if (maps.isEmpty()) {
                return;
            }
            Disposable disposable = maps.get(tag);
            if (disposable == null) {
                return;
            }
            if (!disposable.isDisposed()) {
                disposable.dispose();
                maps.remove(tag);
            }
        }
    
        public void cancel(LogicTaskId tag) {
            if (tag == null) {
                return;
            }
            if (logicMaps.isEmpty()) {
                return;
            }
            Disposable disposable = logicMaps.get(tag);
            if (disposable == null) {
                return;
            }
            if (!disposable.isDisposed()) {
                disposable.dispose();
                logicMaps.remove(tag);
            }
        }
    
        public void cancel(HttpTaskId... tags) {
            HttpTaskId[] taskIds = tags;
            if (tags == null) {
                return;
            }
            if (maps.isEmpty()) {
                return;
            }
            for (HttpTaskId tag : taskIds) {
                Disposable disposable = maps.get(tag);
                if (disposable == null) {
                    continue;
                }
                if (!disposable.isDisposed()) {
                    disposable.dispose();
                    maps.remove(tag);
                }
            }
        }
    
        public void cancelAll() {
            if (maps.isEmpty()) {
                return;
            }
            Set<HttpTaskId> keys = maps.keySet();
            for (HttpTaskId apiKey : keys) {
                Disposable disposable = maps.get(apiKey);
                if (disposable == null) {
                    continue;
                }
                if (!disposable.isDisposed()) {
                    disposable.dispose();
                }
            }
            maps.clear();
        }
    
        public void cancelLogicAll() {
            if (logicMaps.isEmpty()) {
                return;
            }
            Set<LogicTaskId> keys = logicMaps.keySet();
            for (LogicTaskId apiKey : keys) {
                Disposable disposable = logicMaps.get(apiKey);
                if (disposable == null) {
                    continue;
                }
                if (!disposable.isDisposed()) {
                    disposable.dispose();
                }
            }
            logicMaps.clear();
        }
    }
    

    IViewModelService是構(gòu)造HttpObserver的時候傳入的丙者,因為Activity和Fragment的自定義基類實現(xiàn)了這個接口复斥,所有的Activity和Fragment都繼承自基類,所以mIViewModelService就是Activity或者Fragment的實例械媒,這個接口提供網(wǎng)絡(luò)請求過程中伴隨的顯示/隱藏加載框和提示信息的功能:

    public interface IViewModelService {
        /*** 預(yù)加載*/
        void preLoading();
        /**
         * 完成加載
         */
        void finishLoading();
        /**
         * 加載失敗
         * @param showErrorMessage 是否顯示錯誤信息
         * @param errorMessage     錯誤信息
         */
        void onFailure(boolean showErrorMessage, String errorMessage);
        /**
         * 成功msg
         *
         * @param message message
         */
        void showSuccessMessage(String message);
        /**
         * 顯示緊急公共彈框
         * @param publicDialogBean PublicDialogBean
         * @param listener HttpObserverPublicDialogListener
         */
        <T extends BaseResponse> void  showPublicDialog(PublicDialogBean publicDialogBean, BaseResponse response, HttpObserverPublicDialogListener<T> listener);
    
        /**
         * 公共彈框點擊處理回調(diào)
         * @param publicDialogBean
         * @param response
         * @param clickType
         */
        void publicDialogExcute(PublicDialogBean publicDialogBean, BaseResponse response, byte clickType);
    }
    

    接下來調(diào)用call.execute()目锭,call就是HttpServiceMethod的invoke中構(gòu)造的OkHttpCall,就調(diào)用到Retrofit的部分了纷捞。

    call.execute()返回response之后調(diào)用observer.onNext(response)方法痢虹,這個方法中會根據(jù)resultCode執(zhí)行不同操作,注意onSuccess()就是交給HttpObserver的匿名類對象覆寫的onSuccess主儡,這個方法是abstract方法所以必須進行覆寫奖唯,再然后就是我們前面說的在onSuccess中通過MutableLiveData的postValue方法通知界面組件更新UI。

    還需要注意的是onNext結(jié)束之后會調(diào)用observer.onComplete()方法糜值,這里的onComplete中會通過RxApiManager把當(dāng)前這個CallDisposable進行dispose操作丰捷,dispose中調(diào)用call.cancel()把socket給close()掉。

    如果出現(xiàn)異常會走到catch塊中調(diào)用observer.onError()進行錯誤情況處理寂汇。

  • 關(guān)于線程間通信

    目前為止瓢阴,我們知道了call和回調(diào)是怎么一起工作的,但是把call放進子線程健无,回調(diào)回到主線程的操作在哪里產(chǎn)生影響的呢荣恐?

    還記得RxObservable.INSTANCE.getObservable嗎?這個方法返回的Observable才是要執(zhí)行subscribe方法的,所以這個方法里設(shè)置的就是線程間切換的關(guān)鍵:

      fun <T> getObservable(@NonNull apiObservable: Observable<T>, @NonNull lifecycle: LifecycleProvider<*>): Observable<T> {
          return apiObservable
                  .subscribeOn(Schedulers.io())
                  .observeOn(AndroidSchedulers.mainThread())
                  .compose<T>(lifecycle.bindToLifecycle())
      }
    

    通過源碼叠穆,我們得到這塊代碼的調(diào)用鏈?zhǔn)?

    CallExecuteObservable->ObservableSubscribeOn->ObservableObserveOn->ObservableFromUnsafeSource

    返回的就是ObservableFromUnsafeSource少漆,也就是入口是調(diào)用它的subscribeActual方法:

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        source.subscribe(observer);
    }
    

    這里的source是lifecycle.bindToLifecycle()返回的對象,這里牽扯到和UI組件生命周期綁定防止內(nèi)存泄露的問題硼被,由于較復(fù)雜會再寫一篇文章分析它示损,這里先不研究,現(xiàn)在只需要知道返回的對象是ObservableTakeUntil嚷硫,所以這里就是調(diào)用它的subscribeActual方法:

    @Override
    public void subscribeActual(Observer<? super T> child) {
        TakeUntilMainObserver<T, U> parent = new TakeUntilMainObserver<T, U>(child);
        child.onSubscribe(parent);
    
        other.subscribe(parent.otherObserver);
        source.subscribe(parent);
    }
    

    other.subscribe(parent.otherObserver)是關(guān)于驗證生命周期的功能检访,source.subscribe(parent)就是我們現(xiàn)在要講的開始的地方。

    這個source就是compose的調(diào)用者ObservableObserveOn仔掸,它的的subscribeActual方法:

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
    
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    

    這里創(chuàng)建了一個ObserveOnObserver對象脆贵,把上面的HttpObserver添加進去,還有scheduler.createWorker()創(chuàng)建的一個Worker起暮,scheduler是AndroidSchedulers.mainThread()卖氨,也就是:

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });
    

    繼續(xù)往上返,到了ObservableSubscribeOn的subscribeActual:

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
    
        observer.onSubscribe(parent);
    
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    

    沿著observer.onSubscribe(parent)往下追溯會走到HttpObserver的onSubscribe方法里负懦,也就是顯示加載框等邏輯的地方筒捺,此時真正的網(wǎng)絡(luò)請求還沒開始,還是在主線程中操作纸厉,然后執(zhí)行scheduler.scheduleDirect(new SubscribeTask(parent))系吭,scheduler也就是Schedulers.io():

    @NonNull
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
    
    IO = RxJavaPlugins.initIoScheduler(new IOTask());
    
    static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
    }
    

    initIoScheduler:

    @NonNull
    public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
        ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
        Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
        if (f == null) {
            return callRequireNonNull(defaultScheduler);
        }
        return applyRequireNonNull(f, defaultScheduler);
    }
    

    callRequireNonNull:

    @NonNull
    static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
        try {
            return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
        } catch (Throwable ex) {
            throw ExceptionHelper.wrapOrThrow(ex);
        }
    }
    

    s.call也就是得到了IoHolder.DEFAULT:

    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }
    

    scheduleDirect的調(diào)用:

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
    
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
        DisposeTask task = new DisposeTask(decoratedRun, w);
    
        w.schedule(task, delay, unit);
    
        return task;
    }
    

    可以看到Worker通過createWorker()創(chuàng)建,也就是前面的IoScheduler的createWorker方法:

    @NonNull
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    

    接下來執(zhí)行schedule方法就是EventLoopWorker的schedule方法:

    @NonNull
    @Override
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (tasks.isDisposed()) {
            // don't schedule, we are unsubscribed
            return EmptyDisposable.INSTANCE;
        }
    
        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    }
    

    threadWorker是EventLoopWorker構(gòu)造方法中pool.get()所得颗品,pool又是createWorker構(gòu)造時傳入的pool.get()所得村斟,createWorker中的pool又是:

    public IoScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<CachedWorkerPool>(NONE);
        start();
    }
    

    調(diào)用get()得到NONE,NONE是:

    NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
    NONE.shutdown();
    

    所以CachedWorkerPool的get是:

    ThreadWorker get() {
        if (allWorkers.isDisposed()) {
            return SHUTDOWN_THREAD_WORKER;
        }
        while (!expiringWorkerQueue.isEmpty()) {
            ThreadWorker threadWorker = expiringWorkerQueue.poll();
            if (threadWorker != null) {
                return threadWorker;
            }
        }
    
        // No cached worker found, so create a new one.
        ThreadWorker w = new ThreadWorker(threadFactory);
        allWorkers.add(w);
        return w;
    }
    

    所以threadWorker就是ThreadWorker實例抛猫,那它的scheduleActual方法是:

    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }
    
        Future<?> f;
        try {
            //沒有延遲執(zhí)行蟆盹,直接submit
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }
    
        return sr;
    }
    

    executor通過SchedulerPoolFactory.create(threadFactory)創(chuàng)建,threadFactory不難找到:

    WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority)
    

    SchedulerPoolFactory.create:

    public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        tryPutIntoPool(PURGE_ENABLED, exec);
        return exec;
    }
    

    看到了我們熟悉的Executors.newScheduledThreadPool闺金,所以executor是ScheduledThreadPoolExecutor逾滥,其實再往下是Executors封裝線程池的部分了,我們這里簡單的過一下败匹,找到啟動線程的地方寨昙,它的submit方法:

    public <T> Future<T> submit(Callable<T> task) {
        return schedule(task, 0, NANOSECONDS);
    }
    

    最終都會進入schedule方法:

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit),
                                       sequencer.getAndIncrement()));
        delayedExecute(t);
        return t;
    }
    
    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }
    

    這里的ensurePrestart會調(diào)用其父類的方法:

    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }
    

    addWorker代碼:

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
    
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    

    t是w.thread,w是new Worker(firstTask)掀亩,w.thread是在Worker的構(gòu)造方法里創(chuàng)建的:

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    

    前面已知threadFactory是RxThreadFactory:

     @Override
        public Thread newThread(Runnable r) {
            StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
    
    //        if (CREATE_TRACE) {
    //            nameBuilder.append("\r\n");
    //            for (StackTraceElement se :Thread.currentThread().getStackTrace()) {
    //                String s = se.toString();
    //                if (s.contains("sun.reflect.")) {
    //                    continue;
    //                }
    //                if (s.contains("junit.runners.")) {
    //                    continue;
    //                }
    //                if (s.contains("org.gradle.internal.")) {
    //                    continue;
    //                }
    //                if (s.contains("java.util.concurrent.ThreadPoolExecutor")) {
    //                    continue;
    //                }
    //                nameBuilder.append(s).append("\r\n");
    //            }
    //        }
    
            String name = nameBuilder.toString();
            Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
            t.setPriority(priority);
            t.setDaemon(true);
            return t;
        }
    

    返回了一個RxCustomThread舔哪,設(shè)置daemon為true(守護線程,后臺一直運行)槽棍。

    t.start就是RxCustomThread的start:

    public synchronized void start() {
        /**
         * This method is not invoked for the main method thread or "system"
         * group threads created/set up by the VM. Any new functionality added
         * to this method in the future may have to also be added to the VM.
         *
         * A zero status value corresponds to state "NEW".
         */
        // Android-changed: throw if 'started' is true
        if (threadStatus != 0 || started)
            throw new IllegalThreadStateException();
    
        /* Notify the group that this thread is about to be started
         * so that it can be added to the group's list of threads
         * and the group's unstarted count can be decremented. */
        group.add(this);
    
        started = false;
        try {
            nativeCreate(this, stackSize, daemon);
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
            }
        }
    }
    

    調(diào)用nativeCreate方法開啟線程捉蚤,這樣一來就會運行Runnable對象的run方法抬驴,最終的Runnable對象是ObservableSubscribeOn的subscribeActual中scheduler.scheduleDirect(new SubscribeTask(parent))傳入的SubscribeTask對象:

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
    
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
    
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
    

    source.subscribe(parent)!source就是最頂層的CallExecuteObservable缆巧,調(diào)用它的subscribrActual()布持,這就和我們前面對起來了,然后在CallExecuteObservable的subscribrActual中調(diào)用的onNext陕悬、onComplete和onError等方法就會沿著downstream一步步下放题暖,在ObservableObserveOn中的ObserveOnObserver中的schedule方法中:

    void schedule() {
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }
    

    前面說過這個worker是通過主線程創(chuàng)建的,和前面的Schedulers.io()一樣的原理捉超,只不過這個Worker是運行在主線程中的胧卤,schedule參數(shù)傳的是this,因為ObservableObserveOn實現(xiàn)了Runnable接口:

    @Override
    public void run() {
        if (outputFused) {
            drainFused();
        } else {
            drainNormal();
        }
    }
    

    因為不滿足條件所以沒調(diào)用requestFusion拼岳,outputFused會一直是false枝誊,所以一直會執(zhí)行drainNormal:

    void drainNormal() {
        int missed = 1;
    
        final SimpleQueue<T> q = queue;
        final Observer<? super T> a = downstream;
    
        for (;;) {
            if (checkTerminated(done, q.isEmpty(), a)) {
                return;
            }
    
            for (;;) {
                boolean d = done;
                T v;
    
                try {
                    v = q.poll();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    disposed = true;
                    upstream.dispose();
                    q.clear();
                    a.onError(ex);
                    worker.dispose();
                    return;
                }
                boolean empty = v == null;
    
                if (checkTerminated(d, empty, a)) {
                    return;
                }
    
                if (empty) {
                    break;
                }
    
                a.onNext(v);
            }
    
            missed = addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
    

    至此,整個子線程請求主線程回調(diào)的大概過程就完成了裂问。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市牛柒,隨后出現(xiàn)的幾起案子堪簿,更是在濱河造成了極大的恐慌,老刑警劉巖皮壁,帶你破解...
    沈念sama閱讀 210,914評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件椭更,死亡現(xiàn)場離奇詭異,居然都是意外死亡蛾魄,警方通過查閱死者的電腦和手機虑瀑,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,935評論 2 383
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來滴须,“玉大人舌狗,你說我怎么就攤上這事∪铀” “怎么了痛侍?”我有些...
    開封第一講書人閱讀 156,531評論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長魔市。 經(jīng)常有香客問我主届,道長,這世上最難降的妖魔是什么待德? 我笑而不...
    開封第一講書人閱讀 56,309評論 1 282
  • 正文 為了忘掉前任君丁,我火速辦了婚禮,結(jié)果婚禮上将宪,老公的妹妹穿的比我還像新娘绘闷。我一直安慰自己橡庞,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,381評論 5 384
  • 文/花漫 我一把揭開白布簸喂。 她就那樣靜靜地躺著毙死,像睡著了一般。 火紅的嫁衣襯著肌膚如雪喻鳄。 梳的紋絲不亂的頭發(fā)上扼倘,一...
    開封第一講書人閱讀 49,730評論 1 289
  • 那天,我揣著相機與錄音除呵,去河邊找鬼再菊。 笑死,一個胖子當(dāng)著我的面吹牛颜曾,可吹牛的內(nèi)容都是我干的纠拔。 我是一名探鬼主播,決...
    沈念sama閱讀 38,882評論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼泛豪,長吁一口氣:“原來是場噩夢啊……” “哼稠诲!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起诡曙,我...
    開封第一講書人閱讀 37,643評論 0 266
  • 序言:老撾萬榮一對情侶失蹤臀叙,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后价卤,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體劝萤,經(jīng)...
    沈念sama閱讀 44,095評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,448評論 2 325
  • 正文 我和宋清朗相戀三年慎璧,在試婚紗的時候發(fā)現(xiàn)自己被綠了床嫌。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,566評論 1 339
  • 序言:一個原本活蹦亂跳的男人離奇死亡胸私,死狀恐怖厌处,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情岁疼,我是刑警寧澤嘱蛋,帶...
    沈念sama閱讀 34,253評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站五续,受9級特大地震影響洒敏,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜疙驾,卻給世界環(huán)境...
    茶點故事閱讀 39,829評論 3 312
  • 文/蒙蒙 一凶伙、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧它碎,春花似錦函荣、人聲如沸显押。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,715評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽乘碑。三九已至,卻和暖如春金拒,著一層夾襖步出監(jiān)牢的瞬間兽肤,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,945評論 1 264
  • 我被黑心中介騙來泰國打工绪抛, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留资铡,地道東北人。 一個月前我還...
    沈念sama閱讀 46,248評論 2 360
  • 正文 我出身青樓幢码,卻偏偏與公主長得像笤休,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子症副,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,440評論 2 348

推薦閱讀更多精彩內(nèi)容