1.創(chuàng)建Retrofit對象
OkHttpClient.Builder okHttpClient = new OkHttpClient.Builder();
retrofit = new Retrofit.Builder()
.client(okHttpClient.build())
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.baseUrl(base_url)
.build();
這里是普通的 Retrofit 對象創(chuàng)建過程,傳入一些必要的參數(shù):okHttpClient
旁瘫,converterFactory
,callAdapterFactory
(不搭配 Rxjava 的時候使用 Retrofit 默認的 callAdapterFactory
拂玻,什么都不做)地粪,baseUrl 取募。
這里特別要注意的是傳入了 RxJavaCallAdapterFactory.create()
這個RxjavaCallAdapter 對象
,這個對象將徹底改變 Retrofit 的使用蟆技。使得 Retrofit 搭配 Rxjava 變成可能玩敏,不得不佩服 Retrofit 作者的編程功底,開放 CallAdapterFactory
這個接口质礼,使 Retrofit 的靈活性更高旺聚。
2.創(chuàng)建接口的動態(tài)代理對象
給出實驗的接口
public interface NetApiService {
//post請求
@FormUrlEncoded
@POST("{url}")
Observable<ResponseBody> executePost(
@Path("url") String url,
@Field("params") String params,
@Field("signature") String signature
);
}
netApiService = retrofit.create(NetApiService .class); //返回一個動態(tài)代理對象
這里也是 Retrofit 神奇的地方,傳入一個接口眶蕉,就可以生成實現(xiàn)了這個接口的對象砰粹,當然這個只是 Java 代碼生成的動態(tài)代理對象。下面我們進
create()
方法看看造挽。
public <T> T create(final Class<T> service) {
Utils.validateServiceInterface(service); //驗證外部傳進的“服務”接口是否合法
if (validateEagerly) {
eagerlyValidateMethods(service); //根據(jù)validateEagerly判斷是否對接口中的全部方法進行緩存
}
//使用Proxy工廠類返回一個泛型動態(tài)代理實例碱璃。
return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
new InvocationHandler() {
private final Platform platform = Platform.get();
@Override public Object invoke(Object proxy, Method method, Object... args)
throws Throwable {
// If the method is a method from Object then defer to normal invocation.
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this, args);
}
if (platform.isDefaultMethod(method)) {
return platform.invokeDefaultMethod(method, service, proxy, args);
}
ServiceMethod serviceMethod = loadServiceMethod(method);
OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
return serviceMethod.callAdapter.adapt(okHttpCall);
}
});
}
在這里我們首先對傳入的接口進行檢驗是否是接口,然后根據(jù) validateEagerly
判斷是否對接口中的全部方法進行緩存饭入,最后我們用 java.lang.reflect.Proxy;
創(chuàng)建一個泛型的動態(tài)代理對象嵌器,返回這個對象。(不懂 java動態(tài)代理技術 的同學別著急谐丢,我會在文末給出參考資料)
3.創(chuàng)建Observable
Observable<ResponseBody> observable = netApiService.executePost(url, params, signature);
調(diào)用動態(tài)代理對象的接口方法爽航,這時候會調(diào)用
new InvocationHandler() {
private final Platform platform = Platform.get();
@Override public Object invoke(Object proxy, Method method, Object... args)
throws Throwable {
// If the method is a method from Object then defer to normal invocation.
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this, args);
}
if (platform.isDefaultMethod(method)) {
return platform.invokeDefaultMethod(method, service, proxy, args);
}
ServiceMethod serviceMethod = loadServiceMethod(method);
OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
return serviceMethod.callAdapter.adapt(okHttpCall);
}
}
InvocationHandler
的 invoke()
方法,在這里我們可以看到有三個參數(shù):
proxy
表示通過 Proxy.newProxyInstance()
生成的代理類對象乾忱。
method
表示代理對象被調(diào)用的函數(shù)岳掐。
args
表示代理對象被調(diào)用的函數(shù)的參數(shù)。
調(diào)用代理對象的每個函數(shù)實際最終都是調(diào)用了 InvocationHandler
的 invoke
函數(shù)饭耳。
由于這個是接口的方法串述,所以不會進第一個 if
,因為也不是默認方法寞肖,所以也不會進第二個 if
纲酗。這樣就可以看到我們的代理對象在調(diào)用了接口的方法后實際上是 new 了一個 okHttpCall<>
對象,然后將這個對象作為參數(shù)傳進了 callAdapter.adapt();
方法中新蟆。
由于我們之前傳入的是 RxJavaCallAdapterFactory.create()
觅赊,所以我們深入 RxJavaCallAdapterFactory.java
看看構造 Observable
的方法,在adapt()
可以看到:
static final class ResponseCallAdapter implements CallAdapter<Observable<?>> {
private final Type responseType;
private final Scheduler scheduler;
ResponseCallAdapter(Type responseType, Scheduler scheduler) {
this.responseType = responseType;
this.scheduler = scheduler;
}
@Override public Type responseType() {
return responseType;
}
@Override public <R> Observable<Response<R>> adapt(Call<R> call) {
Observable<Response<R>> observable = Observable.create(new CallOnSubscribe<>(call));
if (scheduler != null) {
return observable.subscribeOn(scheduler);
}
return observable;
}
}
在這里我們看到琼稻,這里將傳入的 okHttp 對象
作為參數(shù)吮螺,構造了 CallOnSubscribe 對象
。
CallOnSubscribe
是何方神圣?鸠补?萝风?按照 Rxjava
構造 Observable
方法來說,這個 CallOnSubscribe
應該是一個實現(xiàn)了Observable.OnSubscribe<T>
接口的對象紫岩。
我們看看源碼规惰,果然如此。
static final class CallOnSubscribe<T> implements Observable.OnSubscribe<Response<T>> {
private final Call<T> originalCall;
CallOnSubscribe(Call<T> originalCall) {
this.originalCall = originalCall;
}
@Override public void call(final Subscriber<? super Response<T>> subscriber) {
// Since Call is a one-shot type, clone it for each new subscriber.
Call<T> call = originalCall.clone();
// Wrap the call in a helper which handles both unsubscription and backpressure.
RequestArbiter<T> requestArbiter = new RequestArbiter<>(call, subscriber);
subscriber.add(requestArbiter);
subscriber.setProducer(requestArbiter);
}
}
看到這里泉蝌,相信你大概也懂了為什么調(diào)用生成的動態(tài)代理對象的接口方法不像只使用 Retrofit
那樣返回一個 okHttpCall<> 對象
歇万,而是返回一個 Observable<ResponseBody>
對象。其實這就是
RxJavaCallAdapterFactory
做的轉換勋陪。
仔細看 CallOnSubscribe<T>
的 call()
方法贪磺,我們發(fā)現(xiàn)這里的 subscriber
(其實是調(diào)用 subscribe()
方法時傳進來的 subscriber
,就是外部的觀察者)添加了一個 requestArbiter
對象诅愚。這個對象很重要寒锚,在 subscriber.setProducer(requestArbiter);
時,它會控制
okHttpCall對象
直接聯(lián)網(wǎng)獲取數(shù)據(jù)呻粹,然后回調(diào)給觀察者 subscriber
壕曼。
4.observable.subscribe(subscriber);訂閱
這里的代碼不多苏研,就一行 observable.subscribe(subscriber);
等浊。我們仔細看看在 subscribe
方法里面發(fā)生了什么神奇的事?(提前劇透一下摹蘑,Observable.OnSubscribe<T>對象
很棒筹燕,它就相當于橋梁,將 Observable 和 Observer 連接起來)
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
//判斷傳進來的參數(shù)衅鹿,即是觀察者對象撒踪,被觀察者對象,是否為空大渤。
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
}
// new Subscriber so onStart it
//重要的操作制妄,可以在訂閱之前做一些準備工作
subscriber.onStart();
/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// in case the subscriber can't listen to exceptions anymore
if (subscriber.isUnsubscribed()) {
RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
} else {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
}
return Subscriptions.unsubscribed();
}
}
在這段代碼中,我們看到 subscribe()
過程中泵三,會先調(diào)用 onStart()
耕捞,一般這個方法在 subscribe 剛開始,而事件還未發(fā)送之前被調(diào)用烫幕,可以用于做一些準備工作俺抽,例如數(shù)據(jù)的清零或重置。這是一個可選方法较曼,默認情況下它的實現(xiàn)為空磷斧。需要注意的是,如果對準備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執(zhí)行)弛饭, onStart()
就不適用了(因為它總是在 subscribe 所發(fā)生的線程被調(diào)用冕末,而不能指定線程。要在指定的線程來做準備工作孩哑,可以使用 doOnSubscribe()
方法)(先記住這個知識點栓霜,請保留,現(xiàn)在先不管線程切換)
高能來了:嵫选8炻!
我們重點看看這句代碼:
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
// pass through by default
return onSubscribe;
}
其實 onSubscribeStart()
方法直接返回了 onSubscribe
對象丛晌,然后直接調(diào)用 onSubscribe
的 call(subscriber)
方法仅炊。記得我們剛剛分析,這里其實是調(diào)用了 CallOnSubscribe<T>對象
的 call()
方法澎蛛。也就是在這里進行了聯(lián)網(wǎng)獲取數(shù)據(jù)抚垄,然后回調(diào) Subscriber
觀察者的方法。(具體的代碼就是 call()
方法的subscriber.setProducer(requestArbiter);
)谋逻。
public void setProducer(Producer p) {
long toRequest;
boolean passToSubscriber = false;
synchronized (this) {
toRequest = requested;
producer = p;
if (subscriber != null) {
// middle operator ... we pass through unless a request has been made
if (toRequest == NOT_SET) {
// we pass through to the next producer as nothing has been requested
passToSubscriber = true;
}
}
}
// do after releasing lock
if (passToSubscriber) {
subscriber.setProducer(producer);
} else {
// we execute the request with whatever has been requested (or Long.MAX_VALUE)
if (toRequest == NOT_SET) {
producer.request(Long.MAX_VALUE);
} else {
producer.request(toRequest);
}
}
}
最后會調(diào)用 producer.request(toRequest);
方法呆馁。
這個 request()
方法,就是 RequestArbiter<T> requestArbiter = new RequestArbiter<>(call, subscriber);
的 request()
毁兆。
static final class RequestArbiter<T> extends AtomicBoolean implements Subscription, Producer {
private final Call<T> call;
private final Subscriber<? super Response<T>> subscriber;
RequestArbiter(Call<T> call, Subscriber<? super Response<T>> subscriber) {
this.call = call;
this.subscriber = subscriber;
}
@Override public void request(long n) {
if (n < 0) throw new IllegalArgumentException("n < 0: " + n);
if (n == 0) return; // Nothing to do when requesting 0.
if (!compareAndSet(false, true)) return; // Request was already triggered.
try {
Response<T> response = call.execute();
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(response);
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
if (!subscriber.isUnsubscribed()) {
subscriber.onError(t);
}
return;
}
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
@Override public void unsubscribe() {
call.cancel();
}
@Override public boolean isUnsubscribed() {
return call.isCanceled();
}
}
Response<T> response = call.execute();
這里進行了網(wǎng)絡請求浙滤;
if (!subscriber.isUnsubscribed()) { subscriber.onNext(response); }
這里進行回調(diào)。
接下來的 onError()
和 onCompleted()
方法的回調(diào)一樣的气堕,就不分析了纺腊。
至此,我們就完整地了解了 Retrofit + Rxjava 中從創(chuàng)建 Observable 和 Observer 到 Observable 訂閱 Observer 的流程茎芭,以及中間隱藏的聯(lián)網(wǎng)和回調(diào)的過程揖膜。