從零實現(xiàn)ImageLoader(三)—— 線程池詳解

目錄

從零實現(xiàn)ImageLoader(一)—— 架構
從零實現(xiàn)ImageLoader(二)—— 基本實現(xiàn)
從零實現(xiàn)ImageLoader(三)—— 線程池詳解
從零實現(xiàn)ImageLoader(四)—— Handler的內心獨白
從零實現(xiàn)ImageLoader(五)—— 內存緩存LruCache
從零實現(xiàn)ImageLoader(六)—— 磁盤緩存DiskLruCache

異步加載

既然是異步加載那新開線程自然是必不可少的。一個線程怎么樣娃善?這種情況下圖片得一個一個依次加載鲜戒,效率未免太低了境钟。那每張圖片新開一個線程怎么樣癞蚕?在圖片過多的情況下析桥,線程數(shù)量也會迅速隨之增長冻记,系統(tǒng)資源消耗太多嚴重买置,也不能接受秆麸。這時候就是線程池ExecutorService這個線程管理工具登場的時候了失都。

public class Dispatcher {
    private final String mUrl;
    private final ExecutorService mExecutorService;

    public Dispatcher(String url, ExecutorService executorService) {
        mUrl = url;
        mExecutorService = executorService;
    }

    public void into(ImageView imageView) {
        mExecutorService.execute(() -> {
            try {
                Bitmap image = get();
                //這一句將代碼切換到主線程缚态,下一篇文章再詳細解釋
                ImageLoader.HANDLER.post(() -> imageView.setImageBitmap(image));
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }

    ...
}

ImageLoader類負責線程池的創(chuàng)建:

public class ImageLoader {
    ...
    
    private static final int MAX_THREAD_NUM = 3;
    private final ExecutorService mExecutorService;

    private ImageLoader(Context context) {
        //防止單例持有Activity的Context導致內存泄露
        mContext = context.getApplicationContext();
        mExecutorService = Executors.newFixedThreadPool(MAX_THREAD_NUM);
    }

    public Dispatcher load(String url) {
        return new Dispatcher(url, mExecutorService);
    }
}

這樣異步加載就實現(xiàn)完成了卸留,測試一下:

public class MainActivity extends Activity {
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        ImageView imageView = findViewById(R.id.image);
        ImageLoader.with(this)
                .load("https://i.redd.it/20mplvimm8ez.jpg")
                .into(imageView);
    }
}
效果圖

線程池使用

當然我們今天的重點不在異步加載,而是在線程池上慨默。

Executors

我們平時使用線程池只需要調用Executors.new**ThreadPool()方法贩耐,甚至都不需要關心創(chuàng)建的類是什么。那今天就從Executors入手去探尋線程池的廬山真面目:

public class Executors {
    ...

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
}

可以看到厦取,平時使用最頻繁的這幾個方法基本都是直接創(chuàng)建了ThreadPoolExecutor類潮太,只是參數(shù)有所不同。唯一比較特殊的ScheduledThreadPoolExecutor也繼承自ThreadPoolExecutor虾攻。

ThreadPoolExecutor構造方法

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • corePoolSize:線程池的核心線程數(shù)铡买。當線程數(shù)小于corePoolSize時會創(chuàng)建一個線程去執(zhí)行任務,當線程數(shù)達到corePoolSize時會將任務放入等待隊列霎箍。如果沒有手動調用核心線程超時奇钞,這些線程在創(chuàng)建后會一直存在。
  • maximumPoolSize:線程池允許創(chuàng)建的最大線程數(shù)漂坏。在等待隊列添加滿之后景埃,線程池會創(chuàng)建臨時線程用來處理任務,臨時隊列在超時后會自動結束顶别。而臨時線程與核心線程的總數(shù)不能超過maximumPoolSize纠亚。
  • keepAliveTime:臨時線程超時時間。
  • unitkeepAliveTime時間單位筋夏。
  • workQueue:線程池的等待隊列蒂胞。當線程數(shù)達到corePoolSize時會任務會被放入該等待隊列。
  • threadFactory:線程工廠条篷。用于創(chuàng)建新線程骗随。
  • handler:飽和處理策略。當線程池關閉或者線程數(shù)達到maximumPoolSize時赴叹,任務被放入該handler鸿染。

在看完上面的一系列參數(shù),可能還是一臉懵逼乞巧。其實大家可以線程池當做一個工廠涨椒,這個工廠負責對一些半成品進行加工,而核心線程就是這個工廠的工人绽媒。

每個工人同時只能處理一個半成品蚕冬,多余的半成品就被放入倉庫,等哪個工人處理完了手頭的半成品再來倉庫取是辕,這個倉庫就是等待隊列囤热。

但是倉庫也有限制,隨著半成品越來越多倉庫也放不下了获三,這時候工廠就請來一些臨時工來幫忙旁蔼,等工廠的任務輕了之后再請他們回去锨苏,這些臨時工就是臨時線程

可是工廠的資金也是有限的不能同時請?zhí)嗟墓と斯琢模@個資金限制就是maximumPoolSize伞租,而這些既沒有工人處理,倉庫又放不下的半成品就要想個辦法處理了限佩。是直接把它們丟掉葵诈?還是退回給半成品廠商?這就是飽和策略需要決定的了犀暑。

等待隊列

