媽媽再也不用擔(dān)心你不會(huì)使用線程池了(ThreadUtils)

為什么要用線程池

使用線程池管理線程有如下優(yōu)點(diǎn):

  1. 降低資源消耗:通過(guò)重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷(xiāo)毀造成的消耗。
  2. 提高響應(yīng)速度:當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行药有。
  3. 提高線程的可管理性:線程是稀缺資源鸟廓,如果無(wú)限制的創(chuàng)建部念,不僅會(huì)消耗系統(tǒng)資源邀窃,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一的分配锦积,調(diào)優(yōu)和監(jiān)控藏否。

線程池介紹

ThreadPoolExecutor

Java 為我們提供了 ThreadPoolExecutor 來(lái)創(chuàng)建一個(gè)線程池,其完整構(gòu)造函數(shù)如下所示:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • int corePoolSize(核心線程數(shù)):線程池新建線程的時(shí)候充包,如果當(dāng)前線程總數(shù)小于corePoolSize副签,則新建的是核心線程,如果超過(guò)corePoolSize基矮,則新建的是非核心線程淆储;核心線程默認(rèn)情況下會(huì)一直存活在線程池中,即使這個(gè)核心線程啥也不干(閑置狀態(tài))家浇;如果設(shè)置了 allowCoreThreadTimeOut 為 true本砰,那么核心線程如果不干活(閑置狀態(tài))的話,超過(guò)一定時(shí)間(時(shí)長(zhǎng)下面參數(shù)決定)钢悲,就會(huì)被銷(xiāo)毀掉点额。

  • int maximumPoolSize(線程池能容納的最大線程數(shù)量):線程總數(shù) = 核心線程數(shù) + 非核心線程數(shù)。

  • long keepAliveTime(非核心線程空閑存活時(shí)長(zhǎng)):非核心線程空閑時(shí)長(zhǎng)超過(guò)該時(shí)長(zhǎng)將會(huì)被回收莺琳,主要應(yīng)用在緩存線程池中还棱,當(dāng)設(shè)置了 allowCoreThreadTimeOut 為 true 時(shí),對(duì)核心線程同樣起作用惭等。

  • TimeUnit unit(keepAliveTime 的單位):它是一個(gè)枚舉類(lèi)型珍手,常用的如:TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)辞做。

  • BlockingQueue workQueue(任務(wù)隊(duì)列):當(dāng)所有的核心線程都在干活時(shí)琳要,新添加的任務(wù)會(huì)被添加到這個(gè)隊(duì)列中等待處理,如果隊(duì)列滿了秤茅,則新建非核心線程執(zhí)行任務(wù)稚补,常用的 workQueue 類(lèi)型:

    1. SynchronousQueue:這個(gè)隊(duì)列接收到任務(wù)的時(shí)候,會(huì)直接提交給線程處理框喳,而不保留它课幕,如果所有線程都在工作怎么辦?那就新建一個(gè)線程來(lái)處理這個(gè)任務(wù)帖努!所以為了保證不出現(xiàn) 線程數(shù)達(dá)到了 maximumPoolSize 而不能新建線程 的錯(cuò)誤撰豺,使用這個(gè)類(lèi)型隊(duì)列的時(shí)候,maximumPoolSize 一般指定成 Integer.MAX_VALUE拼余,即無(wú)限大。

    2. LinkedBlockingQueue:這個(gè)隊(duì)列接收到任務(wù)的時(shí)候亩歹,如果當(dāng)前線程數(shù)小于核心線程數(shù)匙监,則新建線程(核心線程)處理任務(wù)凡橱;如果當(dāng)前線程數(shù)等于核心線程數(shù),則進(jìn)入隊(duì)列等待亭姥。由于這個(gè)隊(duì)列沒(méi)有最大值限制稼钩,即所有超過(guò)核心線程數(shù)的任務(wù)都將被添加到隊(duì)列中,這也就導(dǎo)致了 maximumPoolSize 的設(shè)定失效达罗,因?yàn)榭偩€程數(shù)永遠(yuǎn)不會(huì)超過(guò) corePoolSize坝撑。

    3. ArrayBlockingQueue:可以限定隊(duì)列的長(zhǎng)度,接收到任務(wù)的時(shí)候粮揉,如果沒(méi)有達(dá)到 corePoolSize 的值巡李,則新建線程(核心線程)執(zhí)行任務(wù),如果達(dá)到了扶认,則入隊(duì)等候侨拦,如果隊(duì)列已滿,則新建線程(非核心線程)執(zhí)行任務(wù)辐宾,又如果總線程數(shù)到了 maximumPoolSize狱从,并且隊(duì)列也滿了,則發(fā)生錯(cuò)誤叠纹。

    4. DelayQueue:隊(duì)列內(nèi)元素必須實(shí)現(xiàn) Delayed 接口季研,這就意味著你傳進(jìn)去的任務(wù)必須先實(shí)現(xiàn) Delayed 接口。這個(gè)隊(duì)列接收到任務(wù)時(shí)誉察,首先先入隊(duì)训貌,只有達(dá)到了指定的延時(shí)時(shí)間,才會(huì)執(zhí)行任務(wù)冒窍。

  • ThreadFactory threadFactory(線程工廠):用來(lái)創(chuàng)建線程池中的線程递沪,通常用默認(rèn)的即可。

  • RejectedExecutionHandler handler(拒絕策略):在線程池已經(jīng)關(guān)閉的情況下和任務(wù)太多導(dǎo)致最大線程數(shù)和任務(wù)隊(duì)列已經(jīng)飽和综液,無(wú)法再接收新的任務(wù)款慨,在上面兩種情況下,只要滿足其中一種時(shí)谬莹,在使用 execute() 來(lái)提交新的任務(wù)時(shí)將會(huì)拒絕檩奠,線程池提供了以下 4 種策略:

    1. AbortPolicy:默認(rèn)策略,在拒絕任務(wù)時(shí)附帽,會(huì)拋出RejectedExecutionException埠戳。

    2. CallerRunsPolicy:只要線程池未關(guān)閉,該策略直接在調(diào)用者線程中蕉扮,運(yùn)行當(dāng)前的被丟棄的任務(wù)整胃。

    3. DiscardOldestPolicy:該策略將丟棄最老的一個(gè)請(qǐng)求,也就是即將被執(zhí)行的任務(wù)喳钟,并嘗試再次提交當(dāng)前任務(wù)屁使。

    4. DiscardPolicy:該策略默默的丟棄無(wú)法處理的任務(wù)在岂,不予任何處理。

