目錄
從零實現(xiàn)ImageLoader(一)—— 架構
從零實現(xiàn)ImageLoader(二)—— 基本實現(xiàn)
從零實現(xiàn)ImageLoader(三)—— 線程池詳解
從零實現(xiàn)ImageLoader(四)—— Handler的內心獨白
從零實現(xiàn)ImageLoader(五)—— 內存緩存LruCache
從零實現(xiàn)ImageLoader(六)—— 磁盤緩存DiskLruCache
異步加載
既然是異步加載那新開線程自然是必不可少的。一個線程怎么樣娃善?這種情況下圖片得一個一個依次加載鲜戒,效率未免太低了境钟。那每張圖片新開一個線程怎么樣癞蚕?在圖片過多的情況下析桥,線程數(shù)量也會迅速隨之增長冻记,系統(tǒng)資源消耗太多嚴重买置,也不能接受秆麸。這時候就是線程池ExecutorService
這個線程管理工具登場的時候了失都。
public class Dispatcher {
private final String mUrl;
private final ExecutorService mExecutorService;
public Dispatcher(String url, ExecutorService executorService) {
mUrl = url;
mExecutorService = executorService;
}
public void into(ImageView imageView) {
mExecutorService.execute(() -> {
try {
Bitmap image = get();
//這一句將代碼切換到主線程缚态,下一篇文章再詳細解釋
ImageLoader.HANDLER.post(() -> imageView.setImageBitmap(image));
} catch (IOException e) {
e.printStackTrace();
}
});
}
...
}
ImageLoader
類負責線程池的創(chuàng)建:
public class ImageLoader {
...
private static final int MAX_THREAD_NUM = 3;
private final ExecutorService mExecutorService;
private ImageLoader(Context context) {
//防止單例持有Activity的Context導致內存泄露
mContext = context.getApplicationContext();
mExecutorService = Executors.newFixedThreadPool(MAX_THREAD_NUM);
}
public Dispatcher load(String url) {
return new Dispatcher(url, mExecutorService);
}
}
這樣異步加載就實現(xiàn)完成了卸留,測試一下:
public class MainActivity extends Activity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
ImageView imageView = findViewById(R.id.image);
ImageLoader.with(this)
.load("https://i.redd.it/20mplvimm8ez.jpg")
.into(imageView);
}
}
線程池使用
當然我們今天的重點不在異步加載,而是在線程池上慨默。
Executors
我們平時使用線程池只需要調用Executors.new**ThreadPool()
方法贩耐,甚至都不需要關心創(chuàng)建的類是什么。那今天就從Executors
入手去探尋線程池的廬山真面目:
public class Executors {
...
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
}
可以看到厦取,平時使用最頻繁的這幾個方法基本都是直接創(chuàng)建了ThreadPoolExecutor
類潮太,只是參數(shù)有所不同。唯一比較特殊的ScheduledThreadPoolExecutor
也繼承自ThreadPoolExecutor
虾攻。
ThreadPoolExecutor構造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
-
corePoolSize
:線程池的核心線程數(shù)铡买。當線程數(shù)小于corePoolSize
時會創(chuàng)建一個線程去執(zhí)行任務,當線程數(shù)達到corePoolSize
時會將任務放入等待隊列霎箍。如果沒有手動調用核心線程超時奇钞,這些線程在創(chuàng)建后會一直存在。 -
maximumPoolSize
:線程池允許創(chuàng)建的最大線程數(shù)漂坏。在等待隊列添加滿之后景埃,線程池會創(chuàng)建臨時線程用來處理任務,臨時隊列在超時后會自動結束顶别。而臨時線程與核心線程的總數(shù)不能超過maximumPoolSize
纠亚。 -
keepAliveTime
:臨時線程超時時間。 -
unit
:keepAliveTime
時間單位筋夏。 -
workQueue
:線程池的等待隊列蒂胞。當線程數(shù)達到corePoolSize
時會任務會被放入該等待隊列。 -
threadFactory
:線程工廠条篷。用于創(chuàng)建新線程骗随。 -
handler
:飽和處理策略。當線程池關閉或者線程數(shù)達到maximumPoolSize
時赴叹,任務被放入該handler
鸿染。
在看完上面的一系列參數(shù),可能還是一臉懵逼乞巧。其實大家可以線程池當做一個工廠涨椒,這個工廠負責對一些半成品進行加工,而核心線程就是這個工廠的工人绽媒。
每個工人同時只能處理一個半成品蚕冬,多余的半成品就被放入倉庫,等哪個工人處理完了手頭的半成品再來倉庫取是辕,這個倉庫就是等待隊列囤热。
但是倉庫也有限制,隨著半成品越來越多倉庫也放不下了获三,這時候工廠就請來一些臨時工來幫忙旁蔼,等工廠的任務輕了之后再請他們回去锨苏,這些臨時工就是臨時線程。
可是工廠的資金也是有限的不能同時請?zhí)嗟墓と斯琢模@個資金限制就是maximumPoolSize伞租,而這些既沒有工人處理,倉庫又放不下的半成品就要想個辦法處理了限佩。是直接把它們丟掉葵诈?還是退回給半成品廠商?這就是飽和策略需要決定的了犀暑。
等待隊列
- 同步移交隊列:任務不在隊列中存儲驯击,而是直接交給工作線程。這時可以使用
SynchronousQueue
實現(xiàn)耐亏,該隊列保證在插入時必須有另一個線程在等待獲取徊都,如果沒有則插入失敗。Executors.newCachedThreadPool()
使用的就是該隊列广辰。 - 無界隊列:例如無界
LinkedBlockingQueue
暇矫。ExecutorService.newFixedThreadPool
和ExecutorService.newSingleThreadExecutor
使用的就是該隊列。 - 有界隊列:例如有界
LinkedBlockingQueue
和ArrayBlockingQueue
择吊。有界隊列避免了無界隊列無限制的增加導致資源耗盡的問題李根。
飽和策略
這里很明顯是策略模式,ThreadPoolExecutor
給我們提供了四個已經(jīng)實現(xiàn)好的飽和策略几睛,不過我們也可以選擇自己實現(xiàn):
-
AbortPolicy
:拋出RejectedExecutionException
房轿。 -
CallerRunsPolicy
:將任務放到調用execute()
所在線程執(zhí)行,也就是直接調用任務的run()
方法所森。 -
DiscardPolicy
:直接丟棄任務囱持,不做任何處理。 -
DiscardOldestPolicy
:將等待隊列頭部的任務刪除焕济,再重新執(zhí)行此任務纷妆。
套路
說了這么多,那到底應該怎么選擇呢晴弃?其實只要大致遵循一個規(guī)律掩幢,如果是計算密集型的任務,線程池的大小設為CPU的數(shù)目加1通常是最優(yōu)的上鞠,而如果是I/O密集型的任務就可以設置的大一些际邻,比如2倍的CPU的數(shù)目。當然旗国,具體的數(shù)目就要在運行過程中慢慢調試了枯怖。
線程池原理
講完了線程池的使用,接下來就是線程池的原理了能曾。這次的分析都基于Android 7.1.1的源碼度硝,其他版本的可能會在細節(jié)上有一些差異,不過大的方向不會有問題寿冕。
類結構
在了解ThreadPoolExecutor
的實現(xiàn)之前我們首先對類的繼承結構要有一個整體的把握:
Executor
是Java提供的用于簡化線程管理的接口蕊程,用戶只需通過execute()
方法傳入Runnable
的實現(xiàn),由Executor
決定使用哪個線程處理同時負責線程的創(chuàng)建驼唱、運行和關閉藻茂。
ExecutorService
,這個是我們平時使用線程池最用的了玫恳,在Executor
的基礎上又加入了submit()
辨赐、shutdown()
等等一些方便用戶自主管理任務的方法。
AbstractExecutorService
類實現(xiàn)了submit()
等一系列方法京办,不過最主要的execute()
依然留給了子類也就是今天的主角ThreadPoolExecutor
去實現(xiàn)掀序。
概覽
ThreadPoolExecutor
實際上使用的是生產(chǎn)者/消費者模型,在分析具體的代碼之前我們先看一下這個流程圖惭婿,有一個大概的印象不恭。
execute()
關于execute()
的處理過程,Java源碼有很詳細的注釋财饥,這里我把它翻譯為中午供大家參考换吧。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 三步走:
*
* 1. 如果當前運行的線程少于核心線程數(shù),嘗試開啟一個線程并將command作為
* 它的第一個任務(作者注:這里的第一個任務在后面會有所解釋)钥星。調用
* runWorker通過原子性的方式重新檢查線程池運行狀態(tài)和工作線程數(shù)沾瓦,如果
* 不能添加線程則返回false
*
* 2. 如果任務可以成功入列,我們依然要再次檢查線程池是否已經(jīng)關閉以及
* 是否需要添加一個新線程(因為存在一種情況是在上次檢查之后所有的線程都已經(jīng)
* 死光光了(作者注:至于為什么必須保證至少一個線程存活谦炒,我們在后面的
* runWorker方法中會找到答案))贯莺。如果線程池已經(jīng)關閉,則將之前加入
* 隊列的command彈出编饺;如果已經(jīng)沒有線程存活乖篷,則添加一個新線程。
*
* 3. 如果不能將任務入列透且,我們會嘗試添加一個新線程撕蔼。如果失敗了,要不
* 就是線程池已經(jīng)關閉了秽誊,要不就是線程已經(jīng)飽和了(作者注:線程數(shù)達到最大值)鲸沮,
* 這時候我們就將這個任務加入飽和策略。
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
這里有一點需要注意的是锅论,這個ctl
變量的含義讼溺。其實ctl
就是一個保存了線程池運行狀態(tài)以及線程數(shù)的原子整形變量:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
它的高3位存儲線程池運行狀態(tài),即RUNNING
最易、SHUTDOWN
怒坯、STOP
炫狱、TIDYING
、TERMINATED
五個狀態(tài)剔猿,低位存儲運行的線程數(shù)视译,可以用isRunning()
判斷線程池是否處于運行狀態(tài),用workerCountOf
獲取運行的線程數(shù)归敬。這里的ctl
的用法非常巧妙酷含,強烈推薦大家去看一下源碼,這里由于篇幅所限就不再多說了汪茧。
addWorker()
private boolean addWorker(Runnable firstTask, boolean core) {
// 這一段for循環(huán)用來將ctl的值加1椅亚,如果線程池關閉或者線程數(shù)量
// 達到限制,則直接返回false舱污。
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 判斷線程是否關閉或者已經(jīng)沒有需要執(zhí)行的任務
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 根據(jù)core判斷當前線程數(shù)量是否已經(jīng)達到限制
// 如果core為true呀舔,則線程數(shù)不能大于核心線程數(shù)
// 否則不能大于最大線程數(shù)
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 將ctl的值加1,如果成功則跳出外循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
// 判斷再次期間線程池狀態(tài)是否已經(jīng)發(fā)生改變慌闭,如果是則
// 重新開始外循環(huán)别威,否則在內循環(huán)中再次嘗試對ctl值加1
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 在得到主鎖mainLock之后再次檢查線程池的狀態(tài)
// 如果已經(jīng)關閉則不再添加
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 檢查線程t能否啟動
throw new IllegalThreadStateException();
// 將線程t加入線程集合
workers.add(w);
// 更新目前為止線程最多時達到的數(shù)目,與最大線程數(shù)
// 無關驴剔,調試用
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 正式開始運行線程t
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker()
方法很有意思省古,它用一個core
參數(shù)來區(qū)分是添加核心線程還是添加臨時線程,一個方法可以有不同的功能丧失。這也是為什么上面的流程圖里豺妓,添加核心線程和臨時線程的箭頭上只有一個addWorker
方法。
而addWorker()
添加線程的邏輯可以分為四步:
- 確保線程數(shù)不超過限制布讹,并將
ctl
的計數(shù)加1琳拭。 - 將
firstTask
封裝為Worker
。 - 將
Worker
加入線程集合workers
描验。 - 啟動
Worker
的線程白嘁。
有人已經(jīng)注意到addWorker()
的參數(shù)名有點奇怪,明明只添加了一個任務為什么要叫firstTask
呢膘流?在addWorker()
的代碼里firstTask
傳入了Worker
的構造器絮缅,后面一系列操作就都是相對Worker
執(zhí)行的,那Worker
又對firstTask
做了什么呼股?
Worker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
...
}
可以看到Worker
也繼承了Runnable
接口耕魄,在構造方法里Worker
通過ThreadFactory
新開了一個線程,而傳入的Runnable
卻是自己彭谁,所以之前addWorker()
里的代碼t.start()
最終執(zhí)行的將會是Worker
的run()
方法吸奴,也就是runWorker()
。
主循環(huán)runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
這里的邏輯初看起來可能是一頭霧水,其實很簡單则奥。重點是這里的while循環(huán)考润,首先會判斷firstTask
是否為空,如果不為空則分四步:
- 調用
beforeExecute()
方法逞度。 - 調用
task
的run()
方法额划。 - 調用
afterExecute()
方法妙啃。 - 將
task
賦值為空档泽。
下一次循環(huán)task
必定為空,于是執(zhí)行task = getTask()
揖赴,這條語句是將Runnable
任務從等待隊列workQueue
里取出來賦值給task
馆匿,于是再次執(zhí)行上面四步,直到線程池關閉或者等待超時燥滑。
每個Worker
創(chuàng)建的線程在執(zhí)行完屬于自己的任務后渐北,還會繼續(xù)執(zhí)行等待隊列中的任務,所以這個firstTask
也可以當做每個線程的啟動任務铭拧,這就是它為什么被叫做firstTask
的原因赃蛛,也是runWorker
方法為什么被稱為主循環(huán)的原因,線程池的設計者巧妙的用這一方法實現(xiàn)了線程的復用搀菩。
這也解答了之前的許多疑問:
- 為什么沒有專門處理的等待隊列的線程?原因就在于每個線程都是處理等待隊列的線程呕臂。
- 為什么在
execute()
方法中將任務加入等待隊列時,必須保證至少有一個線程存活肪跋?這是為了確保存在存活線程去執(zhí)行等待隊列中的任務歧蒋。
總結
我們這次實現(xiàn)了圖片的異步加載,不過將重點放在了線程池的使用及其原理上州既,設計者的各種巧思也是讓我們嘆為觀止谜洽,大家如果有空可以自己嘗試看一下源碼,一定不會讓你們失望吴叶。下一篇文章我們將要講解的是Handler阐虚,敬請期待。