  • 同步移交隊列:任務不在隊列中存儲驯击,而是直接交給工作線程。這時可以使用SynchronousQueue實現(xiàn)耐亏,該隊列保證在插入時必須有另一個線程在等待獲取徊都,如果沒有則插入失敗。Executors.newCachedThreadPool()使用的就是該隊列广辰。
  • 無界隊列:例如無界LinkedBlockingQueue暇矫。ExecutorService.newFixedThreadPoolExecutorService.newSingleThreadExecutor使用的就是該隊列。
  • 有界隊列:例如有界LinkedBlockingQueueArrayBlockingQueue择吊。有界隊列避免了無界隊列無限制的增加導致資源耗盡的問題李根。

飽和策略

這里很明顯是策略模式,ThreadPoolExecutor給我們提供了四個已經(jīng)實現(xiàn)好的飽和策略几睛,不過我們也可以選擇自己實現(xiàn):

  • AbortPolicy:拋出RejectedExecutionException房轿。
  • CallerRunsPolicy:將任務放到調用execute()所在線程執(zhí)行,也就是直接調用任務的run()方法所森。
  • DiscardPolicy:直接丟棄任務囱持,不做任何處理。
  • DiscardOldestPolicy:將等待隊列頭部的任務刪除焕济,再重新執(zhí)行此任務纷妆。

套路

說了這么多,那到底應該怎么選擇呢晴弃?其實只要大致遵循一個規(guī)律掩幢,如果是計算密集型的任務,線程池的大小設為CPU的數(shù)目加1通常是最優(yōu)的上鞠,而如果是I/O密集型的任務就可以設置的大一些际邻,比如2倍的CPU的數(shù)目。當然旗国,具體的數(shù)目就要在運行過程中慢慢調試了枯怖。

線程池原理

講完了線程池的使用,接下來就是線程池的原理了能曾。這次的分析都基于Android 7.1.1的源碼度硝,其他版本的可能會在細節(jié)上有一些差異,不過大的方向不會有問題寿冕。

類結構

在了解ThreadPoolExecutor的實現(xiàn)之前我們首先對類的繼承結構要有一個整體的把握:

類結構

Executor是Java提供的用于簡化線程管理的接口蕊程,用戶只需通過execute()方法傳入Runnable的實現(xiàn),由Executor決定使用哪個線程處理同時負責線程的創(chuàng)建驼唱、運行和關閉藻茂。

ExecutorService,這個是我們平時使用線程池最用的了玫恳,在Executor的基礎上又加入了submit()辨赐、shutdown()等等一些方便用戶自主管理任務的方法。

AbstractExecutorService類實現(xiàn)了submit()等一系列方法京办,不過最主要的execute()依然留給了子類也就是今天的主角ThreadPoolExecutor去實現(xiàn)掀序。

概覽

ThreadPoolExecutor實際上使用的是生產(chǎn)者/消費者模型,在分析具體的代碼之前我們先看一下這個流程圖惭婿,有一個大概的印象不恭。

流程圖

execute()

關于execute()的處理過程,Java源碼有很詳細的注釋财饥,這里我把它翻譯為中午供大家參考换吧。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * 三步走:
         *
         * 1. 如果當前運行的線程少于核心線程數(shù),嘗試開啟一個線程并將command作為
         * 它的第一個任務(作者注:這里的第一個任務在后面會有所解釋)钥星。調用
         * runWorker通過原子性的方式重新檢查線程池運行狀態(tài)和工作線程數(shù)沾瓦,如果
         * 不能添加線程則返回false
         *
         * 2. 如果任務可以成功入列,我們依然要再次檢查線程池是否已經(jīng)關閉以及
         * 是否需要添加一個新線程(因為存在一種情況是在上次檢查之后所有的線程都已經(jīng)
         * 死光光了(作者注:至于為什么必須保證至少一個線程存活谦炒,我們在后面的
         * runWorker方法中會找到答案))贯莺。如果線程池已經(jīng)關閉,則將之前加入
         * 隊列的command彈出编饺;如果已經(jīng)沒有線程存活乖篷,則添加一個新線程。
         *
         * 3. 如果不能將任務入列透且,我們會嘗試添加一個新線程撕蔼。如果失敗了,要不
         * 就是線程池已經(jīng)關閉了秽誊,要不就是線程已經(jīng)飽和了(作者注:線程數(shù)達到最大值)鲸沮,
         * 這時候我們就將這個任務加入飽和策略。
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

這里有一點需要注意的是锅论,這個ctl變量的含義讼溺。其實ctl就是一個保存了線程池運行狀態(tài)以及線程數(shù)的原子整形變量:

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

它的高3位存儲線程池運行狀態(tài),即RUNNING最易、SHUTDOWN怒坯、STOP炫狱、TIDYINGTERMINATED五個狀態(tài)剔猿,低位存儲運行的線程數(shù)视译,可以用isRunning()判斷線程池是否處于運行狀態(tài),用workerCountOf獲取運行的線程數(shù)归敬。這里的ctl的用法非常巧妙酷含,強烈推薦大家去看一下源碼,這里由于篇幅所限就不再多說了汪茧。

addWorker()