線程池執(zhí)行策略

當(dāng)一個(gè)任務(wù)要被添加進(jìn)線程池時(shí)蛮寂,有以下四種執(zhí)行策略:

  1. 線程數(shù)量未達(dá)到 corePoolSize蔽午,則新建一個(gè)線程(核心線程)執(zhí)行任務(wù)。
  2. 線程數(shù)量達(dá)到了 corePoolsSize酬蹋,則將任務(wù)移入隊(duì)列等待及老。
  3. 隊(duì)列已滿,新建非核心線程執(zhí)行任務(wù)范抓。
  4. 隊(duì)列已滿骄恶,總線程數(shù)又達(dá)到了 maximumPoolSize,就會(huì)由 RejectedExecutionHandler 拋出異常尉咕。

其流程圖如下所示:

image

常見(jiàn)的四類(lèi)線程池

常見(jiàn)的四類(lèi)線程池分別有 FixedThreadPool叠蝇、SingleThreadExecutor、ScheduledThreadPool 和 CachedThreadPool年缎,它們其實(shí)都是通過(guò) ThreadPoolExecutor 創(chuàng)建的悔捶,其參數(shù)如下表所示:

參數(shù) FixedThreadPool SingleThreadExecutor ScheduledThreadPool CachedThreadPool
corePoolSize nThreads 1 corePoolSize 0
maximumPoolSize nThreads 1 Integer.MAX_VALUE Integer.MAX_VALUE
keepAliveTime 0 0 10 60
unit MILLISECONDS MILLISECONDS MILLISECONDS SECONDS
workQueue LinkedBlockingQueue LinkedBlockingQueue DelayedWorkQueue SynchronousQueue
threadFactory defaultThreadFactory defaultThreadFactory defaultThreadFactory defaultThreadFactory
handler defaultHandler defaultHandler defaultHandler defaultHandler
適用場(chǎng)景 已知并發(fā)壓力的情況下,對(duì)線程數(shù)做限制 需要保證順序執(zhí)行的場(chǎng)景单芜,并且只有一個(gè)線程在執(zhí)行 需要多個(gè)后臺(tái)線程執(zhí)行周期任務(wù)的場(chǎng)景 處理執(zhí)行時(shí)間比較短的任務(wù)

