之前的Retrofit學(xué)習(xí)(一)了解了一下Retrofit
的最基本使用驹愚,不過目前最流行的Retrofit
使用方式是Retrofit + RxJava + Gson
, 如果要使用RxJava
, 需要在創(chuàng)建Retrofit
時配置RxJava
對應(yīng)的CallAdapter:
OkHttpclient client = new OkHttpClient.Builder().build();
Retrofit retrofit = new Retrofit.Builder()
.client(client)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.baseUrl("http://localhost:4567/")
.build();
下面就主要分析一下RxJava2CallAdapterFactory
是如何工作的
RxJava2CallAdapterFactory
public static RxJava2CallAdapterFactory create() {
return new RxJava2CallAdapterFactory(null, false);
}
public static RxJava2CallAdapterFactory createAsync() {
return new RxJava2CallAdapterFactory(null, true);
}
public static RxJava2CallAdapterFactory createWithScheduler(Scheduler scheduler) {
if (scheduler == null) throw new NullPointerException("scheduler == null");
return new RxJava2CallAdapterFactory(scheduler, false);
}
private final Scheduler scheduler;
private final boolean isAsync;
private RxJava2CallAdapterFactory(Scheduler scheduler, boolean isAsync) {
this.scheduler = scheduler;
this.isAsync = isAsync;
}
RxJava2CallAdapterFactory
提供了三個create
系列方法辙芍,一般情況下氯檐,最常使用的是第一個沒有參數(shù)的create
方法孟岛,即不指定Scheduler
,isAsync = false
, isAsync
指明是調(diào)用Call.execute
還是Call.enqueue
RxJava2CallAdapterFactory.get
public CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
Class<?> rawType = getRawType(returnType);
if (rawType == Completable.class) {
return new RxJava2CallAdapter(Void.class, scheduler, isAsync, false, true, false, false,
false, true);
}
boolean isFlowable = rawType == Flowable.class;
boolean isSingle = rawType == Single.class;
boolean isMaybe = rawType == Maybe.class;
//確保返回值是Observable, Flowable, Single, Maybe
if (rawType != Observable.class && !isFlowable && !isSingle && !isMaybe) {
return null;
}
boolean isResult = false;
boolean isBody = false;
Type responseType;
//ParameterizedType指泛型類型针炉,如List<T>
if (!(returnType instanceof ParameterizedType)) {
String name = isFlowable ? "Flowable"
: isSingle ? "Single"
: isMaybe ? "Maybe" : "Observable";
throw new IllegalStateException(...);
}
//獲取返回值的泛型參數(shù)
Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);
Class<?> rawObservableType = getRawType(observableType);
if (rawObservableType == Response.class) {
//如果泛型參數(shù)是Response, 且Response沒有泛型參數(shù),拋出異常,Response必須是以泛型形式使用
if (!(observableType instanceof ParameterizedType)) {
throw new IllegalStateException(...);
}
//獲取Response<T>中的T
responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
} else if (rawObservableType == Result.class) {
//如果泛型參數(shù)是Result, 且Result沒有泛型參數(shù)扳抽, 拋出異常篡帕,Result必須是以泛型形式使用
if (!(observableType instanceof ParameterizedType)) {
throw new IllegalStateException(...);
}
//獲取Result<T>中的T
responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
isResult = true;
} else {
responseType = observableType;
isBody = true;
}
return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable,
isSingle, isMaybe, false);
}``
- 如果返回值類型是
Completable
, 直接返回一個RxJava2CallAdapter
指定其中的isBody = true, isCompletable = true
- 確保方法的返回值
Observable
,Flowable
,Single
,Maybe
, 否則返回null, 如果是null, 則根據(jù)之前的分析贸呢,如果CallAdapter.Factory.get()
返回null, 在Retrofit.nextCallAdapter
中會拋出異常(Retrofit
默認有一個ExecutorCallAdapterFactory
, 但如果方法返回值不是Call
類型镰烧,其get()
方法會直接返回null) - 如果返回值不是泛型,拋出異常
- 獲取返回值中的泛型參數(shù)的類型楞陷,假設(shè)返回值類型
Observable<Bean>
, 這里獲取到的泛型參數(shù)類型就是Bean
- 如果泛型參數(shù)的類型是
retrofit2.Response
或retrofit2.Result
怔鳖,確保Response
和Result
也是以泛型的形式聲明的,即返回值類型是Observable<Response<T>>
或Observable<Result<T>>
固蛾;Retrofit + RxJava2
最常見的的用法是Observable<Bean> getName(...)
, 一般都會直接使用自定義的Bean
類结执, 所以一般情況下度陆,isBody = true
- 返回
RxJava2CallAdapter
RxJava2CallAdapter.adapt
public Object adapt(Call<R> call) {
//call是OkHttpCall, 將R轉(zhuǎn)換為Response<R>
Observable<Response<R>> responseObservable = isAsync
? new CallEnqueueObservable<>(call)
: new CallExecuteObservable<>(call);
Observable<?> observable;
if (isResult) {
observable = new ResultObservable<>(responseObservable);
} else if (isBody) {
//將Response<T>轉(zhuǎn)換為T
observable = new BodyObservable<>(responseObservable);
} else {
observable = responseObservable;
}
if (scheduler != null) {
observable = observable.subscribeOn(scheduler);
}
if (isFlowable) {
return observable.toFlowable(BackpressureStrategy.LATEST);
}
if (isSingle) {
return observable.singleOrError();
}
if (isMaybe) {
return observable.singleElement();
}
if (isCompletable) {
return observable.ignoreElements();
}
return observable;
}
- 首先生成一個
Obsersavle<Response<R>>
, 這里會根據(jù)isAsync
是否為tru,來確定是生成CallEnqueueObservable
還是CallExecuteObservable
,這兩者的區(qū)別在于前者最終實際是調(diào)用OkHttpCall.enqueue
方法献幔,后者實際調(diào)用了OkHttpCall.execute
方法懂傀, 前者異步執(zhí)行,后者同步執(zhí)行 -
Retrofit + RxJava2
最常見的的用法是Observable<Bean> getName(...)
, 一般都會直接使用自定義的Bean
類蜡感, - 根據(jù)之前的分析可以知道蹬蚁,一般情況下,由于開發(fā)者都直接使用自己的Bean, 所以
isBody = true
郑兴, 會創(chuàng)建一個BodyObservable
- 之后會根據(jù)方法返回值是否是
Flowable, Single, Maybe, Completable
其中一種來做出具體的轉(zhuǎn)換
BodyObservable
final class BodyObservable<T> extends Observable<T> {
private final Observable<Response<T>> upstream;
BodyObservable(Observable<Response<T>> upstream) {
this.upstream = upstream;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//upstream是CallExecuteObservable或CallEnqueueObservable, BodyObserver是一個代理
//作用就是把Response<T>轉(zhuǎn)換為T
upstream.subscribe(new BodyObserver<T>(observer));
}
private static class BodyObserver<R> implements Observer<Response<R>> {
private final Observer<? super R> observer;
private boolean terminated;
BodyObserver(Observer<? super R> observer) {
this.observer = observer;
}
@Override
public void onSubscribe(Disposable disposable) {
observer.onSubscribe(disposable);
}
@Override
public void onNext(Response<R> response) {
if (response.isSuccessful()) {
observer.onNext(response.body());
} else {
...
} }
}
@Override
public void onComplete() {
if (!terminated) {
observer.onComplete();
}
}
@Override
public void onError(Throwable throwable) {
if (!terminated) {
observer.onError(throwable);
} else {
...
}
}
}
}
這里upstream
是CallExecutorObservable
或CallEnqueueObservable
犀斋,一般情況下,這里會是CallExecutorObservable
情连;在subscribeActual
中可以看到叽粹,在傳入的observer
外又包裝了一個BodyObserver
, 傳入的observer
就是開發(fā)者傳入的自定義Observer
CallExecutorObservable
final class CallExecuteObservable<T> extends Observable<Response<T>> {
private final Call<T> originalCall;
CallExecuteObservable(Call<T> originalCall) {
this.originalCall = originalCall;
}
@Override
protected void subscribeActual(Observer<? super Response<T>> observer) {
//originalCall是OkHttpCall
Call<T> call = originalCall.clone();
observer.onSubscribe(new CallDisposable(call));
boolean terminated = false;
try {
//call.execute會調(diào)用OkHttpCall.execute(), 從而得到okhttp3.Response
//再從okhttp3.Response中得到okhttp3.ResponseBody, 然后調(diào)用 Converter.convert轉(zhuǎn)換結(jié)果,最后將接過封裝為retrofit.Response
Response<T> response = call.execute();
if (!call.isCanceled()) {
//BodyObserver.onNext -> 自定義Onserver.onNext
observer.onNext(response);
}
if (!call.isCanceled()) {
terminated = true;
observer.onComplete();
}
} catch (Throwable t) {
...
}
}
private static final class CallDisposable implements Disposable {
private final Call<?> call;
CallDisposable(Call<?> call) {
this.call = call;
}
@Override
public void dispose() {
call.cancel();
}
@Override
public boolean isDisposed() {
return call.isCanceled();
}
}
}
originalCall
是OkHttpCall
, 可以看到在subscribeActual
中會調(diào)用call.execute
即調(diào)用OkHttpCall.execute
來從而得到okhttp3.Response, 再從okhttp3.Response中得到okhttp3.ResponseBody, 然后調(diào)用 Converter.convert
轉(zhuǎn)換結(jié)果蒙具,最后將結(jié)果封裝為retrofit.Response
,
subscribeActual
的參數(shù)observer
是BodyObserver
, 而從之前的分析中可以看到BodyObserver
的各方法會再調(diào)用開發(fā)人員自定義的Observer
的響應(yīng)方法
這里看似比較抽象的一系列操作球榆,是為了獲得一個統(tǒng)一的結(jié)果, 一般開發(fā)人員在定義請求方法時,都會使用Observable<Bean>
這種方式禁筏,即直接將Observable<T>
中的T設(shè)為自定義Bean, 但不同場景下肯定會定義不同的類型Bean, Retrofit不可能預(yù)先知道開發(fā)人員會定義那些Bean, 為了得到一個統(tǒng)一的結(jié)果持钉,先將Bean轉(zhuǎn)換為Reponse<T>
,這樣無論T是哪種類型,都可以得到一個統(tǒng)一的結(jié)果篱昔,之后再通過Response.body()
將Response轉(zhuǎn)換為自定義Bean