    private boolean addWorker(Runnable firstTask, boolean core) {
        // 這一段for循環(huán)用來將ctl的值加1椅亚,如果線程池關閉或者線程數(shù)量
        // 達到限制,則直接返回false舱污。
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 判斷線程是否關閉或者已經(jīng)沒有需要執(zhí)行的任務
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                // 根據(jù)core判斷當前線程數(shù)量是否已經(jīng)達到限制
                // 如果core為true呀舔,則線程數(shù)不能大于核心線程數(shù)
                // 否則不能大于最大線程數(shù)
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 將ctl的值加1,如果成功則跳出外循環(huán)
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 判斷再次期間線程池狀態(tài)是否已經(jīng)發(fā)生改變慌闭,如果是則
                // 重新開始外循環(huán)别威,否則在內循環(huán)中再次嘗試對ctl值加1
                c = ctl.get(); 
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 在得到主鎖mainLock之后再次檢查線程池的狀態(tài)
                    // 如果已經(jīng)關閉則不再添加
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // 檢查線程t能否啟動
                            throw new IllegalThreadStateException();
                        // 將線程t加入線程集合
                        workers.add(w);
                        // 更新目前為止線程最多時達到的數(shù)目,與最大線程數(shù)
                        // 無關驴剔,調試用
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 正式開始運行線程t
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorker()方法很有意思省古,它用一個core參數(shù)來區(qū)分是添加核心線程還是添加臨時線程,一個方法可以有不同的功能丧失。這也是為什么上面的流程圖里豺妓,添加核心線程和臨時線程的箭頭上只有一個addWorker方法。

addWorker()添加線程的邏輯可以分為四步:

  1. 確保線程數(shù)不超過限制布讹,并將ctl的計數(shù)加1琳拭。
  2. firstTask封裝為Worker
  3. Worker加入線程集合workers描验。
  4. 啟動Worker的線程白嘁。

有人已經(jīng)注意到addWorker()的參數(shù)名有點奇怪,明明只添加了一個任務為什么要叫firstTask呢膘流?在addWorker()的代碼里firstTask傳入了Worker的構造器絮缅,后面一系列操作就都是相對Worker執(zhí)行的,那Worker又對firstTask做了什么呼股?

Worker

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker. */
        public void run() {
            runWorker(this);
        }

        ...
    }

可以看到Worker也繼承了Runnable接口耕魄,在構造方法里Worker通過ThreadFactory新開了一個線程,而傳入的Runnable卻是自己彭谁,所以之前addWorker()里的代碼t.start()最終執(zhí)行的將會是Workerrun()方法吸奴,也就是runWorker()

主循環(huán)runWorker

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

這里的邏輯初看起來可能是一頭霧水,其實很簡單则奥。重點是這里的while循環(huán)考润,首先會判斷firstTask是否為空,如果不為空則分四步:

