拆輪子:OkHttp 的源碼解析(三):任務分發(fā)器(Dispatcher)

OkHttp3.7源碼分析文章列表如下:
拆輪子:OkHttp 的源碼解析(一):概述
拆輪子:OkHttp 的源碼解析(二):流程分析
拆輪子:OkHttp 的源碼解析(三):任務分發(fā)器(Dispatcher)
拆輪子:OkHttp 的源碼解析(四):攔截器(Interceptor)

從上篇文章我們可以看到 OkHttp 的同步和異步都使用了 Dispatcher 登馒,它的主要作用就是一個任務隊列。

我們都聽過 OkHttp 的一個高效之處在于在內部維護了一個線程池,方便高效地執(zhí)行異步請求框冀,這個線程池就在 Dispatcher 類里面苫费。

Dispatcher 類去掉注解只有一百多行挟伙,建議自己看下并不是很難,我們來分析:

1脐雪、Dispatcher 的成員變量

public final class Dispatcher {
  /** 最大并發(fā)請求數為64 */
  private int maxRequests = 64;
  /** 每個主機最大請求數為5 */
  private int maxRequestsPerHost = 5;

  /** 線程池 */
  private ExecutorService executorService;

  /** 準備執(zhí)行的異步請求 */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** 正在執(zhí)行的異步請求,包含已經取消但未執(zhí)行完的請求 */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** 正在執(zhí)行的同步請求恢共,包含已經取消單未執(zhí)行完的請求 */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
  ······
}

問:為什么要使用2個異步請求隊列呢战秋?

一個是正準備執(zhí)行的:readyAsyncCalls ,用來做一個緩沖使用讨韭;另外一個是正在執(zhí)行的:runningAsyncCalls 脂信。這里其實是一個生產者-消費者模型,如下圖所示:

生產者-消費者模型.png
  • Dispatcher: 生產者(默認在主線程)
  • AsyncCall: 隊列中需要處理的Runnable(包裝了異步回調接口)
  • ExecutorService:消費者池(也就是線程池)
  • Deque<readyAsyncCalls>:緩存(用數組實現透硝,可自動擴容狰闪,無大小限制)
  • Deque<runningAsyncCalls>:正在運行的任務,僅僅是用來引用正在運行的任務以判斷并發(fā)量濒生,注意它并不是消費者緩存

根據生產者消費者模型的模型理論埋泵,當入隊(enqueue)請求時,如果滿足條件,那么就直接把AsyncCall直接加到runningCalls的隊列中丽声,并在線程池中執(zhí)行礁蔗。如果消費者緩存滿了,就放入readyAsyncCalls進行緩存等待雁社。

當任務執(zhí)行完成后,調用finished的promoteCalls()函數浴井,手動移動緩存區(qū)(可以看出這里是主動清理的,因此不會發(fā)生死鎖)歧胁。

2滋饲、Dispatcher 的線程池

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

根據上面的源碼,OkHttp 使用的是單例的線程池喊巍,有些朋友對線程池不太了解屠缭,解釋下幾個參數的意思:

  • 0(corePoolSize):核心線程池的數量為 0,空閑一段時間后所有線程將全部被銷毀崭参。
  • Integer.MAX_VALUE(maximumPoolSize): 最大線程數呵曹,當任務進來時可以擴充的線程最大值,相當于無限大何暮。
  • 60(keepAliveTime): 當線程數大于corePoolSize時奄喂,多余的空閑線程的最大存活時間。
  • TimeUnit.SECONDS:存活時間的單位是秒海洼。
  • new SynchronousQueue<Runnable>():工作隊列,先進先出跨新。
  • Util.threadFactory("OkHttp Dispatcher", false):單個線程的工廠 。

也就是說坏逢,在實際運行中域帐,當收到10個并發(fā)請求時,線程池會創(chuàng)建十個線程是整,當工作完成后肖揣,線程池會在60s后相繼關閉所有線程。

3浮入、同步調用

  @Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
      client.dispatcher().finished(this);
    }
  }

