這章將研究AsyncTask的實(shí)現(xiàn)原理撮胧,并自己嘗試編寫一個(gè)相同的異步操作類
這章你將學(xué)習(xí)到的關(guān)鍵詞:
AsyncTask
線程相關(guān):
ThreadPoolExector,Exector, ArrayDeque雙向隊(duì)列
任務(wù)操作相關(guān):
FutureTask<?>(Callable<?> r) [?表示泛型]
其他類:
AtomicBoolean或者Atomic* [*表示基本數(shù)據(jù)類型, Atomic表示自增長(zhǎng)]
下面我們便開始著手編寫我們的異步線程并結(jié)合AsyncTask源碼
需求:
1.我們要寫一個(gè)異步線程,這個(gè)線程需要寫在線程池里面
2.我們的線程需要傳入自定義類型老翘,回調(diào)能夠有自定義類型參數(shù)回調(diào)
3.我們的線程必須要能有相應(yīng)的回調(diào)芹啥,以及能夠取消
1.編寫異步線程
你需要一個(gè)線程池TheadPoolExector, 你需要一個(gè)Exector來實(shí)現(xiàn)runnable
一般你可能會(huì)這樣寫:
ExecutorService service = Executors.newFixedThreadPool(3);
這樣就創(chuàng)建了Exector與TheadPoolExector, 但是阿里的編碼規(guī)范里面推薦手動(dòng)創(chuàng)建線程池,那么我們可以自定義實(shí)現(xiàn)Executor接口來創(chuàng)建我們的線程池TheadPoolExector铺峭,這里直接上代碼:
/**
* Created by Qzhu on 2018/5/18.
*/
public class ThreadExector implements Executor{
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger atomicInteger = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread("ThreadExector #" + atomicInteger.getAndIncrement());
}
};
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
/**
* An {@link Executor} that can be used to execute tasks in parallel.
*/
public static final Executor THREAD_POOL_EXECUTOR;
final ArrayDeque<Runnable> queue = new ArrayDeque<>();
Runnable point = null;
@Override
public void execute(final Runnable command) {
queue.offer(new Runnable() {
@Override
public void run() {
try {
command.run();
}finally {
scheduleNext();
}
}
});
if(point == null) {
scheduleNext();
}
}
private void scheduleNext() {
if((point = queue.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(point);
}
}
}
類解析:
我們可以看到這是一個(gè)自定義的Exector實(shí)現(xiàn)類墓怀,里面初始化了ThreadPoolExector,定制了參數(shù)(初始化請(qǐng)自行百度~)卫键,實(shí)現(xiàn)的execute里面的runnable使用了雙向隊(duì)列ArrayDeque包裝傀履,這樣可以不停取出里面的runnable并放入ThreadPoolExector執(zhí)行,只要execute執(zhí)行多少次都是排入隊(duì)列并等待彈出執(zhí)行(等下我們將分析怎樣限制其之執(zhí)行一次[其實(shí)只要加個(gè)狀態(tài)就行啦])
以上我們便準(zhǔn)備好了線程執(zhí)行者莉炉,以及可以先這樣用了
new ThreadExector().execute(Runnable r);
下面我們需要對(duì)參數(shù)Runnaber r進(jìn)行包裝使用的是FutureTask<?>(Callable<?> r)钓账,先來大概了解以下FutureTask<?>(Callable<?> r)這個(gè)類對(duì)Runnable的包裝吧碴犬,F(xiàn)utureTask實(shí)現(xiàn)了Runnable將執(zhí)行Run方法,然后里面執(zhí)行Callable的call方法梆暮,或者其他情況將執(zhí)行自身的done方法服协,比如取消或者異常等,而?泛型代表返回參數(shù)啦粹,這就符合條件2的自定義結(jié)果參數(shù)偿荷,下面缺少自定義入?yún)ⅲ@一點(diǎn)只要添加一個(gè)泛型就行了
我們先實(shí)現(xiàn)FutureTask<?>(Callable<?> r)里面的Callable<?>:
public abstract class WorkParam<R,P> implements Callable<R>{
P[] param;
}
其中泛型R代表結(jié)果參數(shù)卖陵,P代表傳入的參數(shù)
初始化如下:
private final AtomicBoolean mTaskInvoked = new AtomicBoolean(false);
private final AtomicBoolean mCancelled = new AtomicBoolean(false);
WorkParam worker = new WorkParam<Result,Params>() {
@Override
public Result call() throws Exception {
Result r = null;
mTaskInvoked.set(true);
android.os.Process.setThreadPriority(android.os.Process.THREAD_PRIORITY_BACKGROUND);
try {
r = doInBackgroud(param);
}catch (Exception e) {
mCancelled.set(true);
r = null;
}finally {
postResult(r);
}
Binder.flushPendingCommands();
return r;
}
};
這段代碼編寫參考AsyncTask,實(shí)現(xiàn)call的方法遭顶,在里面回調(diào)doInBackgroud執(zhí)行具體異步操作,同名AsyncTask接口方法泪蔫,而onPostExecute也是AsyncTask接口方法棒旗,這里先調(diào)用doInBackgroud放到后臺(tái)讓Exector工作,然后執(zhí)行postResult(見下面具體實(shí)現(xiàn))撩荣,最后就是回調(diào)onPostExecute接口方法
以上是Callable<?> r的包裝铣揉,還有一個(gè)FutureTask<?>如下:
futureTask = new FutureTask<Result>(worker){
@Override
protected void done() {
super.done();
try {
postIfNotInvoke(get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}catch (CancellationException e) {
postIfNotInvoke(null);
}
}
};
private void postIfNotInvoke(Result r){
if(!mTaskInvoked.get()) {
postResult(r);
}
}
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = innerHandller.obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
//定義一個(gè)InnerHandler來處理線程切換回調(diào)
private static final int MESSAGE_POST_RESULT = 0x1;
private static final int MESSAGE_POST_PROGRESS = 0x2;
private Handler innerHandller = new Handler(Looper.getMainLooper()){
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
result.mTask.onPostExecute(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
default:
break;
}
}
};
private static class AsyncTaskResult<Data> {
final AsyncTask mTask;
final Data[] mData;
AsyncTaskResult(AsyncTask task, Data... data) {
mTask = task;
mData = data;
}
}
這個(gè)方法表示如果操作被其他行為中斷,比如取消餐曹,執(zhí)行postIfNotInvoke方法逛拱,而這個(gè)方法將判斷線程是否執(zhí)行過Callable<?>的call方法,里面的mTaskInvoked是一個(gè)標(biāo)志台猴,用于區(qū)分是否被執(zhí)行過朽合,如果執(zhí)行過將回調(diào)postExecute,使用handler來切換到主線程回調(diào)饱狂,這樣我們就實(shí)現(xiàn)了doInBackgroud與onPostExecute 回調(diào)
至于如何實(shí)現(xiàn)onpreExecute其實(shí)只要在execute方法里面調(diào)用就行的曹步,如下:
public AsyncTask<Params, Process, Result> execute(Params... params){
return executeOnExecutor(new ThreadExector(), params);
}
private final AsyncTask<Params, Process, Result> executeOnExecutor(Executor exec,Params... params){
onPreExecute();
worker.param = params;
exec.execute(futureTask);
return this;
}
關(guān)于實(shí)現(xiàn)進(jìn)度更新,其實(shí)上面自定義的InnerHandler里面有切換到主線程回調(diào)跟新休讳,只需要定義公開方法就行讲婚,如下:
protected final void publishProgress(Progress... values) {
if (!isCancelled()) {
getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
new AsyncTaskResult<Progress>(this, values)).sendToTarget();
}
}
protected void onProgressUpdate(Progress... values) {
}
以上過程便是AsyncTask的實(shí)現(xiàn)流程,首先在構(gòu)造函數(shù)里面初始化FutureTask<?>(Callable<?> r),然后執(zhí)行execute的時(shí)候?qū)?zhǔn)備好的線程池Exector來執(zhí)行該FutureTask俊柔,回調(diào)對(duì)應(yīng)實(shí)現(xiàn)體內(nèi)部筹麸,線程的取消可以由FutureTask控制,使用幾個(gè)AtomicBoolean作為標(biāo)志位來控制結(jié)果的回調(diào)雏婶,狀態(tài)是否取消等等物赶,在Callable執(zhí)行call方法里面可以使用InnerHandler來切換到主線程,這便是AsyncTask源碼的實(shí)現(xiàn)方式
總結(jié)以下:
1.[重要]自定義Exector線程池(ArrayDeque雙向無限擴(kuò)充隊(duì)列)
2.[重要]Runnable包裝類FutureTask<?>(Callable<?> r)的使用尚骄,泛型的傳遞
3.線程切換回調(diào)方法InnerHandler的定義與參數(shù)的定義
4.Atomic*類型控制結(jié)果