Java并發(fā)工具類的三板斧 狀態(tài)典唇,隊列馋吗,CAS
狀態(tài):
/** * 當(dāng)前任務(wù)的運(yùn)行狀態(tài)焕盟。 * * 可能存在的狀態(tài)轉(zhuǎn)換 * NEW -> COMPLETING -> NORMAL(有正常結(jié)果)
* NEW -> COMPLETING -> EXCEPTIONAL(結(jié)果為異常) * NEW -> CANCELLED(無結(jié)果) * NEW -> INTERRUPTING -> INTERRUPTED(無結(jié)果) */
private volatile int state; private static final int NEW = 0; //初始狀態(tài)
private static final int COMPLETING = 1; //結(jié)果計算完成或響應(yīng)中斷到賦值給返回值之間的狀態(tài)。
private static final int NORMAL = 2; //任務(wù)正常完成宏粤,結(jié)果被set
private static final int EXCEPTIONAL = 3; //任務(wù)拋出異常
private static final int CANCELLED = 4; //任務(wù)已被取消
private static final int INTERRUPTING = 5; //線程中斷狀態(tài)被設(shè)置ture脚翘,但線程未響應(yīng)中斷
private static final int INTERRUPTED = 6; //線程已被中斷 //將要執(zhí)行的任務(wù)
private Callable<V> callable; //用于get()返回的結(jié)果,也可能是用于get()方法拋出的異常
private Object outcome; // non-volatile, protected by state reads/writes
private volatile Thread runner; //執(zhí)行callable的線程绍哎,調(diào)用FutureTask.run()方法通過CAS設(shè)置
private volatile WaitNode waiters; //棧結(jié)構(gòu)的等待隊列来农,該節(jié)點是棧中的最頂層節(jié)點。
隊列:
在FutureTask中崇堰,隊列的實現(xiàn)是一個單向鏈表沃于,它表示所有等待任務(wù)執(zhí)行完畢的線程的集合,如果獲取結(jié)果時海诲,任務(wù)還沒有執(zhí)行完畢怎么辦呢繁莹?那么獲取結(jié)果的線程就會在一個等待隊列中掛起,直到任務(wù)執(zhí)行完畢被喚醒特幔。
掛起的線程什么時候被喚醒蒋困?
1.任務(wù)執(zhí)行完畢了,在finishCompletion方法中會喚醒所有在隊列中等待的線程
2.等待的線程自身因為被中斷等原因而被喚醒敬辣。
CAS:
使用CAS來完成入棧出棧操作。為啥要使用一個線程安全的棧呢零院,因為同一時刻可能有多個線程都在獲取任務(wù)的執(zhí)行結(jié)果溉跃,如果任務(wù)還在執(zhí)行過程中,
則這些線程就要被包裝成WaitNode扔到棧的棧頂告抄,即完成入棧操作撰茎,這樣就有可能出現(xiàn)多個線程同時入棧的情況,因此需要使用CAS操作保證入棧的線程安全打洼,對于出棧的情況也是同理龄糊。
使用這個隊列就只需要一個指向棧頂節(jié)點的指針就行了
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
//如果狀態(tài)小于COMPLETING 說明還沒計算完成,則調(diào)用 awaitDone方法阻塞
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
// 如果該線程被中斷募疮,則從等待隊列中移除該線程炫惩,直接拋異常
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
// 如果state >COMPLETING 說明任務(wù)已經(jīng)執(zhí)行完成,或者取消阿浓,如果等待隊列不為空他嚷,將等待隊列中該節(jié)點置為null,
// 返回state 然后就可以在外層方法調(diào)用report方法獲取結(jié)果了
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
//如果state=COMPLETING,則使用yield筋蓖,因為此狀態(tài)的時間特別短卸耘,通過yield比掛起響應(yīng)更快。
//yield 表示讓出CPU執(zhí)行權(quán)粘咖,等待下一次競爭蚣抗。
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
// 如果q不為null,說明代表當(dāng)前線程的WaitNode已經(jīng)被創(chuàng)建出來了瓮下,如果queued=false翰铡,表示當(dāng)前線程還沒有入隊 接下來利用CAS入隊
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);
//如果需要阻塞指定時間,則使用LockSupport.parkNanos阻塞指定時間
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
//如果到指定時間還沒執(zhí)行完唱捣,則從隊列中移除該節(jié)點两蟀,并返回當(dāng)前狀態(tài)
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
public void run() {
// 利用CAS設(shè)置當(dāng)前執(zhí)行線程,保證run方法只被執(zhí)行一次
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 真正執(zhí)行任務(wù)震缭,獲取結(jié)果
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
// 設(shè)置結(jié)果
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
// 先利用CAS將狀態(tài)改為COMPLETING 保證只有一個線程能夠執(zhí)行 outcome = v;
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 將結(jié)果賦值給 outcome
outcome = v;
// 再利用CAS將狀態(tài)改為NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// 將隊列清空赂毯,然后喚醒隊列中所有線程
finishCompletion();
}
}