前言
幾個(gè)月前,跟過 OkHttp 的流程源碼馆类,但是時(shí)間久了弹谁,現(xiàn)在能夠回想起來的的,只有幾個(gè)攔截器了预愤,那我豈不是沒什么收獲了植康。所以,好好想想销睁,我從 OkHttp 中能夠?qū)W到什么
疑問
- OkHttp 是怎么拆分功能的冻记,大概有幾個(gè)模塊
- 它所用到的責(zé)任鏈模式,在實(shí)際開發(fā)中適合哪些場(chǎng)景
- 它是怎么使用線程池的冗栗,這么用有什么好處
OkHttp 是怎么拆分功能的供搀,大概有幾個(gè)模塊
作為一個(gè)網(wǎng)絡(luò)框架葛虐,最核心的功能就是發(fā)起請(qǐng)求棉钧,處理響應(yīng)了,這倆個(gè)是功能部分摄悯, OkHttp 使用 Dispatcher 執(zhí)行任務(wù)愧捕,內(nèi)部是一個(gè)高并發(fā)的線程池,另外整個(gè)流程的處理使用到了Interceptor 攔截器
請(qǐng)求執(zhí)行的調(diào)用流程
- Recall.enqueue(Callback)
- client.dispatcher().enqueue(new AsyncCall(responseCallback))
停一下,跟進(jìn)一下分發(fā)器器的 enqueue 方法,內(nèi)部時(shí)怎么處理異步請(qǐng)求的
// 創(chuàng)建 AsyncCall 對(duì)象
void enqueue(AsyncCall call) {
synchronized (this) {
// 添加到待執(zhí)行隊(duì)列中,雙向隊(duì)列
readyAsyncCalls.add(call);
}
// 執(zhí)行的重要代碼
promoteAndExecute();
}
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
// 創(chuàng)建一個(gè)可執(zhí)行隊(duì)列,目的是限制 64 個(gè)最大連接數(shù),每個(gè) Host 最多 5 個(gè)連接的限制
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
// 符合條件的,從準(zhǔn)備中的隊(duì)列挪到其他兩個(gè)隊(duì)列中
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
// 放到線程池中執(zhí)行
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
首先 AsyncCall 是實(shí)現(xiàn)了 Runnable 的一個(gè)類,也難怪,畢竟是要放到線程池中執(zhí)行的
響應(yīng)執(zhí)行的調(diào)用過程
剛才看了請(qǐng)求入隊(duì)后,經(jīng)過篩選,開始遍歷放到線程池中去執(zhí)行,那下來就是等待響應(yīng)了
具體代碼在 Recall 類的 run() 方法中,里面就是我們之前了解到的使用責(zé)任鏈模式-攔截器的代碼了
為什么要使用雙端隊(duì)列 ArrayDeque ?
OkHttp 源碼系列 之 ArrayDeque - 雙端隊(duì)列
首先 Jdk 提供的雙端隊(duì)列主要有兩個(gè):
- LinkedList 鏈表實(shí)現(xiàn)的雙端隊(duì)列
- ArrayDeque 循環(huán)數(shù)組實(shí)現(xiàn)的雙端隊(duì)列
那為什么偏偏就選了 ArrayDeque 了,因?yàn)樾矢?
Jdk 的說明中就說了,ArrayDeque 作為隊(duì)列使用時(shí),將比 LinkedList 更快
ArrayQueue 是線程不安全的,Okhttp 是怎樣保證同步問題的? synchronized 關(guān)鍵字
OkHttp 是怎樣使用責(zé)任鏈模式的,Android 源碼中還有其他地方用到了嘛管跺?
那就首先要補(bǔ)習(xí)下責(zé)任鏈模式是什么禾进,以及怎樣用代碼實(shí)現(xiàn)
在網(wǎng)上查了很多文檔,有一種表述我覺得很形象泻云,小張去外地出差回來宠纯,其中 2w 要去招公司報(bào)銷,他去找到組長(zhǎng)
- 組長(zhǎng)看到發(fā)票婆瓜,面值超過了權(quán)限廉白,說讓小張去找主管
- 主管一看,自己最大只能簽 3k 的猴蹂,讓其去找經(jīng)理
- 經(jīng)理最大只能批 1w 的,讓小張去找老板
- 最終老板簽字處理
整個(gè)流程涉及到多個(gè)類(組長(zhǎng)覆获、主管瓢省、經(jīng)理等),一級(jí)一級(jí)的處理摹量,最終處理結(jié)束
Android 源碼中 View 的事件分發(fā)也是使用了責(zé)任鏈模式馒胆,其中被分發(fā)的 MotionEvent 經(jīng)過 ViewGroup 層層分發(fā),最終被消費(fèi)或者重新返回到最上層的 View
- 那 OkHttp 是怎樣實(shí)現(xiàn)責(zé)任鏈的呢睦尽?
Let is see fucking code 型雳,Woohooo
// 同步執(zhí)行的代碼中
@Override protected void execute() {
boolean signalledCallback = false;
transmitter.timeoutEnter();
try {
// 使用責(zé)任鏈處理請(qǐng)求響應(yīng),重點(diǎn)看這里
Response response = getResponseWithInterceptorChain();
......
}
}
Response getResponseWithInterceptorChain() throws IOException {
// 創(chuàng)建一個(gè)攔截器隊(duì)列
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(new RetryAndFollowUpInterceptor(client));
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
// 添加完了沿量,開始逐個(gè)處理了
Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
boolean calledNoMoreExchanges = false;
try {
// proceed 執(zhí)行后得到響應(yīng)結(jié)果
Response response = chain.proceed(originalRequest);
if (transmitter.isCanceled()) {
closeQuietly(response);
throw new IOException("Canceled");
}
return response;
} catch (IOException e) {
calledNoMoreExchanges = true;
throw transmitter.noMoreExchanges(e);
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null);
}
}
}
這里使用了隊(duì)列保存所有的攔截器朴则,然后一股腦傳進(jìn)了 RealInterceptorChain 對(duì)象中钓简,最后調(diào)用 proceed 就有個(gè)返回結(jié)果,停芥被,一下子就獲取到了結(jié)果了嘛坐榆? 進(jìn)去看看
RealInterceptorChain 里面是怎么處理的
public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
throws IOException {
... 省略部分代碼
// 看到 next 我就想起來鏈表里的 next,這個(gè) next 是干嘛的呢匹中?
RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
// 根據(jù) index 獲取對(duì)應(yīng)位置的攔截器
Interceptor interceptor = interceptors.get(index);
// 調(diào)用攔截器的攔截方法
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed()
... 省略部分代碼
return response;
}
有一個(gè)地方?jīng)]看明白豪诲,創(chuàng)建 Chain 后,調(diào)用 proceed 服赎,剛開始 index 為 0,那整個(gè)隊(duì)列是怎么遍歷的践付,沒看到有循環(huán)遍歷語句啊
進(jìn)入 interceptor.intercept(next) 后缺厉,一切截然而止了,懷著好奇心提针,點(diǎn)開了 interceptor 的實(shí)現(xiàn)類 CacheInterceptor 辐脖,果不其然,在 intercept 方法中揖曾,再次看到了 proceed 的身影,終于破案了
@Override public Response intercept(Chain chain) throws IOException {
... 省略部分代碼
// 每調(diào)用一次练链,內(nèi)部的 index 自增奴拦,就意味著不斷的傳遞到下一級(jí)攔截器
networkResponse = chain.proceed(networkRequest);
... 省略部分代碼
return response;
OkHttp 是怎么使用線程池的
最后一個(gè)問題,怎么使用線程池的绿鸣,首先補(bǔ)習(xí)下線程池的各個(gè)參數(shù)的含義暂氯,以及線程池的工作原理
// OkHttp 中的線程池
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
// 線程池的構(gòu)造方法
// corePoolSize – the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set
// maximumPoolSize – the maximum number of threads to allow in the pool
// keepAliveTime – when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
// unit – the time unit for the keepAliveTime argument
// workQueue – the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.
// threadFactory – the factory to use when the executor creates a new thread
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
創(chuàng)建參數(shù):
- 核心線程數(shù)痴施,核心線程將保持存活,及時(shí)是空閑的动遭,這里是 0
- 池中允許的最大線程數(shù)神得,這里是 Int 的最大值,實(shí)際到不了這么多宵蕉,OkHttp 有 64 最大連接數(shù)的限制
- 等待時(shí)間 可以理解為非核心線程等待任務(wù)時(shí)的超時(shí)時(shí)間 ,這里為 60 秒
- 等待時(shí)間的單位
- 工作隊(duì)列国裳,這里是一個(gè)同步的阻塞隊(duì)列缝左,內(nèi)部沒有容器浓若,傳入一個(gè)時(shí)就會(huì)阻塞下一個(gè)的傳入
- 線程工廠
- 為什么 OkHttp 要這么設(shè)置線程池,有什么好處呢挪钓?
其實(shí)這種參數(shù)設(shè)置,就是 Excutor.newCachedThreadPool() ,
首先一個(gè)阻塞的同步隊(duì)列,內(nèi)部沒有容器,意味著什么呢,每當(dāng)一個(gè)網(wǎng)絡(luò)請(qǐng)求發(fā)起,只要核心線程滿了,就會(huì)在池中創(chuàng)建新的線程
如果線程池中的線程數(shù)大于核心線程數(shù)且隊(duì)列滿了碌上,且線程數(shù)小于最大線程數(shù),則會(huì)創(chuàng)建新的線程,剛好 Okhttp 的最大線程數(shù)時(shí)是一個(gè)極大值,那就會(huì)不斷創(chuàng)建線程,是一個(gè)高并發(fā)的線程池