OkHttp3.7源碼分析文章列表如下:
拆輪子:OkHttp 的源碼解析(一):概述
拆輪子:OkHttp 的源碼解析(二):流程分析
拆輪子:OkHttp 的源碼解析(三):任務分發(fā)器(Dispatcher)
拆輪子:OkHttp 的源碼解析(四):攔截器(Interceptor)
從上篇文章我們可以看到 OkHttp 的同步和異步都使用了 Dispatcher 登馒,它的主要作用就是一個任務隊列。
我們都聽過 OkHttp 的一個高效之處在于在內部維護了一個線程池,方便高效地執(zhí)行異步請求框冀,這個線程池就在 Dispatcher 類里面苫费。
Dispatcher 類去掉注解只有一百多行挟伙,建議自己看下并不是很難,我們來分析:
1脐雪、Dispatcher 的成員變量
public final class Dispatcher {
/** 最大并發(fā)請求數為64 */
private int maxRequests = 64;
/** 每個主機最大請求數為5 */
private int maxRequestsPerHost = 5;
/** 線程池 */
private ExecutorService executorService;
/** 準備執(zhí)行的異步請求 */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** 正在執(zhí)行的異步請求,包含已經取消但未執(zhí)行完的請求 */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** 正在執(zhí)行的同步請求恢共,包含已經取消單未執(zhí)行完的請求 */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
······
}
問:為什么要使用2個異步請求隊列呢战秋?
一個是正準備執(zhí)行的:readyAsyncCalls ,用來做一個緩沖使用讨韭;另外一個是正在執(zhí)行的:runningAsyncCalls 脂信。這里其實是一個生產者-消費者模型,如下圖所示:
- Dispatcher: 生產者(默認在主線程)
- AsyncCall: 隊列中需要處理的Runnable(包裝了異步回調接口)
- ExecutorService:消費者池(也就是線程池)
- Deque<readyAsyncCalls>:緩存(用數組實現透硝,可自動擴容狰闪,無大小限制)
- Deque<runningAsyncCalls>:正在運行的任務,僅僅是用來引用正在運行的任務以判斷并發(fā)量濒生,注意它并不是消費者緩存
根據生產者消費者模型的模型理論埋泵,當入隊(enqueue)請求時,如果滿足條件,那么就直接把AsyncCall直接加到runningCalls的隊列中丽声,并在線程池中執(zhí)行礁蔗。如果消費者緩存滿了,就放入readyAsyncCalls進行緩存等待雁社。
當任務執(zhí)行完成后,調用finished的promoteCalls()函數浴井,手動移動緩存區(qū)(可以看出這里是主動清理的,因此不會發(fā)生死鎖)歧胁。
2滋饲、Dispatcher 的線程池
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
根據上面的源碼,OkHttp 使用的是單例的線程池喊巍,有些朋友對線程池不太了解屠缭,解釋下幾個參數的意思:
- 0(corePoolSize):核心線程池的數量為 0,空閑一段時間后所有線程將全部被銷毀崭参。
- Integer.MAX_VALUE(maximumPoolSize): 最大線程數呵曹,當任務進來時可以擴充的線程最大值,相當于無限大何暮。
- 60(keepAliveTime): 當線程數大于corePoolSize時奄喂,多余的空閑線程的最大存活時間。
- TimeUnit.SECONDS:存活時間的單位是秒海洼。
- new SynchronousQueue<Runnable>():工作隊列,先進先出跨新。
- Util.threadFactory("OkHttp Dispatcher", false):單個線程的工廠 。
也就是說坏逢,在實際運行中域帐,當收到10個并發(fā)請求時,線程池會創(chuàng)建十個線程是整,當工作完成后肖揣,線程池會在60s后相繼關閉所有線程。
3浮入、同步調用
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}
}
dispatcher().executed(this)
方法的源碼很簡單:
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
這里主要有4點:
- 檢查這個 call 是否已經被執(zhí)行了龙优,每個 call 只能被執(zhí)行一次,如果想要一個完全一樣的 call事秀,可以利用 call#clone 方法進行克隆彤断。
- 利用 client.dispatcher().executed(this) 來進行實際執(zhí)行,將請求的 call 插入到同步隊列中秽晚。
- 調用 getResponseWithInterceptorChain() 獲取 HTTP網絡請求返回結果瓦糟,拋出給最上層的。從方法名可以看出赴蝇,這一步還會進行一系列“攔截”操作菩浙,這個方法很重要待會細說。
- 當任務執(zhí)行完成后,無論成功與否都會調用 dispatcher.finished 方法劲蜻,通知分發(fā)器相關任務已結束陆淀,finished 方法的源碼就不去看了。
4先嬉、異步調用
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
dispatcher.enqueue 方法的源碼也很簡單:
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
從上述源碼分析轧苫,如果當前滿足
(runningRequests<64 && runningRequestsPerHost<5)
則把異步請求加入 runningAsyncCalls ,并在線程池中執(zhí)行(線程池會根據當前負載自動創(chuàng)建疫蔓,銷毀含懊,緩存相應的線程)。否則加入 readyAsyncCalls 緩沖排隊衅胀。
問:異步調用為什么返回 void岔乔,那我們請求網絡的數據在哪?
我們在異步調用時使用的是接口回調的方式:
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) {
}
});
這就涉及到一個新的類: AsyncCalll(它實現了Runnable接口)滚躯,AsyncCall的excute方法最終將會被執(zhí)行雏门,它是 RealCall 的內部類:
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
······
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
在它的 execute() 方法中,主要是2點:
- 我們又看到了同步調用中熟悉的:
Response response = getResponseWithInterceptorChain();
所以不管是同步還是異步掸掏,都會使用 getResponseWithInterceptorChain() 獲取網絡請求的返回值茁影。
- 不管請求成功還是失敗,通知任務分發(fā)器 (client.dispatcher().finished(this)) 該任務已結束丧凤,將其銷毀募闲。
5、getResponseWithInterceptorChain() 分析
這個方法實在太復雜了愿待,在新的博文中來分析蝇更。
6、Dispatcher線程池總結
1)調度線程池Disptcher實現了高并發(fā)呼盆,低阻塞的實現
2)采用Deque作為緩存,先進先出的順序執(zhí)行
3)任務在try/finally中調用了finished函數蚁廓,控制任務隊列的執(zhí)行順序访圃,而不是采用鎖,減少了編碼復雜性提高性能相嵌。
7腿时、Dispatcher 全部源碼
public final class Dispatcher {
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
private Runnable idleCallback;
private ExecutorService executorService;
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
public synchronized void setMaxRequests(int maxRequests) {
if (maxRequests < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequests);
}
this.maxRequests = maxRequests;
promoteCalls();
}
public synchronized int getMaxRequests() {
return maxRequests;
}
public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {
if (maxRequestsPerHost < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
}
this.maxRequestsPerHost = maxRequestsPerHost;
promoteCalls();
}
public synchronized int getMaxRequestsPerHost() {
return maxRequestsPerHost;
}
public synchronized void setIdleCallback(Runnable idleCallback) {
this.idleCallback = idleCallback;
}
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
public synchronized void cancelAll() {
for (AsyncCall call : readyAsyncCalls) {
call.get().cancel();
}
for (AsyncCall call : runningAsyncCalls) {
call.get().cancel();
}
for (RealCall call : runningSyncCalls) {
call.cancel();
}
}
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
private int runningCallsForHost(AsyncCall call) {
int result = 0;
for (AsyncCall c : runningAsyncCalls) {
if (c.host().equals(call.host())) result++;
}
return result;
}
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
public synchronized List<Call> queuedCalls() {
List<Call> result = new ArrayList<>();
for (AsyncCall asyncCall : readyAsyncCalls) {
result.add(asyncCall.get());
}
return Collections.unmodifiableList(result);
}
public synchronized List<Call> runningCalls() {
List<Call> result = new ArrayList<>();
result.addAll(runningSyncCalls);
for (AsyncCall asyncCall : runningAsyncCalls) {
result.add(asyncCall.get());
}
return Collections.unmodifiableList(result);
}
public synchronized int queuedCallsCount() {
return readyAsyncCalls.size();
}
public synchronized int runningCallsCount() {
return runningAsyncCalls.size() + runningSyncCalls.size();
}
}