RxJava與Retrofit是當前使用比較廣泛的兩個框架,很多開發(fā)者同時使用了這兩個框架并以此為基礎(chǔ)搭建了網(wǎng)絡請求。筆者也在使用月腋,下面介紹一下如何利用RxJava實現(xiàn)簡單的網(wǎng)絡請求相關(guān)回調(diào)(onStart onSuccess等方法),并提供取消網(wǎng)絡請求方法cancelRequest()。至于Retrofit部分配置以后有時間再做具體介紹权薯,重點在于構(gòu)建Retrofit時需要設(shè)置RxJava2CallAdapterFactory,請求接口調(diào)用方法返回Observable對象睡扬。
本文主要介紹相關(guān)回調(diào)設(shè)計及網(wǎng)絡請求取消功能盟蚣,主要是RxJava層面觀察者部分,直接上代碼
import android.app.Dialog;
import android.content.Context;
import android.os.Handler;
import android.os.Looper;
import android.support.annotation.NonNull;
import com.dev.kit.basemodule.BuildConfig;
import com.dev.kit.basemodule.R;
import com.dev.kit.basemodule.View.NetProgressDialog;
import com.dev.kit.basemodule.netRequest.Configs.Config;
import com.dev.kit.basemodule.netRequest.subscribers.NetRequestCallback;
import com.dev.kit.basemodule.netRequest.util.OnNetProgressCancelListener;
import com.dev.kit.basemodule.result.BaseResult;
import com.dev.kit.basemodule.util.ToastUtil;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import retrofit2.HttpException;
/**
* 網(wǎng)絡請求訂閱者
* Created by cuiyan on 16/6/2 14:09
*/
public class NetRequestSubscriber<T> implements Observer<T> {
private Dialog progressDialog;
private Disposable disposable;
private NetRequestCallback<T> netRequestCallback;
private Context context;
/**
* @param netRequestCallback 網(wǎng)絡請求回調(diào)
*/
public NetRequestSubscriber(@NonNull NetRequestCallback<T> netRequestCallback, Context context) {
this(netRequestCallback, context, false, null);
}
/**
* @param netRequestCallback 網(wǎng)絡請求回調(diào)
* @param showProgress 是否顯示網(wǎng)絡請求加載對話框
* @param progressTip loading提示語
* @see NetProgressDialog
*/
public NetRequestSubscriber(@NonNull final NetRequestCallback<T> netRequestCallback, Context context, boolean showProgress, String progressTip) {
this.netRequestCallback = netRequestCallback;
this.context = context;
if (showProgress) {
progressDialog = NetProgressDialog.getInstance(context, progressTip, new OnNetProgressCancelListener() {
@Override
public void onCancelRequest() {
cancelRequest();
}
});
}
}
/**
* @param netRequestCallback 網(wǎng)絡請求回調(diào)
* @param progressDialog dialog 自定義對話框
*/
public NetRequestSubscriber(@NonNull NetRequestCallback<T> netRequestCallback, Context context, @NonNull Dialog progressDialog) {
this.netRequestCallback = netRequestCallback;
this.context = context;
this.progressDialog = progressDialog;
}
@Override
public void onSubscribe(@NonNull Disposable d) {
this.disposable = d;
showProgress();
onRequestStart();
}
@Override
public synchronized void onNext(final T t) {
if (t == null) {
onRequestResultNull();
} else {
if (t instanceof BaseResult && !Config.REQUEST_SUCCESS_CODE.equals(((BaseResult) t).getCode())) {
ToastUtil.showToast(context, ((BaseResult) t).getMessage());
}
onRequestSuccess(t);
}
}
@Override
public synchronized void onError(Throwable throwable) {
dismissProgress();
onRequestError(throwable);
if (throwable instanceof HttpException) {
ToastUtil.showToast(context, ((HttpException) throwable).message() + ((HttpException) throwable).code());
} else {
if (BuildConfig.DEBUG) {
ToastUtil.showToast(context, "error:" + throwable.getMessage());
} else {
ToastUtil.showToast(context, context.getString(R.string.error_net_request_failed));
}
}
}
/**
* {@link NetRequestSubscriber#onError(Throwable)}
* {@link Observer#onError(Throwable)}
* {@link Observer#onComplete()} (Throwable)}
* 該方法與onError方法互斥
*/
@Override
public void onComplete() {
dismissProgress();
netRequestCallback.onFinish();
}
private void onRequestStart() {
if (Looper.myLooper() != context.getMainLooper()) {
Handler handler = new Handler(context.getMainLooper());
handler.post(new Runnable() {
@Override
public void run() {
netRequestCallback.onStart();
}
});
} else {
netRequestCallback.onStart();
}
}
private void onRequestSuccess(final T t) {
if (Looper.myLooper() != context.getMainLooper()) {
Handler handler = new Handler(context.getMainLooper());
handler.post(new Runnable() {
@Override
public void run() {
netRequestCallback.onSuccess(t);
}
});
} else {
netRequestCallback.onSuccess(t);
}
}
private void onRequestResultNull() {
if (Looper.myLooper() != context.getMainLooper()) {
Handler handler = new Handler(context.getMainLooper());
handler.post(new Runnable() {
@Override
public void run() {
netRequestCallback.onResultNull();
}
});
} else {
netRequestCallback.onResultNull();
}
}
private void onRequestError(final Throwable throwable) {
throwable.printStackTrace();
if (Looper.myLooper() != context.getMainLooper()) {
Handler handler = new Handler(context.getMainLooper());
handler.post(new Runnable() {
@Override
public void run() {
netRequestCallback.onError(throwable);
netRequestCallback.onFinish();
}
});
} else {
netRequestCallback.onError(throwable);
netRequestCallback.onFinish();
}
}
/**
*
*
*/
private void showProgress() {
if (progressDialog != null && !progressDialog.isShowing()) {
progressDialog.show();
}
}
private void dismissProgress() {
if (progressDialog != null && progressDialog.isShowing()) {
progressDialog.dismiss();
}
}
public void cancelRequest() {
dismissProgress();
if (disposable != null && !disposable.isDisposed()) {
disposable.dispose();
}
netRequestCallback.onCancel();
netRequestCallback.onFinish();
}
}
NetRequestSubscriber實現(xiàn)了Observer接口卖怜,利用Observer接口提供的以下四個方法即可實現(xiàn)簡單的網(wǎng)絡請求相關(guān)回調(diào)屎开,上述回調(diào)實現(xiàn)的前提是構(gòu)建Retrofit時要添加RxJava2CallAdapterFactory,這部分流程后續(xù)有空再聊韧涨。
/**
* Provides the Observer with the means of cancelling (disposing) the
* connection (channel) with the Observable in both
* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
* @param d the Disposable instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(@NonNull Disposable d);
/**
* Provides the Observer with a new item to observe.
* <p>
* The {@link Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
void onNext(@NonNull T t);
/**
* Notifies the Observer that the {@link Observable} has experienced an error condition.
* <p>
* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onComplete}.
*
* @param e
* the exception encountered by the Observable
*/
void onError(@NonNull Throwable e);
/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/
void onComplete();
以上四個方法的作用及調(diào)用時機牍戚,api描述的已經(jīng)很清晰,不再贅述虑粥,需要注意的地方是onError方法與onComplete方法互斥如孝,當onError方法觸發(fā)時,不會再觸發(fā)onComplete方法娩贷。
下面我們看一下cancelRequest() 第晰,顧名思義是要取消網(wǎng)絡請求,那么是如何做到的彬祖?下面只做一點單介紹茁瘦,定位到關(guān)鍵類及相關(guān)方法,說清細節(jié)還是比較麻煩的储笑,篇幅也會比較大甜熔。
可以看到cancelRequest() 方法中調(diào)用了disposable.dispose()來取消網(wǎng)絡請求,那么disposable到底是什么鬼突倍?其實這個鬼是RxJava提供的類腔稀。源碼如下:
/**
* Represents a disposable resource.
*/
public interface Disposable {
/**
* Dispose the resource, the operation should be idempotent.
*/
void dispose();
/**
* Returns true if this resource has been disposed.
* @return true if this resource has been disposed
*/
boolean isDisposed();
}
看Disposable源碼,其實還是一頭霧水的羽历,無法直接看出是如何取消網(wǎng)絡請求的焊虏,但其實想一想,取消網(wǎng)絡請求實際上也不是RxJava能做到的秕磷,那應該是網(wǎng)絡請求框架該做的事诵闭,Disposable只是個神輔而已。
前述已經(jīng)說過構(gòu)建Retrofit需要配置RxJava2CallAdapterFactory,是為了支持了以O(shè)bservable形式觀測網(wǎng)絡請求疏尿,那么RxJava2CallAdapterFactory的作用是如何體現(xiàn)的瘟芝,與調(diào)用Disposable.dispose()后取消網(wǎng)絡請求有何關(guān)聯(lián)?我們簡單看一下RxJava2CallAdapterFactory類
該類通過工廠方法創(chuàng)建實例润歉,其構(gòu)造方法只有一個模狭,如下
private RxJava2CallAdapterFactory(@Nullable Scheduler scheduler, boolean isAsync) {
this.scheduler = scheduler;
this.isAsync = isAsync;
}
RxJava2CallAdapterFactory的核心方法
public CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
Class<?> rawType = getRawType(returnType);
// 此處省略n個字
return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable,
isSingle, isMaybe, false);
}
重點來了:RxJava2CallAdapter 及 isAsync參數(shù),直接看RxJava2CallAdapter核心代碼踩衩,如下:
@Override
public Object adapt(Call<R> call) {
************************************************重點*****************************************************
Observable<Response<R>> responseObservable = isAsync ? new CallEnqueueObservable<>(call) : new CallExecuteObservable<>(call);
*********************************************************************************************************
Observable<?> observable;
if (isResult) {
observable = new ResultObservable<>(responseObservable);
} else if (isBody) {
observable = new BodyObservable<>(responseObservable);
} else {
observable = responseObservable;
}
if (scheduler != null) {
observable = observable.subscribeOn(scheduler);
}
if (isFlowable) {
return observable.toFlowable(BackpressureStrategy.LATEST);
}
if (isSingle) {
return observable.singleOrError();
}
if (isMaybe) {
return observable.singleElement();
}
if (isCompletable) {
return observable.ignoreElements();
}
return observable;
}
看兩行*號之間的重點部分嚼鹉, 定位到兩個關(guān)鍵類CallEnqueueObservable和CallExecuteObservable粗略看一下兩個類:
1.CallEnqueueObservable.java源碼如下:
final class CallEnqueueObservable<T> extends Observable<Response<T>> {
private final Call<T> originalCall;
CallEnqueueObservable(Call<T> originalCall) {
this.originalCall = originalCall;
}
@Override protected void subscribeActual(Observer<? super Response<T>> observer) {
// Since Call is a one-shot type, clone it for each new observer.
Call<T> call = originalCall.clone();
CallCallback<T> callback = new CallCallback<>(call, observer);
observer.onSubscribe(callback);
call.enqueue(callback);
}
private static final class CallCallback<T> implements Disposable, Callback<T> {
private final Call<?> call;
private final Observer<? super Response<T>> observer;
boolean terminated = false;
CallCallback(Call<?> call, Observer<? super Response<T>> observer) {
this.call = call;
this.observer = observer;
}
@Override public void onResponse(Call<T> call, Response<T> response) {
if (call.isCanceled()) return;
try {
observer.onNext(response);
if (!call.isCanceled()) {
terminated = true;
observer.onComplete();
}
} catch (Throwable t) {
if (terminated) {
RxJavaPlugins.onError(t);
} else if (!call.isCanceled()) {
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
}
}
@Override public void onFailure(Call<T> call, Throwable t) {
if (call.isCanceled()) return;
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
@Override public void dispose() {
call.cancel();
}
@Override public boolean isDisposed() {
return call.isCanceled();
}
}
看到 dispose()方法沒 ^ _ ^,調(diào)用了 call.cancel(),這才是關(guān)鍵所在
@Override
public void dispose() {
call.cancel();
}
2.CallExecuteObservable.java源碼如下
final class CallExecuteObservable<T> extends Observable<Response<T>> {
private final Call<T> originalCall;
CallExecuteObservable(Call<T> originalCall) {
this.originalCall = originalCall;
}
@Override protected void subscribeActual(Observer<? super Response<T>> observer) {
// Since Call is a one-shot type, clone it for each new observer.
Call<T> call = originalCall.clone();
observer.onSubscribe(new CallDisposable(call));
boolean terminated = false;
try {
Response<T> response = call.execute();
if (!call.isCanceled()) {
observer.onNext(response);
}
if (!call.isCanceled()) {
terminated = true;
observer.onComplete();
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
if (terminated) {
RxJavaPlugins.onError(t);
} else if (!call.isCanceled()) {
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
}
}
private static final class CallDisposable implements Disposable {
private final Call<?> call;
CallDisposable(Call<?> call) {
this.call = call;
}
@Override public void dispose() {
call.cancel();
}
@Override public boolean isDisposed() {
return call.isCanceled();
}
}
重點在subscribeActual方法中第三行observer.onSubscribe(new CallDisposable(call))驱富;CallDisposable是CallExecuteObservable定義的內(nèi)部類锚赤,實現(xiàn)了Disposable接口,dispose()方法中也是調(diào)用了call.cancel()方法來取消網(wǎng)絡請求
付與該問相關(guān)重要java類
RxJava部分:
Observer.java
Observable.java
Disposable.java
Retrofit部分:
Retrofit.java (著重關(guān)注create方法)
OkHttpCall.java
RxJava2CallAdapterFactory.java (著重關(guān)注get方法)
RxJava2CallAdapter (著重關(guān)注adapt方法)
CallEnqueueObservable.java (著重關(guān)注subscribeActual方法及CallCallback內(nèi)部類)
CallExecuteObservable.java (著重關(guān)注subscribeActual方法)
該文并未詳細介紹具體流程褐鸥,且由于筆者水平有限线脚,可能有錯誤或不妥之處,如該文有幸被讀叫榕,望賜教
另附完整示例:https://github.com/670832188/TestApp