OkHttp Dispatcher的調(diào)度過程分析

Dispatcher是負(fù)責(zé)對okhttp所有的請求進(jìn)行調(diào)度管理的類惶楼「汛椋可以通過Dispatcher獲取劳曹,或者取消所有請求。這里指的一個請求就是對應(yīng)的Call蔬螟,并不是Request此迅,下面出現(xiàn)的所有的請求都是指Call。首先看下調(diào)度的整個流程圖旧巾。接著通過分析跟蹤okhttp發(fā)送請求的過程來分析Dispatcher是如何維護(hù)和調(diào)度我們發(fā)出的所有請求的耸序。

Dispatcher調(diào)度流程

Call其實就是對Request的封裝。

OkHttp請求方式

通過okhttp發(fā)送請求主要有兩種方式鲁猩。

  1. 通過execute()調(diào)用坎怪,此時request會被馬上發(fā)出, 直到返回response或者發(fā)生錯誤前會一直阻塞±眨可以理解為一個立即執(zhí)行的同步請求搅窿。
  2. 通過enqueue()調(diào)用,此時request將會在未來的某個時間點被執(zhí)行隙券,具體由dispatcher進(jìn)行調(diào)度戈钢,這種方式是異步返回結(jié)果的∈嵌可以理解為會被盡快執(zhí)行的一個異步請求殉了。

第一種方式

通過execute()調(diào)用,一般是這樣的

okHttpClient.newCall(request).execute();

通過OkHttpClient的代碼可以看出newCall()方法其實是new了一個RealCall拟枚,所以這里直接查看RealCallexecute()方法薪铜。

@Override public Call newCall(Request request) {
  return new RealCall(this, request, false /* for web socket */);
}

RealCallexecute()方法,這里只看下核心的代碼:

@Override public Response execute() throws IOException {
  synchronized (this) {
  //此處除去一些其他代碼
  //...
  try {
    //通知Dispatcher這個Call正在被執(zhí)行,同時將此Call交給Dispatcher
    //Dispatcher可以對此Call進(jìn)行管理
    client.dispatcher().executed(this);
    //請求的過程恩溅,注意這行代碼是阻塞的隔箍,直到返回result!
    Response result = getResponseWithInterceptorChain();
    if (result == null) throw new IOException("Canceled");
    return result;
  } finally {
    //此時這個請求已經(jīng)執(zhí)行完畢了脚乡,通知Dispatcher蜒滩,不要再維護(hù)這個Call了
    client.dispatcher().finished(this);
  }
}

首先注意這行代碼

client.dispatcher().executed(this);

它是調(diào)用的Dispatcher的executed()方法滨达,注意看方法名是executed并不是execute。接下來去Dispatcher里看下這個方法做了什么俯艰。

/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
  runningSyncCalls.add(call);
}

看注釋就明白了捡遍,這里他只是一個通知的作用,通知Dispatcher我這個call立即要被或者正在被執(zhí)行竹握,然后Dispatcher會把加入一個名為runningSyncCalls的雙端隊列中画株,這個隊列中存儲著所有的正在運(yùn)行的同步請求。這樣Dispatcher就可以很方便的對所有的同步請求進(jìn)行管理了啦辐。既然有添加谓传,那么也應(yīng)該有刪除,在請求執(zhí)行完畢時調(diào)用了這行代碼:

client.dispatcher().finished(this);

通過字面意思理解他應(yīng)該就是刪除的操作芹关,通知Dispatcher這個請求已經(jīng)被執(zhí)行完畢了续挟。這里暫時理解為調(diào)用finished方法就是將此call從runningSyncCalls中移除,后面會再討論finished方法的細(xì)節(jié)侥衬。

因為同步請求是被馬上執(zhí)行的庸推,所以Dispatcher能對同步請求進(jìn)行的調(diào)度也只有cancel了。具體可以通過調(diào)用Dispatcher.cancelAll()方法進(jìn)行取消浇冰。

所以真正執(zhí)行請求的只有這行代碼了。

Response result = getResponseWithInterceptorChain();

這個方法先不管他聋亡,就可以理解為這行代碼的執(zhí)行就是請求從發(fā)出到完成的過程肘习。在分析攔截器的實現(xiàn)原理的時候再來討論。

