本文是自己學(xué)習(xí)Java中Future機(jī)制的筆記函卒。閱讀了很多網(wǎng)上的源碼分析,自己對(duì)照著JDK1.8源碼走了一遍。算是稍微理解了一下Future機(jī)制土全。
本文的內(nèi)容包含如下:
- 為什么出現(xiàn)Future機(jī)制
- 如何使用Future機(jī)制
- Future 的 UML 圖
- Future和FutureTask的關(guān)系,以及FutureTask的源碼解析
- 用的知識(shí)點(diǎn)補(bǔ)充会涎,比如
Unsafe
類(lèi)中compareAndSwap
等
一裹匙、為什么出現(xiàn)Future機(jī)制
常見(jiàn)的兩種創(chuàng)建線程的方式。一種是直接繼承Thread末秃,另外一種就是實(shí)現(xiàn)Runnable接口概页。
這兩種方式都有一個(gè)缺陷就是:在執(zhí)行完任務(wù)之后無(wú)法獲取執(zhí)行結(jié)果。
從Java 1.5開(kāi)始练慕,就提供了Callable和Future惰匙,通過(guò)它們可以在任務(wù)執(zhí)行完畢之后得到任務(wù)執(zhí)行結(jié)果。
Future模式的核心思想是能夠讓主線程將原來(lái)需要同步等待的這段時(shí)間用來(lái)做其他的事情铃将。(因?yàn)榭梢援惒将@得執(zhí)行結(jié)果项鬼,所以不用一直同步等待去獲得執(zhí)行結(jié)果)
上圖簡(jiǎn)單描述了不使用Future和使用Future的區(qū)別,不使用Future模式劲阎,主線程在invoke完一些耗時(shí)邏輯之后需要等待绘盟,這個(gè)耗時(shí)邏輯在實(shí)際應(yīng)用中可能是一次RPC調(diào)用,可能是一個(gè)本地IO操作等。B圖表達(dá)的是使用Future模式之后龄毡,我們主線程在invoke之后可以立即返回吠卷,去做其他的事情,回頭再來(lái)看看剛才提交的invoke有沒(méi)有結(jié)果沦零。
二撤嫩、Future的相關(guān)類(lèi)圖
2.1 Future 接口
首先,我們需要清楚蠢终,F(xiàn)utrue是個(gè)接口序攘。Future就是對(duì)于具體的Runnable或者Callable任務(wù)的執(zhí)行結(jié)果進(jìn)行取消、查詢(xún)是否完成寻拂、獲取結(jié)果程奠。必要時(shí)可以通過(guò)get方法獲取執(zhí)行結(jié)果,該方法會(huì)阻塞直到任務(wù)返回結(jié)果祭钉。
接口定義行為瞄沙,我們通過(guò)上圖可以看到實(shí)現(xiàn)Future接口的子類(lèi)會(huì)具有哪些行為:
- 我們可以取消這個(gè)執(zhí)行邏輯,如果這個(gè)邏輯已經(jīng)正在執(zhí)行慌核,提供可選的參數(shù)來(lái)控制是否取消已經(jīng)正在執(zhí)行的邏輯距境。
- 我們可以判斷執(zhí)行邏輯是否已經(jīng)被取消。
- 我們可以判斷執(zhí)行邏輯是否已經(jīng)執(zhí)行完成垮卓。
- 我們可以獲取執(zhí)行邏輯的執(zhí)行結(jié)果垫桂。
- 我們可以允許在一定時(shí)間內(nèi)去等待獲取執(zhí)行結(jié)果,如果超過(guò)這個(gè)時(shí)間粟按,拋
TimeoutException
诬滩。
2.2 FutureTask 類(lèi)
類(lèi)圖如下:
FutureTask是Future的具體實(shí)現(xiàn)。FutureTask
實(shí)現(xiàn)了RunnableFuture
接口灭将。RunnableFuture
接口又同時(shí)繼承了Future
和 Runnable
接口疼鸟。所以FutureTask
既可以作為Runnable被線程執(zhí)行,又可以作為Future得到Callable的返回值庙曙。
三空镜、FutureTask的使用方法
舉個(gè)例子,假設(shè)我們要執(zhí)行一個(gè)算法捌朴,算法需要兩個(gè)輸入 input1
和 input2
, 但是input1
和input2
需要經(jīng)過(guò)一個(gè)非常耗時(shí)的運(yùn)算才能得出吴攒。由于算法必須要兩個(gè)輸入都存在,才能給出輸出男旗,所以我們必須等待兩個(gè)輸入的產(chǎn)生舶斧。接下來(lái)就模仿一下這個(gè)過(guò)程。
package src;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class FutureTaskTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
long starttime = System.currentTimeMillis();
//input2生成察皇, 需要耗費(fèi)3秒
FutureTask<Integer> input2_futuretask = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(3000);
return 5;
}
});
new Thread(input2_futuretask).start();
//input1生成茴厉,需要耗費(fèi)2秒
FutureTask<Integer> input1_futuretask = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(2000);
return 3;
}
});
new Thread(input1_futuretask).start();
Integer integer2 = input2_futuretask.get();
Integer integer1 = input1_futuretask.get();
System.out.println(algorithm(integer1, integer2));
long endtime = System.currentTimeMillis();
System.out.println("用時(shí):" + String.valueOf(endtime - starttime));
}
//這是我們要執(zhí)行的算法
public static int algorithm(int input, int input2) {
return input + input2;
}
}
輸出結(jié)果:
我們可以看到用時(shí)3001毫秒泽台,與最費(fèi)時(shí)的input2生成時(shí)間差不多。
注意矾缓,我們?cè)诔绦蛑猩蒳nput1時(shí)怀酷,也讓線程休眠了2秒,但是結(jié)果不是3+2嗜闻。說(shuō)明FutureTask是被異步執(zhí)行了蜕依。
四、FutureTask源碼分析
4.1 state字段
volatile修飾的state字段琉雳;表示FutureTask當(dāng)前所處的狀態(tài)样眠。可能的轉(zhuǎn)換過(guò)程見(jiàn)注釋翠肘。
/**
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
4.2 其他變量
/** 任務(wù) */
private Callable<V> callable;
/** 儲(chǔ)存結(jié)果*/
private Object outcome; // non-volatile, protected by state reads/writes
/** 執(zhí)行任務(wù)的線程*/
private volatile Thread runner;
/** get方法阻塞的線程隊(duì)列 */
private volatile WaitNode waiters;
//FutureTask的內(nèi)部類(lèi)檐束,get方法的等待隊(duì)列
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
4.3 CAS工具初始化
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
這段代碼是為了后面使用CAS而準(zhǔn)備的∈叮可以這么理解:
一個(gè)java對(duì)象可以看成是一段內(nèi)存被丧,各個(gè)字段都得按照一定的順序放在這段內(nèi)存里,同時(shí)考慮到對(duì)齊要求绪妹,可能這些字段不是連續(xù)放置的甥桂,用這個(gè)UNSAFE.objectFieldOffset()
方法能準(zhǔn)確地告訴你某個(gè)字段相對(duì)于對(duì)象的起始內(nèi)存地址的字節(jié)偏移量,因?yàn)槭窍鄬?duì)偏移量邮旷,所以它其實(shí)跟某個(gè)具體對(duì)象又沒(méi)什么太大關(guān)系黄选,跟class的定義和虛擬機(jī)的內(nèi)存模型的實(shí)現(xiàn)細(xì)節(jié)更相關(guān)。
4.4 構(gòu)造函數(shù)
FutureTask有兩個(gè)構(gòu)造函數(shù):
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
這兩個(gè)構(gòu)造函數(shù)區(qū)別在于廊移,如果使用第一個(gè)構(gòu)造函數(shù)最后獲取線程實(shí)行結(jié)果就是callable的執(zhí)行的返回結(jié)果糕簿;而如果使用第二個(gè)構(gòu)造函數(shù)那么最后獲取線程實(shí)行結(jié)果就是參數(shù)中的result,接下來(lái)讓我們看一下FutureTask的run方法狡孔。
同時(shí)兩個(gè)構(gòu)造函數(shù)都把當(dāng)前狀態(tài)設(shè)置為NEW。
4.5 run方法及其他
構(gòu)造完FutureTask后蜂嗽,會(huì)把它當(dāng)做線程的參數(shù)傳進(jìn)去苗膝,然后線程就會(huì)運(yùn)行它的run方法。所以我們先來(lái)看一下run方法:
public void run() {
//如果狀態(tài)不是new植旧,或者runner舊值不為null(已經(jīng)啟動(dòng)過(guò)了)辱揭,就結(jié)束
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable; // 這里的callable是從構(gòu)造方法里面?zhèn)魅说? if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); //執(zhí)行任務(wù),并將結(jié)果保存在result字段里病附。
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex); // 保存call方法拋出的異常
}
if (ran)
set(result); // 保存call方法的執(zhí)行結(jié)果
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
其中问窃,catch語(yǔ)句中的setException(ex)如下:
//發(fā)生異常時(shí)設(shè)置state和outcome
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
finishCompletion();// 喚醒get()方法阻塞的線程
}
}
而正常完成時(shí),set(result);方法如下:
//正常完成時(shí)完沪,設(shè)置state和outcome
protected void set(V v) {
//正常完成時(shí)域庇,NEW->COMPLETING->NORMAL
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
finishCompletion(); // 喚醒get方法阻塞的線程
}
}
這兩個(gè)set方法中嵌戈,都是用到了finishCompletion()
去喚醒get方法阻塞的線程。下面來(lái)看看這個(gè)方法:
//移除并喚醒所有等待的線程听皿,調(diào)用done熟呛,并清空callable
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t); //喚醒線程
}
//接下來(lái)的這幾句代碼是將當(dāng)前節(jié)點(diǎn)剝離出隊(duì)列,然后將q指向下一個(gè)等待節(jié)點(diǎn)尉姨。被剝離的節(jié)點(diǎn)由于thread和next都為null庵朝,所以會(huì)被GC回收。
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done(); //這個(gè)是空的方法又厉,子類(lèi)可以覆蓋九府,實(shí)現(xiàn)回調(diào)的功能。
callable = null; // to reduce footprint
}
好覆致,到這里我們把運(yùn)行以及設(shè)置結(jié)果的流程分析完了侄旬。那接下來(lái)看一下怎么獲得執(zhí)行結(jié)果把。也就是get
方法篷朵。
get方法有兩個(gè)勾怒,一個(gè)是有超時(shí)時(shí)間設(shè)置,另一個(gè)沒(méi)有超時(shí)時(shí)間設(shè)置声旺。
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
// get(timeout, unit) 也很簡(jiǎn)單, 主要還是在 awaitDone里面
if(unit == null){
throw new NullPointerException();
}
int s = state;
// 判斷state狀態(tài)是否 <= Completing, 調(diào)用awaitDone進(jìn)行自旋等待
if(s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING){
throw new TimeoutException();
}
// 根據(jù)state的值進(jìn)行返回結(jié)果或拋出異常
return report(s);
}
兩個(gè)get方法都用到了awaitDone()
笔链。這個(gè)方法的作用是: 等待任務(wù)執(zhí)行完成、被中斷或超時(shí)腮猖〖ǎ看一下源碼:
//等待完成,可能是是中斷澈缺、異常坪创、正常完成,timed:true姐赡,考慮等待時(shí)長(zhǎng)莱预,false:不考慮等待時(shí)長(zhǎng)
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L; //如果設(shè)置了超時(shí)時(shí)間
WaitNode q = null;
boolean queued = false;
for (;;) {
/**
* 有優(yōu)先級(jí)順序
* 1、如果線程已中斷项滑,則直接將當(dāng)前節(jié)點(diǎn)q從waiters中移出
* 2依沮、如果state已經(jīng)是最終狀態(tài)了,則直接返回state
* 3枪狂、如果state是中間狀態(tài)(COMPLETING),意味很快將變更過(guò)成最終狀態(tài)危喉,讓出cpu時(shí)間片即可
* 4、如果發(fā)現(xiàn)尚未有節(jié)點(diǎn)州疾,則創(chuàng)建節(jié)點(diǎn)
* 5辜限、如果當(dāng)前節(jié)點(diǎn)尚未入隊(duì),則將當(dāng)前節(jié)點(diǎn)放到waiters中的首節(jié)點(diǎn)严蓖,并替換舊的waiters
* 6薄嫡、線程被阻塞指定時(shí)間后再喚醒
* 7氧急、線程一直被阻塞直到被其他線程喚醒
*
*/
if (Thread.interrupted()) {// 1
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {// 2
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // 3
Thread.yield();
else if (q == null) // 4
q = new WaitNode();
else if (!queued) // 5
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
else if (timed) {// 6
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q); //從waiters中移出節(jié)點(diǎn)q
return state;
}
LockSupport.parkNanos(this, nanos);
}
else // 7
LockSupport.park(this);
}
}
接下來(lái)看下removeWaiter()
移除等待節(jié)點(diǎn)的源碼:
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null; // 將移除的節(jié)點(diǎn)的thread=null, 為移除做標(biāo)示
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
//通過(guò) thread 判斷當(dāng)前 q 是否是需要移除的 q節(jié)點(diǎn),因?yàn)槲覀儎偛艠?biāo)示過(guò)了
if (q.thread != null)
pred = q; //當(dāng)不是我們要移除的節(jié)點(diǎn)岂座,就往下走
else if (pred != null) {
//當(dāng)p.thread==null時(shí)态蒂,到這里。下面這句話费什,相當(dāng)于把q從隊(duì)列移除钾恢。
pred.next = s;
//pred.thread == null 這種情況是在多線程進(jìn)行并發(fā) removeWaiter 時(shí)產(chǎn)生的
//此時(shí)正好移除節(jié)點(diǎn) node 和 pred, 所以loop跳到retry, 從新進(jìn)行這個(gè)過(guò)程。想象一下鸳址,如果在并發(fā)的情況下瘩蚪,其他線程把pred的線程置為空了。那說(shuō)明這個(gè)鏈表不應(yīng)該包含pred了稿黍。所以我們需要跳到retry從新開(kāi)始疹瘦。
if (pred.thread == null) // check for race
continue retry;
}
//到這步說(shuō)明p.thread==null 并且 pred==null。說(shuō)明node是頭結(jié)點(diǎn)巡球。
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
最后在get方法中調(diào)用report(s)
言沐,根據(jù)狀態(tài)s的不同進(jìn)行返回結(jié)果或拋出異常。
private V report(int s) throws ExecutionException {
Object x = outcome; //之前我們set的時(shí)候酣栈,已經(jīng)設(shè)置過(guò)這個(gè)值了险胰。所以直接用。
if (s == NORMAL)
return (V)x; //正常執(zhí)行結(jié)束矿筝,返回結(jié)果
if (s >= CANCELLED)
throw new CancellationException(); //被取消或中斷了起便,就拋異常。
throw new ExecutionException((Throwable)x);
}
以上就是FutureTask的源碼分析窖维。經(jīng)過(guò)了一天的折騰剧董,算是弄明白了窘俺。
最后總結(jié)一下:
FutureTask既可以當(dāng)做Runnable也可以當(dāng)做Future。線程通過(guò)執(zhí)行FutureTask的run方法混狠,將正常運(yùn)行的結(jié)果放入FutureTask類(lèi)的result變量中皂林。使用get方法可以阻塞直到獲得結(jié)果揩抡。
參考資料:
Java并發(fā)編程:Callable褒墨、Future和FutureTask
更好地理解與使用Future
java Unsafe類(lèi)中compareAndSwap相關(guān)介紹
FutureTask源碼解讀