為什么要用線程池
使用線程池管理線程有如下優(yōu)點(diǎn):
- 降低資源消耗:通過(guò)重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷(xiāo)毀造成的消耗。
- 提高響應(yīng)速度:當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行药有。
- 提高線程的可管理性:線程是稀缺資源鸟廓,如果無(wú)限制的創(chuàng)建部念,不僅會(huì)消耗系統(tǒng)資源邀窃,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一的分配锦积,調(diào)優(yōu)和監(jiān)控藏否。
線程池介紹
ThreadPoolExecutor
Java 為我們提供了 ThreadPoolExecutor 來(lái)創(chuàng)建一個(gè)線程池,其完整構(gòu)造函數(shù)如下所示:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
int corePoolSize(核心線程數(shù)):線程池新建線程的時(shí)候充包,如果當(dāng)前線程總數(shù)小于corePoolSize副签,則新建的是核心線程,如果超過(guò)corePoolSize基矮,則新建的是非核心線程淆储;核心線程默認(rèn)情況下會(huì)一直存活在線程池中,即使這個(gè)核心線程啥也不干(閑置狀態(tài))家浇;如果設(shè)置了 allowCoreThreadTimeOut 為 true本砰,那么核心線程如果不干活(閑置狀態(tài))的話,超過(guò)一定時(shí)間(時(shí)長(zhǎng)下面參數(shù)決定)钢悲,就會(huì)被銷(xiāo)毀掉点额。
int maximumPoolSize(線程池能容納的最大線程數(shù)量):線程總數(shù) = 核心線程數(shù) + 非核心線程數(shù)。
long keepAliveTime(非核心線程空閑存活時(shí)長(zhǎng)):非核心線程空閑時(shí)長(zhǎng)超過(guò)該時(shí)長(zhǎng)將會(huì)被回收莺琳,主要應(yīng)用在緩存線程池中还棱,當(dāng)設(shè)置了 allowCoreThreadTimeOut 為 true 時(shí),對(duì)核心線程同樣起作用惭等。
TimeUnit unit(keepAliveTime 的單位):它是一個(gè)枚舉類(lèi)型珍手,常用的如:TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)辞做。
-
BlockingQueue workQueue(任務(wù)隊(duì)列):當(dāng)所有的核心線程都在干活時(shí)琳要,新添加的任務(wù)會(huì)被添加到這個(gè)隊(duì)列中等待處理,如果隊(duì)列滿了秤茅,則新建非核心線程執(zhí)行任務(wù)稚补,常用的 workQueue 類(lèi)型:
SynchronousQueue:這個(gè)隊(duì)列接收到任務(wù)的時(shí)候,會(huì)直接提交給線程處理框喳,而不保留它课幕,如果所有線程都在工作怎么辦?那就新建一個(gè)線程來(lái)處理這個(gè)任務(wù)帖努!所以為了保證不出現(xiàn) 線程數(shù)達(dá)到了 maximumPoolSize 而不能新建線程 的錯(cuò)誤撰豺,使用這個(gè)類(lèi)型隊(duì)列的時(shí)候,maximumPoolSize 一般指定成 Integer.MAX_VALUE拼余,即無(wú)限大。
LinkedBlockingQueue:這個(gè)隊(duì)列接收到任務(wù)的時(shí)候亩歹,如果當(dāng)前線程數(shù)小于核心線程數(shù)匙监,則新建線程(核心線程)處理任務(wù)凡橱;如果當(dāng)前線程數(shù)等于核心線程數(shù),則進(jìn)入隊(duì)列等待亭姥。由于這個(gè)隊(duì)列沒(méi)有最大值限制稼钩,即所有超過(guò)核心線程數(shù)的任務(wù)都將被添加到隊(duì)列中,這也就導(dǎo)致了 maximumPoolSize 的設(shè)定失效达罗,因?yàn)榭偩€程數(shù)永遠(yuǎn)不會(huì)超過(guò) corePoolSize坝撑。
ArrayBlockingQueue:可以限定隊(duì)列的長(zhǎng)度,接收到任務(wù)的時(shí)候粮揉,如果沒(méi)有達(dá)到 corePoolSize 的值巡李,則新建線程(核心線程)執(zhí)行任務(wù),如果達(dá)到了扶认,則入隊(duì)等候侨拦,如果隊(duì)列已滿,則新建線程(非核心線程)執(zhí)行任務(wù)辐宾,又如果總線程數(shù)到了 maximumPoolSize狱从,并且隊(duì)列也滿了,則發(fā)生錯(cuò)誤叠纹。
DelayQueue:隊(duì)列內(nèi)元素必須實(shí)現(xiàn) Delayed 接口季研,這就意味著你傳進(jìn)去的任務(wù)必須先實(shí)現(xiàn) Delayed 接口。這個(gè)隊(duì)列接收到任務(wù)時(shí)誉察,首先先入隊(duì)训貌,只有達(dá)到了指定的延時(shí)時(shí)間,才會(huì)執(zhí)行任務(wù)冒窍。
ThreadFactory threadFactory(線程工廠):用來(lái)創(chuàng)建線程池中的線程递沪,通常用默認(rèn)的即可。
-
RejectedExecutionHandler handler(拒絕策略):在線程池已經(jīng)關(guān)閉的情況下和任務(wù)太多導(dǎo)致最大線程數(shù)和任務(wù)隊(duì)列已經(jīng)飽和综液,無(wú)法再接收新的任務(wù)款慨,在上面兩種情況下,只要滿足其中一種時(shí)谬莹,在使用 execute() 來(lái)提交新的任務(wù)時(shí)將會(huì)拒絕檩奠,線程池提供了以下 4 種策略:
AbortPolicy:默認(rèn)策略,在拒絕任務(wù)時(shí)附帽,會(huì)拋出RejectedExecutionException埠戳。
CallerRunsPolicy:只要線程池未關(guān)閉,該策略直接在調(diào)用者線程中蕉扮,運(yùn)行當(dāng)前的被丟棄的任務(wù)整胃。
DiscardOldestPolicy:該策略將丟棄最老的一個(gè)請(qǐng)求,也就是即將被執(zhí)行的任務(wù)喳钟,并嘗試再次提交當(dāng)前任務(wù)屁使。
DiscardPolicy:該策略默默的丟棄無(wú)法處理的任務(wù)在岂,不予任何處理。
線程池執(zhí)行策略
當(dāng)一個(gè)任務(wù)要被添加進(jìn)線程池時(shí)蛮寂,有以下四種執(zhí)行策略:
- 線程數(shù)量未達(dá)到 corePoolSize蔽午,則新建一個(gè)線程(核心線程)執(zhí)行任務(wù)。
- 線程數(shù)量達(dá)到了 corePoolsSize酬蹋,則將任務(wù)移入隊(duì)列等待及老。
- 隊(duì)列已滿,新建非核心線程執(zhí)行任務(wù)范抓。
- 隊(duì)列已滿骄恶,總線程數(shù)又達(dá)到了 maximumPoolSize,就會(huì)由 RejectedExecutionHandler 拋出異常尉咕。
其流程圖如下所示:
常見(jiàn)的四類(lèi)線程池
常見(jiàn)的四類(lèi)線程池分別有 FixedThreadPool叠蝇、SingleThreadExecutor、ScheduledThreadPool 和 CachedThreadPool年缎,它們其實(shí)都是通過(guò) ThreadPoolExecutor 創(chuàng)建的悔捶,其參數(shù)如下表所示:
參數(shù) | FixedThreadPool | SingleThreadExecutor | ScheduledThreadPool | CachedThreadPool |
---|---|---|---|---|
corePoolSize | nThreads | 1 | corePoolSize | 0 |
maximumPoolSize | nThreads | 1 | Integer.MAX_VALUE | Integer.MAX_VALUE |
keepAliveTime | 0 | 0 | 10 | 60 |
unit | MILLISECONDS | MILLISECONDS | MILLISECONDS | SECONDS |
workQueue | LinkedBlockingQueue | LinkedBlockingQueue | DelayedWorkQueue | SynchronousQueue |
threadFactory | defaultThreadFactory | defaultThreadFactory | defaultThreadFactory | defaultThreadFactory |
handler | defaultHandler | defaultHandler | defaultHandler | defaultHandler |
適用場(chǎng)景 | 已知并發(fā)壓力的情況下,對(duì)線程數(shù)做限制 | 需要保證順序執(zhí)行的場(chǎng)景单芜,并且只有一個(gè)線程在執(zhí)行 | 需要多個(gè)后臺(tái)線程執(zhí)行周期任務(wù)的場(chǎng)景 | 處理執(zhí)行時(shí)間比較短的任務(wù) |
如果你不想自己寫(xiě)一個(gè)線程池蜕该,那么你可以從上面看看有沒(méi)有符合你要求的(一般都?jí)蛴昧耍绻兄摒敲春芎媚阒苯佑镁托辛颂玫绻麤](méi)有,那你就老老實(shí)實(shí)自己去寫(xiě)一個(gè)吧扒腕。
合理地配置線程池
需要針對(duì)具體情況而具體處理绢淀,不同的任務(wù)類(lèi)別應(yīng)采用不同規(guī)模的線程池,任務(wù)類(lèi)別可劃分為 CPU 密集型任務(wù)瘾腰、IO 密集型任務(wù)和混合型任務(wù)皆的。
CPU 密集型任務(wù):線程池中線程個(gè)數(shù)應(yīng)盡量少,推薦配置為 (CPU 核心數(shù) + 1)蹋盆;
IO 密集型任務(wù):由于 IO 操作速度遠(yuǎn)低于 CPU 速度费薄,那么在運(yùn)行這類(lèi)任務(wù)時(shí),CPU 絕大多數(shù)時(shí)間處于空閑狀態(tài)栖雾,那么線程池可以配置盡量多些的線程楞抡,以提高 CPU 利用率,推薦配置為 (2 * CPU 核心數(shù) + 1)析藕;
混合型任務(wù):可以拆分為 CPU 密集型任務(wù)和 IO 密集型任務(wù)召廷,當(dāng)這兩類(lèi)任務(wù)執(zhí)行時(shí)間相差無(wú)幾時(shí),通過(guò)拆分再執(zhí)行的吞吐率高于串行執(zhí)行的吞吐率,但若這兩類(lèi)任務(wù)執(zhí)行時(shí)間有數(shù)據(jù)級(jí)的差距柱恤,那么沒(méi)有拆分的意義数初。
線程池工具類(lèi)封裝及使用
為了提升開(kāi)發(fā)效率及更好地使用和管理線程池找爱,我已經(jīng)為你們封裝好了線程工具類(lèi)----ThreadUtils梗顺,依賴 AndroidUtilCode 1.16.1 版本即可使用,其 API 如下所示:
isMainThread : 判斷當(dāng)前是否主線程
getFixedPool : 獲取固定線程池
getSinglePool : 獲取單線程池
getCachedPool : 獲取緩沖線程池
getIoPool : 獲取 IO 線程池
getCpuPool : 獲取 CPU 線程池
executeByFixed : 在固定線程池執(zhí)行任務(wù)
executeByFixedWithDelay : 在固定線程池延時(shí)執(zhí)行任務(wù)
executeByFixedAtFixRate : 在固定線程池按固定頻率執(zhí)行任務(wù)
executeBySingle : 在單線程池執(zhí)行任務(wù)
executeBySingleWithDelay: 在單線程池延時(shí)執(zhí)行任務(wù)
executeBySingleAtFixRate: 在單線程池按固定頻率執(zhí)行任務(wù)
executeByCached : 在緩沖線程池執(zhí)行任務(wù)
executeByCachedWithDelay: 在緩沖線程池延時(shí)執(zhí)行任務(wù)
executeByCachedAtFixRate: 在緩沖線程池按固定頻率執(zhí)行任務(wù)
executeByIo : 在 IO 線程池執(zhí)行任務(wù)
executeByIoWithDelay : 在 IO 線程池延時(shí)執(zhí)行任務(wù)
executeByIoAtFixRate : 在 IO 線程池按固定頻率執(zhí)行任務(wù)
executeByCpu : 在 CPU 線程池執(zhí)行任務(wù)
executeByCpuWithDelay : 在 CPU 線程池延時(shí)執(zhí)行任務(wù)
executeByCpuAtFixRate : 在 CPU 線程池按固定頻率執(zhí)行任務(wù)
executeByCustom : 在自定義線程池執(zhí)行任務(wù)
executeByCustomWithDelay: 在自定義線程池延時(shí)執(zhí)行任務(wù)
executeByCustomAtFixRate: 在自定義線程池按固定頻率執(zhí)行任務(wù)
cancel : 取消任務(wù)的執(zhí)行
如果你使用 RxJava 很 6车摄,而且項(xiàng)目中已經(jīng)使用了 RxJava寺谤,那么你可以繼續(xù)使用 RxJava 來(lái)做線程切換的操作;如果你并不會(huì) RxJava 或者是在開(kāi)發(fā) SDK吮播,那么這個(gè)工具類(lèi)再適合你不過(guò)了变屁,它可以為你統(tǒng)一管理線程池的使用,不至于讓你的項(xiàng)目中出現(xiàn)過(guò)多的線程池意狠。
ThreadUtils 使用極為方便粟关,看 API 即可明白相關(guān)意思,F(xiàn)ixedPool环戈、SinglePool闷板、CachedPool 分別對(duì)應(yīng)了上面介紹的 FixedThreadPool、SingleThreadExecutor院塞、CachedThreadPool 這三種遮晚,IoPool 是創(chuàng)建 (CPU_COUNT * 2 + 1) 個(gè)核心線程數(shù),CpuPool 是建立 (CPU_COUNT + 1) 個(gè)核心線程數(shù)拦止;而所有的 execute 都是線程池外圍裹了一層 ScheduledThreadPool县遣,這里和 RxJava 線程池的實(shí)現(xiàn)有所相似,可以更方便地提供延時(shí)任務(wù)和固定頻率執(zhí)行的任務(wù)汹族,當(dāng)然也可以更方便地取消任務(wù)的執(zhí)行萧求,下面讓我們來(lái)簡(jiǎn)單地來(lái)介紹其使用,以從 assets 中拷貝 APK 到 SD 卡為例顶瞒,其代碼如下所示:
public static void releaseInstallApk(final OnReleasedListener listener) {
if (!FileUtils.isFileExists(Config.TEST_APK_PATH)) {
ThreadUtils.executeByIo(new ThreadUtils.SimpleTask<Void>() {
@Override
public Void doInBackground() throws Throwable {
ResourceUtils.copyFileFromAssets("test_install", Config.TEST_APK_PATH);
return null;
}
@Override
public void onSuccess(Void result) {
if (listener != null) {
listener.onReleased();
}
}
});
} else {
if (listener != null) {
listener.onReleased();
}
LogUtils.d("test apk existed.");
}
}
看起來(lái)還不是很優(yōu)雅是吧夸政,你可以把相關(guān)的 Task 都抽出來(lái)放到合適的包下,這樣每個(gè) Task 的指責(zé)一看便知搁拙,如上例子可以改裝成如下所示:
public class ReleaseInstallApkTask extends ThreadUtils.SimpleTask<Void> {
private OnReleasedListener mListener;
public ReleaseInstallApkTask(final OnReleasedListener listener) {
mListener = listener;
}
@Override
public Void doInBackground() throws Throwable {
ResourceUtils.copyFileFromAssets("test_install", Config.TEST_APK_PATH);
return null;
}
@Override
public void onSuccess(Void result) {
if (mListener != null) {
mListener.onReleased();
}
}
public void execute() {
ThreadUtils.executeByIo(this);
}
}
public static void releaseInstallApk(final OnReleasedListener listener) {
if (!FileUtils.isFileExists(Config.TEST_APK_PATH)) {
new ReleaseInstallApkTask(listener).execute();
} else {
if (listener != null) {
listener.onReleased();
}
LogUtils.d("test apk existed.");
}
}
是不是瞬間清爽了很多秒梳,如果執(zhí)行成功的回調(diào)中涉及了 View 相關(guān)的操作,那么你需要在 destroy 中取消 task 的執(zhí)行哦箕速,否則會(huì)內(nèi)存泄漏哦酪碘,繼續(xù)你上面的例子為例,代碼如下所示:
public class XXActivity extends Activity {
···
@Override
protected void onDestroy() {
// ThreadUtils.cancel(releaseInstallApkTask); 或者下面的取消都可以
releaseInstallApkTask.cancel();
super.onDestroy();
}
}
以上是以 SimpleTask 為例盐茎,Task 的話會(huì)多兩個(gè)回調(diào)兴垦,onCancel() 和 onFail(Throwable t),它們和 onSuccess(T result) 都是互斥的,最終回調(diào)只會(huì)走它們其中之一探越,并且在 Android 端是發(fā)送到主線程中執(zhí)行狡赐,如果是 Java 端的話那就還是會(huì)在相應(yīng)的線程池中執(zhí)行,這點(diǎn)也方便了我做單元測(cè)試钦幔。
線程池工具類(lèi)單元測(cè)試
如果遇到了異步的單測(cè)枕屉,你會(huì)發(fā)現(xiàn)單測(cè)很快就跑完呢,并沒(méi)有等待我們線程跑完再結(jié)束鲤氢,我們可以用 CountDownLatch 來(lái)等待線程的結(jié)束搀擂,或者化異步為同步的做法,這里我們使用 CountDownLatch 來(lái)實(shí)現(xiàn)卷玉,我進(jìn)行了簡(jiǎn)單的封裝哨颂,測(cè)試 Fixed 的代碼如下所示:
public class ThreadUtilsTest {
@Test
public void executeByFixed() throws Exception {
asyncTest(10, new TestRunnable<String>() {
@Override
public void run(final int index, CountDownLatch latch) {
final TestTask<String> task = new TestTask<String>(latch) {
@Override
public String doInBackground() throws Throwable {
Thread.sleep(500 + index * 10);
if (index < 4) {
return Thread.currentThread() + " :" + index;
} else if (index < 7) {
cancel();
return null;
} else {
throw new NullPointerException(String.valueOf(index));
}
}
@Override
void onTestSuccess(String result) {
System.out.println(result);
}
};
ThreadUtils.executeByFixed(3, task);
}
});
}
@Test
public void executeByFixedWithDelay() throws Exception {
asyncTest(10, new TestRunnable<String>() {
@Override
public void run(final int index, CountDownLatch latch) {
final TestTask<String> task = new TestTask<String>(latch) {
@Override
public String doInBackground() throws Throwable {
Thread.sleep(500);
if (index < 4) {
return Thread.currentThread() + " :" + index;
} else if (index < 7) {
cancel();
return null;
} else {
throw new NullPointerException(String.valueOf(index));
}
}
@Override
void onTestSuccess(String result) {
System.out.println(result);
}
};
ThreadUtils.executeByFixedWithDelay(3, task, 500 + index * 10, TimeUnit.MILLISECONDS);
}
});
}
@Test
public void executeByFixedAtFixRate() throws Exception {
asyncTest(10, new TestRunnable<String>() {
@Override
public void run(final int index, CountDownLatch latch) {
final TestScheduledTask<String> task = new TestScheduledTask<String>(latch, 3) {
@Override
public String doInBackground() throws Throwable {
Thread.sleep(500 + index * 10);
if (index < 4) {
return Thread.currentThread() + " :" + index;
} else if (index < 7) {
cancel();
return null;
} else {
throw new NullPointerException(String.valueOf(index));
}
}
@Override
void onTestSuccess(String result) {
System.out.println(result);
}
};
ThreadUtils.executeByFixedAtFixRate(3, task, 3000 + index * 10, TimeUnit.MILLISECONDS);
}
});
}
abstract static class TestScheduledTask<T> extends ThreadUtils.Task<T> {
private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger();
private int mTimes;
CountDownLatch mLatch;
TestScheduledTask(final CountDownLatch latch, final int times) {
mLatch = latch;
mTimes = times;
}
abstract void onTestSuccess(T result);
@Override
public void onSuccess(T result) {
onTestSuccess(result);
if (ATOMIC_INTEGER.addAndGet(1) % mTimes == 0) {
mLatch.countDown();
}
}
@Override
public void onCancel() {
System.out.println(Thread.currentThread() + " onCancel: ");
mLatch.countDown();
}
@Override
public void onFail(Throwable t) {
System.out.println(Thread.currentThread() + " onFail: " + t);
mLatch.countDown();
}
}
abstract static class TestTask<T> extends ThreadUtils.Task<T> {
CountDownLatch mLatch;
TestTask(final CountDownLatch latch) {
mLatch = latch;
}
abstract void onTestSuccess(T result);
@Override
public void onSuccess(T result) {
onTestSuccess(result);
mLatch.countDown();
}
@Override
public void onCancel() {
System.out.println(Thread.currentThread() + " onCancel: ");
mLatch.countDown();
}
@Override
public void onFail(Throwable t) {
System.out.println(Thread.currentThread() + " onFail: " + t);
mLatch.countDown();
}
}
<T> void asyncTest(int threadCount, TestRunnable<T> runnable) throws Exception {
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
runnable.run(i, latch);
}
latch.await();
}
interface TestRunnable<T> {
void run(final int index, CountDownLatch latch);
}
}
最后想說(shuō)的話
感謝大家一起陪伴 AndroidUtilCode 的成長(zhǎng),核心工具類(lèi)幾乎都已囊括相种,也是匯集了我大量的心血威恼,把開(kāi)源做到了極致,希望大家可以用的舒心寝并,大大提升開(kāi)發(fā)效率箫措,早日贏取白富美,走上人生巔峰食茎。
后文再添加一個(gè)個(gè)人對(duì) OkHttp 的線程池的使用分析蒂破,算是送上個(gè)小福利。
OkHttp 中的線程池使用
查看 OkHttp 的源碼發(fā)現(xiàn)别渔,不論是同步請(qǐng)求還是異步請(qǐng)求附迷,最終都是交給 Dispatcher 做處理,我們看下該類(lèi)和線程池有關(guān)的的主要代碼:
public final class Dispatcher {
// 最大請(qǐng)求數(shù)
private int maxRequests = 64;
// 相同 host 最大請(qǐng)求數(shù)
private int maxRequestsPerHost = 5;
// 請(qǐng)求執(zhí)行線程池哎媚,懶加載
private @Nullable ExecutorService executorService;
// 就緒狀態(tài)的異步請(qǐng)求隊(duì)列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
// 運(yùn)行中的異步請(qǐng)求隊(duì)列喇伯,包括還沒(méi)完成的請(qǐng)求
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
// 和 CachedThreadPool 很相似
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
synchronized void enqueue(AsyncCall call) {
// 不超過(guò)最大請(qǐng)求數(shù)并且不超過(guò) host 最大請(qǐng)求數(shù)
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
// 添加到運(yùn)行中的異步請(qǐng)求隊(duì)列
runningAsyncCalls.add(call);
// 添加到線程池中運(yùn)行
executorService().execute(call);
} else {
// 添加到就緒的異步請(qǐng)求隊(duì)列
readyAsyncCalls.add(call);
}
}
// 當(dāng)該異步請(qǐng)求結(jié)束的時(shí)候,會(huì)調(diào)用此方法拨与,用于將運(yùn)行中的異步請(qǐng)求隊(duì)列中的該請(qǐng)求移除并調(diào)整請(qǐng)求隊(duì)列
// 此時(shí)就緒隊(duì)列中的請(qǐng)求就可以進(jìn)入運(yùn)行中的隊(duì)列
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
// 根據(jù) maxRequests 和 maxRequestsPerHost 來(lái)調(diào)整 runningAsyncCalls 和 readyAsyncCalls
// 使運(yùn)行中的異步請(qǐng)求不超過(guò)兩種最大值稻据,并且如果隊(duì)列有空閑,將就緒狀態(tài)的請(qǐng)求歸類(lèi)為運(yùn)行中买喧。
private void promoteCalls() {
// 如果運(yùn)行中的異步隊(duì)列不小于最大請(qǐng)求數(shù)捻悯,直接返回
if (runningAsyncCalls.size() >= maxRequests) return;
// 如果就緒隊(duì)列為空,直接返回
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
// 遍歷就緒隊(duì)列并插入到運(yùn)行隊(duì)列
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
// 運(yùn)行隊(duì)列中的數(shù)量到達(dá)最大請(qǐng)求數(shù)淤毛,直接返回
if (runningAsyncCalls.size() >= maxRequests) return;
}
}
}
可以發(fā)現(xiàn) OkHttp 不是在線程池中維護(hù)線程的個(gè)數(shù)今缚,線程是通過(guò) Dispatcher 間接控制,線程池中的請(qǐng)求都是運(yùn)行中的請(qǐng)求低淡,這也就是說(shuō)線程的重用不是線程池控制的姓言,通過(guò)源碼我們發(fā)現(xiàn)線程重用的地方是請(qǐng)求結(jié)束的地方 finished(AsyncCall call)
瞬项,而真正的控制是通過(guò) promoteCalls
方法, 根據(jù) maxRequests
和 maxRequestsPerHost
來(lái)調(diào)整 runningAsyncCalls
和 readyAsyncCalls
何荚,使運(yùn)行中的異步請(qǐng)求不超過(guò)兩種最大值囱淋,并且如果隊(duì)列有空閑,將就緒狀態(tài)的請(qǐng)求歸類(lèi)為運(yùn)行中餐塘。