第二種方式

client.newCall(request).enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {
        
    }

    @Override
    public void onResponse(Call call, Response response) throws IOException {

    }
});

通過上面我們已經(jīng)知道這里調(diào)用的也是RealCall的enqueue方法坡倔,我們直接來看代碼:

@Override public void enqueue(Callback responseCallback) {
  synchronized (this) {
  //判斷是否已經(jīng)執(zhí)行過了
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  //捕獲調(diào)用棧的信息漂佩,用來分析連接泄露
  captureCallStackTrace();
  //封裝一個AsyncCall交給Dispatcher調(diào)度
  client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

通過上面的代碼可以看出調(diào)用enqueue()方法,其實是調(diào)用了Dispatcher的enqueue()方法罪塔,并且new了一個AsyncCall作為參數(shù)投蝉。AsyncCall為RealCall的一個內(nèi)部類,下面繼續(xù)看AsyncCall類里到底做了什么征堪。

final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;
    
    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }
    //...
    /**
    *真正執(zhí)行發(fā)出請求的地方瘩缆,為了看起來清晰,精簡了部分代碼
    */
    @Override protected void execute() {
      try {
        //請求的過程佃蚜,注意這里也是阻塞的
        Response response = getResponseWithInterceptorChain();
        //先不管這個Interceptor是干嘛的庸娱,下面的代碼可以理解為:
        //如果沒有被取消,并且沒有發(fā)生異常谐算,回調(diào)onResponse方法熟尉。
        //如果發(fā)生了異常或者被取消洲脂,回調(diào)onFailure方法斤儿。
        if (retryAndFollowUpInterceptor.isCanceled()) {
          //此請求被取消了,回調(diào)onFailure
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          //此請求成功了,回調(diào)onResponse
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
          //發(fā)生了異常往果,回調(diào)onFailure
          responseCallback.onFailure(RealCall.this, e);
      } finally {
        //通知Dispatcher Call被執(zhí)行完畢了
        client.dispatcher().finished(this);
      }
    }
  }

可以看到AsyncCall的execute()就是具體請求執(zhí)行的地方疆液,只不過和上面的RealCall的execute()方法相比,多了回調(diào)的處理棚放。retryAndFollowUpInterceptor其實是負(fù)責(zé)請求超時的重試和重定向操作的枚粘,retryAndFollowUpInterceptor.isCanceled()就是用來判斷這個請求是否被取消了,這里就不深入展開了飘蚯。那么AsyncCall的execute()方法是怎么被執(zhí)行的呢馍迄,繼續(xù)來看AsyncCall的父類NamedRunnable。

/**
 * Runnable implementation which always sets its thread name.
 */
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      //注意這里調(diào)用了execute方法
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

可以看到NamedRunnable其實就是一個實現(xiàn)了Runnable接口的抽象類局骤,并且在run方法中調(diào)用了execute()攀圈。也就是說AsyncCal其實就是一個Runnable,當(dāng)這個Runnable被調(diào)用的時候execute()方法自然會被調(diào)用峦甩∽咐矗看到這里就很清晰了,再回過頭來看RealCall的enqueue()中調(diào)用的這段代碼

//封裝一個AsyncCall交給Dispatcher調(diào)度
client.dispatcher().enqueue(new AsyncCall(responseCallback));

其實這里的new AsyncCall(responseCallback)就是new了一個封裝的Runnable對象凯傲,這個Runnable的執(zhí)行犬辰,就是整個請求的發(fā)起與回調(diào)的過程。好啦冰单,這里搞明白了其實調(diào)用Dispatcher().enqueue()方法傳遞過去的是一個Runnable對象幌缝,接下來就去Dispatcher中看下,他對這個Runnable做了什么诫欠。

synchronized void enqueue(AsyncCall call) {
  //判斷正在執(zhí)行的異步請求數(shù)沒有達(dá)到閾值涵卵,并且每一個Host的請求數(shù)也沒有達(dá)到閾值
  if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
    //加入到正在執(zhí)行隊列,并立即執(zhí)行
    runningAsyncCalls.add(call);
    executorService().execute(call);
  } else {
    //加入到等待隊列
    readyAsyncCalls.add(call);
  }
}

