ps: 以前好多東西都寫在一起了鸠项,現(xiàn)在拆開
我們使用 Runnable 可以構(gòu)建一個(gè) Thread 對象
// 普通線程任務(wù)
Runnable runnable = new Runnable() {
@Override
public void run() {
Log.d("AA", Thread.currentThread().getName() + "啟動(dòng),time:" + System.currentTimeMillis());
}
};
但是 Runnable 不能給我們一個(gè)返回值干跛,基本上我們使用多線程都需要返回值的,所以就引出了下面這3個(gè)類:
- Callable - 構(gòu)建帶返回值的任務(wù)
- Future - 和異步線程通信的接口
- FutureTask - Future 的唯一實(shí)現(xiàn)類祟绊,通過 get() 可以拿到結(jié)果楼入,但是阻塞當(dāng)前線程
public interface Callable<V> {
V call() throws Exception;
}
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
先來跑個(gè)小例子哥捕,先啟動(dòng)一個(gè)線程,睡眠2秒嘉熊,之后返回結(jié)果遥赚,我們在 UI 線程里等待結(jié)果(此時(shí)會(huì)阻塞 UI 線程) ,收到結(jié)果后再啟動(dòng)一個(gè)線程阐肤,2個(gè)線程都打印下時(shí)間
// 帶返回結(jié)果的線程任務(wù)凫佛,String 泛型是我們預(yù)訂的返回?cái)?shù)據(jù)類型
Callable<String> callable1 = new Callable<String>() {
@Override
public String call() throws Exception {
Log.d("AA", Thread.currentThread().getName() + "啟動(dòng),time:" + System.currentTimeMillis());
Thread.sleep(2000);
return "AA";
}
};
// 用 Callable 對象構(gòu)建一個(gè) FutureTask 線程返回結(jié)果
FutureTask<String> futureTask = new FutureTask<String>(callable1);
// 創(chuàng)建出2個(gè)線程
Thread thread1 = new Thread(futureTask);
Thread thread2 = new Thread(runnable);
// 先啟動(dòng)會(huì)返回結(jié)果的線程
thread1.start();
try {
// 這里阻塞 UI 線程,等待線程1 的返回?cái)?shù)據(jù)孕惜,再去啟動(dòng)線程2
futureTask.get();
thread2.start();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
ok 看過例子我們就知道怎么回事了愧薛,然后我們再繼續(xù)深入吧,我們重點(diǎn)是看看 FutureTask 給我們提供的方法:
- get() - 獲取異步執(zhí)行的結(jié)果衫画,如果沒有結(jié)果可用厚满,此方法會(huì)阻塞直到異步計(jì)算完成
- get(Long timeout , TimeUnit unit) - 獲取異步執(zhí)行結(jié)果,如果沒有結(jié)果可用碧磅,此方法會(huì)阻塞碘箍,但是會(huì)有時(shí)間限制,如果阻塞時(shí)間超過設(shè)定的timeout時(shí)間鲸郊,該方法將返回null
- isDone() - 如果任務(wù)執(zhí)行結(jié)束丰榴,無論是正常結(jié)束或是中途取消還是發(fā)生異常,都返回tru
- isCanceller() 如果任務(wù)完成前被取消秆撮,則返回true
- cancel(boolean mayInterruptRunning) - 取消任務(wù)四濒,cancel(false) 等待任務(wù)執(zhí)行完畢然后回收資源,cancel(true) 嘗試直接終中斷任務(wù)
上面是結(jié)合 Thread 的使用場景职辨,但是一般我們真不這么寫盗蟆,我們都是在線程池中開干
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
return "AA";
}
};
// new 一個(gè)線程池對象
ExecutorService executorService = Executors.newFixedThreadPool(5);
// 把任務(wù)添加到線程池中去執(zhí)行,獲取到一個(gè) Future 任務(wù)結(jié)果操作對象
Future<String> future = executorService.submit(callable);
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
Log.d("AA", Thread.currentThread().getName() + "獲取到異步線程結(jié)果,time:" + System.currentTimeMillis());
線程池中關(guān)于 FutureTask 的使用
FutureTask 有必要說意下舒裤,因?yàn)?FutureTask 帶給我們在線程池外堆線程的控制手段喳资,我覺得搞清楚很有必要
1. 線程池中添加任務(wù)的2種方式:
//Runnable
fun main(args: Array<String>) {
val pool= Executors.newFixedThreadPool(3)
pool.execute{
Thread.sleep(2000)
println("無返回值")
pool.shutdown()
}
}
//FutureTask
fun main(args: Array<String>) {
val pool= Executors.newFixedThreadPool(3)
val futureTask=FutureTask<String>(Callable<String>{
Thread.sleep(2000)
return@Callable "有返回值"
})
pool.submit(futureTask)
//block current thread
val result=futureTask.get()
println(result)
pool.shutdown()
}
2. 添加 FutureTask 到線程池中后的執(zhí)行特征
FutureTask.get() 會(huì)阻塞所在線程,那么我們是不是每次調(diào)用 get() 都會(huì)阻塞所在線程腾供?
我們設(shè)計(jì)一個(gè) demo 自己跑起來試試:我們往線程池中添加一個(gè) FutureTask 任務(wù)仆邓,等待 FutureTask 任務(wù)執(zhí)行時(shí)間過后再點(diǎn)擊按鈕獲取數(shù)據(jù)
class Main2Activity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main2)
val pool = Executors.newFixedThreadPool(3)
val task = FutureTask<String>({
Log.d("AA", "FutureTask_111:${Thread.currentThread().name}")
Thread.sleep(1000)
Log.d("AA", "FutureTask_222")
Log.d("AA", "FutureTask_333")
return@FutureTask "AA"
})
pool.submit(task)
btn_right.setOnClickListener {
val data = task.get()
Log.d("AA", "task 執(zhí)行結(jié)果:$data")
}
}
}
-
頁面啟動(dòng)后,添加入線程池的任務(wù)會(huì)直接執(zhí)行伴鳖,才不會(huì)管你 get 與否节值,線程池只認(rèn) Runnable 接口,由任務(wù)就執(zhí)行
頁面啟動(dòng)后打印 -
我們等幾秒點(diǎn)擊按鈕榜聂,不管按幾次都沒有阻塞當(dāng)前線程搞疗,而是直接獲取到結(jié)果,這說明 FutureTask 會(huì)保存任務(wù)結(jié)算的結(jié)果须肆,并且會(huì)判斷任務(wù)進(jìn)度匿乃,任務(wù)執(zhí)行結(jié)束之后就不再阻塞當(dāng)前線程了
多次點(diǎn)擊按鈕
3. 探尋 FutureTask 的執(zhí)行過程
基于上面的執(zhí)行結(jié)果脐往,我們心里清楚了 FutureTask 的運(yùn)行特征,這樣在使用時(shí)就不會(huì)有沒有由來的擔(dān)憂了扳埂,怕會(huì)不會(huì)這樣业簿,會(huì)不會(huì)那樣,如果你有這樣的疑問阳懂,說明這個(gè)知識(shí)點(diǎn)你沒學(xué)透梅尤,言歸正傳,這激起了我對 FutureTask 探尋一番的好奇
3.1 先來看 run 方法:
FutureTask 實(shí)現(xiàn)了 Runnable 接口岩调,自然也是有 run 方法的
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//調(diào)用callback的方法巷燥,并獲取執(zhí)行結(jié)果
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);//將執(zhí)行結(jié)果設(shè)置到實(shí)例變量中。
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
- run 方法首先判斷了狀態(tài)号枕,不是 new 直接退出
- 之后調(diào)了 callable.call() 獲取結(jié)果
- 完事用 set(result) 保存結(jié)果缰揪,這說明 FutureTask 的確會(huì)保存結(jié)果值
3.2 set 方法保存結(jié)果,喚醒當(dāng)前線程:
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//保存執(zhí)行結(jié)果
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//喚醒當(dāng)前線程葱淳。
finishCompletion();
}
}
/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//喚醒線程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
//阻塞當(dāng)前線程
s = awaitDone(false, 0L);
//當(dāng)線程從阻塞狀態(tài)被喚醒钝腺,將結(jié)果返回。
return report(s);
}
//Awaits completion or aborts on interrupt or timeout.
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
//阻塞當(dāng)前線程
LockSupport.park(this);
}
}
- 首先又是判斷了任務(wù)狀態(tài)赞厕,不是已經(jīng)完成就阻塞線程艳狐,但是任務(wù)結(jié)速之后直接返回結(jié)果,就不再阻塞線程了皿桑,這和我們上面的執(zhí)行特性吻合
- awaitDone 里面就是根據(jù)任務(wù)的狀態(tài)各種阻塞線程
簡單的看過源碼之后毫目,我想大家對于 FutureTask 就能做到比較通透的了解了,至少在使用時(shí)不會(huì)再束手束腳了诲侮,放心大膽的使用镀虐,別人問起也知道問題在哪,怎么說沟绪,我覺得這就是我們學(xué)習(xí)源碼最大的動(dòng)力和目的刮便,搞清楚運(yùn)行機(jī)制
FutureTask 也有自己的回調(diào)函數(shù)
FutureTask 自己的回調(diào)函數(shù)就是 done 這個(gè)方法,我們可以繼承 FutureTask 來重寫 done 這個(gè)函數(shù)實(shí)現(xiàn)自己的目的
比如下面我簡單實(shí)現(xiàn)一個(gè):
class MyFutureTask<V>(callable: Callable<V>) : FutureTask<V>(callable) {
override fun done() {
Log.d("AA", "futrueTask 回調(diào)執(zhí)行近零,所在線程:${Thread.currentThread().name}")
}
}
val pool = Executors.newFixedThreadPool(3)
val task = MyFutureTask<String>(object : Callable<String> {
override fun call(): String {
Log.d("AA", "FutureTask_111:${Thread.currentThread().name}")
Thread.sleep(1000)
Log.d("AA", "FutureTask_222")
Log.d("AA", "FutureTask_333")
return "AA"
}
})
btn_right.setOnClickListener({
pool.submit(task)
val data = task.get()
Log.d("AA", "task 執(zhí)行結(jié)果:$data")
})
從結(jié)果上看 done 的確是在最后執(zhí)行的诺核,但是吧不是我們期盼的在線程池調(diào)用線程執(zhí)行,而是在線程池所在線程執(zhí)行久信,這樣的話,這個(gè) done 的應(yīng)用范圍就很狹窄了漓摩,至于 done 是在哪里調(diào)的 finishCompletion 最后是也