原文鏈接:http://www.studyshare.cn/blog/details/1130/1
首先寫一個簡單的Demo
public static void main(String[] args) throws Exception{
FutureTask futureTask =new FutureTask(new Callable() {
????????@Override
? ? ? ? public Object call() throws Exception {
????????????long startTime = System.currentTimeMillis();
? ? ? ? ? ? int count =0;
? ? ? ? ? ? //進行累加汁蝶,讓子線程執(zhí)行一段時間
? ? ? ? ? ? for(int i=0;i<2000000000;i++){
????????????????count += i;
? ? ? ? ? ? }
????????????System.out.println("count = " + count +" use time is "+ (System.currentTimeMillis()-startTime));
? ? ? ? ? ? return "test";
? ? ? ? }
});
? ? new Thread(futureTask).start();
? ? //get()在主線程中執(zhí)行董瞻,阻塞方法素挽,主線程等待子線程執(zhí)行完才被喚醒執(zhí)行獲取子線程返回的結(jié)果
? ? System.out.println(futureTask.get());
}
打印結(jié)果很簡單:
更多深度技術(shù)文章锈麸,在這里搁廓。?java開發(fā)工具下載地址及安裝教程大全叠骑,點這里匪蝙。
首先解釋FutureTask的成員變量含義吓著,在它的各種方法中很多地方使用
????????volatile int state :用來表示當前執(zhí)行線程的狀態(tài),用volatile關(guān)鍵字修飾歉井,表示內(nèi)存可見性柿祈,其他線程修改volatile修飾的state后,子線程會強 ????????制到主內(nèi)存取state最新的值哩至,可參考文章:https://www.cnblogs.com/daxin/p/3364014.html
????????private static final int NEW =0;? //表示新建的狀態(tài)
????????private static final int COMPLETING? =1;//執(zhí)行中狀態(tài)(或者即將完成)
????????private static final int NORMAL? ? ? =2; //正常執(zhí)行結(jié)束狀態(tài)
????????private static final int EXCEPTIONAL? =3;//異常狀態(tài)
????????private static final int CANCELLED? ? =4;//已取消狀態(tài)
????????private static final int INTERRUPTING =5;//中斷中狀態(tài)
????????private static final int INTERRUPTED? =6;//已中斷狀態(tài)
? ??????private Callable?callable : Callable接口變量躏嚎,外部實例化傳遞到FutureTask中
? ??????private Object outcome :用于存放返回結(jié)果或者異常信息
????????private volatile Thread runner :存放當前子線程(執(zhí)行任務(wù)的線程)
? ??????private volatile WaitNode waiters :一個節(jié)點內(nèi)部類,類中包含
? ??????static final class WaitNode {
????????????????volatile Thread thread;//線程成員變量
? ? ? ? ? ? ? ? volatile WaitNode next;//下一個包裝了等待線程的節(jié)點
? ? ????????????WaitNode() {thread = Thread.currentThread(); }//初始化時當前線程設(shè)置為需要等待的線程
? ? ? ? ?}
當前子線程執(zhí)行過程中以上狀態(tài)會有以下幾種路線:
????????1.??NEW -> COMPLETING -> NORMAL : 新建 --> 執(zhí)行--> 正常結(jié)束
? ? ? ? 2.??NEW -> COMPLETING -> EXCEPTIONAL:新建--> 執(zhí)行--> 異常結(jié)束(執(zhí)行過程中發(fā)生異常)
? ? ? ? 3.? NEW -> CANCELLED : 新建--> 取消(準備執(zhí)行時主線程調(diào)用子線程的取消方法)
? ? ? ? 4.??NEW -> INTERRUPTING -> INTERRUPTED :新建--> 被通知中斷(準備執(zhí)行時主線程調(diào)用子線程取消方法并傳遞中斷標志位true)-->已中斷
FutureTask的主要方法:
? ? ? ? 1.FutureTask(Callable callable):構(gòu)造方法菩貌,初始化傳入的callable卢佣,并將state設(shè)置為NEW
? ? ? ? 2.isCancelled() :獲取當前執(zhí)行中的線程狀態(tài)是否已經(jīng)取消,已取消返回true箭阶,否則返回false
? ? ? ? 3.isDone() : 獲取當前執(zhí)行的線程的狀態(tài)是否處于運行狀態(tài)虚茶,是返回true,否則返回false
? ? ? ? 4.cancel(boolean mayInterruptIfRunning):發(fā)出取消或者中斷當前線程的信號仇参。參數(shù)為true嘹叫,則發(fā)出中斷,否則為取消
? ? ? ? 5.get()诈乒、get(long timeout, TimeUnit unit) :獲取線程執(zhí)行完成后的返回結(jié)果
? ? ? ? 6.set(V v):設(shè)置線程的結(jié)果值到成員變量outcome 中
? ? ? ? 7.setException(Throwable t):設(shè)置線程出現(xiàn)異常時的異常值到成員變量outcome 中
? ? ? ? 8.run():FutureTask實現(xiàn)了Runnable接口罩扇,因此需要覆蓋run()方法,call()方法就是在run()方法中進行調(diào)用
? ? ? ? 9.finishCompletion():該方法是執(zhí)行的線程完成(無論正常完成還是異常完成)后喚醒其他等待的線程繼續(xù)執(zhí)行怕磨。并清空callable喂饥。
? ? ? ? 10.awaitDone(boolean timed, long nanos):核心方法消约,當執(zhí)行線程在運行中,主線程調(diào)用get()方法后則加入等待隊列员帮,類似AQS的同步隊列或粮,執(zhí)行的線程執(zhí)行完畢會調(diào)用finishCompletion()方法喚醒等待隊列中的線程進行執(zhí)行。
下面對Demo中的代碼進行源碼分析:
? ? ? ? 第一步:執(zhí)行構(gòu)造方法
????????????public FutureTask(Callable callable) {
????????????????????if (callable ==null) //判斷傳入的callable不為空集侯,則初始化FutureTask中的成員變量
????????????????????????????throw new NullPointerException();
? ????????????? ????this.callable = callable;
? ? ? ? ? ? ? ? ????this.state =NEW;? ? ? // ensure visibility of callable
????????????}
? ? ? ? 第二步:new Thread(futureTask).start();
? ? ? ? ????此處實例化一個線程并調(diào)用start()方法被啼,則操作系統(tǒng)調(diào)度該線程,并分配CPU時間片棠枉,如果該線程獲取到CPU時間片后浓体,則會執(zhí)行run方法
? ? ? ? ????注意:由于使用的是FutureTask,該類實現(xiàn)了Runnable接口并覆寫了run方法,則會進入到FutrueTask的run方法中執(zhí)行辈讶,代碼如下:
? ??????????public void run() {
? ? ? ? ? ? ? ? //首先判斷state是否是處于新建以外的狀態(tài)(如果是新建以外的狀態(tài)則直接返回命浴,因為線程剛進入run只會是new狀態(tài)露泊,為了程序健壯性做此判斷)楞件,compareAndSwapObject是判斷當前FutureTask中的runner是否是null,為空則將當前執(zhí)行的這個線程賦值給runner,不為空就直接退出
????????????????if (state !=NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
????????????????????return;
? ? ? ? ? ? ? ? try {
????????????????????Callable c =callable;
? ? ? ? ????????????if (c !=null &&state ==NEW) { //判斷狀態(tài)與callable
????????????????????V result;
? ? ? ? ? ????????? boolean ran;
? ? ? ? ? ? ????????try {
????????????????????????result = c.call(); //執(zhí)行Callable中的call()方法
? ? ? ? ? ? ? ? ????????ran =true;
? ? ? ? ? ? ????????}catch (Throwable ex) {//如果執(zhí)行call()方法出現(xiàn)了異常掀泳,則捕獲異常并去設(shè)置異常信息
????????????????????????result =null;
? ? ? ? ? ? ? ? ????????ran =false;
? ? ? ? ? ? ? ? ????????setException(ex);//捕獲異常并去設(shè)置異常信息月幌,將異常信息設(shè)置到Object outcome結(jié)果對象中碍讯,同時改變state的狀態(tài)為EXCEPTIONAL
? ? ? ? ? ? ????????}
????????????????????if (ran)
????????????????????????set(result);//正常執(zhí)行,設(shè)置返回結(jié)果扯躺,設(shè)置到Object outcome中捉兴,并設(shè)置state狀態(tài)為NORMAL
? ? ? ? ? ? ? ? ? ? }
????????????????????}finally {
? ? ? ? ????????????????runner =null;//清空runner,方便GC回收
? ? ? ? ????????????????int s =state;
? ? ? ? ????????????????if (s >=INTERRUPTING) //如果state狀態(tài)為中斷中或者已中斷
????????????????????????????handlePossibleCancellationInterrupt(s);//交出當前線程的執(zhí)行權(quán),與其他線程重新競爭
? ? ????????????????????}
????????????????}
? ? ? ? ? ? 此處看一下setException()與set()方法內(nèi)的源碼
? ??????????setException():
? ??????????????protected void setException(Throwable t) {
? ? ? ? ? ? ? ? ? ? //原子操作比較state的內(nèi)存地址上的值是否與NEW相等录语,相等則將state修改為COMPLETING
????????????????????if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
????????????????????????outcome = t;//使用outcome保存異常結(jié)果信息
? ? ? ? ????????????????UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 將state設(shè)置為EXCEPTIONAL
? ? ? ? ????????????????finishCompletion();//下面貼出源碼倍啥,具體解釋
? ????????????????? }
????????????????}
? ? ? ? ? ? set():
? ??????????????protected void set(V v) {
? ? ? ? ? ? ? ? ? ? ? ? //原子操作比較state的內(nèi)存地址上的值是否與NEW相等,相等則將state修改為COMPLETING
????????????????????????if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
????????????????????????????????outcome = v;//使用outcome保存正常返回的結(jié)果信息
? ? ? ? ????????????????????????UNSAFE.putOrderedInt(this, stateOffset, NORMAL); ?// 將state設(shè)置為NORMAL
? ? ? ? ????????????????????????finishCompletion();
? ? ????????????????????}
????????????????????}
? ??????????????finishCompletion():這是線程執(zhí)行(正撑觳海或者異常)的最后一個方法虽缕,下面分析一下源碼
? ??????????????????private void finishCompletion() {
????????????????????????// 無論執(zhí)行的線程是正常還是異常都會返回結(jié)果并設(shè)置到outcome中,那么返回結(jié)果后蒲稳,主線程或者其他線程還在等待隊列中氮趋,則需要去喚醒等待隊列中的線程進行執(zhí)行。
? ? ????????????????????for (WaitNode q; (q =waiters) !=null;) {//循環(huán)等待隊列中的線程節(jié)點
????????????????????????????if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//判斷等待線程節(jié)點是否與之前設(shè)置的等待線程一致江耀,一致則返回true
????????????????????????????????for (;;) {//自旋
????????????????????????????????????Thread t = q.thread;
? ? ? ? ? ? ? ? ????????????????????if (t !=null) {//等待線程不為空
????????????????????????????????????????q.thread =null;//等待線程節(jié)點中的線程局部變量置空
? ? ? ? ? ? ? ? ? ? ????????????????????LockSupport.unpark(t);//喚醒等待線程執(zhí)行
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }
????????????????????????????????????WaitNode next = q.next;//獲取等待線程節(jié)點的下一個節(jié)點
? ? ? ? ? ? ? ? ????????????????????if (next ==null)// 如果為空凭峡,則說明當前等待線程節(jié)點就是頭結(jié)點,已經(jīng)沒有后續(xù)等待節(jié)點决记,
????????????????????????????????????????break;//退出自旋
? ? ? ? ? ? ? ? ????????????????????????q.next =null; // 置空,方便GC回收
? ? ? ? ? ? ? ? ????????????????????????q = next;//當前等待線程節(jié)點也置空倍踪,在上面已經(jīng)喚醒了當前的等待線程系宫,因此此處也將已經(jīng)喚醒的線程置空讓GC回收
? ? ? ? ? ? ????????????????????????}
????????????????????????????????????break;
? ? ? ? ????????????????????????}
????????????????????????????}
????????????????????????????done();
? ? ????????????????????????callable =null;? ? ? ? // 當前線程中的callable置空索昂,讓gc回收,整個線程到此執(zhí)行完畢
????????????????????????}
? ? ? ? ? ? 以上詳細說明線程start()方法調(diào)用后的一系列執(zhí)行過程扩借。那么有個問題就是這個等待線程節(jié)點是從哪里產(chǎn)生的呢椒惨?下面解釋這個問題
? ? ? ? ? ? 看Demo中的代碼:?System.out.println(futureTask.get()); 這句代碼在demo有解釋,get()方法是一個阻塞的方法潮罪,那么就是這個get()
????????????方法產(chǎn)生的WaitNode節(jié)點的康谆。看源碼:
? ??????????????public V get() throws InterruptedException, ExecutionException {
????????????????????int s =state;?
? ? ????????????????if (s <=COMPLETING) //判斷state是否處于執(zhí)行中或者新建狀態(tài)嫉到,是的話則說明調(diào)用get()方法的線程需要進入等待隊列沃暗,等待callable所在的線程執(zhí)行完畢后并等待喚醒,此處的代碼就證明了get()方法是阻塞的
????????????????????????s = awaitDone(false, 0L);//下面分析源碼
? ? ????????????????return report(s);
? ? ? ? ? ? ?????}
? ??????????????awaitDone():
? ??????????????????private int awaitDone(boolean timed, long nanos) throws InterruptedException {
????????????????????????????final long deadline = timed ? System.nanoTime() + nanos :0L;//引入等待超時機制何恶,調(diào)用get()方法傳false孽锥,0L則表示不進行超時處理
? ? ? ? ? ? ? ? ? ? ? ? ? ? WaitNode q =null;
? ? ? ? ? ? ? ? ? ? ? ? ? ? boolean queued =false;
? ? ????????????????????????for (;;) {//自旋
????????????????????????????????if (Thread.interrupted()) {//判斷當前線程(主線程)是否已經(jīng)中斷,中斷則去移除等待線程節(jié)點并拋出中斷異常细层,此代碼為程序健壯性考慮惜辑,不必過分關(guān)注
????????????????????????????????????removeWaiter(q);
? ? ? ? ? ? ????????????????????????throw new InterruptedException();
? ? ? ? ????????????????????????}
????????????????????????????????int s =state;
? ? ? ????????????????????????? if (s >COMPLETING) {//如果執(zhí)行的子線程的state狀態(tài)已經(jīng)處于NORMAL或者EXCEPTIONAL狀態(tài),說明子線程已經(jīng)執(zhí)行結(jié)束了疫赎,那么直接返回盛撑,說明不用讓阻塞線程進入等待隊列。
????????????????????????????????????if (q !=null)//等待線程節(jié)點不為空捧搞,則直接置為空抵卫,并返回state
????????????????????????????????????????q.thread =null;
? ? ? ? ? ? ????????????????????????????return s;
? ? ? ????????????????????????? }
????????????????????????????????else if (s ==COMPLETING)// 重點,當當前執(zhí)行子線程還在運行中的時候实牡,則此時要讓主線程交出CPU執(zhí)行權(quán)陌僵。
? ? ? ? ? ? ????????????????????????Thread.yield();//交出CPU執(zhí)行權(quán),yield()方法雖然是交出執(zhí)行權(quán)创坞,但主線程還是可以和其他線程進行公平競爭
? ? ? ????????????????????????? else if (q ==null)//重點碗短,以上的條件都不滿足,則說明該線程確實需要進入等待隊列進行等待
????????????????????????????????????q =new WaitNode();//構(gòu)造一個等待節(jié)點题涨,該構(gòu)造方法中將當前線程作為參數(shù)傳遞給節(jié)點中的局部變量進行保存
? ? ? ? ????????????????????????else if (!queued)//注意偎谁,此處較難理解,這是入隊列的操作纲堵,q.next=waiters 巡雨,如果是首節(jié)點,waiters一定是null的席函,則q.next=null,waitersOffset偏移量指向的地址上的初始值也是null铐望,則期望值與內(nèi)存地址值都為null,則會將q的值設(shè)置到waitersOffset指向的地址,同時返回true,等待節(jié)點進入隊列就成功正蛙。原子操作比較抽象督弓,建議去深入理解CAS操作
????????????????????????????????????queued =UNSAFE.compareAndSwapObject(this, waitersOffset, q.next =waiters, q);
? ? ? ? ????????????????????????else if (timed) {
????????????????????????????????????nanos = deadline - System.nanoTime();
? ? ? ? ? ? ????????????????????????if (nanos <=0L) {
????????????????????????????????????????removeWaiter(q);
? ? ? ? ? ? ? ? ????????????????????????return state;
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}
????????????????????????????????????LockSupport.parkNanos(this, nanos);//這是帶有超時設(shè)置的阻塞方法
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ? ? ? ? else
? ? ? ? ? ? ????????????????LockSupport.park(this);//自旋到這里的時候,說明已經(jīng)加入等待隊列乒验,阻塞當前線程愚隧,讓其等待被喚醒。
? ? ????????????????????}
????????????????????}
? ? ? ? ? ? ? ? 另外最后獲取返回值的方法report()比較簡單就不多做解釋锻全。內(nèi)部就是返回outcome的值
????????????????????private V report(int s)throws ExecutionException {
????????????????????????Object x =outcome;
? ????????????????????? if (s ==NORMAL)
????????????????????????????return (V)x;
? ????????????????????? if (s >=CANCELLED)
????????????????????????????throw new CancellationException();
? ????????????????????????? throw new ExecutionException((Throwable)x);
????????????????????}
? ? ? ? ? ? ? ? 到此狂塘,F(xiàn)utureTask源碼就分析完畢,總結(jié)一下:Callable實現(xiàn)的線程內(nèi)部使用state的轉(zhuǎn)換鳄厌,這種轉(zhuǎn)換是基于原子操作來保證線程安全(多線程環(huán)境下對state進行競爭)荞胡,同時其他非Callable的外部線程調(diào)用FutureTask中的方法(主要是get()),則讓這些線程進入等待隊列,當Callable的線程執(zhí)行完畢會使用自旋對等待中的線程進行喚醒部翘。