Android 線程簡單分析(一)
Android 并發(fā)之synchronized鎖住的是代碼還是對象(二)
Android 并發(fā)之CountDownLatch、CyclicBarrier的簡單應(yīng)用(三)
Android 并發(fā)HashMap和ConcurrentHashMap的簡單應(yīng)用(四)(待發(fā)布)
Android 并發(fā)之Lock募逞、ReadWriteLock和Condition的簡單應(yīng)用(五)
Android 并發(fā)之CAS(原子操作)簡單介紹(六)
Android 并發(fā)Kotlin協(xié)程的重要性(七)(待發(fā)布)
Android 并發(fā)之AsyncTask原理分析(八)
Android 并發(fā)之Handler肢专、Looper圾笨、MessageQueue和ThreadLocal消息機制原理分析(九)
Android 并發(fā)之HandlerThread和IntentService原理分析(十)
相信AsyncTask大家都不陌生。AsyncTask內(nèi)部封裝了線程池和Handler逊移,AsyncTask允許我們在后臺進行耗時操作且把結(jié)果及時更新到UI上逻翁。當(dāng)然AsyncTask從最開始到現(xiàn)在已經(jīng)經(jīng)過了幾次代碼修改,任務(wù)的執(zhí)行邏輯慢慢地發(fā)生了改變氧急,并不是大家所想象的那樣:AsyncTask是完全并行執(zhí)行的就像多個線程一樣,其實不是的毫深,所以用AsyncTask的時候還是要注意吩坝,下面會一一說明。
AsyncTask到底是串行還是并行哑蔫?
new MyAsyncTask("MyAsyncTask1").execute("");
new MyAsyncTask("MyAsyncTask2").execute("");
new MyAsyncTask("MyAsyncTask3").execute("");
new MyAsyncTask("MyAsyncTask4").execute("");
new MyAsyncTask("MyAsyncTask5").execute("");
看一下日志:
MyAsyncTask1 === 2019-05-03 07:35:23
MyAsyncTask2 === 2019-05-03 07:35:26
MyAsyncTask3 === 2019-05-03 07:35:29
MyAsyncTask4 === 2019-05-03 07:35:32
MyAsyncTask5 === 2019-05-03 07:35:35
MyAsyncTask6 === 2019-05-03 07:35:38
從5個AsyncTask共耗時15s且時間間隔為3s钉寝,很顯然是串行執(zhí)行的。
既然下來我們來分析一下源碼:
public abstract class AsyncTask<Params, Progress, Result> {
private static final String LOG_TAG = "AsyncTask";
// 獲取當(dāng)前CPU數(shù)量
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
// 核心線程數(shù)量
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
// 線程池最大容量
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
// 空閑線程存活時間
private static final int KEEP_ALIVE_SECONDS = 30;
//ThreadFactory 線程工廠闸迷,通過工廠方法newThread來獲取新線程
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
//原子操作嵌纲,保證并發(fā)不影響共享資源
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
//阻塞式隊列,用來存放待并發(fā)執(zhí)行的任務(wù)腥沽,初始容量:128個
private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<Runnable>(128);
/**
* 并發(fā)線程池逮走,可以用來并行執(zhí)行任務(wù),從3.0開始AsyncTask默認(rèn)是串行執(zhí)行任務(wù)
* 但是我們?nèi)匀荒軜?gòu)造出并行的AsyncTask
*/
public static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
/**
* 串行任務(wù)執(zhí)行器今阳,其內(nèi)部實現(xiàn)了串行控制师溅,
* 循環(huán)的取出一個個任務(wù)交給上述的并發(fā)線程池去執(zhí)行
*/
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
//消息類型:發(fā)送結(jié)果
private static final int MESSAGE_POST_RESULT = 0x1;
//消息類型:更新進度
private static final int MESSAGE_POST_PROGRESS = 0x2;
/**
* 靜態(tài)Handler茅信,用來發(fā)送上述兩種通知,采用UI線程的Looper來處理消息,
* 在Android 5.2之前墓臭,AsyncTask必須在UI線程調(diào)用蘸鲸,因為子線程
* 默認(rèn)沒有Looper無法創(chuàng)建下面的Handler,程序會直接Crash窿锉,但是在5.2之后InternalHandler內(nèi)部通過
* sHandler = new InternalHandler(Looper.getMainLooper())酌摇,所以沒必要在UI線程也是可以的
*/
private static InternalHandler sHandler;
private final Handler mHandler;
/**
* 默認(rèn)任務(wù)執(zhí)行器,被賦值為串行任務(wù)執(zhí)行器嗡载,AsyncTask變成串行執(zhí)行任務(wù)窑多,SERIAL_EXECUTOR其內(nèi)部通過
* try {r.run();} finally {scheduleNext();}實現(xiàn)了串行執(zhí)行。
*/
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
// 這兩個就是任務(wù)執(zhí)行體
private final WorkerRunnable<Params, Result> mWorker;
private final FutureTask<Result> mFuture;
//任務(wù)的狀態(tài) 默認(rèn)為等待狀態(tài)鼻疮,即等待執(zhí)行怯伊,其類型標(biāo)識為volatile線程可見
private volatile Status mStatus = Status.PENDING;
//原子布爾型,支持高并發(fā)訪問判沟,標(biāo)識任務(wù)是否被取消
private final AtomicBoolean mCancelled = new AtomicBoolean();
//原子布爾型耿芹,支持高并發(fā)訪問,標(biāo)識任務(wù)是否被執(zhí)行過
private final AtomicBoolean mTaskInvoked = new AtomicBoolean();
/**
* 串行執(zhí)行器的實現(xiàn)挪哄,我們要好好看看吧秕,它是怎么把并行轉(zhuǎn)為串行的
* 目前我們需要知道,asyncTask.execute(Params ...)實際上會調(diào)用
* SerialExecutor的execute方法迹炼,這一點后面再說明砸彬。也就是說:當(dāng)你的asyncTask執(zhí)行的時候,
* 首先你的task會被加入到任務(wù)隊列斯入,然后排隊砂碉,一個個執(zhí)行。
* 總結(jié):
* <p>
* 當(dāng)任務(wù)來臨時刻两,首先就是入隊列增蹭,然后判斷當(dāng)前有沒有任務(wù)正在執(zhí)行
* 如果沒有任務(wù)執(zhí)行則直接調(diào)用scheduleNext(),執(zhí)行任務(wù)磅摹。
* 當(dāng)當(dāng)前任務(wù)執(zhí)行完成又再次在finally塊調(diào)用scheduleNext執(zhí)行下一個任務(wù)
*/
private static class SerialExecutor implements Executor {
//線性雙向隊列滋迈,用來存儲所有的AsyncTask任務(wù)
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
//當(dāng)前正在執(zhí)行的AsyncTask任務(wù)
Runnable mActive;
public synchronized void execute(final Runnable r) {
//將新的AsyncTask任務(wù)加入到雙向隊列中
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();//執(zhí)行AsyncTask任務(wù)
} finally {
/**
* 當(dāng)前AsyncTask任務(wù)執(zhí)行完畢后,進行下一輪執(zhí)行户誓,如果還有未執(zhí)行任務(wù)的話
* 這一點很明顯體現(xiàn)了AsyncTask是串行執(zhí)行任務(wù)的饼灿,總是一個任務(wù)執(zhí)行完畢才會執(zhí)行下一個任務(wù)
*/
scheduleNext();
}
}
});
//如果當(dāng)前沒有任務(wù)在執(zhí)行,直接進入執(zhí)行邏輯
if (mActive == null) {
scheduleNext();
}
}
//從任務(wù)隊列中取出隊列頭部的任務(wù)帝美,如果有就交給并發(fā)線程池去執(zhí)行
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
public enum Status {
/**
* 任務(wù)等待執(zhí)行
*/
PENDING,
/**
* 任務(wù)正在執(zhí)行
*/
RUNNING,
/**
* 任務(wù)已經(jīng)執(zhí)行結(jié)束
*/
FINISHED,
}
/**
* 在Android 5.2之前碍彭,AsyncTask必須在UI線程調(diào)用,因為子線程
* 默認(rèn)沒有Looper無法創(chuàng)建下面的Handler,程序會直接Crash硕旗,但是在5.2之后InternalHandler內(nèi)部通過
* sHandler = new InternalHandler(Looper.getMainLooper())窗骑,所以沒必要在UI線程也是可以的
*/
private static Handler getMainHandler() {
synchronized (AsyncTask.class) {
if (sHandler == null) {
sHandler = new InternalHandler(Looper.getMainLooper());
}
return sHandler;
}
}
private Handler getHandler() {
return mHandler;
}
/**
* @hide 設(shè)置默認(rèn)執(zhí)行器
*/
public static void setDefaultExecutor(Executor exec) {
sDefaultExecutor = exec;
}
/**
* 創(chuàng)建新的任務(wù),這個方法必須在UI線程中調(diào)用,其實在5.2之后工作線程也是可以的
*/
public AsyncTask() { this((Looper) null); }
/**
* 創(chuàng)建新的任務(wù)漆枚,這個方法必須在UI線程中調(diào)用,其實在5.2之后工作線程也是可以的
*
* @hide
*/
public AsyncTask(@Nullable Handler handler) { this(handler != null ? handler.getLooper() : null); }
/**
* 創(chuàng)建新的任務(wù)创译,這個方法必須在UI線程中調(diào)用,其實在5.2之后工作線程也是可以的
*
* @hide
*/
public AsyncTask(@Nullable Looper callbackLooper) {
/**構(gòu)造Handler*/
mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper() ? getMainHandler() : new Handler(callbackLooper);
/**構(gòu)造任務(wù)執(zhí)行體*/
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Result result = null;
try {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
result = doInBackground(mParams);
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
postResult(result);
}
return result;
}
};
/**構(gòu)造任務(wù)執(zhí)行體,傳入mWorker*/
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
//執(zhí)行Future墙基,通過get獲取結(jié)果
Result result = get();
postResultIfNotInvoked(result);
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
private void postResultIfNotInvoked(Result result) {
final boolean wasTaskInvoked = mTaskInvoked.get();
if (!wasTaskInvoked) {
postResult(result);
}
}
//doInBackground執(zhí)行完畢软族,發(fā)送消息
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
public final Status getStatus() { return mStatus; }
/**
* 這個方法是我們必須要重寫的,用來做后臺計算残制,使用指定參數(shù)通過execute方法執(zhí)行任務(wù)立砸。
* 這個方法能夠調(diào)用publishProgress方法去更新進度。
* 所在線程:后臺線程
*/
@WorkerThread
protected abstract Result doInBackground(Params... params);
/**
* 在doInBackground之前調(diào)用初茶,用來做初始化工作
* 所在線程:UI線程
*/
@MainThread
protected void onPreExecute() { }
/**
* 在doInBackground之后調(diào)用颗祝,用來接受后臺計算結(jié)果更新UI
* 所在線程:UI線程
*/
@SuppressWarnings({"UnusedDeclaration"})
@MainThread
protected void onPostExecute(Result result) { }
/**
* 在publishProgress之后調(diào)用,用來更新計算進度
* 所在線程:UI線程
*/
@MainThread
protected void onProgressUpdate(Progress... values) { }
@MainThread
protected void onCancelled(Result result) { onCancelled(); }
/**
* cancel被調(diào)用并且doInBackground執(zhí)行結(jié)束恼布,會調(diào)用onCancelled螺戳,表示任務(wù)被取消
* 這個時候onPostExecute不會再被調(diào)用,二者是互斥的折汞,分別表示任務(wù)取消和任務(wù)執(zhí)行完成
* 所在線程:UI線程
*/
@MainThread
protected void onCancelled() { }
public final boolean isCancelled() {
return mCancelled.get();
}
public final boolean cancel(boolean mayInterruptIfRunning) {
mCancelled.set(true);
return mFuture.cancel(mayInterruptIfRunning);
}
public final Result get() throws InterruptedException, ExecutionException {
return mFuture.get();
}
public final Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return mFuture.get(timeout, unit);
}
/**
* 這個方法如何執(zhí)行和系統(tǒng)版本有關(guān)倔幼,在AsyncTask的使用規(guī)則里已經(jīng)說明,如果你真的想使用并行AsyncTask爽待,
* 也是可以的损同,只要稍作修改
* 必須在UI線程調(diào)用此方法,5.2之后這不是必須的
*/
@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
/**
* 通過這個方法我們可以自定義AsyncTask的執(zhí)行方式鸟款,串行or并行膏燃,甚至可以采用自己的Executor
* 為了實現(xiàn)并行,我們可以在外部這么用AsyncTask:
* asyncTask.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR, Params... params);
* 必須在UI線程調(diào)用此方法何什,5.2之后這不是必須的
*/
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec, Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
//任務(wù)執(zhí)行之前這里調(diào)用了onPreExecute
onPreExecute();
mWorker.mParams = params;
//然后后臺計算doInBackground才準(zhǔn)備開始
exec.execute(mFuture);
return this;
}
/**這是AsyncTask提供的一個靜態(tài)方法蹄梢,方便我們直接執(zhí)行一個runnable*/
@MainThread
public static void execute(Runnable runnable) {
sDefaultExecutor.execute(runnable);
}
/**打印后臺計算進度,onProgressUpdate會被調(diào)用*/
@WorkerThread
protected final void publishProgress(Progress... values) {
if (!isCancelled()) {
getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
new AsyncTaskResult<Progress>(this, values)).sendToTarget();
}
}
//任務(wù)結(jié)束的時候會進行判斷富俄,如果任務(wù)沒有被取消,則onPostExecute會被調(diào)用
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);
}
mStatus = Status.FINISHED;
}
//AsyncTask內(nèi)部Handler而咆,用來發(fā)送后臺計算進度更新消息和計算完成消息
private static class InternalHandler extends Handler {
public InternalHandler(Looper looper) {
super(looper);
}
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> { Params[] mParams;}
private static class AsyncTaskResult<Data> {
final AsyncTask mTask;
final Data[] mData;
AsyncTaskResult(AsyncTask task, Data... data) {
mTask = task;
mData = data;
}
}
}
AsynTask是如何實現(xiàn)串行的霍比?
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec, Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
//任務(wù)執(zhí)行之前這里調(diào)用了onPreExecute
onPreExecute();
mWorker.mParams = params;
//然后后臺計算doInBackground才準(zhǔn)備開始
exec.execute(mFuture);
return this;
}
是不是上面看不出什么來?那是必須的暴备,AsyncTask實現(xiàn)串行是通過控制Executor .execute方法來實現(xiàn)的悠瞬。
private static class SerialExecutor implements Executor {
//線性雙向隊列,用來存儲所有的AsyncTask任務(wù)
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
//當(dāng)前正在執(zhí)行的AsyncTask任務(wù)
Runnable mActive;
public synchronized void execute(final Runnable r) {
//將新的AsyncTask任務(wù)加入到雙向隊列中
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();//執(zhí)行AsyncTask任務(wù)
} finally {
/**
* 當(dāng)前AsyncTask任務(wù)執(zhí)行完畢后,進行下一輪執(zhí)行浅妆,如果還有未執(zhí)行任務(wù)的話
* 這一點很明顯體現(xiàn)了AsyncTask是串行執(zhí)行任務(wù)的望迎,總是一個任務(wù)執(zhí)行完畢才會執(zhí)行下一個任務(wù)
*/
scheduleNext();
}
}
});
//如果當(dāng)前沒有任務(wù)在執(zhí)行,直接進入執(zhí)行邏輯
if (mActive == null) {
scheduleNext();
}
}
//從任務(wù)隊列中取出隊列頭部的任務(wù)凌外,如果有就交給并發(fā)線程池去執(zhí)行
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
1辩尊、 當(dāng)任務(wù)來臨時,首先就是加入隊列康辑,然后判斷當(dāng)前有沒有任務(wù)正在執(zhí)行摄欲;
2、 如果沒有任務(wù)正在執(zhí)行則直接調(diào)用scheduleNext()疮薇,執(zhí)行任務(wù)胸墙。
3、 當(dāng)當(dāng)前任務(wù)執(zhí)行完成又再次在finally塊調(diào)用scheduleNext執(zhí)行下一個任務(wù)按咒。這樣就實現(xiàn)了串行執(zhí)行任務(wù)迟隅,當(dāng)然串行執(zhí)行任務(wù)也是使用了線程池,那還用說是吧励七。
那如果我們想要并行智袭,如何操作,很簡單之間替換默認(rèn)的SerialExecutor 呀伙,就是因為它补履,才串行的,所以我們只要像這樣就可以實現(xiàn)并行執(zhí)行任務(wù)了:
new MyAsyncTask("MyAsyncTask1").executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR,"");
其實AsyncTask源碼并不多剿另,文章到此結(jié)束箫锤,歡迎拍磚。