如果你不想自己寫(xiě)一個(gè)線程池蜕该,那么你可以從上面看看有沒(méi)有符合你要求的(一般都?jí)蛴昧耍绻兄摒敲春芎媚阒苯佑镁托辛颂玫绻麤](méi)有,那你就老老實(shí)實(shí)自己去寫(xiě)一個(gè)吧扒腕。

合理地配置線程池

需要針對(duì)具體情況而具體處理绢淀,不同的任務(wù)類(lèi)別應(yīng)采用不同規(guī)模的線程池,任務(wù)類(lèi)別可劃分為 CPU 密集型任務(wù)瘾腰、IO 密集型任務(wù)和混合型任務(wù)皆的。

  • CPU 密集型任務(wù):線程池中線程個(gè)數(shù)應(yīng)盡量少,推薦配置為 (CPU 核心數(shù) + 1)蹋盆;

  • IO 密集型任務(wù):由于 IO 操作速度遠(yuǎn)低于 CPU 速度费薄,那么在運(yùn)行這類(lèi)任務(wù)時(shí),CPU 絕大多數(shù)時(shí)間處于空閑狀態(tài)栖雾,那么線程池可以配置盡量多些的線程楞抡,以提高 CPU 利用率,推薦配置為 (2 * CPU 核心數(shù) + 1)析藕;

  • 混合型任務(wù):可以拆分為 CPU 密集型任務(wù)和 IO 密集型任務(wù)召廷,當(dāng)這兩類(lèi)任務(wù)執(zhí)行時(shí)間相差無(wú)幾時(shí),通過(guò)拆分再執(zhí)行的吞吐率高于串行執(zhí)行的吞吐率,但若這兩類(lèi)任務(wù)執(zhí)行時(shí)間有數(shù)據(jù)級(jí)的差距柱恤,那么沒(méi)有拆分的意義数初。

線程池工具類(lèi)封裝及使用

為了提升開(kāi)發(fā)效率及更好地使用和管理線程池找爱,我已經(jīng)為你們封裝好了線程工具類(lèi)----ThreadUtils梗顺,依賴 AndroidUtilCode 1.16.1 版本即可使用,其 API 如下所示:

isMainThread            : 判斷當(dāng)前是否主線程
getFixedPool            : 獲取固定線程池
getSinglePool           : 獲取單線程池
getCachedPool           : 獲取緩沖線程池
getIoPool               : 獲取 IO 線程池
getCpuPool              : 獲取 CPU 線程池
executeByFixed          : 在固定線程池執(zhí)行任務(wù)
executeByFixedWithDelay : 在固定線程池延時(shí)執(zhí)行任務(wù)
executeByFixedAtFixRate : 在固定線程池按固定頻率執(zhí)行任務(wù)
executeBySingle         : 在單線程池執(zhí)行任務(wù)
executeBySingleWithDelay: 在單線程池延時(shí)執(zhí)行任務(wù)
executeBySingleAtFixRate: 在單線程池按固定頻率執(zhí)行任務(wù)
executeByCached         : 在緩沖線程池執(zhí)行任務(wù)
executeByCachedWithDelay: 在緩沖線程池延時(shí)執(zhí)行任務(wù)
executeByCachedAtFixRate: 在緩沖線程池按固定頻率執(zhí)行任務(wù)
executeByIo             : 在 IO 線程池執(zhí)行任務(wù)
executeByIoWithDelay    : 在 IO 線程池延時(shí)執(zhí)行任務(wù)
executeByIoAtFixRate    : 在 IO 線程池按固定頻率執(zhí)行任務(wù)
executeByCpu            : 在 CPU 線程池執(zhí)行任務(wù)
executeByCpuWithDelay   : 在 CPU 線程池延時(shí)執(zhí)行任務(wù)
executeByCpuAtFixRate   : 在 CPU 線程池按固定頻率執(zhí)行任務(wù)
executeByCustom         : 在自定義線程池執(zhí)行任務(wù)
executeByCustomWithDelay: 在自定義線程池延時(shí)執(zhí)行任務(wù)
executeByCustomAtFixRate: 在自定義線程池按固定頻率執(zhí)行任務(wù)
cancel                  : 取消任務(wù)的執(zhí)行