dispatcher().executed(this) 方法的源碼很簡單:

  /** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

這里主要有4點:

  • 檢查這個 call 是否已經被執(zhí)行了龙优,每個 call 只能被執(zhí)行一次,如果想要一個完全一樣的 call事秀,可以利用 call#clone 方法進行克隆彤断。
  • 利用 client.dispatcher().executed(this) 來進行實際執(zhí)行,將請求的 call 插入到同步隊列中秽晚。
  • 調用 getResponseWithInterceptorChain() 獲取 HTTP網絡請求返回結果瓦糟,拋出給最上層的。從方法名可以看出赴蝇,這一步還會進行一系列“攔截”操作菩浙,這個方法很重要待會細說。
  • 當任務執(zhí)行完成后,無論成功與否都會調用 dispatcher.finished 方法劲蜻,通知分發(fā)器相關任務已結束陆淀,finished 方法的源碼就不去看了。

4先嬉、異步調用

  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

dispatcher.enqueue 方法的源碼也很簡單:

  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

從上述源碼分析轧苫,如果當前滿足

(runningRequests<64 && runningRequestsPerHost<5)

則把異步請求加入 runningAsyncCalls ,并在線程池中執(zhí)行(線程池會根據當前負載自動創(chuàng)建疫蔓,銷毀含懊,緩存相應的線程)。否則加入 readyAsyncCalls 緩沖排隊衅胀。

問:異步調用為什么返回 void岔乔,那我們請求網絡的數據在哪?
我們在異步調用時使用的是接口回調的方式:

call.enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {
    }

    @Override
    public void onResponse(Call call, Response response) {
    }
});

這就涉及到一個新的類: AsyncCalll(它實現了Runnable接口)滚躯,AsyncCall的excute方法最終將會被執(zhí)行雏门,它是 RealCall 的內部類:

  final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    ······

    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

在它的 execute() 方法中,主要是2點:

  • 我們又看到了同步調用中熟悉的:

Response response = getResponseWithInterceptorChain();

所以不管是同步還是異步掸掏,都會使用 getResponseWithInterceptorChain() 獲取網絡請求的返回值茁影。

  • 不管請求成功還是失敗,通知任務分發(fā)器 (client.dispatcher().finished(this)) 該任務已結束丧凤,將其銷毀募闲。

5、getResponseWithInterceptorChain() 分析

這個方法實在太復雜了愿待,在新的博文中來分析蝇更。

6、Dispatcher線程池總結

1)調度線程池Disptcher實現了高并發(fā)呼盆,低阻塞的實現
2)采用Deque作為緩存,先進先出的順序執(zhí)行
3)任務在try/finally中調用了finished函數蚁廓,控制任務隊列的執(zhí)行順序访圃,而不是采用鎖,減少了編碼復雜性提高性能相嵌。

拆輪子:OkHttp 的源碼解析(四):攔截器(Interceptor)

7腿时、Dispatcher 全部源碼

public final class Dispatcher {
  private int maxRequests = 64;
  private int maxRequestsPerHost = 5;
  private Runnable idleCallback;

  private ExecutorService executorService;

  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }

  public Dispatcher() {
  }

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

  public synchronized void setMaxRequests(int maxRequests) {
    if (maxRequests < 1) {
      throw new IllegalArgumentException("max < 1: " + maxRequests);
    }
    this.maxRequests = maxRequests;
    promoteCalls();
  }

  public synchronized int getMaxRequests() {
    return maxRequests;
  }

  public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {
    if (maxRequestsPerHost < 1) {
      throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
    }
    this.maxRequestsPerHost = maxRequestsPerHost;
    promoteCalls();
  }

  public synchronized int getMaxRequestsPerHost() {
    return maxRequestsPerHost;
  }

  public synchronized void setIdleCallback(Runnable idleCallback) {
    this.idleCallback = idleCallback;
  }

  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

  public synchronized void cancelAll() {
    for (AsyncCall call : readyAsyncCalls) {
      call.get().cancel();
    }

    for (AsyncCall call : runningAsyncCalls) {
      call.get().cancel();
    }

    for (RealCall call : runningSyncCalls) {
      call.cancel();
    }
  }

  private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

  private int runningCallsForHost(AsyncCall call) {
    int result = 0;
    for (AsyncCall c : runningAsyncCalls) {
      if (c.host().equals(call.host())) result++;
    }
    return result;
  }

  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

  void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

  public synchronized List<Call> queuedCalls() {
    List<Call> result = new ArrayList<>();
    for (AsyncCall asyncCall : readyAsyncCalls) {
      result.add(asyncCall.get());
    }
    return Collections.unmodifiableList(result);
  }

  public synchronized List<Call> runningCalls() {
    List<Call> result = new ArrayList<>();
    result.addAll(runningSyncCalls);
    for (AsyncCall asyncCall : runningAsyncCalls) {
      result.add(asyncCall.get());
    }
    return Collections.unmodifiableList(result);
  }

  public synchronized int queuedCallsCount() {
    return readyAsyncCalls.size();
  }

  public synchronized int runningCallsCount() {
    return runningAsyncCalls.size() + runningSyncCalls.size();
  }
}
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市饭宾,隨后出現的幾起案子批糟,更是在濱河造成了極大的恐慌,老刑警劉巖看铆,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件徽鼎,死亡現場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機否淤,發(fā)現死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進店門悄但,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人石抡,你說我怎么就攤上這事檐嚣。” “怎么了啰扛?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵嚎京,是天一觀的道長。 經常有香客問我隐解,道長鞍帝,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任厢漩,我火速辦了婚禮膜眠,結果婚禮上,老公的妹妹穿的比我還像新娘溜嗜。我一直安慰自己宵膨,他們只是感情好,可當我...
    茶點故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布炸宵。 她就那樣靜靜地躺著辟躏,像睡著了一般。 火紅的嫁衣襯著肌膚如雪土全。 梳的紋絲不亂的頭發(fā)上捎琐,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天,我揣著相機與錄音裹匙,去河邊找鬼瑞凑。 笑死,一個胖子當著我的面吹牛概页,可吹牛的內容都是我干的籽御。 我是一名探鬼主播,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼惰匙,長吁一口氣:“原來是場噩夢啊……” “哼技掏!你這毒婦竟也來了?” 一聲冷哼從身側響起项鬼,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤哑梳,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后绘盟,有當地人在樹林里發(fā)現了一具尸體鸠真,經...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡悯仙,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了弧哎。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片雁比。...
    茶點故事閱讀 40,561評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖撤嫩,靈堂內的尸體忽然破棺而出偎捎,到底是詐尸還是另有隱情,我是刑警寧澤序攘,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布茴她,位于F島的核電站,受9級特大地震影響程奠,放射性物質發(fā)生泄漏丈牢。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一瞄沙、第九天 我趴在偏房一處隱蔽的房頂上張望己沛。 院中可真熱鬧,春花似錦距境、人聲如沸申尼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽师幕。三九已至,卻和暖如春诬滩,著一層夾襖步出監(jiān)牢的瞬間霹粥,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工疼鸟, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留后控,地道東北人。 一個月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓空镜,卻偏偏與公主長得像忆蚀,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子姑裂,可洞房花燭夜當晚...
    茶點故事閱讀 45,573評論 2 359

推薦閱讀更多精彩內容