Retrofit 2.1 + Rxjava 源碼解析(一)

1.創(chuàng)建Retrofit對象


OkHttpClient.Builder okHttpClient = new OkHttpClient.Builder();
retrofit = new Retrofit.Builder()
                .client(okHttpClient.build())
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .baseUrl(base_url)
                .build();

這里是普通的 Retrofit 對象創(chuàng)建過程,傳入一些必要的參數(shù):okHttpClient旁瘫,converterFactorycallAdapterFactory(不搭配 Rxjava 的時候使用 Retrofit 默認的 callAdapterFactory拂玻,什么都不做)地粪,baseUrl 取募。

這里特別要注意的是傳入了 RxJavaCallAdapterFactory.create() 這個RxjavaCallAdapter 對象,這個對象將徹底改變 Retrofit 的使用蟆技。使得 Retrofit 搭配 Rxjava 變成可能玩敏,不得不佩服 Retrofit 作者的編程功底,開放 CallAdapterFactory 這個接口质礼,使 Retrofit 的靈活性更高旺聚。

2.創(chuàng)建接口的動態(tài)代理對象

給出實驗的接口

public interface NetApiService {

    //post請求
    @FormUrlEncoded
    @POST("{url}")
    Observable<ResponseBody> executePost(
            @Path("url") String url,
            @Field("params") String params,
            @Field("signature") String signature
    );

}
netApiService = retrofit.create(NetApiService .class);  //返回一個動態(tài)代理對象

這里也是 Retrofit 神奇的地方,傳入一個接口眶蕉,就可以生成實現(xiàn)了這個接口的對象砰粹,當然這個只是 Java 代碼生成的動態(tài)代理對象。下面我們進
create() 方法看看造挽。

public <T> T create(final Class<T> service) {
    Utils.validateServiceInterface(service);  //驗證外部傳進的“服務”接口是否合法
    if (validateEagerly) {
      eagerlyValidateMethods(service);  //根據(jù)validateEagerly判斷是否對接口中的全部方法進行緩存
    }
    //使用Proxy工廠類返回一個泛型動態(tài)代理實例碱璃。
    return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
        new InvocationHandler() {
          private final Platform platform = Platform.get();

          @Override public Object invoke(Object proxy, Method method, Object... args)
              throws Throwable {
            // If the method is a method from Object then defer to normal invocation.
            if (method.getDeclaringClass() == Object.class) {
              return method.invoke(this, args);
            }
            if (platform.isDefaultMethod(method)) {
              return platform.invokeDefaultMethod(method, service, proxy, args);
            }
            ServiceMethod serviceMethod = loadServiceMethod(method);
            OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
            return serviceMethod.callAdapter.adapt(okHttpCall);
          }
        });
  }

在這里我們首先對傳入的接口進行檢驗是否是接口,然后根據(jù) validateEagerly 判斷是否對接口中的全部方法進行緩存饭入,最后我們用 java.lang.reflect.Proxy; 創(chuàng)建一個泛型的動態(tài)代理對象嵌器,返回這個對象。(不懂 java動態(tài)代理技術 的同學別著急谐丢,我會在文末給出參考資料)

3.創(chuàng)建Observable

Observable<ResponseBody> observable = netApiService.executePost(url, params, signature);

調(diào)用動態(tài)代理對象的接口方法爽航,這時候會調(diào)用

new InvocationHandler() {
          private final Platform platform = Platform.get();

          @Override public Object invoke(Object proxy, Method method, Object... args)
              throws Throwable {
            // If the method is a method from Object then defer to normal invocation.
            if (method.getDeclaringClass() == Object.class) {
              return method.invoke(this, args);
            }
            if (platform.isDefaultMethod(method)) {
              return platform.invokeDefaultMethod(method, service, proxy, args);
            }
            ServiceMethod serviceMethod = loadServiceMethod(method);
            OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
            return serviceMethod.callAdapter.adapt(okHttpCall);
          }
        }

