Dispatcher是負(fù)責(zé)對okhttp所有的請求進(jìn)行調(diào)度管理的類惶楼「汛椋可以通過Dispatcher獲取劳曹,或者取消所有請求。這里指的一個請求就是對應(yīng)的Call蔬螟,并不是指Request此迅,下面出現(xiàn)的所有的請求都是指Call。首先看下調(diào)度的整個流程圖旧巾。接著通過分析跟蹤okhttp發(fā)送請求的過程來分析Dispatcher是如何維護(hù)和調(diào)度我們發(fā)出的所有請求的耸序。
Call其實就是對Request的封裝。
OkHttp請求方式
通過okhttp發(fā)送請求主要有兩種方式鲁猩。
- 通過
execute()
調(diào)用坎怪,此時request會被馬上發(fā)出, 直到返回response或者發(fā)生錯誤前會一直阻塞±眨可以理解為一個立即執(zhí)行的同步請求搅窿。 - 通過
enqueue()
調(diào)用,此時request將會在未來的某個時間點被執(zhí)行隙券,具體由dispatcher進(jìn)行調(diào)度戈钢,這種方式是異步返回結(jié)果的∈嵌可以理解為會被盡快執(zhí)行的一個異步請求殉了。
第一種方式
通過execute()
調(diào)用,一般是這樣的
okHttpClient.newCall(request).execute();
通過OkHttpClient的代碼可以看出newCall()
方法其實是new了一個RealCall拟枚,所以這里直接查看RealCall的execute()
方法薪铜。
@Override public Call newCall(Request request) {
return new RealCall(this, request, false /* for web socket */);
}
RealCall的execute()
方法,這里只看下核心的代碼:
@Override public Response execute() throws IOException {
synchronized (this) {
//此處除去一些其他代碼
//...
try {
//通知Dispatcher這個Call正在被執(zhí)行,同時將此Call交給Dispatcher
//Dispatcher可以對此Call進(jìn)行管理
client.dispatcher().executed(this);
//請求的過程恩溅,注意這行代碼是阻塞的隔箍,直到返回result!
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
//此時這個請求已經(jīng)執(zhí)行完畢了脚乡,通知Dispatcher蜒滩,不要再維護(hù)這個Call了
client.dispatcher().finished(this);
}
}
首先注意這行代碼
client.dispatcher().executed(this);
它是調(diào)用的Dispatcher的executed()
方法滨达,注意看方法名是executed并不是execute。接下來去Dispatcher里看下這個方法做了什么俯艰。
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
看注釋就明白了捡遍,這里他只是一個通知的作用,通知Dispatcher我這個call立即要被或者正在被執(zhí)行竹握,然后Dispatcher會把加入一個名為runningSyncCalls的雙端隊列中画株,這個隊列中存儲著所有的正在運(yùn)行的同步請求。這樣Dispatcher就可以很方便的對所有的同步請求進(jìn)行管理了啦辐。既然有添加谓传,那么也應(yīng)該有刪除,在請求執(zhí)行完畢時調(diào)用了這行代碼:
client.dispatcher().finished(this);
通過字面意思理解他應(yīng)該就是刪除的操作芹关,通知Dispatcher這個請求已經(jīng)被執(zhí)行完畢了续挟。這里暫時理解為調(diào)用finished方法就是將此call從runningSyncCalls中移除,后面會再討論finished方法的細(xì)節(jié)侥衬。
因為同步請求是被馬上執(zhí)行的庸推,所以Dispatcher能對同步請求進(jìn)行的調(diào)度也只有cancel了。具體可以通過調(diào)用Dispatcher.cancelAll()
方法進(jìn)行取消浇冰。
所以真正執(zhí)行請求的只有這行代碼了。
Response result = getResponseWithInterceptorChain();
這個方法先不管他聋亡,就可以理解為這行代碼的執(zhí)行就是請求從發(fā)出到完成的過程肘习。在分析攔截器的實現(xiàn)原理的時候再來討論。
第二種方式
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
通過上面我們已經(jīng)知道這里調(diào)用的也是RealCall的enqueue方法坡倔,我們直接來看代碼:
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
//判斷是否已經(jīng)執(zhí)行過了
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
//捕獲調(diào)用棧的信息漂佩,用來分析連接泄露
captureCallStackTrace();
//封裝一個AsyncCall交給Dispatcher調(diào)度
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
通過上面的代碼可以看出調(diào)用enqueue()
方法,其實是調(diào)用了Dispatcher的enqueue()
方法罪塔,并且new了一個AsyncCall作為參數(shù)投蝉。AsyncCall為RealCall的一個內(nèi)部類,下面繼續(xù)看AsyncCall類里到底做了什么征堪。
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
//...
/**
*真正執(zhí)行發(fā)出請求的地方瘩缆,為了看起來清晰,精簡了部分代碼
*/
@Override protected void execute() {
try {
//請求的過程佃蚜,注意這里也是阻塞的
Response response = getResponseWithInterceptorChain();
//先不管這個Interceptor是干嘛的庸娱,下面的代碼可以理解為:
//如果沒有被取消,并且沒有發(fā)生異常谐算,回調(diào)onResponse方法熟尉。
//如果發(fā)生了異常或者被取消洲脂,回調(diào)onFailure方法斤儿。
if (retryAndFollowUpInterceptor.isCanceled()) {
//此請求被取消了,回調(diào)onFailure
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
//此請求成功了,回調(diào)onResponse
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
//發(fā)生了異常往果,回調(diào)onFailure
responseCallback.onFailure(RealCall.this, e);
} finally {
//通知Dispatcher Call被執(zhí)行完畢了
client.dispatcher().finished(this);
}
}
}
可以看到AsyncCall的execute()
就是具體請求執(zhí)行的地方疆液,只不過和上面的RealCall的execute()
方法相比,多了回調(diào)的處理棚放。retryAndFollowUpInterceptor其實是負(fù)責(zé)請求超時的重試和重定向操作的枚粘,retryAndFollowUpInterceptor.isCanceled()
就是用來判斷這個請求是否被取消了,這里就不深入展開了飘蚯。那么AsyncCall的execute()
方法是怎么被執(zhí)行的呢馍迄,繼續(xù)來看AsyncCall的父類NamedRunnable。
/**
* Runnable implementation which always sets its thread name.
*/
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
//注意這里調(diào)用了execute方法
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
可以看到NamedRunnable其實就是一個實現(xiàn)了Runnable接口的抽象類局骤,并且在run方法中調(diào)用了execute()
攀圈。也就是說AsyncCal其實就是一個Runnable,當(dāng)這個Runnable被調(diào)用的時候execute()
方法自然會被調(diào)用峦甩∽咐矗看到這里就很清晰了,再回過頭來看RealCall的enqueue()
中調(diào)用的這段代碼
//封裝一個AsyncCall交給Dispatcher調(diào)度
client.dispatcher().enqueue(new AsyncCall(responseCallback));
其實這里的new AsyncCall(responseCallback)
就是new了一個封裝的Runnable對象凯傲,這個Runnable的執(zhí)行犬辰,就是整個請求的發(fā)起與回調(diào)的過程。好啦冰单,這里搞明白了其實調(diào)用Dispatcher().enqueue()
方法傳遞過去的是一個Runnable對象幌缝,接下來就去Dispatcher中看下,他對這個Runnable做了什么诫欠。
synchronized void enqueue(AsyncCall call) {
//判斷正在執(zhí)行的異步請求數(shù)沒有達(dá)到閾值涵卵,并且每一個Host的請求數(shù)也沒有達(dá)到閾值
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
//加入到正在執(zhí)行隊列,并立即執(zhí)行
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
//加入到等待隊列
readyAsyncCalls.add(call);
}
}
上面的代碼中又出現(xiàn)了兩個雙端隊列荒叼,runningAsyncCalls和readyAsyncCalls轿偎,加上上面出現(xiàn)的runningSyncCalls可以看到Dispatcher一共維護(hù)了3個請求隊列,分別是
- runningAsyncCalls被廓,正在請求的異步隊列
- readyAsyncCalls坏晦,準(zhǔn)備請求的異步隊列\(zhòng)等待請求的異步隊列
- runningSyncCalls,正在請求的同步隊列
還出現(xiàn)了一個方法executorService()
嫁乘,接下來看下這個方法是干嘛的英遭。
private ExecutorService executorService;
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
可以看出來這個方法就是以懶漢的方式創(chuàng)建最大容量為 Integer.MAX_VALUE
, 存活等待時間為60S的線程池(其實這里的最大容量并沒什么用,因為他的最大容量不會超過runningAsyncCalls的size亦渗,即設(shè)置的并發(fā)請求數(shù)的閾值)挖诸。executorService().execute(call)
就是把這個請求丟進(jìn)去執(zhí)行。那么enqueue()
方法執(zhí)行的過程大概就是法精,首先判斷當(dāng)前正在執(zhí)行的異步請求總數(shù)是否已經(jīng)達(dá)到的閾值(默認(rèn)為64)多律,針對每個host的同時請求數(shù)量是否達(dá)到了閾值(默認(rèn)為5)痴突。如果都沒有達(dá)到那么將這個請求加入到runningAsyncCalls隊列中,馬上執(zhí)行狼荞。
否則辽装,會將這個請求加入到readyAsyncCalls中,準(zhǔn)備執(zhí)行相味。那么readyAsyncCalls中的請求時何時被調(diào)用的呢拾积?掐指一算,應(yīng)該是在runningAsyncCalls中某些請求被執(zhí)行完畢時丰涉,不滿足上面的兩個條件自然會被調(diào)用拓巧。是不是呢?接下來看上面一直忽略的Dispatcher的三個finished方法:
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
//異步請求結(jié)束時調(diào)用此方法
finished(runningAsyncCalls, call, true);
}
/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
//同步請求結(jié)束時調(diào)用此方法
finished(runningSyncCalls, call, false);
}
/**
*將執(zhí)行完畢的call從相應(yīng)的隊列移除
*/
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
//從相應(yīng)的隊列中移除相應(yīng)的call一死,如果不包含肛度,拋異常
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
//是否需要提升Call的級別
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
//如果沒有任何需要執(zhí)行的請求,那么執(zhí)行idleCallBack
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
可以看出來不管是異步調(diào)用結(jié)束投慈,還是同步調(diào)用結(jié)束承耿,最終都是調(diào)用的這個被private修飾的finished方法,都會將完成的call從相應(yīng)的隊列中移除伪煤。唯一不同的是調(diào)用時傳遞的promoteCalls參數(shù)不同加袋,異步請求結(jié)束時傳入的是true,同步請求時結(jié)束傳入的是false抱既。并且會根據(jù)這個flag來判斷是否執(zhí)行promoteCalls()
方法职烧,接下來看promoteCalls()
里做了什么。
/**
*提升call的優(yōu)先級
*/
private void promoteCalls() {
//runningAsyncCalls已經(jīng)滿了蝙砌,不能再加了
if (runningAsyncCalls.size() >= maxRequests) return;
//沒有請求在readyAsyncCalls等著被執(zhí)行
if (readyAsyncCalls.isEmpty()) return;
//遍歷準(zhǔn)備隊列里的請求
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
//判斷該請求的host是否小于每個host最大請求閾值
if (runningCallsForHost(call) < maxRequestsPerHost) {
//將該請求從readyAsyncCalls移除,加入runningAsyncCalls并執(zhí)行
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
//如果runningAsyncCalls數(shù)量已經(jīng)達(dá)到閾值跋理,終止遍歷
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
可以看出promoteCalls()
方法就是試圖去readyAsyncCalls中取出Call來加入runningAsyncCalls中執(zhí)行择克。所以上面的兩個finished方法調(diào)用方式的區(qū)別也就明晰了。同步調(diào)用結(jié)束因為并沒有涉及到runningAsyncCalls中的任何東西前普,對runningAsyncCalls沒任何影響肚邢,所以不需要調(diào)用promoteCalls。而異步的調(diào)用結(jié)束意味著runningAsyncCalls中會出現(xiàn)一個空位值拭卿,所以它會調(diào)用promoteCalls去嘗試從readyAsyncCalls中拉一個進(jìn)來骡湖。
總結(jié)
好啦 到這里整個dispatcher的調(diào)度分析算是完成了【瘢總結(jié)起來其實他就是維護(hù)了三個隊列响蕴,三個隊列中包含了正在執(zhí)行或者將要執(zhí)行的所有請求』萏遥總結(jié)起來就是:
- 當(dāng)發(fā)送一個異步請求時:如果runningAsyncCalls沒達(dá)到閾值浦夷,那么會將這個請求加入到runningAsyncCalls立即執(zhí)行辖试,否則會將這個請求加入到readyAsyncCalls中等待執(zhí)行。當(dāng)一個異步請求執(zhí)行完畢時會試圖去執(zhí)行readyAsyncCalls中的請求劈狐。
- 當(dāng)發(fā)送一個同步請求時:該請求會直接加入到runningSyncCalls中罐孝,并且馬上開始執(zhí)行,注意這個執(zhí)行并不是由Dispatcher調(diào)度的肥缔。
- 所有異步執(zhí)行的請求都會通過executorService線程池來執(zhí)行莲兢,這是個懶漢方式創(chuàng)建的線程池。
最后再看下上面的流程圖整個過程是不是都清晰了呢续膳!