如果你使用 RxJava 很 6车摄,而且項(xiàng)目中已經(jīng)使用了 RxJava寺谤,那么你可以繼續(xù)使用 RxJava 來(lái)做線程切換的操作;如果你并不會(huì) RxJava 或者是在開(kāi)發(fā) SDK吮播,那么這個(gè)工具類(lèi)再適合你不過(guò)了变屁,它可以為你統(tǒng)一管理線程池的使用,不至于讓你的項(xiàng)目中出現(xiàn)過(guò)多的線程池意狠。

ThreadUtils 使用極為方便粟关,看 API 即可明白相關(guān)意思,F(xiàn)ixedPool环戈、SinglePool闷板、CachedPool 分別對(duì)應(yīng)了上面介紹的 FixedThreadPool、SingleThreadExecutor院塞、CachedThreadPool 這三種遮晚,IoPool 是創(chuàng)建 (CPU_COUNT * 2 + 1) 個(gè)核心線程數(shù),CpuPool 是建立 (CPU_COUNT + 1) 個(gè)核心線程數(shù)拦止;而所有的 execute 都是線程池外圍裹了一層 ScheduledThreadPool县遣,這里和 RxJava 線程池的實(shí)現(xiàn)有所相似,可以更方便地提供延時(shí)任務(wù)和固定頻率執(zhí)行的任務(wù)汹族,當(dāng)然也可以更方便地取消任務(wù)的執(zhí)行萧求,下面讓我們來(lái)簡(jiǎn)單地來(lái)介紹其使用,以從 assets 中拷貝 APK 到 SD 卡為例顶瞒,其代碼如下所示:

public static void releaseInstallApk(final OnReleasedListener listener) {
    if (!FileUtils.isFileExists(Config.TEST_APK_PATH)) {
        ThreadUtils.executeByIo(new ThreadUtils.SimpleTask<Void>() {
            @Override
            public Void doInBackground() throws Throwable {
                ResourceUtils.copyFileFromAssets("test_install", Config.TEST_APK_PATH);
                return null;
            }

            @Override
            public void onSuccess(Void result) {
                if (listener != null) {
                    listener.onReleased();
                }
            }
        });
    } else {
        if (listener != null) {
            listener.onReleased();
        }
        LogUtils.d("test apk existed.");
    }
}

看起來(lái)還不是很優(yōu)雅是吧夸政,你可以把相關(guān)的 Task 都抽出來(lái)放到合適的包下,這樣每個(gè) Task 的指責(zé)一看便知搁拙,如上例子可以改裝成如下所示:

public class ReleaseInstallApkTask extends ThreadUtils.SimpleTask<Void> {

    private OnReleasedListener mListener;

    public ReleaseInstallApkTask(final OnReleasedListener listener) {
        mListener = listener;
    }

    @Override
    public Void doInBackground() throws Throwable {
        ResourceUtils.copyFileFromAssets("test_install", Config.TEST_APK_PATH);
        return null;
    }

    @Override
    public void onSuccess(Void result) {
        if (mListener != null) {
            mListener.onReleased();
        }
    }

    public void execute() {
        ThreadUtils.executeByIo(this);
    }
}

public static void releaseInstallApk(final OnReleasedListener listener) {
    if (!FileUtils.isFileExists(Config.TEST_APK_PATH)) {
        new ReleaseInstallApkTask(listener).execute();
    } else {
        if (listener != null) {
            listener.onReleased();
        }
        LogUtils.d("test apk existed.");
    }
}

是不是瞬間清爽了很多秒梳,如果執(zhí)行成功的回調(diào)中涉及了 View 相關(guān)的操作,那么你需要在 destroy 中取消 task 的執(zhí)行哦箕速,否則會(huì)內(nèi)存泄漏哦酪碘,繼續(xù)你上面的例子為例,代碼如下所示:

public class XXActivity extends Activity {
    ···
    
    @Override
    protected void onDestroy() {
        // ThreadUtils.cancel(releaseInstallApkTask); 或者下面的取消都可以
        releaseInstallApkTask.cancel();
        super.onDestroy();
    }
}

