關(guān)于Thread和Runnable的區(qū)別:
1)從對象編程來說:Thread是繼承;Runnable是組合,會比繼承耦合性低,更加靈活捉撮。
2)從對象共享角度:Runnable實例可以由過個線程實例共享彤断,會產(chǎn)生并發(fā)問題野舶。
3)對象創(chuàng)建成本:Thread在創(chuàng)建的時候JVM就會為其分配調(diào)用棧空間宰衙,內(nèi)核線程等資源平道;而Runnable是普通的類,作為參數(shù)傳給Thread供炼,所以Runnable創(chuàng)建成本相對較低一屋。
關(guān)于用戶線程(User)和守護(hù)線程(Daemon):
用戶線程會阻止JVM正常停止;
守護(hù)線程不會影響JVM正常停止袋哼,所以守護(hù)線程通常用于執(zhí)行一些重要性不高的任務(wù)冀墨。
關(guān)于Callable和Runnable的區(qū)別:
Callable方式需要FutureTask實現(xiàn)類的支持,用于接收運(yùn)算結(jié)果涛贯。
FutureTask是Future接口的實現(xiàn)類诽嘉,且FutureTask也可用于閉鎖的操作,因為get() 會阻塞當(dāng)前線程直到Callable返回結(jié)果弟翘。
FutureTask
FutureTask提供了支持cancel的異步計算方式虫腋,它實現(xiàn)了RunnableFuture接口,而RunnableFuture接口繼承了Runnable和Future接口衅胀。
FutureTask可以用來包裝一個Callable和Runnable對象岔乔,且因為實現(xiàn)了Runnable,所以FutureTask可以被提交到線程池處理滚躯。
源碼分析
因為FutureTask實現(xiàn)了Future接口,所以它會實現(xiàn)Future接口的相關(guān)方法嘿歌,比如說get(), cancel()等等掸掏。
成員變量以及構(gòu)造函數(shù)
有三個volatile成員變量,會通過UNSAFE類來進(jìn)行CAS操作宙帝。
另外定義了state變量對應(yīng)的7中狀態(tài)丧凤。
callable變量意味著FutureTask會將Runnable也封裝成Callable來處理。
outcome則是返回這或者異常的信息
private volatile int state;
private static final int NEW = 0;//初始是NEW
private static final int COMPLETING = 1;//在set()和setException()里面先CAS更新成COMPLETING榜配,加鎖操作
private static final int NORMAL = 2;//set()方法執(zhí)行成功從COMPLETING到NORMAL
private static final int EXCEPTIONAL = 3;//setException()方法執(zhí)行成功從COMPLETING到EXCEPTIONAL
private static final int CANCELLED = 4;//cancel(false)的時候
private static final int INTERRUPTING = 5;//cancel(true)的時候
private static final int INTERRUPTED = 6;//cancel(true)最終狀態(tài)
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
構(gòu)造函數(shù)
FutureTask內(nèi)部有Callable類型的成員變量忽孽,所以Runnable會通過一個適配器RunnableAdapter实撒,轉(zhuǎn)換成Callable,內(nèi)部還是調(diào)用的Runnable的run()方法仍侥。
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
//接受Runnable的構(gòu)造函數(shù)
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
//適配器將Runnable轉(zhuǎn)換成Callable
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
//Adapter類,Callable的call方法調(diào)用Runnable的run方法鸳君。
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
run()
run()方法主要是調(diào)用Callable的call方法农渊,如果有異常則通過setException(Throwable t)設(shè)置異常;如果沒有異常或颊,則通過set(V result)方法砸紊,來設(shè)置返回值传于。且這兩個set方法最終都會調(diào)用finishCompletion()來unpark喚醒WaitNode節(jié)點里的等待線程去獲得結(jié)果。
run()方法涉及到的方法有setException(), set(), finishCompletion(), handlePossibleCancellationInterrupt()醉顽。
涉及到的state有COMPLETING, EXCEPTIONAL, NORMAL
且在call()方法返回之前沼溜,都是NEW狀態(tài),這樣cancel()才會有機(jī)會游添。
且run方法最好檢查INTERRUPTING狀態(tài)盛末,會在handlePossibleCancellationInterrupt自旋直到cancel()方法結(jié)束,state變成INTERRUPTED狀態(tài)(下面代碼里分析)否淤。
public void run() {
//如果state不是NEW或者CAS更新runner成員變量失敗悄但,則直接return。
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//直接調(diào)用Callable的call方法并拿到result
result = c.call();
ran = true;
} catch (Throwable ex) {
//如果call()方法拋出異常石抡,ran=false
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
//如果拋出異常檐嚣,則調(diào)用該方法,更新state狀態(tài)到COMPLETING啰扛,
//將Throwable賦值給outcome變量嚎京,再更新state到EXCEPTIONAL.
//最后finishCompletion()喚醒WaitNode中等待線程節(jié)點
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
//因為上面更新到COMPLETING成功,這邊更新EXCEPTIONAL則不需要CAS操作隐解,而是putOrderedInt內(nèi)存操作
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
//先通過CAS獲得更新資格鞍帝,將waiter變量更新為null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
//拿到q對應(yīng)的線程,更新為null煞茫,并喚醒該線程
q.thread = null;
LockSupport.unpark(t);
}
//然后操作q的后繼節(jié)點
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//done()方法由子類實現(xiàn)
done();
callable = null; // to reduce footprint
}
//如果沒有異常帕涌,更新state到COMPLETING先,然后將result設(shè)置到outcome续徽,更新state到NORMAL
//最后finishCompletion喚醒等待線程
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
//final的時候會檢查一下state是不是被中斷蚓曼,如果一直在中斷過程中,則當(dāng)前線程yield讓出CPU
//這邊自旋等待調(diào)用中斷的線程執(zhí)行完畢钦扭,因為此時run方法已經(jīng)結(jié)束纫版,runner也被重置為null
// 所以別的線程可能有機(jī)會提交這個FutureTask,而執(zhí)行run方法客情,這時候cancel可能被應(yīng)用到不同的Task上其弊。
// 所以這里要自旋直到cancel()方法執(zhí)行結(jié)束
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}
cancel()
cancel的時候有幾種情況:
1.任務(wù)還沒開始,直接返回false
2.任務(wù)已經(jīng)開始:
2.1調(diào)用cancel(false)膀斋,就是最后調(diào)用finishCompletion()來喚醒等待線程
2.2調(diào)用cancel(true)梭伐,則通過runner獲得當(dāng)前運(yùn)行線程,調(diào)用運(yùn)行線程的Interrupt()方法來設(shè)置目標(biāo)線程的中斷標(biāo)記位為true
這里涉及到的state狀態(tài)有INTERRUPTING, CANCELLED, INTERRUPTED概页。
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
//如果參數(shù)是true籽御,則更新為INTERRUPTING狀態(tài),否則CANCELLED狀態(tài)
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();//通過設(shè)置中斷標(biāo)志位來控制目標(biāo)線程的生命周期
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
get()
如果state的狀態(tài)還沒到COMPLETING,就在awaitDone()里面通過LockSupport.park()掛起當(dāng)前線程技掏,直到run()方法執(zhí)行結(jié)束喚醒等待線程铃将。
異常:
1.get()方法響應(yīng)異常中斷,會拋出InterruptedException哑梳。
2.且如果是get(long timeout, TimeUnit unit)方法劲阎,還會拋出TimeoutException。
3.另外如果Callable的call方法沒有捕捉異常而拋出異常鸠真,則get()方法會拋出ExecutionException悯仙。FutureTask的run()方法在執(zhí)行Callable的call()方法時,會將call拋出的異常捕獲包裝成ExecutionException吠卷,這意味著call()方法出現(xiàn)異常锡垄,不會直接導(dǎo)致執(zhí)行線程運(yùn)行結(jié)束(相比正常的Thread的run()方法拋出異常可能導(dǎo)致執(zhí)行線程提前終止生命周期)祭隔。
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
//如果state還沒到COMPLETING货岭,則開始掛起當(dāng)前線程
s = awaitDone(false, 0L);
//run()方法結(jié)束之后會喚醒當(dāng)前線程,去拿到執(zhí)行結(jié)果疾渴。
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
//判斷當(dāng)前線程是否被中斷
if (Thread.interrupted()) {
//如果被中斷千贯,則需要把waitNode的節(jié)點都垃圾回收掉,拋出InterruptedException
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
//如果已經(jīng)結(jié)束搞坝,則將waitNode的thread設(shè)置為null搔谴,并返回state值
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
//COMPLETING表示set開始,快結(jié)束了桩撮,則讓執(zhí)行線程讓出CPU等待一下
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//這里s就還沒到COMPLETING敦第,如果waitNode是null,則創(chuàng)建WaitNode
else if (q == null)
q = new WaitNode();
//如果q不為null距境,則需要添加到WaitNode的next后繼節(jié)點
else if (!queued)
//這個UNSAFE操作申尼,先更新q的next=waiter,再將qCAS更新到waiters變量
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
//如果是有等待時間的垫桂,則在等待時間過后,removeWaiter()來處理等待的node
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
//處理最終結(jié)果
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
//如果state是NORMAL粟按,則直接返回outcome值
return (V)x;
if (s >= CANCELLED)
//則就是cancel()使得線程被Interrupt了或者cancel了诬滩,拋出CancellationException
throw new CancellationException();
//否則就是拋出異常了
throw new ExecutionException((Throwable)x);
}
因為是無線循環(huán),所以里面的q = new WaitNode()和UNSAFE更新操作在park之前都會做一邊灭将,即park之前會將當(dāng)前線程封裝到WaitNode鏈表里疼鸟。