上面的代碼中又出現(xiàn)了兩個雙端隊列荒叼,runningAsyncCalls和readyAsyncCalls轿偎,加上上面出現(xiàn)的runningSyncCalls可以看到Dispatcher一共維護(hù)了3個請求隊列,分別是

  1. runningAsyncCalls被廓,正在請求的異步隊列
  2. readyAsyncCalls坏晦,準(zhǔn)備請求的異步隊列\(zhòng)等待請求的異步隊列
  3. runningSyncCalls,正在請求的同步隊列

還出現(xiàn)了一個方法executorService()嫁乘,接下來看下這個方法是干嘛的英遭。

private ExecutorService executorService;
public synchronized ExecutorService executorService() {
  if (executorService == null) {
    executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
  }
  return executorService;
}

可以看出來這個方法就是以懶漢的方式創(chuàng)建最大容量為 Integer.MAX_VALUE, 存活等待時間為60S的線程池(其實這里的最大容量并沒什么用,因為他的最大容量不會超過runningAsyncCalls的size亦渗,即設(shè)置的并發(fā)請求數(shù)的閾值)挖诸。executorService().execute(call)就是把這個請求丟進(jìn)去執(zhí)行。那么enqueue()方法執(zhí)行的過程大概就是法精,首先判斷當(dāng)前正在執(zhí)行的異步請求總數(shù)是否已經(jīng)達(dá)到的閾值(默認(rèn)為64)多律,針對每個host的同時請求數(shù)量是否達(dá)到了閾值(默認(rèn)為5)痴突。如果都沒有達(dá)到那么將這個請求加入到runningAsyncCalls隊列中,馬上執(zhí)行狼荞。

否則辽装,會將這個請求加入到readyAsyncCalls中,準(zhǔn)備執(zhí)行相味。那么readyAsyncCalls中的請求時何時被調(diào)用的呢拾积?掐指一算,應(yīng)該是在runningAsyncCalls中某些請求被執(zhí)行完畢時丰涉,不滿足上面的兩個條件自然會被調(diào)用拓巧。是不是呢?接下來看上面一直忽略的Dispatcher的三個finished方法:

/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
  //異步請求結(jié)束時調(diào)用此方法
  finished(runningAsyncCalls, call, true);
}

/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
  //同步請求結(jié)束時調(diào)用此方法
  finished(runningSyncCalls, call, false);
}
/**
*將執(zhí)行完畢的call從相應(yīng)的隊列移除
*/
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
  int runningCallsCount;
  Runnable idleCallback;
  synchronized (this) {
    //從相應(yīng)的隊列中移除相應(yīng)的call一死,如果不包含肛度,拋異常
    if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
    //是否需要提升Call的級別
    if (promoteCalls) promoteCalls();
    runningCallsCount = runningCallsCount();
    idleCallback = this.idleCallback;
  }
    //如果沒有任何需要執(zhí)行的請求,那么執(zhí)行idleCallBack
  if (runningCallsCount == 0 && idleCallback != null) {
    idleCallback.run();
  }
}

可以看出來不管是異步調(diào)用結(jié)束投慈,還是同步調(diào)用結(jié)束承耿,最終都是調(diào)用的這個被private修飾的finished方法,都會將完成的call從相應(yīng)的隊列中移除伪煤。唯一不同的是調(diào)用時傳遞的promoteCalls參數(shù)不同加袋,異步請求結(jié)束時傳入的是true,同步請求時結(jié)束傳入的是false抱既。并且會根據(jù)這個flag來判斷是否執(zhí)行promoteCalls()方法职烧,接下來看promoteCalls()里做了什么。

/**
*提升call的優(yōu)先級
*/
private void promoteCalls() {
  //runningAsyncCalls已經(jīng)滿了蝙砌,不能再加了
  if (runningAsyncCalls.size() >= maxRequests) return; 
  //沒有請求在readyAsyncCalls等著被執(zhí)行
  if (readyAsyncCalls.isEmpty()) return; 
  //遍歷準(zhǔn)備隊列里的請求
  for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
    AsyncCall call = i.next();
    //判斷該請求的host是否小于每個host最大請求閾值
    if (runningCallsForHost(call) < maxRequestsPerHost) {
      //將該請求從readyAsyncCalls移除,加入runningAsyncCalls并執(zhí)行
      i.remove();
      runningAsyncCalls.add(call);
      executorService().execute(call);
    }
    //如果runningAsyncCalls數(shù)量已經(jīng)達(dá)到閾值跋理,終止遍歷
    if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
  }
}