以上是以 SimpleTask 為例盐茎,Task 的話會(huì)多兩個(gè)回調(diào)兴垦,onCancel() 和 onFail(Throwable t),它們和 onSuccess(T result) 都是互斥的,最終回調(diào)只會(huì)走它們其中之一探越,并且在 Android 端是發(fā)送到主線程中執(zhí)行狡赐,如果是 Java 端的話那就還是會(huì)在相應(yīng)的線程池中執(zhí)行,這點(diǎn)也方便了我做單元測(cè)試钦幔。

線程池工具類(lèi)單元測(cè)試

如果遇到了異步的單測(cè)枕屉,你會(huì)發(fā)現(xiàn)單測(cè)很快就跑完呢,并沒(méi)有等待我們線程跑完再結(jié)束鲤氢,我們可以用 CountDownLatch 來(lái)等待線程的結(jié)束搀擂,或者化異步為同步的做法,這里我們使用 CountDownLatch 來(lái)實(shí)現(xiàn)卷玉,我進(jìn)行了簡(jiǎn)單的封裝哨颂,測(cè)試 Fixed 的代碼如下所示:

public class ThreadUtilsTest {

    @Test
    public void executeByFixed() throws Exception {
        asyncTest(10, new TestRunnable<String>() {
            @Override
            public void run(final int index, CountDownLatch latch) {
                final TestTask<String> task = new TestTask<String>(latch) {
                    @Override
                    public String doInBackground() throws Throwable {
                        Thread.sleep(500 + index * 10);
                        if (index < 4) {
                            return Thread.currentThread() + " :" + index;
                        } else if (index < 7) {
                            cancel();
                            return null;
                        } else {
                            throw new NullPointerException(String.valueOf(index));
                        }
                    }

                    @Override
                    void onTestSuccess(String result) {
                        System.out.println(result);
                    }
                };
                ThreadUtils.executeByFixed(3, task);
            }
        });
    }

    @Test
    public void executeByFixedWithDelay() throws Exception {
        asyncTest(10, new TestRunnable<String>() {
            @Override
            public void run(final int index, CountDownLatch latch) {
                final TestTask<String> task = new TestTask<String>(latch) {
                    @Override
                    public String doInBackground() throws Throwable {
                        Thread.sleep(500);
                        if (index < 4) {
                            return Thread.currentThread() + " :" + index;
                        } else if (index < 7) {
                            cancel();
                            return null;
                        } else {
                            throw new NullPointerException(String.valueOf(index));
                        }
                    }

                    @Override
                    void onTestSuccess(String result) {
                        System.out.println(result);
                    }
                };
                ThreadUtils.executeByFixedWithDelay(3, task, 500 + index * 10, TimeUnit.MILLISECONDS);
            }
        });
    }

    @Test
    public void executeByFixedAtFixRate() throws Exception {
        asyncTest(10, new TestRunnable<String>() {
            @Override
            public void run(final int index, CountDownLatch latch) {
                final TestScheduledTask<String> task = new TestScheduledTask<String>(latch, 3) {
                    @Override
                    public String doInBackground() throws Throwable {
                        Thread.sleep(500 + index * 10);
                        if (index < 4) {
                            return Thread.currentThread() + " :" + index;
                        } else if (index < 7) {
                            cancel();
                            return null;
                        } else {
                            throw new NullPointerException(String.valueOf(index));
                        }
                    }

                    @Override
                    void onTestSuccess(String result) {
                        System.out.println(result);
                    }
                };
                ThreadUtils.executeByFixedAtFixRate(3, task, 3000 + index * 10, TimeUnit.MILLISECONDS);
            }
        });
    }

    abstract static class TestScheduledTask<T> extends ThreadUtils.Task<T> {

        private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger();
        private int mTimes;
        CountDownLatch mLatch;

        TestScheduledTask(final CountDownLatch latch, final int times) {
            mLatch = latch;
            mTimes = times;
        }

        abstract void onTestSuccess(T result);

        @Override
        public void onSuccess(T result) {
            onTestSuccess(result);
            if (ATOMIC_INTEGER.addAndGet(1) % mTimes == 0) {
                mLatch.countDown();
            }
        }

