最終會形成一條被觀察者鏈子,每個被觀察者對象都有各自的線程Schedulers用來切換線程击困。
OkHttpCall封裝okhttp3相關(guān)的操作。
我們分5個步驟來分析流程:
第一步:
在retrofit.addCallApdapterFactory(RxJava2CallAdapterFactory.create())捂龄,會把RxJava2CallAdapterFactory加入retrofit中的變量private final List<CallAdapter.Factory> adapterFactories = new ArrayList<>();
中夕晓,后面要用到這個生成適配類RxJava2CallAdapter
這個類會調(diào)用adapt(Call<R> call)生成被觀察者Observable<T>的對象。
這是典型的橋接模式
第二步:
create函數(shù)中動態(tài)代理對象唱星,動態(tài)代理類都必須要實現(xiàn)InvocationHandler這個接口喊衫,當(dāng)我們通過代理對象調(diào)用一個方法的時候跌造,這個方法的調(diào)用就會被轉(zhuǎn)發(fā)為由InvocationHandler這個接口的 invoke 方法來進(jìn)行調(diào)用。我們來看看InvocationHandler這個接口的唯一一個方法 invoke 方法族购,即后面調(diào)用service.getSearchBook()時壳贪,會走到這個invoke的方法中。
public <T> T create(final Class<T> service) {
Utils.validateServiceInterface(service);
if (validateEagerly) {
eagerlyValidateMethods(service);
}
return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
new InvocationHandler() {
private final Platform platform = Platform.get();
@Override public Object invoke(Object proxy, Method method, @Nullable Object[] args)
throws Throwable {
// If the method is a method from Object then defer to normal invocation.
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this, args);
}
if (platform.isDefaultMethod(method)) {
return platform.invokeDefaultMethod(method, service, proxy, args);
}
ServiceMethod<Object, Object> serviceMethod =
(ServiceMethod<Object, Object>) loadServiceMethod(method);
OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);
return serviceMethod.callAdapter.adapt(okHttpCall);
}
});
}
ServiceMethod<?, ?> loadServiceMethod(Method method) {
ServiceMethod<?, ?> result = serviceMethodCache.get(method);
if (result != null) return result;
synchronized (serviceMethodCache) {
result = serviceMethodCache.get(method);
if (result == null) {
result = new ServiceMethod.Builder<>(this, method).build();
serviceMethodCache.put(method, result);
}
}
return result;
}
其中SerivceMethod.callAdapter.adapt(okHttpCall),生成流程:
ServiceMethod.Builder().build()中寝杖,調(diào)用retroift.callAdatper()生成callAdapter對象违施。
在retrofit中,callAdapter又調(diào)用nextCallAdapter,在這里面瑟幕,會從adapterFactories變量中取出RxJava2CallAdapterFactory對象磕蒲,調(diào)用get(),生成RxJava2CallAdapter.
最后serviceMethod.callAdapter.adapt(okHttpCall)只盹,會到RxJava2CallAdapter.adapt函數(shù)中亿卤。
在RxJava2CallAdapter函數(shù)中:
@Override public <R> Object adapt(Call<R> call) {
Observable<Response<R>> responseObservable = new CallObservable<>(call);
Observable<?> observable;
if (isResult) {
observable = new ResultObservable<>(responseObservable);
} else if (isBody) {
observable = new BodyObservable<>(responseObservable);
} else {
observable = responseObservable;
}
if (scheduler != null) {
observable = observable.subscribeOn(scheduler);
}
if (isFlowable) {
return observable.toFlowable(BackpressureStrategy.LATEST);
}
if (isSingle) {
return observable.singleOrError();
}
if (isMaybe) {
return observable.singleElement();
}
if (isCompletable) {
return observable.ignoreElements();
}
return observable;
}
會生成被觀察者BodyObservable<>(responseObservable);其中Observable<Response<R>> responseObservable = new CallObservable<>(call);即BodyObservable的變量Observable<Response<T>> upstream
這樣在第二步中生成的對象就是BodyObservable
第三步:生成被觀察者ObservableSubscribeOn:
observable.subscribeOn(Schedulers.io())
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
就是把BodyObservable和IoScheduler放到ObservableSubscribeOn涵但,
其中ObservableSource<T> source是BodyObservable
第四步:生成被觀察者ObservableObserveOn
observeOn(AndroidSchedulers.mainThread())
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
把ObservableSubscribeOn 和 HandlerScheduler
放入到被觀察者 ObservableObserveOn<T>飞蹂,其中ObservableSource<T> source是ObservableSubscribeOn對象拉队,形成一條被觀察者的責(zé)任鏈模式,對應(yīng)各個的Schedulers來實現(xiàn)線程之間的切換懦鼠,以及Disposable的接口钻哩,實現(xiàn)中斷等。
第五步:創(chuàng)建觀察者肛冶,通過訂閱者連接被觀察者街氢。
subscribe(observer);
這個observer是觀察者,通過subscribe就開始處理各個事件睦袖。