可以看出promoteCalls()方法就是試圖去readyAsyncCalls中取出Call來加入runningAsyncCalls中執(zhí)行择克。所以上面的兩個finished方法調(diào)用方式的區(qū)別也就明晰了。同步調(diào)用結(jié)束因為并沒有涉及到runningAsyncCalls中的任何東西前普,對runningAsyncCalls沒任何影響肚邢,所以不需要調(diào)用promoteCalls。而異步的調(diào)用結(jié)束意味著runningAsyncCalls中會出現(xiàn)一個空位值拭卿,所以它會調(diào)用promoteCalls去嘗試從readyAsyncCalls中拉一個進(jìn)來骡湖。

總結(jié)

好啦 到這里整個dispatcher的調(diào)度分析算是完成了【瘢總結(jié)起來其實他就是維護(hù)了三個隊列响蕴,三個隊列中包含了正在執(zhí)行或者將要執(zhí)行的所有請求』萏遥總結(jié)起來就是:

  1. 當(dāng)發(fā)送一個異步請求時:如果runningAsyncCalls沒達(dá)到閾值浦夷,那么會將這個請求加入到runningAsyncCalls立即執(zhí)行辖试,否則會將這個請求加入到readyAsyncCalls中等待執(zhí)行。當(dāng)一個異步請求執(zhí)行完畢時會試圖去執(zhí)行readyAsyncCalls中的請求劈狐。
  2. 當(dāng)發(fā)送一個同步請求時:該請求會直接加入到runningSyncCalls中罐孝,并且馬上開始執(zhí)行,注意這個執(zhí)行并不是由Dispatcher調(diào)度的肥缔。
  3. 所有異步執(zhí)行的請求都會通過executorService線程池來執(zhí)行莲兢,這是個懶漢方式創(chuàng)建的線程池。
    最后再看下上面的流程圖整個過程是不是都清晰了呢续膳!
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末改艇,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子姑宽,更是在濱河造成了極大的恐慌遣耍,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件炮车,死亡現(xiàn)場離奇詭異舵变,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)瘦穆,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進(jìn)店門纪隙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人扛或,你說我怎么就攤上這事绵咱。” “怎么了熙兔?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵悲伶,是天一觀的道長。 經(jīng)常有香客問我住涉,道長麸锉,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任舆声,我火速辦了婚禮花沉,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘媳握。我一直安慰自己碱屁,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布蛾找。 她就那樣靜靜地躺著娩脾,像睡著了一般。 火紅的嫁衣襯著肌膚如雪打毛。 梳的紋絲不亂的頭發(fā)上晦雨,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天架曹,我揣著相機(jī)與錄音,去河邊找鬼闹瞧。 笑死绑雄,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的奥邮。 我是一名探鬼主播万牺,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼洽腺!你這毒婦竟也來了脚粟?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤蘸朋,失蹤者是張志新(化名)和其女友劉穎核无,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體藕坯,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡团南,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了炼彪。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片吐根。...
    茶點故事閱讀 39,981評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖辐马,靈堂內(nèi)的尸體忽然破棺而出拷橘,到底是詐尸還是另有隱情,我是刑警寧澤喜爷,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布冗疮,位于F島的核電站,受9級特大地震影響檩帐,放射性物質(zhì)發(fā)生泄漏术幔。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一轿塔、第九天 我趴在偏房一處隱蔽的房頂上張望特愿。 院中可真熱鬧仲墨,春花似錦勾缭、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至癌蚁,卻和暖如春幻梯,著一層夾襖步出監(jiān)牢的瞬間兜畸,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工碘梢, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留咬摇,地道東北人。 一個月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓煞躬,卻偏偏與公主長得像肛鹏,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子恩沛,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,933評論 2 355

推薦閱讀更多精彩內(nèi)容