源碼地址:https://github.com/square/okhttp
針對(duì)具體一個(gè)請(qǐng)求的流程警儒,前面已經(jīng)做了學(xué)習(xí)分析授翻,現(xiàn)在對(duì)OkHttp的請(qǐng)求任務(wù)管理進(jìn)行分析學(xué)習(xí)句灌。
使用過(guò)OkHttp的都知道,調(diào)用分為同步阻塞式的請(qǐng)求execute(),以及異步調(diào)用 enqueue(Callback responseCallback)
棠隐,同步請(qǐng)求沒(méi)有什么好分析的,基本就是直接發(fā)起了請(qǐng)求檐嚣。這里主要分析異步請(qǐng)求助泽,是如何進(jìn)行請(qǐng)求的管理和分配的。
主要的類:Dispatcher.
主要內(nèi)容:
- 線程池
- 任務(wù)分發(fā)模型
1. 線程池
線程池嚎京,為解決的問(wèn)題嗡贺,很多資料都有具體的闡述,這里就引用一些專業(yè)的解釋多線程:
多線程技術(shù)主要解決處理器單元內(nèi)多個(gè)線程執(zhí)行的問(wèn)題挖藏,它可以顯著減少處理器單元的閑置時(shí)間暑刃,增加處理器單元的吞吐能力。但如果對(duì)多線程應(yīng)用不當(dāng)膜眠,會(huì)增加對(duì)單個(gè)任務(wù)的處理時(shí)間岩臣。可以舉一個(gè)簡(jiǎn)單的例子:
假設(shè)在一臺(tái)服務(wù)器完成一項(xiàng)任務(wù)的時(shí)間為T
T1 創(chuàng)建線程的時(shí)間
T2 在線程中執(zhí)行任務(wù)的時(shí)間宵膨,包括線程間同步所需時(shí)間
T3 線程銷毀的時(shí)間
顯然T = T1+T2+T3架谎。注意這是一個(gè)極度簡(jiǎn)化的假設(shè)。
可以看出T1,T3是多線程本身的帶來(lái)的開(kāi)銷(在Java中辟躏,通過(guò)映射pThead谷扣,并進(jìn)一步通過(guò)SystemCall實(shí)現(xiàn)native線程),我們渴望減少T1,T3所用的時(shí)間,從而減少T的時(shí)間会涎。但一些線程的使用者并沒(méi)有注意到這一點(diǎn)裹匙,所以在程序中頻繁的創(chuàng)建或銷毀線程,這導(dǎo)致T1和T3在T中占有相當(dāng)比例末秃。顯然這是突出了線程的弱點(diǎn)(T1概页,T3),而不是優(yōu)點(diǎn)(并發(fā)性)练慕。
線程池惰匙,就是針對(duì)解決減少T1 和 T3的時(shí)間,提高服務(wù)的性能铃将。
1.1 OkHttp的線程池
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
Dispatcher 通過(guò)單例創(chuàng)建了一個(gè)線程池项鬼,針對(duì)幾個(gè)參數(shù),可以發(fā)現(xiàn)劲阎,OkHttp的線程池具備特點(diǎn):
- 線程數(shù)區(qū)間[0绘盟,Integer.MAX_VALUE],不保留最少線程數(shù)哪工,隨時(shí)創(chuàng)建更多線程 奥此;
- 當(dāng)線程空閑的時(shí)候,最多毖惚龋活時(shí)間為60s;
- 使用一個(gè)同步隊(duì)列作為工作隊(duì)列撤嫩,先進(jìn)先出偎捎;
- 創(chuàng)建一個(gè)名為“OkHttp Dispatcher” 的線程工廠 ThreadFactory 。
線程池的處理序攘,就到這里茴她,下面就開(kāi)始,分析OkHttp是如何使用這個(gè)線程池來(lái)進(jìn)行請(qǐng)求任務(wù)的調(diào)度和分配的程奠。
2. 任務(wù)分發(fā)模型
在我們發(fā)起一個(gè)異步請(qǐng)求的時(shí)候丈牢,其實(shí)是交給了Dispatcher來(lái)處理的
RealCall.java
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
//處理再這里
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
在Dispatcher中:
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
// 添加到runningAsyncCalls隊(duì)列中
runningAsyncCalls.add(call);
//線程池的調(diào)用
executorService().execute(call);
} else {
//添加到準(zhǔn)備中的隊(duì)列中
readyAsyncCalls.add(call);
}
}
上面的代碼中幾個(gè)重要的變量:
- runningAsyncCalls 儲(chǔ)存運(yùn)行中的異步請(qǐng)求隊(duì)列
- readyAsyncCalls 儲(chǔ)存準(zhǔn)備中的異步請(qǐng)求隊(duì)列
- maxRequests 最大請(qǐng)求數(shù)量(64個(gè))
- maxRequestsPerHost 相同Host最大請(qǐng)求數(shù)量(5)
當(dāng)如果運(yùn)行中的請(qǐng)求少用64個(gè)以及相同 Host的請(qǐng)求小于5個(gè)的時(shí)候,直接添加到runningAsyncCalls并且調(diào)用線程池來(lái)執(zhí)行這個(gè)AsyncCall,交給線程池去調(diào)度瞄沙。 如果已經(jīng)超出了這個(gè)限制己沛,就把這個(gè)請(qǐng)求添加到 readyAsyncCalls,等待調(diào)用距境。但是這個(gè)準(zhǔn)備中的隊(duì)列是什么時(shí)候被調(diào)用的呢申尼?
擼一下代碼,發(fā)現(xiàn)在AsyncCall的execute方法里面垫桂。也就是這個(gè)異步線程的執(zhí)行方法里面:
@Override protected void execute() {
boolean signalledCallback = false;
try {
//執(zhí)行真正的請(qǐng)求
Response response = getResponseWithInterceptorChain();
//通過(guò)CallBack 回調(diào)給用戶
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//重點(diǎn)在這里
client.dispatcher().finished(this);
}
}
最后的時(shí)候調(diào)用了Dispatcher的finished师幕,繼續(xù)向下擼:
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
//重點(diǎn)在這里
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
這里會(huì)調(diào)用一個(gè) promoteCalls()方法:
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
// 加入到運(yùn)行中的隊(duì)列,并執(zhí)行诬滩。
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
終于找到了霹粥,在請(qǐng)求完成的時(shí)候灭将,調(diào)用Dispatcher的finished同時(shí),會(huì)檢查這個(gè)時(shí)候準(zhǔn)備中的請(qǐng)求后控,是否有可以添加到運(yùn)行請(qǐng)求中線程池中去的庙曙。
3. 總結(jié)
整個(gè)任務(wù)管理的流程,其實(shí)也不復(fù)雜:
- 通過(guò)兩個(gè)請(qǐng)求隊(duì)列忆蚀,來(lái)管理請(qǐng)求數(shù)量以及準(zhǔn)備中的請(qǐng)求的數(shù)量矾利;
- 請(qǐng)求交一個(gè)線程池來(lái)完成。
最后用一個(gè)圖的形式總結(jié)一下馋袜,OkHttp的請(qǐng)求任務(wù)管理的實(shí)現(xiàn):
系列:
OKhttp源碼學(xué)習(xí)(一)—— 基本請(qǐng)求流程
OKhttp源碼學(xué)習(xí)(二)—— OkHttpClient
OKhttp源碼學(xué)習(xí)(三)—— Request, RealCall
OKhttp源碼學(xué)習(xí)(四)—— RetryAndFollowUpInterceptor攔截器
OKhttp源碼學(xué)習(xí)(五)—— BridgeInterceptor攔截器
OKhttp源碼學(xué)習(xí)(六)—— CacheInterceptor攔截器
OKhttp源碼學(xué)習(xí)(七)—— ConnectInterceptor攔截器
OKhttp源碼學(xué)習(xí)(八)—— CallServerInterceptor攔截器