-
為什么要使用RxJava處理Retrofit網(wǎng)絡(luò)請求
Retrofit的調(diào)用過程我們前面文章已經(jīng)整理過了,對于Android來講呢载弄,絕大部分的網(wǎng)絡(luò)請求任務(wù)都是需要回調(diào)操作進行UI修改的浓恳,Retrofit網(wǎng)絡(luò)請求底層是socket通信琢感,因為網(wǎng)絡(luò)的不確定性所以是阻塞性的,那么網(wǎng)絡(luò)請求的工作就要放在子線程里去做糕殉,比如下面這行代碼:
List<User> users = service.groupList(1001).execute().body();
我們需要把這塊代碼放進子線程中去調(diào)用亩鬼,然后調(diào)用定義在主線程的方法去更新UI,這塊工作實現(xiàn)起來也不麻煩阿蝶,但是看起來沒有那么優(yōu)雅且需要自己維護線程間的通信雳锋。
RxJava是一個優(yōu)秀的基于觀察者模式的異步調(diào)用框架,結(jié)合Retrofit可以完成優(yōu)雅的網(wǎng)絡(luò)請求回調(diào)任務(wù)羡洁。
-
結(jié)合ViewModel中調(diào)用應(yīng)用場景
實際開發(fā)中玷过,為了解除耦合,MVC模式下我們通常會把網(wǎng)絡(luò)請求的代碼放在ViewModel中筑煮,下面的代碼可以全部用kotlin或者java實現(xiàn)辛蚊,這里都用到了而已∨匚粒看下面的調(diào)用代碼:
BaseRequest req = new BaseRequest(); MyServiceImpl.getData(req, lifecycleProvider) .subscribe(new HttpObserver<MyResponse, BaseRequest>(req, RxApiManager.HttpTaskId.TASK_ID_FLIGHT_DETAILS_FLIGHTWEATHERINFO, getIViewModelService()) { @Override public void onSuccess(@NonNull MyResponse response) { dataBeanData.postValue(response); } @Override public void onFailure(@NotNull BaseResponse response) { dataBeanData.postValue(null); } });
getData內(nèi)容下面會講到嚼隘,這里先知道它返回一個Observable對象,然后通過subscribe方法和Observer聯(lián)系起來袒餐。這里的dataBeanData是一個MutableLiveData飞蛹,它會通過post方法把數(shù)據(jù)發(fā)送到主線程。
看一下getData:
public static Observable<MyResponse> getData(BaseRequest request, @NonNull LifecycleProvider lifecycle) { return RxObservable.INSTANCE.getObservable(new MyApi().getData(request), lifecycle); }
RxObservable.INSTANCE.getObservable:
fun <T> getObservable(@NonNull apiObservable: Observable<T>, @NonNull lifecycle: LifecycleProvider<*>): Observable<T> { return apiObservable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .compose<T>(lifecycle.bindToLifecycle()) }
apiObservable是什么灸眼?通過new MyApi().getData(request)獲得:
fun getData(request: BaseRequest): Observable<MyResponse> { return mApi.getData(getRequestBody(request)) }
mApi是:
private var mApi: MyService = getApiService(MyService::class.java)
getApiService是其父類中的方法:
fun <T> getApiService(apiService: Class<T>): T { return HttpCall.getInstance().client.httpClient.create(apiService) }
HttpCall是自定義的擴展工具類:
public class HttpCall { private static HttpCall call; private HttpClient client; private HttpCall() { client = new HttpClient() { @Override protected String setBaseUrl() { LogUtils.v(this, "client.getUrl() = "+HttpUrlUtil.getInstance().getUrl()); return HttpUrlUtil.getInstance().getUrl(); } }; } public static HttpCall getInstance() { if (call == null) { synchronized (HttpCall.class) { if (call == null) { call = new HttpCall(); } } } return call; } public HttpClient getClient() { return client; } }
它的client是自定義的HttpClient:
abstract class HttpClient { private lateinit var client: Retrofit val httpClient: Retrofit get() { if (!::client.isInitialized) { val okHttpClientBuilder = OkHttpClient.Builder() .retryOnConnectionFailure(true) .addInterceptor(HeaderInterceptor()) .addInterceptor(LoggingInterceptor()) .addInterceptor(TransactionIdInterceptor()) .addInterceptor(EncodeRequestInterceptor()) .addInterceptor(DecodeResponseInterceptor()) .addInterceptor(DynamicTimeoutInterceptor()) .hostnameVerifier(SafeHostnameVerifier()) val sslSocketFactory = SslContextFactory.getSSLSocketFactory() if (sslSocketFactory != null) { okHttpClientBuilder.sslSocketFactory(sslSocketFactory, CustomTrustManager()) } val okHttpClient = okHttpClientBuilder.build() client = Retrofit.Builder() .baseUrl(setBaseUrl()) .client(okHttpClient) .addConverterFactory(GsonConverterFactory.create(CustomGson.buildGson())) .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) .build() } return client } /** * base url 統(tǒng)一以/http * * @return String */ protected abstract fun setBaseUrl(): String }
可以看到卧檐,這個類封裝了創(chuàng)建Retrofit對象的工作。
所以mApi其實就是Retrofit對象創(chuàng)建的MyService的代理對象焰宣,那么mApi.getData就是MyService里面的getData方法:
@POST(Api.RELATIVE_URL) Observable<MyResponse> queryFlightList(@Body RequestBody requestBody);
Api.RELATIVE_URL是一個相對的url地址霉囚,結(jié)合前面的baseUrl組合成完整的請求url。
到這一步就相當(dāng)于Retrofit的invoke方法執(zhí)行完返回Call了匕积,那這里為什么是返回的Observable<MyResponse>呢?
還記得我們前面分析Retrofit源碼的時候有一個addCallAdapterFactory的api嗎盈罐,這里添加的是RxJava2CallAdapterFactory.create()榜跌,返回了RxJava2CallAdapterFactory,這就是原因盅粪。
這還得從Retrofit源碼說起钓葫,HttpServiceMethod的parseAnnotations中返回了一個CallAdapted,構(gòu)造它的方法入?yún)⒂幸粋€callAdapter票顾,這個callAdapter通過createCallAdapter創(chuàng)建:
public CallAdapter<?, ?> nextCallAdapter( @Nullable CallAdapter.Factory skipPast, Type returnType, Annotation[] annotations) { Objects.requireNonNull(returnType, "returnType == null"); Objects.requireNonNull(annotations, "annotations == null"); int start = callAdapterFactories.indexOf(skipPast) + 1; for (int i = start, count = callAdapterFactories.size(); i < count; i++) { CallAdapter<?, ?> adapter = callAdapterFactories.get(i).get(returnType, annotations, this); if (adapter != null) { return adapter; } } StringBuilder builder = new StringBuilder("Could not locate call adapter for ").append(returnType).append(".\n"); if (skipPast != null) { builder.append(" Skipped:"); for (int i = 0; i < start; i++) { builder.append("\n * ").append(callAdapterFactories.get(i).getClass().getName()); } builder.append('\n'); } builder.append(" Tried:"); for (int i = start, count = callAdapterFactories.size(); i < count; i++) { builder.append("\n * ").append(callAdapterFactories.get(i).getClass().getName()); } throw new IllegalArgumentException(builder.toString()); }
callAdapterFactories.get(i).get(returnType, annotations, this)會得到一個CallAdapter础浮,構(gòu)建Retrofit的build方法中:
List<CallAdapter.Factory> callAdapterFactories = new ArrayList<>(this.callAdapterFactories); callAdapterFactories.addAll(platform.defaultCallAdapterFactories(callbackExecutor));
this.callAdapterFactories又是通過addCallAdapterFactory方法添加的元素:
public Builder addCallAdapterFactory(CallAdapter.Factory factory) { callAdapterFactories.add(Objects.requireNonNull(factory, "factory == null")); return this; }
所以callAdapterFactories中含有了RxJava2CallAdapterFactory,那么調(diào)用它的get方法就得到了一個CallAdapter:
@Override public @Nullable CallAdapter<?, ?> get( Type returnType, Annotation[] annotations, Retrofit retrofit) { //getRawType的作用:譬如List[]奠骄、List<>等形式的類型會得到List.class Class<?> rawType = getRawType(returnType); if (rawType == Completable.class) { // Completable is not parameterized (which is what the rest of this method deals with) so it // can only be created with a single configuration. return new RxJava2CallAdapter( Void.class, scheduler, isAsync, false, true, false, false, false, true); } boolean isFlowable = rawType == Flowable.class; boolean isSingle = rawType == Single.class; boolean isMaybe = rawType == Maybe.class; if (rawType != Observable.class && !isFlowable && !isSingle && !isMaybe) { return null; } boolean isResult = false; boolean isBody = false; Type responseType; //如果rawType不是前面的五種則必須是ParameterizedType(T<>這種泛型類型) if (!(returnType instanceof ParameterizedType)) { String name = isFlowable ? "Flowable" : isSingle ? "Single" : isMaybe ? "Maybe" : "Observable"; throw new IllegalStateException( name + " return type must be parameterized" + " as " + name + "<Foo> or " + name + "<? extends Foo>"); } //得到泛型參數(shù)中的上界(<? extends String>會得到String的Type,如果不是WildcardType這中<?>通配符的類型的則直接返回returnType) //前兩個if-elseif直接按照Observable<Foo<Foo2>>這樣理解,如果取到的上界類型是Response或Result則必須上界類型也是ParameterizedType豆同,因為這兩個類的定義里有泛型要求(Response<T>、Result<T>) Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType); Class<?> rawObservableType = getRawType(observableType); if (rawObservableType == Response.class) { if (!(observableType instanceof ParameterizedType)) { throw new IllegalStateException( "Response must be parameterized" + " as Response<Foo> or Response<? extends Foo>"); } responseType = getParameterUpperBound(0, (ParameterizedType) observableType); } else if (rawObservableType == Result.class) { if (!(observableType instanceof ParameterizedType)) { throw new IllegalStateException( "Result must be parameterized" + " as Result<Foo> or Result<? extends Foo>"); } responseType = getParameterUpperBound(0, (ParameterizedType) observableType); isResult = true; } else { responseType = observableType; isBody = true; } return new RxJava2CallAdapter( responseType, scheduler, isAsync, isResult, isBody, isFlowable, isSingle, isMaybe, false); }
所以callAdapter.adapt方法實際上就是RxJava2CallAdapter的adapt方法:
@Override public Object adapt(Call<R> call) { Observable<Response<R>> responseObservable = isAsync ? new CallEnqueueObservable<>(call) : new CallExecuteObservable<>(call); Observable<?> observable; if (isResult) { observable = new ResultObservable<>(responseObservable); } else if (isBody) { observable = new BodyObservable<>(responseObservable); } else { observable = responseObservable; } if (scheduler != null) { observable = observable.subscribeOn(scheduler); } if (isFlowable) { return observable.toFlowable(BackpressureStrategy.LATEST); } if (isSingle) { return observable.singleOrError(); } if (isMaybe) { return observable.singleElement(); } if (isCompletable) { return observable.ignoreElements(); } return RxJavaPlugins.onAssembly(observable); }
所以為什么上面的MyService的方法返回為什么是Observable含鳞。
那現(xiàn)在RxObservable.INSTANCE.getObservable中的apiObservable就得到了影锈,再貼一遍:
fun <T> getObservable(@NonNull apiObservable: Observable<T>, @NonNull lifecycle: LifecycleProvider<*>): Observable<T> { return apiObservable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .compose<T>(lifecycle.bindToLifecycle()) }
subscribeOn(Schedulers.io())表示在非阻塞線程中進行網(wǎng)絡(luò)請求,observeOn(AndroidSchedulers.mainThread())表示在主線程中處理回調(diào)蝉绷,lifecycleProvider是Activity或Fragment的生命周期引用精居,compose將這個RxJava請求和生命周期綁定在一起實現(xiàn)其生命周期自動跟隨組件生命周期一致,這個知識會再寫一篇文章分析潜必。
至此靴姿,MyServiceImpl.getData(req, lifecycleProvider)就走完了,接下來就是執(zhí)行網(wǎng)絡(luò)請求了磁滚。
-
調(diào)用過程
ViewModel中的subscribe方法是開始的起點佛吓,其參數(shù)HttpObserver是一個自定義的實現(xiàn)了Observer接口的類:
@SuppressWarnings("all") abstract class HttpObserver<T : BaseResponse, K : BaseRequest> : Observer<T> { private var mIViewModelService: IViewModelService? = null private var showErrorMessage = true private var showLoading = true private var manuelFinishLoading = false private var needSuccessResultMsg = true private var showSuccessMessage = false private var httpTaskId: RxApiManager.HttpTaskId private var request: K /** * 默認(rèn)只有1個任務(wù)處理 */ private var finishCount = 1 /** * @param httpTaskId 注意:HttpTaskId 非同一請求不可使用相同ID * @param mIViewModelService IViewModelService */ constructor(request: K, httpTaskId: RxApiManager.HttpTaskId, mIViewModelService: IViewModelService?) { this.request = request this.mIViewModelService = mIViewModelService this.httpTaskId = httpTaskId LogUtils.e(this, "HttpObserver transactionId = "+request.transactionId) } /** * @param httpTaskId 注意:HttpTaskId 非同一請求不可使用相同ID * @param mIViewModelService IViewModelService */ constructor(request: K, httpTaskId: RxApiManager.HttpTaskId, mIViewModelService: IViewModelService?, showErrorMessage: Boolean = true) { this.mIViewModelService = mIViewModelService this.showErrorMessage = showErrorMessage this.httpTaskId = httpTaskId this.request = request } /** * @param httpTaskId 注意:HttpTaskId 非同一請求不可使用相同ID * @param mIViewModelService IViewModelService */ constructor(request: K, httpTaskId: RxApiManager.HttpTaskId, mIViewModelService: IViewModelService?, showErrorMessage: Boolean = true, showLoading: Boolean = true) { this.mIViewModelService = mIViewModelService this.showErrorMessage = showErrorMessage this.showLoading = showLoading this.httpTaskId = httpTaskId this.request = request } constructor(request: K, httpTaskId: RxApiManager.HttpTaskId, mIViewModelService: IViewModelService?, showErrorMessage: Boolean = true, showLoading: Boolean = true, showSuccessMessage: Boolean = false) { this.mIViewModelService = mIViewModelService this.showErrorMessage = showErrorMessage this.showSuccessMessage = showSuccessMessage this.showLoading = showLoading this.httpTaskId = httpTaskId this.request = request } /** * 手動調(diào)用結(jié)束loading * * @param manuelFinishLoading */ fun setManuelFinishLoading(manuelFinishLoading: Boolean): HttpObserver<T, K> { this.manuelFinishLoading = manuelFinishLoading return this } /** * 多個任務(wù)情況下,根據(jù)任務(wù)數(shù)量結(jié)束loading * * @param finishCount * @return */ fun setFinishCount(finishCount: Int): HttpObserver<T, K> { this.finishCount = finishCount return this } /** * 設(shè)置是否顯示loading * * @param isShowLoading * @return */ fun setShowLoading(isShowLoading: Boolean): HttpObserver<T, K> { this.showLoading = isShowLoading return this } /** * 是否顯示錯誤信息 * * @param showErrorMessage * @return */ fun setShowErrorMsg(showErrorMessage: Boolean): HttpObserver<T, K> { this.showErrorMessage = showErrorMessage return this } /** * 是否默認(rèn)設(shè)置"成功" */ fun setNeedSuccessResultMsg(needSuccessResultMsg: Boolean): HttpObserver<T, K> { this.needSuccessResultMsg = needSuccessResultMsg return this } override fun onComplete() { RxApiManager.get().cancel(this.httpTaskId) if (!manuelFinishLoading && showLoading && finishCount == 1) { mIViewModelService?.finishLoading() } } override fun onSubscribe(d: Disposable) { RxApiManager.get().add(httpTaskId, d) if (showLoading) { mIViewModelService?.preLoading() } } /** * 根據(jù)具體的Api 業(yè)務(wù)邏輯去重寫 onSuccess 方法垂攘! * http==0&&code==0 回調(diào) * 必須實現(xiàn) * * @param response response */ abstract fun onSuccess(response: T) /** * 選擇性重寫 * 如果只需要code跟msg重寫此方法 */ open fun onFailure(response: BaseResponse) { } /** * 選擇性重寫 * 如果需要用到code维雇、msg、data重寫此方法 */ open fun onServerFailure(response: T) { onError(ServerException(response.resultCode, response.resultMsg)) } /** * Gio埋點處理 */ open fun onGioHttpErrorInfoCollect(response: T){ //兩個都為null的時候不埋 if(response.resultCode.isEmpty() && response.resultMsg.isEmpty()){ return } val gioHttpErrorInfo = GioHttpErrorInfo(response.resultCode, response.resultMsg, response.transactionId) CollectDataUtil.setGioEvent(CollectDataUtil.GioEventKey.ERRORMESSAGE, gioHttpErrorInfo) } /** * 處理公共邏輯 * 包含顯示公共彈框方法和邏輯 */ open fun onServerHandle(response: T) { val hasPublicDialog = response.publicDialogResponse != null && !response.publicDialogResponse?.content.isNullOrEmpty() val hasPublicDialogs = response.publicDialogResponses?.dialogs != null if (hasPublicDialog || hasPublicDialogs) { //顯示公共彈框使用dialog和對應(yīng)的值 val publicDialogBean = PublicDialogBean() .setPublicDialogResponse(response.publicDialogResponse) .setPublicDialogResponses(response.publicDialogResponses) .setHttpObserverPublicDialogListener(HttpObserverPublicDialogListener { _: PublicDialogBean, _: T, _: Byte -> }) ARouterManager.startDialogActivityPublic(response, publicDialogBean) //使用透明activity彈對話框(暫時使用延時跳轉(zhuǎn)晒他,防止跳轉(zhuǎn)頁面時候被覆蓋) // Handler().postDelayed({ // ARouterManager.startDialogActivityPublic(response, publicDialogBean) // }, 500) // //使用普通dialog模式 // mIViewModelService?.showPublicDialog(publicDialogBean, response, HttpObserverPublicDialogListener { // publicDialogBean : PublicDialogBean, response : T , clickType : Byte-> // mIViewModelService?.publicDialogExcute(publicDialogBean, response, clickType) // }) } } override fun onNext(t: T) { if (finishCount > 1) { finishCount-- } when { HttpErrorCode.SUCCESS_CODE == t.resultCode -> { CacheManager.saveResponse(httpTaskId, t) if (TextUtils.isEmpty(t.resultMsg)) { //成功情況若后臺沒有msg且自定義設(shè)置默認(rèn)吱型,則默認(rèn)"成功" if (needSuccessResultMsg) { t.resultMsg = BaseApp.getBaseApplication().getString(R.string.success) } } if (showSuccessMessage) { if (TextUtils.isEmpty(t.resultMsg)) { t.resultMsg = BaseApp.getBaseApplication().getString(R.string.success) } mIViewModelService?.showSuccessMessage(t.resultMsg) } onSuccess(t) onServerHandle(t) } HttpErrorCode.SESSION_OUT_CODE == t.resultCode -> { //session驗證不通過,會報會話超時陨仅,客戶端與服務(wù)重新握手津滞,獲取session,并刪除本地用戶信息 AppDataManager.mAppSpService.cleanPublicDialog() AppDataManager.mAppSpService.setLogout() JPushLocalUtil.instance.cleanAliasAndTag() AppDataManager.mAppSpService.appSessionId = "" if (mIViewModelService != null) { if (BuildConfig.DEBUG && TextUtils.isEmpty(t.resultMsg)) { t.resultMsg = BaseApp.getAppString(R.string.http_session_error) } //GIO Http錯誤信息埋點 onGioHttpErrorInfoCollect(t) mIViewModelService?.onFailure(true, t.resultMsg) } getTimeOut() } else -> { //GIO Http錯誤信息埋點 onGioHttpErrorInfoCollect(t) if (BuildConfig.DEBUG && TextUtils.isEmpty(t.resultMsg)) { //服務(wù)器報錯時如果沒有錯誤信息灼伤,則默認(rèn)未知錯誤 t.resultMsg = BaseApp.getBaseApplication().getString(R.string.http_un_known_error) } onServerFailure(t) onServerHandle(t) } } if (!manuelFinishLoading && showLoading && finishCount == 1) { mIViewModelService?.finishLoading() } } override fun onError(t: Throwable) { LogUtils.e(this, t) var code = HttpErrorCode.CONNECT_ERROR_CODE var errorMessage = "" if (t is NetErrorException) { errorMessage = t.errorMessage } else if (t is HttpException) { errorMessage = t.message ?: "" } else if (t is JsonParseException || t is JSONException || t is ParseException || t is MalformedJsonException) { //解析數(shù)據(jù)錯誤 errorMessage = BaseApp.getBaseApplication().getString(R.string.http_analytic_data_error) } else if (t is SocketTimeoutException) { errorMessage = BaseApp.getBaseApplication().getString(R.string.http_service_time_out_error) } else if (t is ConnectException) { errorMessage = BaseApp.getBaseApplication().getString(R.string.http_connect_error) } else if (t is UnknownHostException) { errorMessage = BaseApp.getBaseApplication().getString(R.string.http_analytic_server_error) } else if (t is UnknownServiceException) { errorMessage = BaseApp.getBaseApplication().getString(R.string.http_un_known_error) } else if (t is ServerException) { //服務(wù)器非0 code = t.code errorMessage = t.message ?: "" } if (TextUtils.isEmpty(errorMessage)) { errorMessage = BaseApp.getBaseApplication().getString(R.string.http_un_known_error) } //showLoading為true時触徐,防止靜默加載接口finishLoading,影響其他同時調(diào)用的線程 if (!manuelFinishLoading && showLoading) { mIViewModelService?.finishLoading() } mIViewModelService?.onFailure(showErrorMessage, errorMessage) val baseResponse = BaseResponse() baseResponse.resultCode = code baseResponse.resultMsg = errorMessage onFailure(baseResponse) } /** * session驗證不通過狐赡,會報會話超時撞鹉,客戶端與服務(wù)重新握手,獲取session * 完成后跳轉(zhuǎn)首頁 */ private fun getTimeOut() { LibApi().getTimeout(TimeoutRequest()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(object : Observer<TimeoutResponse> { override fun onSubscribe(d: Disposable) { RxApiManager.get().add(RxApiManager.HttpTaskId.TASK_ID_GET_TIME_OUT, d) } override fun onNext(timeoutResponse: TimeoutResponse) { if (HttpErrorCode.SUCCESS_CODE == timeoutResponse.resultCode && timeoutResponse.data != null) { AppDataManager.mAppSpService.appSessionId = timeoutResponse.data?.appSessionId NbsUtil.setNbsUserIdentifier() } } override fun onError(e: Throwable) { RxApiManager.get().cancel(RxApiManager.HttpTaskId.TASK_ID_GET_TIME_OUT) } override fun onComplete() { mIViewModelService?.finishLoading() if ("MainActivity" != ActivityStackManager.getManager().lastActivityClassName?.simpleName) { ARouterManager.startToMainActivity(AppConstant.MAIN_TAB_INDEX_HOME) } RxApiManager.get().cancel(RxApiManager.HttpTaskId.TASK_ID_GET_TIME_OUT) } }) } }
這就是觀察者,通過subscribe方法和前面得到的的Observable綁定在一起鸟雏。根據(jù)RxJava的設(shè)計原理享郊,subscribe最終就是調(diào)用被觀察者的subscribeActual方法,這里的被觀察者要回到RxJava2CallAdapter的adapt方法中找孝鹊,根據(jù)RxJava2CallAdapterFactory的get方法得知我們這里的返回類型的rawType不是Flowable拂蝎、Maybe、Single惶室、Completable,其第一個泛型類型的rawType也不是Response和Result玄货,所以isBody是true皇钞,所以adapt返回的是BodyObservable,它持有了一個Observable<Response<R>>對象松捉,因為我們是通過RxJava2CallAdapterFactory.create()創(chuàng)建的而不是RxJava2CallAdapterFactory.createAsync()夹界,所以isAsync是false,所以這里的Observable<Response<R>>是CallExecuteObservable隘世,它的subscribeActual如下:
@Override protected void subscribeActual(Observer<? super Response<T>> observer) { // Since Call is a one-shot type, clone it for each new observer. Call<T> call = originalCall.clone(); CallDisposable disposable = new CallDisposable(call); observer.onSubscribe(disposable); if (disposable.isDisposed()) { return; } boolean terminated = false; try { Response<T> response = call.execute(); if (!disposable.isDisposed()) { observer.onNext(response); } if (!disposable.isDisposed()) { terminated = true; observer.onComplete(); } } catch (Throwable t) { Exceptions.throwIfFatal(t); if (terminated) { RxJavaPlugins.onError(t); } else if (!disposable.isDisposed()) { try { observer.onError(t); } catch (Throwable inner) { Exceptions.throwIfFatal(inner); RxJavaPlugins.onError(new CompositeException(t, inner)); } } } }
CallDisposable是:
private static final class CallDisposable implements Disposable { private final Call<?> call; private volatile boolean disposed; CallDisposable(Call<?> call) { this.call = call; } @Override public void dispose() { disposed = true; call.cancel(); } @Override public boolean isDisposed() { return disposed; } }
observer.onSubscribe就是HttpObserver的onSubscribe:
override fun onSubscribe(d: Disposable) { RxApiManager.get().add(httpTaskId, d) if (showLoading) { mIViewModelService?.preLoading() } }
RxApiManager保存此請求的taskId可柿,如果需要的話彈出加載框。
RxApiManager是用來管理taskId的:
public class RxApiManager { private static RxApiManager sInstance = null; private ArrayMap<HttpTaskId, Disposable> maps = new ArrayMap<>(); private ArrayMap<LogicTaskId, Disposable> logicMaps = new ArrayMap<>(); public static RxApiManager get() { if (sInstance == null) { synchronized (RxApiManager.class) { if (sInstance == null) { sInstance = new RxApiManager(); } } } return sInstance; } public boolean hasHttpTaskById(HttpTaskId tag) { if (tag == null) { return false; } if (maps.containsKey(tag)) { return true; } return false; } public void add(HttpTaskId tag, Disposable disposable) { if (tag == null) { return; } if (maps.containsKey(tag)) { Disposable mapDisposable = maps.get(tag); if (mapDisposable != null && !mapDisposable.isDisposed()) { mapDisposable.dispose(); maps.remove(tag); } } maps.put(tag, disposable); } public void add(LogicTaskId tag, Disposable disposable) { if (tag == null) { return; } if (logicMaps.containsKey(tag)) { Disposable mapDisposable = logicMaps.get(tag); if (mapDisposable != null && !mapDisposable.isDisposed()) { mapDisposable.dispose(); logicMaps.remove(tag); } } logicMaps.put(tag, disposable); } public void remove(HttpTaskId tag) { if (!maps.isEmpty()) { maps.remove(tag); } } public void removeAll() { maps.clear(); logicMaps.clear(); } public void cancel(HttpTaskId tag) { if (tag == null) { return; } if (maps.isEmpty()) { return; } Disposable disposable = maps.get(tag); if (disposable == null) { return; } if (!disposable.isDisposed()) { disposable.dispose(); maps.remove(tag); } } public void cancel(LogicTaskId tag) { if (tag == null) { return; } if (logicMaps.isEmpty()) { return; } Disposable disposable = logicMaps.get(tag); if (disposable == null) { return; } if (!disposable.isDisposed()) { disposable.dispose(); logicMaps.remove(tag); } } public void cancel(HttpTaskId... tags) { HttpTaskId[] taskIds = tags; if (tags == null) { return; } if (maps.isEmpty()) { return; } for (HttpTaskId tag : taskIds) { Disposable disposable = maps.get(tag); if (disposable == null) { continue; } if (!disposable.isDisposed()) { disposable.dispose(); maps.remove(tag); } } } public void cancelAll() { if (maps.isEmpty()) { return; } Set<HttpTaskId> keys = maps.keySet(); for (HttpTaskId apiKey : keys) { Disposable disposable = maps.get(apiKey); if (disposable == null) { continue; } if (!disposable.isDisposed()) { disposable.dispose(); } } maps.clear(); } public void cancelLogicAll() { if (logicMaps.isEmpty()) { return; } Set<LogicTaskId> keys = logicMaps.keySet(); for (LogicTaskId apiKey : keys) { Disposable disposable = logicMaps.get(apiKey); if (disposable == null) { continue; } if (!disposable.isDisposed()) { disposable.dispose(); } } logicMaps.clear(); } }
IViewModelService是構(gòu)造HttpObserver的時候傳入的丙者,因為Activity和Fragment的自定義基類實現(xiàn)了這個接口复斥,所有的Activity和Fragment都繼承自基類,所以mIViewModelService就是Activity或者Fragment的實例械媒,這個接口提供網(wǎng)絡(luò)請求過程中伴隨的顯示/隱藏加載框和提示信息的功能:
public interface IViewModelService { /*** 預(yù)加載*/ void preLoading(); /** * 完成加載 */ void finishLoading(); /** * 加載失敗 * @param showErrorMessage 是否顯示錯誤信息 * @param errorMessage 錯誤信息 */ void onFailure(boolean showErrorMessage, String errorMessage); /** * 成功msg * * @param message message */ void showSuccessMessage(String message); /** * 顯示緊急公共彈框 * @param publicDialogBean PublicDialogBean * @param listener HttpObserverPublicDialogListener */ <T extends BaseResponse> void showPublicDialog(PublicDialogBean publicDialogBean, BaseResponse response, HttpObserverPublicDialogListener<T> listener); /** * 公共彈框點擊處理回調(diào) * @param publicDialogBean * @param response * @param clickType */ void publicDialogExcute(PublicDialogBean publicDialogBean, BaseResponse response, byte clickType); }
接下來調(diào)用call.execute()目锭,call就是HttpServiceMethod的invoke中構(gòu)造的OkHttpCall,就調(diào)用到Retrofit的部分了纷捞。
call.execute()返回response之后調(diào)用observer.onNext(response)方法痢虹,這個方法中會根據(jù)resultCode執(zhí)行不同操作,注意onSuccess()就是交給HttpObserver的匿名類對象覆寫的onSuccess主儡,這個方法是abstract方法所以必須進行覆寫奖唯,再然后就是我們前面說的在onSuccess中通過MutableLiveData的postValue方法通知界面組件更新UI。
還需要注意的是onNext結(jié)束之后會調(diào)用observer.onComplete()方法糜值,這里的onComplete中會通過RxApiManager把當(dāng)前這個CallDisposable進行dispose操作丰捷,dispose中調(diào)用call.cancel()把socket給close()掉。
如果出現(xiàn)異常會走到catch塊中調(diào)用observer.onError()進行錯誤情況處理寂汇。
-
關(guān)于線程間通信
目前為止瓢阴,我們知道了call和回調(diào)是怎么一起工作的,但是把call放進子線程健无,回調(diào)回到主線程的操作在哪里產(chǎn)生影響的呢荣恐?
還記得RxObservable.INSTANCE.getObservable嗎?這個方法返回的Observable才是要執(zhí)行subscribe方法的,所以這個方法里設(shè)置的就是線程間切換的關(guān)鍵:
fun <T> getObservable(@NonNull apiObservable: Observable<T>, @NonNull lifecycle: LifecycleProvider<*>): Observable<T> { return apiObservable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .compose<T>(lifecycle.bindToLifecycle()) }
通過源碼叠穆,我們得到這塊代碼的調(diào)用鏈?zhǔn)?
CallExecuteObservable->ObservableSubscribeOn->ObservableObserveOn->ObservableFromUnsafeSource
返回的就是ObservableFromUnsafeSource少漆,也就是入口是調(diào)用它的subscribeActual方法:
@Override protected void subscribeActual(Observer<? super T> observer) { source.subscribe(observer); }
這里的source是lifecycle.bindToLifecycle()返回的對象,這里牽扯到和UI組件生命周期綁定防止內(nèi)存泄露的問題硼被,由于較復(fù)雜會再寫一篇文章分析它示损,這里先不研究,現(xiàn)在只需要知道返回的對象是ObservableTakeUntil嚷硫,所以這里就是調(diào)用它的subscribeActual方法:
@Override public void subscribeActual(Observer<? super T> child) { TakeUntilMainObserver<T, U> parent = new TakeUntilMainObserver<T, U>(child); child.onSubscribe(parent); other.subscribe(parent.otherObserver); source.subscribe(parent); }
other.subscribe(parent.otherObserver)是關(guān)于驗證生命周期的功能检访,source.subscribe(parent)就是我們現(xiàn)在要講的開始的地方。
這個source就是compose的調(diào)用者ObservableObserveOn仔掸,它的的subscribeActual方法:
@Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
這里創(chuàng)建了一個ObserveOnObserver對象脆贵,把上面的HttpObserver添加進去,還有scheduler.createWorker()創(chuàng)建的一個Worker起暮,scheduler是AndroidSchedulers.mainThread()卖氨,也就是:
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler( new Callable<Scheduler>() { @Override public Scheduler call() throws Exception { return MainHolder.DEFAULT; } });
繼續(xù)往上返,到了ObservableSubscribeOn的subscribeActual:
@Override public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); observer.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
沿著observer.onSubscribe(parent)往下追溯會走到HttpObserver的onSubscribe方法里负懦,也就是顯示加載框等邏輯的地方筒捺,此時真正的網(wǎng)絡(luò)請求還沒開始,還是在主線程中操作纸厉,然后執(zhí)行scheduler.scheduleDirect(new SubscribeTask(parent))系吭,scheduler也就是Schedulers.io():
@NonNull public static Scheduler io() { return RxJavaPlugins.onIoScheduler(IO); }
IO = RxJavaPlugins.initIoScheduler(new IOTask());
static final class IOTask implements Callable<Scheduler> { @Override public Scheduler call() throws Exception { return IoHolder.DEFAULT; } }
initIoScheduler:
@NonNull public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) { ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null"); Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler; if (f == null) { return callRequireNonNull(defaultScheduler); } return applyRequireNonNull(f, defaultScheduler); }
callRequireNonNull:
@NonNull static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) { try { return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null"); } catch (Throwable ex) { throw ExceptionHelper.wrapOrThrow(ex); } }
s.call也就是得到了IoHolder.DEFAULT:
static final class IoHolder { static final Scheduler DEFAULT = new IoScheduler(); }
scheduleDirect的調(diào)用:
@NonNull public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); }
@NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; }
可以看到Worker通過createWorker()創(chuàng)建,也就是前面的IoScheduler的createWorker方法:
@NonNull @Override public Worker createWorker() { return new EventLoopWorker(pool.get()); }
接下來執(zhí)行schedule方法就是EventLoopWorker的schedule方法:
@NonNull @Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); }
threadWorker是EventLoopWorker構(gòu)造方法中pool.get()所得颗品,pool又是createWorker構(gòu)造時傳入的pool.get()所得村斟,createWorker中的pool又是:
public IoScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; this.pool = new AtomicReference<CachedWorkerPool>(NONE); start(); }
調(diào)用get()得到NONE,NONE是:
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY); NONE.shutdown();
所以CachedWorkerPool的get是:
ThreadWorker get() { if (allWorkers.isDisposed()) { return SHUTDOWN_THREAD_WORKER; } while (!expiringWorkerQueue.isEmpty()) { ThreadWorker threadWorker = expiringWorkerQueue.poll(); if (threadWorker != null) { return threadWorker; } } // No cached worker found, so create a new one. ThreadWorker w = new ThreadWorker(threadFactory); allWorkers.add(w); return w; }
所以threadWorker就是ThreadWorker實例抛猫,那它的scheduleActual方法是:
@NonNull public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent != null) { if (!parent.add(sr)) { return sr; } } Future<?> f; try { //沒有延遲執(zhí)行蟆盹,直接submit if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; }
executor通過SchedulerPoolFactory.create(threadFactory)創(chuàng)建,threadFactory不難找到:
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority)
SchedulerPoolFactory.create:
public static ScheduledExecutorService create(ThreadFactory factory) { final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory); tryPutIntoPool(PURGE_ENABLED, exec); return exec; }
看到了我們熟悉的Executors.newScheduledThreadPool闺金,所以executor是ScheduledThreadPoolExecutor逾滥,其實再往下是Executors封裝線程池的部分了,我們這里簡單的過一下败匹,找到啟動線程的地方寨昙,它的submit方法:
public <T> Future<T> submit(Callable<T> task) { return schedule(task, 0, NANOSECONDS); }
最終都會進入schedule方法:
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit), sequencer.getAndIncrement())); delayedExecute(t); return t; }
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }
這里的ensurePrestart會調(diào)用其父類的方法:
void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }
addWorker代碼:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
t是w.thread,w是new Worker(firstTask)掀亩,w.thread是在Worker的構(gòu)造方法里創(chuàng)建的:
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
前面已知threadFactory是RxThreadFactory:
@Override public Thread newThread(Runnable r) { StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet()); // if (CREATE_TRACE) { // nameBuilder.append("\r\n"); // for (StackTraceElement se :Thread.currentThread().getStackTrace()) { // String s = se.toString(); // if (s.contains("sun.reflect.")) { // continue; // } // if (s.contains("junit.runners.")) { // continue; // } // if (s.contains("org.gradle.internal.")) { // continue; // } // if (s.contains("java.util.concurrent.ThreadPoolExecutor")) { // continue; // } // nameBuilder.append(s).append("\r\n"); // } // } String name = nameBuilder.toString(); Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name); t.setPriority(priority); t.setDaemon(true); return t; }
返回了一個RxCustomThread舔哪,設(shè)置daemon為true(守護線程,后臺一直運行)槽棍。
t.start就是RxCustomThread的start:
public synchronized void start() { /** * This method is not invoked for the main method thread or "system" * group threads created/set up by the VM. Any new functionality added * to this method in the future may have to also be added to the VM. * * A zero status value corresponds to state "NEW". */ // Android-changed: throw if 'started' is true if (threadStatus != 0 || started) throw new IllegalThreadStateException(); /* Notify the group that this thread is about to be started * so that it can be added to the group's list of threads * and the group's unstarted count can be decremented. */ group.add(this); started = false; try { nativeCreate(this, stackSize, daemon); started = true; } finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { /* do nothing. If start0 threw a Throwable then it will be passed up the call stack */ } } }
調(diào)用nativeCreate方法開啟線程捉蚤,這樣一來就會運行Runnable對象的run方法抬驴,最終的Runnable對象是ObservableSubscribeOn的subscribeActual中scheduler.scheduleDirect(new SubscribeTask(parent))傳入的SubscribeTask對象:
final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); } }
source.subscribe(parent)!source就是最頂層的CallExecuteObservable缆巧,調(diào)用它的subscribrActual()布持,這就和我們前面對起來了,然后在CallExecuteObservable的subscribrActual中調(diào)用的onNext陕悬、onComplete和onError等方法就會沿著downstream一步步下放题暖,在ObservableObserveOn中的ObserveOnObserver中的schedule方法中:
void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } }
前面說過這個worker是通過主線程創(chuàng)建的,和前面的Schedulers.io()一樣的原理捉超,只不過這個Worker是運行在主線程中的胧卤,schedule參數(shù)傳的是this,因為ObservableObserveOn實現(xiàn)了Runnable接口:
@Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } }
因為不滿足條件所以沒調(diào)用requestFusion拼岳,outputFused會一直是false枝誊,所以一直會執(zhí)行drainNormal:
void drainNormal() { int missed = 1; final SimpleQueue<T> q = queue; final Observer<? super T> a = downstream; for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return; } for (;;) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); disposed = true; upstream.dispose(); q.clear(); a.onError(ex); worker.dispose(); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { return; } if (empty) { break; } a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) { break; } } }
至此,整個子線程請求主線程回調(diào)的大概過程就完成了裂问。
Retrofit結(jié)合RxJava源碼分析
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
- 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來滴须,“玉大人舌狗,你說我怎么就攤上這事∪铀” “怎么了痛侍?”我有些...
- 文/不壞的土叔 我叫張陵,是天一觀的道長魔市。 經(jīng)常有香客問我主届,道長,這世上最難降的妖魔是什么待德? 我笑而不...
- 正文 為了忘掉前任君丁,我火速辦了婚禮,結(jié)果婚禮上将宪,老公的妹妹穿的比我還像新娘绘闷。我一直安慰自己橡庞,他們只是感情好,可當(dāng)我...
- 文/花漫 我一把揭開白布簸喂。 她就那樣靜靜地躺著毙死,像睡著了一般。 火紅的嫁衣襯著肌膚如雪喻鳄。 梳的紋絲不亂的頭發(fā)上扼倘,一...
- 文/蒼蘭香墨 我猛地睜開眼泛豪,長吁一口氣:“原來是場噩夢啊……” “哼稠诲!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起诡曙,我...
- 正文 年R本政府宣布,位于F島的核電站五续,受9級特大地震影響洒敏,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜疙驾,卻給世界環(huán)境...
- 文/蒙蒙 一凶伙、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧它碎,春花似錦函荣、人聲如沸显押。這莊子的主人今日做“春日...
- 文/蒼蘭香墨 我抬頭看了看天上的太陽乘碑。三九已至,卻和暖如春金拒,著一層夾襖步出監(jiān)牢的瞬間兽肤,已是汗流浹背。 一陣腳步聲響...
推薦閱讀更多精彩內(nèi)容
- 轉(zhuǎn)載請以鏈接形式標(biāo)明出處:本文出自:103style的博客 timer 操作符 timer 操作符實際上返回的是一...
- RxJava和Retrofit配合使用過程中的具體流程是怎么樣的呢店雅?1.如何創(chuàng)建一個請求2.具體的請求線程是什么3...
- Android RxJava詳解 響應(yīng)式代碼的基本組成部分是Observables和Subscribers(事實上...
- 本文的所有分析都是基于 RxJava2 進行的。以下的 RxJava 指 RxJava2閱讀本文你將會知道: Rx...
- 不講 rxjava 和 retrofit 而是直接上手 2 了贞铣,因為 2 封裝的更好用的更多闹啦。 1. 觀察者模式 ...