  1. 調用beforeExecute()方法逞度。
  2. 調用taskrun()方法额划。
  3. 調用afterExecute()方法妙啃。
  4. task賦值為空档泽。

下一次循環(huán)task必定為空,于是執(zhí)行task = getTask()揖赴,這條語句是將Runnable任務從等待隊列workQueue里取出來賦值給task馆匿,于是再次執(zhí)行上面四步,直到線程池關閉或者等待超時燥滑。

每個Worker創(chuàng)建的線程在執(zhí)行完屬于自己的任務后渐北,還會繼續(xù)執(zhí)行等待隊列中的任務,所以這個firstTask也可以當做每個線程的啟動任務铭拧,這就是它為什么被叫做firstTask的原因赃蛛,也是runWorker方法為什么被稱為主循環(huán)的原因,線程池的設計者巧妙的用這一方法實現(xiàn)了線程的復用搀菩。

這也解答了之前的許多疑問:

  • 為什么沒有專門處理的等待隊列的線程?原因就在于每個線程都是處理等待隊列的線程呕臂。
  • 為什么在execute()方法中將任務加入等待隊列時,必須保證至少有一個線程存活肪跋?這是為了確保存在存活線程去執(zhí)行等待隊列中的任務歧蒋。

總結

我們這次實現(xiàn)了圖片的異步加載,不過將重點放在了線程池的使用及其原理上州既,設計者的各種巧思也是讓我們嘆為觀止谜洽,大家如果有空可以自己嘗試看一下源碼,一定不會讓你們失望吴叶。下一篇文章我們將要講解的是Handler阐虚,敬請期待。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末蚌卤,一起剝皮案震驚了整個濱河市实束,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌造寝,老刑警劉巖磕洪,帶你破解...
    沈念sama閱讀 221,430評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異诫龙,居然都是意外死亡析显,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,406評論 3 398
  • 文/潘曉璐 我一進店門签赃,熙熙樓的掌柜王于貴愁眉苦臉地迎上來谷异,“玉大人分尸,你說我怎么就攤上這事〈踵冢” “怎么了箩绍?”我有些...
    開封第一講書人閱讀 167,834評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長尺上。 經(jīng)常有香客問我材蛛,道長,這世上最難降的妖魔是什么怎抛? 我笑而不...
    開封第一講書人閱讀 59,543評論 1 296
  • 正文 為了忘掉前任卑吭,我火速辦了婚禮,結果婚禮上马绝,老公的妹妹穿的比我還像新娘豆赏。我一直安慰自己,他們只是感情好富稻,可當我...
    茶點故事閱讀 68,547評論 6 397
  • 文/花漫 我一把揭開白布掷邦。 她就那樣靜靜地躺著,像睡著了一般椭赋。 火紅的嫁衣襯著肌膚如雪抚岗。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,196評論 1 308
  • 那天纹份,我揣著相機與錄音苟跪,去河邊找鬼。 笑死蔓涧,一個胖子當著我的面吹牛件已,可吹牛的內容都是我干的。 我是一名探鬼主播元暴,決...
    沈念sama閱讀 40,776評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼篷扩,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了茉盏?” 一聲冷哼從身側響起鉴未,我...
    開封第一講書人閱讀 39,671評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎鸠姨,沒想到半個月后铜秆,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,221評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡讶迁,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,303評論 3 340
  • 正文 我和宋清朗相戀三年连茧,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,444評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡啸驯,死狀恐怖客扎,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情罚斗,我是刑警寧澤徙鱼,帶...
    沈念sama閱讀 36,134評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站针姿,受9級特大地震影響袱吆,放射性物質發(fā)生泄漏。R本人自食惡果不足惜搓幌,卻給世界環(huán)境...
    茶點故事閱讀 41,810評論 3 333
  • 文/蒙蒙 一杆故、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧溉愁,春花似錦、人聲如沸饲趋。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,285評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽奕塑。三九已至堂污,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間龄砰,已是汗流浹背盟猖。 一陣腳步聲響...
    開封第一講書人閱讀 33,399評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留换棚,地道東北人式镐。 一個月前我還...
    沈念sama閱讀 48,837評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像固蚤,于是被迫代替她去往敵國和親娘汞。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,455評論 2 359

推薦閱讀更多精彩內容