        @Override
        public void onCancel() {
            System.out.println(Thread.currentThread() + " onCancel: ");
            mLatch.countDown();
        }

        @Override
        public void onFail(Throwable t) {
            System.out.println(Thread.currentThread() + " onFail: " + t);
            mLatch.countDown();
        }
    }

    abstract static class TestTask<T> extends ThreadUtils.Task<T> {
        CountDownLatch mLatch;

        TestTask(final CountDownLatch latch) {
            mLatch = latch;
        }

        abstract void onTestSuccess(T result);

        @Override
        public void onSuccess(T result) {
            onTestSuccess(result);
            mLatch.countDown();
        }

        @Override
        public void onCancel() {
            System.out.println(Thread.currentThread() + " onCancel: ");
            mLatch.countDown();
        }

        @Override
        public void onFail(Throwable t) {
            System.out.println(Thread.currentThread() + " onFail: " + t);
            mLatch.countDown();
        }
    }

    <T> void asyncTest(int threadCount, TestRunnable<T> runnable) throws Exception {
        CountDownLatch latch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            runnable.run(i, latch);
        }
        latch.await();
    }

    interface TestRunnable<T> {
        void run(final int index, CountDownLatch latch);
    }
}

最后想說(shuō)的話

感謝大家一起陪伴 AndroidUtilCode 的成長(zhǎng),核心工具類(lèi)幾乎都已囊括相种,也是匯集了我大量的心血威恼,把開(kāi)源做到了極致,希望大家可以用的舒心寝并,大大提升開(kāi)發(fā)效率箫措,早日贏取白富美,走上人生巔峰食茎。

后文再添加一個(gè)個(gè)人對(duì) OkHttp 的線程池的使用分析蒂破,算是送上個(gè)小福利。

OkHttp 中的線程池使用

查看 OkHttp 的源碼發(fā)現(xiàn)别渔,不論是同步請(qǐng)求還是異步請(qǐng)求附迷,最終都是交給 Dispatcher 做處理,我們看下該類(lèi)和線程池有關(guān)的的主要代碼:

public final class Dispatcher {
  // 最大請(qǐng)求數(shù)
  private int maxRequests = 64;
  // 相同 host 最大請(qǐng)求數(shù)
  private int maxRequestsPerHost = 5;
  // 請(qǐng)求執(zhí)行線程池哎媚,懶加載
  private @Nullable ExecutorService executorService;
  // 就緒狀態(tài)的異步請(qǐng)求隊(duì)列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
  // 運(yùn)行中的異步請(qǐng)求隊(duì)列喇伯,包括還沒(méi)完成的請(qǐng)求
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  public Dispatcher(ExecutorService executorService) {
      this.executorService = executorService;
  }

  public Dispatcher() {
  }

  public synchronized ExecutorService executorService() {
      if (executorService == null) {
          // 和 CachedThreadPool 很相似
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
      }
      return executorService;
  }

  synchronized void enqueue(AsyncCall call) {
    // 不超過(guò)最大請(qǐng)求數(shù)并且不超過(guò) host 最大請(qǐng)求數(shù)
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      // 添加到運(yùn)行中的異步請(qǐng)求隊(duì)列
      runningAsyncCalls.add(call);
      // 添加到線程池中運(yùn)行
      executorService().execute(call);
    } else {
      // 添加到就緒的異步請(qǐng)求隊(duì)列
      readyAsyncCalls.add(call);
    }
  }

  // 當(dāng)該異步請(qǐng)求結(jié)束的時(shí)候,會(huì)調(diào)用此方法拨与,用于將運(yùn)行中的異步請(qǐng)求隊(duì)列中的該請(qǐng)求移除并調(diào)整請(qǐng)求隊(duì)列
  // 此時(shí)就緒隊(duì)列中的請(qǐng)求就可以進(jìn)入運(yùn)行中的隊(duì)列
  void finished(AsyncCall call) {
      finished(runningAsyncCalls, call, true);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
      int runningCallsCount;
      Runnable idleCallback;
      synchronized (this) {
          if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
          if (promoteCalls) promoteCalls();
          runningCallsCount = runningCallsCount();
          idleCallback = this.idleCallback;
      }

      if (runningCallsCount == 0 && idleCallback != null) {
          idleCallback.run();
      }
  }

  // 根據(jù) maxRequests 和 maxRequestsPerHost 來(lái)調(diào)整 runningAsyncCalls 和 readyAsyncCalls
  // 使運(yùn)行中的異步請(qǐng)求不超過(guò)兩種最大值稻据,并且如果隊(duì)列有空閑,將就緒狀態(tài)的請(qǐng)求歸類(lèi)為運(yùn)行中买喧。
  private void promoteCalls() {
    // 如果運(yùn)行中的異步隊(duì)列不小于最大請(qǐng)求數(shù)捻悯,直接返回
    if (runningAsyncCalls.size() >= maxRequests) return;
    // 如果就緒隊(duì)列為空,直接返回
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    // 遍歷就緒隊(duì)列并插入到運(yùn)行隊(duì)列
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }
      // 運(yùn)行隊(duì)列中的數(shù)量到達(dá)最大請(qǐng)求數(shù)淤毛,直接返回
      if (runningAsyncCalls.size() >= maxRequests) return;
    }
  }
}