InvocationHandlerinvoke() 方法,在這里我們可以看到有三個參數(shù):
proxy 表示通過 Proxy.newProxyInstance() 生成的代理類對象乾忱。
method 表示代理對象被調(diào)用的函數(shù)岳掐。
args 表示代理對象被調(diào)用的函數(shù)的參數(shù)。
調(diào)用代理對象的每個函數(shù)實際最終都是調(diào)用了 InvocationHandlerinvoke 函數(shù)饭耳。

由于這個是接口的方法串述,所以不會進第一個 if ,因為也不是默認方法寞肖,所以也不會進第二個 if 纲酗。這樣就可以看到我們的代理對象在調(diào)用了接口的方法后實際上是 new 了一個 okHttpCall<> 對象,然后將這個對象作為參數(shù)傳進了 callAdapter.adapt(); 方法中新蟆。

由于我們之前傳入的是 RxJavaCallAdapterFactory.create() 觅赊,所以我們深入 RxJavaCallAdapterFactory.java 看看構造 Observable 的方法,在adapt()可以看到:

static final class ResponseCallAdapter implements CallAdapter<Observable<?>> {
    private final Type responseType;
    private final Scheduler scheduler;

    ResponseCallAdapter(Type responseType, Scheduler scheduler) {
      this.responseType = responseType;
      this.scheduler = scheduler;
    }

    @Override public Type responseType() {
      return responseType;
    }

    @Override public <R> Observable<Response<R>> adapt(Call<R> call) {
      Observable<Response<R>> observable = Observable.create(new CallOnSubscribe<>(call));
      if (scheduler != null) {
        return observable.subscribeOn(scheduler);
      }
      return observable;
    }
  }

在這里我們看到琼稻,這里將傳入的 okHttp 對象 作為參數(shù)吮螺,構造了 CallOnSubscribe 對象
CallOnSubscribe 是何方神圣?鸠补?萝风?按照 Rxjava 構造 Observable 方法來說,這個 CallOnSubscribe 應該是一個實現(xiàn)了Observable.OnSubscribe<T> 接口的對象紫岩。

我們看看源碼规惰,果然如此。

static final class CallOnSubscribe<T> implements Observable.OnSubscribe<Response<T>> {
    private final Call<T> originalCall;

    CallOnSubscribe(Call<T> originalCall) {
      this.originalCall = originalCall;
    }

    @Override public void call(final Subscriber<? super Response<T>> subscriber) {
      // Since Call is a one-shot type, clone it for each new subscriber.
      Call<T> call = originalCall.clone();

      // Wrap the call in a helper which handles both unsubscription and backpressure.
      RequestArbiter<T> requestArbiter = new RequestArbiter<>(call, subscriber);
      subscriber.add(requestArbiter);
      subscriber.setProducer(requestArbiter);
    }
  }

看到這里泉蝌,相信你大概也懂了為什么調(diào)用生成的動態(tài)代理對象的接口方法不像只使用 Retrofit 那樣返回一個 okHttpCall<> 對象歇万,而是返回一個 Observable<ResponseBody> 對象。其實這就是
RxJavaCallAdapterFactory 做的轉換勋陪。

仔細看 CallOnSubscribe<T>call() 方法贪磺,我們發(fā)現(xiàn)這里的 subscriber (其實是調(diào)用 subscribe() 方法時傳進來的 subscriber ,就是外部的觀察者)添加了一個 requestArbiter 對象诅愚。這個對象很重要寒锚,在 subscriber.setProducer(requestArbiter); 時,它會控制
okHttpCall對象 直接聯(lián)網(wǎng)獲取數(shù)據(jù)呻粹,然后回調(diào)給觀察者 subscriber 壕曼。

4.observable.subscribe(subscriber);訂閱