可以發(fā)現(xiàn) OkHttp 不是在線程池中維護(hù)線程的個(gè)數(shù)今缚,線程是通過(guò) Dispatcher 間接控制,線程池中的請(qǐng)求都是運(yùn)行中的請(qǐng)求低淡,這也就是說(shuō)線程的重用不是線程池控制的姓言,通過(guò)源碼我們發(fā)現(xiàn)線程重用的地方是請(qǐng)求結(jié)束的地方 finished(AsyncCall call) 瞬项,而真正的控制是通過(guò) promoteCalls 方法, 根據(jù) maxRequestsmaxRequestsPerHost 來(lái)調(diào)整 runningAsyncCallsreadyAsyncCalls何荚,使運(yùn)行中的異步請(qǐng)求不超過(guò)兩種最大值囱淋,并且如果隊(duì)列有空閑,將就緒狀態(tài)的請(qǐng)求歸類(lèi)為運(yùn)行中餐塘。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末妥衣,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子唠倦,更是在濱河造成了極大的恐慌称鳞,老刑警劉巖涮较,帶你破解...
    沈念sama閱讀 216,402評(píng)論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件稠鼻,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡狂票,警方通過(guò)查閱死者的電腦和手機(jī)候齿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)闺属,“玉大人慌盯,你說(shuō)我怎么就攤上這事〉嗥鳎” “怎么了亚皂?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,483評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)国瓮。 經(jīng)常有香客問(wèn)我灭必,道長(zhǎng),這世上最難降的妖魔是什么乃摹? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,165評(píng)論 1 292
  • 正文 為了忘掉前任禁漓,我火速辦了婚禮,結(jié)果婚禮上孵睬,老公的妹妹穿的比我還像新娘播歼。我一直安慰自己,他們只是感情好掰读,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,176評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布秘狞。 她就那樣靜靜地躺著,像睡著了一般蹈集。 火紅的嫁衣襯著肌膚如雪烁试。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,146評(píng)論 1 297
  • 那天雾狈,我揣著相機(jī)與錄音廓潜,去河邊找鬼。 笑死,一個(gè)胖子當(dāng)著我的面吹牛辩蛋,可吹牛的內(nèi)容都是我干的呻畸。 我是一名探鬼主播,決...
    沈念sama閱讀 40,032評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼悼院,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼伤为!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起据途,我...
    開(kāi)封第一講書(shū)人閱讀 38,896評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤绞愚,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后颖医,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體位衩,經(jīng)...
    沈念sama閱讀 45,311評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,536評(píng)論 2 332
  • 正文 我和宋清朗相戀三年熔萧,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了糖驴。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,696評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡佛致,死狀恐怖贮缕,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情俺榆,我是刑警寧澤感昼,帶...
    沈念sama閱讀 35,413評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站罐脊,受9級(jí)特大地震影響定嗓,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜爹殊,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,008評(píng)論 3 325
  • 文/蒙蒙 一蜕乡、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧梗夸,春花似錦层玲、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至铅碍,卻和暖如春润绵,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背胞谈。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,815評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工尘盼, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留憨愉,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,698評(píng)論 2 368
  • 正文 我出身青樓卿捎,卻偏偏與公主長(zhǎng)得像配紫,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子午阵,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,592評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容