這里的代碼不多苏研,就一行 observable.subscribe(subscriber); 等浊。我們仔細看看在 subscribe 方法里面發(fā)生了什么神奇的事?(提前劇透一下摹蘑,Observable.OnSubscribe<T>對象 很棒筹燕,它就相當于橋梁,將 Observable 和 Observer 連接起來)

 public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }
    
    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     // validate and proceed
    //判斷傳進來的參數(shù)衅鹿,即是觀察者對象撒踪,被觀察者對象,是否為空大渤。
        if (subscriber == null) {
            throw new IllegalArgumentException("observer can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /*
             * the subscribe function can also be overridden but generally that's not the appropriate approach
             * so I won't mention that in the exception
             */
        }
        
        // new Subscriber so onStart it
       //重要的操作制妄,可以在訂閱之前做一些準備工作
        subscriber.onStart();
        
        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would 
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // in case the subscriber can't listen to exceptions anymore
            if (subscriber.isUnsubscribed()) {
                RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
            } else {
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(hook.onSubscribeError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    hook.onSubscribeError(r);
                    // TODO why aren't we throwing the hook's return value.
                    throw r;
                }
            }
            return Subscriptions.unsubscribed();
        }
    }

在這段代碼中,我們看到 subscribe() 過程中泵三,會先調(diào)用 onStart() 耕捞,一般這個方法在 subscribe 剛開始,而事件還未發(fā)送之前被調(diào)用烫幕,可以用于做一些準備工作俺抽,例如數(shù)據(jù)的清零或重置。這是一個可選方法较曼,默認情況下它的實現(xiàn)為空磷斧。需要注意的是,如果對準備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執(zhí)行)弛饭, onStart()就不適用了(因為它總是在 subscribe 所發(fā)生的線程被調(diào)用冕末,而不能指定線程。要在指定的線程來做準備工作孩哑,可以使用 doOnSubscribe()
方法)(先記住這個知識點栓霜,請保留,現(xiàn)在先不管線程切換)

高能來了:嵫选8炻!
我們重點看看這句代碼:

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
 public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
        // pass through by default
        return onSubscribe;
    }

其實 onSubscribeStart() 方法直接返回了 onSubscribe 對象丛晌,然后直接調(diào)用 onSubscribecall(subscriber) 方法仅炊。記得我們剛剛分析,這里其實是調(diào)用了 CallOnSubscribe<T>對象call() 方法澎蛛。也就是在這里進行了聯(lián)網(wǎng)獲取數(shù)據(jù)抚垄,然后回調(diào) Subscriber 觀察者的方法。(具體的代碼就是 call() 方法的subscriber.setProducer(requestArbiter);)谋逻。

public void setProducer(Producer p) {
        long toRequest;
        boolean passToSubscriber = false;
        synchronized (this) {
            toRequest = requested;
            producer = p;
            if (subscriber != null) {
                // middle operator ... we pass through unless a request has been made
                if (toRequest == NOT_SET) {
                    // we pass through to the next producer as nothing has been requested
                    passToSubscriber = true;
                }
            }
        }
        // do after releasing lock
        if (passToSubscriber) {
            subscriber.setProducer(producer);
        } else {
            // we execute the request with whatever has been requested (or Long.MAX_VALUE)
            if (toRequest == NOT_SET) {
                producer.request(Long.MAX_VALUE);
            } else {
                producer.request(toRequest);
            }
        }
    }

最后會調(diào)用 producer.request(toRequest); 方法呆馁。
這個 request() 方法,就是 RequestArbiter<T> requestArbiter = new RequestArbiter<>(call, subscriber);request() 毁兆。

static final class RequestArbiter<T> extends AtomicBoolean implements Subscription, Producer {
    private final Call<T> call;
    private final Subscriber<? super Response<T>> subscriber;

    RequestArbiter(Call<T> call, Subscriber<? super Response<T>> subscriber) {
      this.call = call;
      this.subscriber = subscriber;
    }

    @Override public void request(long n) {
      if (n < 0) throw new IllegalArgumentException("n < 0: " + n);
      if (n == 0) return; // Nothing to do when requesting 0.
      if (!compareAndSet(false, true)) return; // Request was already triggered.

      try {
        Response<T> response = call.execute();
        if (!subscriber.isUnsubscribed()) {
          subscriber.onNext(response);
        }
      } catch (Throwable t) {
        Exceptions.throwIfFatal(t);
        if (!subscriber.isUnsubscribed()) {
          subscriber.onError(t);
        }
        return;
      }

      if (!subscriber.isUnsubscribed()) {
        subscriber.onCompleted();
      }
    }

    @Override public void unsubscribe() {
      call.cancel();
    }

    @Override public boolean isUnsubscribed() {
      return call.isCanceled();
    }
  }

Response<T> response = call.execute(); 這里進行了網(wǎng)絡請求浙滤;
if (!subscriber.isUnsubscribed()) { subscriber.onNext(response); } 這里進行回調(diào)。

接下來的 onError()onCompleted() 方法的回調(diào)一樣的气堕,就不分析了纺腊。

至此,我們就完整地了解了 Retrofit + Rxjava 中從創(chuàng)建 Observable 和 Observer 到 Observable 訂閱 Observer 的流程茎芭,以及中間隱藏的聯(lián)網(wǎng)和回調(diào)的過程揖膜。

參考資料

Retrofit 2.1 源碼分析
Java 動態(tài)代理技術
Rxjava 源碼分析一

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市梅桩,隨后出現(xiàn)的幾起案子壹粟,更是在濱河造成了極大的恐慌,老刑警劉巖宿百,帶你破解...
    沈念sama閱讀 211,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件趁仙,死亡現(xiàn)場離奇詭異,居然都是意外死亡犀呼,警方通過查閱死者的電腦和手機幸撕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來外臂,“玉大人坐儿,你說我怎么就攤上這事。” “怎么了貌矿?”我有些...
    開封第一講書人閱讀 157,435評論 0 348
  • 文/不壞的土叔 我叫張陵炭菌,是天一觀的道長。 經(jīng)常有香客問我逛漫,道長黑低,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,509評論 1 284
  • 正文 為了忘掉前任酌毡,我火速辦了婚禮克握,結果婚禮上,老公的妹妹穿的比我還像新娘枷踏。我一直安慰自己菩暗,他們只是感情好,可當我...
    茶點故事閱讀 65,611評論 6 386
  • 文/花漫 我一把揭開白布旭蠕。 她就那樣靜靜地躺著停团,像睡著了一般。 火紅的嫁衣襯著肌膚如雪掏熬。 梳的紋絲不亂的頭發(fā)上佑稠,一...
    開封第一講書人閱讀 49,837評論 1 290
  • 那天,我揣著相機與錄音旗芬,去河邊找鬼舌胶。 笑死,一個胖子當著我的面吹牛岗屏,可吹牛的內(nèi)容都是我干的辆琅。 我是一名探鬼主播漱办,決...
    沈念sama閱讀 38,987評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼这刷,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了娩井?” 一聲冷哼從身側響起暇屋,我...
    開封第一講書人閱讀 37,730評論 0 267
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎洞辣,沒想到半個月后咐刨,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,194評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡扬霜,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,525評論 2 327
  • 正文 我和宋清朗相戀三年定鸟,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片著瓶。...
    茶點故事閱讀 38,664評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡联予,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情沸久,我是刑警寧澤季眷,帶...
    沈念sama閱讀 34,334評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站卷胯,受9級特大地震影響子刮,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜窑睁,卻給世界環(huán)境...
    茶點故事閱讀 39,944評論 3 313
  • 文/蒙蒙 一挺峡、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧担钮,春花似錦沙郭、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,764評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至鲤嫡,卻和暖如春送挑,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背暖眼。 一陣腳步聲響...
    開封第一講書人閱讀 31,997評論 1 266
  • 我被黑心中介騙來泰國打工惕耕, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人诫肠。 一個月前我還...
    沈念sama閱讀 46,389評論 2 360
  • 正文 我出身青樓司澎,卻偏偏與公主長得像,于是被迫代替她去往敵國和親栋豫。 傳聞我的和親對象是個殘疾皇子挤安,可洞房花燭夜當晚...
    茶點故事閱讀 43,554評論 2 349

推薦閱讀更多精彩內(nèi)容