1. Runnable、Callable球散、Future、FutureTask的區(qū)別與聯(lián)系
和Java異步打交道就不能回避掉Runnable
,Callable
,Future
,FutureTask
等類摇肌,首先來介紹下這幾個類的區(qū)別烟零。
1.1 Runnable
Runnable接口是我們最熟悉的,它只有一個run函數(shù)暴备。然后使用某個線程去執(zhí)行該runnable即可實(shí)現(xiàn)多線程悠瞬,Thread類在調(diào)用start()函數(shù)后就是執(zhí)行的是Runnable的run()函數(shù)。Runnable最大的缺點(diǎn)在于run函數(shù)沒有返回值涯捻。
1.2 Callable
Callable接口和Runnable接口類似浅妆,它有一個call函數(shù)。使用某個線程執(zhí)行Callable接口實(shí)質(zhì)就是執(zhí)行其call函數(shù)障癌。call方法和run方法最大的區(qū)別就是call方法有返回值:
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
1.3 Future
Future就是對于具體的Runnable或者Callable任務(wù)的執(zhí)行結(jié)果進(jìn)行取消凌外、查詢是否完成、獲取結(jié)果涛浙、設(shè)置結(jié)果操作康辑。get
方法會阻塞,直到任務(wù)返回結(jié)果(Future簡介)轿亮。
1.4 FutureTask
Future只是一個接口疮薇,在實(shí)際使用過程中,諸如ThreadPoolExecutor返回的都是一個FutureTask實(shí)例我注。
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
可以看到按咒,F(xiàn)utureTask是一個RunnableFuture<V>,而RunnableFuture實(shí)現(xiàn)了Runnbale又實(shí)現(xiàn)了Futrue<V>這兩個接口但骨。
2 FutureTask的構(gòu)造過程
事實(shí)上励七,通過ExecutorService接口的相關(guān)submit方法,實(shí)際上都是提交的Callable或者Runnable嗽冒,包裝成一個FutureTask對象呀伙。
public abstract class AbstractExecutorService implements ExecutorService {
...
//將Runable包裝成FutureTask之后,再調(diào)用execute方法
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
//調(diào)用newTaskFor方法添坊,利用Callable構(gòu)造一個FutureTask對象
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
}
可以看到AbstractExecutorService
的submit方法調(diào)用后返回的就是一個FutureTask對象剿另,接下來看下FutureTask的構(gòu)造方法:
//接受Callable對象作為參數(shù)
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
//接受Runnable對象作為參數(shù)
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);//將Runnable轉(zhuǎn)為Callable對象
this.state = NEW;
}
//callable方法,將Runnable轉(zhuǎn)為一個Callable對象贬蛙,包裝設(shè)計(jì)模式
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
//RunnableAdapter是Executors的一個內(nèi)部類雨女,實(shí)現(xiàn)了Callable接口
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
可以看到,構(gòu)造FutureTask時(shí)阳准,無論傳入的是Runnable還是Callable氛堕,最終都實(shí)現(xiàn)了Callable接口。
3 FutureTask主要成員
接下來看下FutureTask類的主要成員變量:
public class FutureTask<V> implements RunnableFuture<V> {
/*
* FutureTask中定義了一個state變量野蝇,用于記錄任務(wù)執(zhí)行的相關(guān)狀態(tài) 讼稚,狀態(tài)的變化過程如下
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
//主流程狀態(tài)
private static final int NEW = 0; //當(dāng)FutureTask實(shí)例剛剛創(chuàng)建到callbale的call方法執(zhí)行完成前括儒,處于此狀態(tài)
private static final int COMPLETING = 1; //callable的call方法執(zhí)行完成或出現(xiàn)異常時(shí),首先進(jìn)行此狀態(tài)
private static final int NORMAL = 2;//callable的call方法正常結(jié)束時(shí)锐想,進(jìn)入此狀態(tài)帮寻,將outcom設(shè)置為正常結(jié)果
private static final int EXCEPTIONAL = 3;//callable的call方法異常結(jié)束時(shí),進(jìn)入此狀態(tài)赠摇,將outcome設(shè)置為拋出的異常
//取消任務(wù)執(zhí)行時(shí)可能處于的狀態(tài)
private static final int CANCELLED= 4;// FutureTask任務(wù)尚未執(zhí)行固逗,即還在任務(wù)隊(duì)列的時(shí)候,調(diào)用了cancel方法藕帜,進(jìn)入此狀態(tài)
private static final int INTERRUPTING = 5;// FutureTask的run方法已經(jīng)在執(zhí)行烫罩,收到中斷信號,進(jìn)入此狀態(tài)
private static final int INTERRUPTED = 6;// 任務(wù)成功中斷后洽故,進(jìn)入此狀態(tài)
private Callable<V> callable;//需要執(zhí)行的任務(wù)贝攒,提示:如果提交的是Runnable對象,會先轉(zhuǎn)換為Callable對象收津,這是構(gòu)造方法參數(shù)
private Object outcome; //任務(wù)運(yùn)行的結(jié)果
private volatile Thread runner;//執(zhí)行此任務(wù)的線程
//等待該FutureTask的線程鏈表饿这,對于同一個FutureTask,如果多個線程調(diào)用了get方法撞秋,對應(yīng)的線程都會加入到waiters鏈表中长捧,同時(shí)當(dāng)FutureTask執(zhí)行完成后,也會告知所有waiters中的線程
private volatile WaitNode waiters;
......
}
FutureTask的成員變量并不復(fù)雜吻贿,主要記錄以下幾部分信息:
- 狀態(tài)
- 任務(wù)(callable)
- 結(jié)果(outcome)
- 等待線程(waiters)
4 FutureTask的執(zhí)行過程
4.1 run
接下來開始看一個FutureTask的執(zhí)行過程串结,F(xiàn)utureTask執(zhí)行任務(wù)的方法當(dāng)然還是run方法:
public void run() {
//保證callable任務(wù)只被運(yùn)行一次
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//執(zhí)行任務(wù)
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
//判斷該任務(wù)是否正在響應(yīng)中斷,如果中斷沒有完成舅列,則等待中斷操作完成
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
- 如果狀態(tài)不為new或者運(yùn)行線程runner失敗肌割,說明當(dāng)前任務(wù)已經(jīng)被其他線程啟動或者已經(jīng)被執(zhí)行過,直接返回false
- 調(diào)用call方法執(zhí)行核心任務(wù)邏輯帐要。如果調(diào)用成功則執(zhí)行set(result)方法把敞,將state狀態(tài)設(shè)置成NORMAL。如果調(diào)用失敗拋出異常則執(zhí)行setException(ex)方法榨惠,將state狀態(tài)設(shè)置成EXCEPTIONAL奋早,喚醒所有在get()方法上等待的線程
- 如果當(dāng)前狀態(tài)為INTERRUPTING(步驟2已CAS失敗),則一直調(diào)用Thread.yield()直至狀態(tài)不為INTERRUPTING
4.2 set赠橙、setException方法
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
兩個方法的邏輯基本一致耽装,先通過CAS操作將狀態(tài)從NEW置為COMPLETING,然后再將最終狀態(tài)分別置為NORMAL或者EXCEPTIONAL期揪,最后再調(diào)用finishCompletion方法掉奄。
4.3 finishCompletion
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
//通過CAS把棧頂?shù)脑刂脼閚ull,相當(dāng)于彈出棧頂元素
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
finishCompletion的邏輯也比較簡單:
- 遍歷waiters鏈表凤薛,取出每一個節(jié)點(diǎn):每個節(jié)點(diǎn)都代表一個正在等待該FutureTask結(jié)果(即調(diào)用過get方法)的線程
- 通過 LockSupport.unpark(t)喚醒每一個節(jié)點(diǎn)姓建,通知每個線程诞仓,該任務(wù)執(zhí)行完成
4.4 get
在finishCompletion方法中,F(xiàn)utureTask會通知waiters鏈表中的每一個等待線程速兔,那么這些線程是怎么被加入到waiters鏈表中的呢狂芋?上文已經(jīng)講過,當(dāng)在一個線程中調(diào)用了get方法憨栽,該線程就會被加入到waiters鏈表中。所以接下來看下get方法:
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
get方法很簡答翼虫,主要就是調(diào)用awaitDone
方法:
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
//如果該線程執(zhí)行interrupt()方法屑柔,則從隊(duì)列中移除該節(jié)點(diǎn),并拋出異常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
//如果state狀態(tài)大于COMPLETING 則說明任務(wù)執(zhí)行完成珍剑,或取消
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
//如果state=COMPLETING掸宛,則使用yield,因?yàn)榇藸顟B(tài)的時(shí)間特別短招拙,通過yield比掛起響應(yīng)更快唧瘾。
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//構(gòu)建節(jié)點(diǎn)
else if (q == null)
q = new WaitNode();
//把當(dāng)前節(jié)點(diǎn)入棧
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
//如果需要阻塞指定時(shí)間,則使用LockSupport.parkNanos阻塞指定時(shí)間
//如果到指定時(shí)間還沒執(zhí)行完别凤,則從隊(duì)列中移除該節(jié)點(diǎn)饰序,并返回當(dāng)前狀態(tài)
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
//阻塞當(dāng)前線程
else
LockSupport.park(this);
}
}
整個方法的大致邏輯主要分為以下幾步:
- 如果當(dāng)前狀態(tài)值大于COMPLETING,說明已經(jīng)執(zhí)行完成或者取消规哪,直接返回
- 如果state=COMPLETING求豫,則使用yield,因?yàn)榇藸顟B(tài)的時(shí)間特別短诉稍,通過yield比掛起響應(yīng)更快
- 如果當(dāng)前線程是首次進(jìn)入循環(huán)蝠嘉,為當(dāng)前線程創(chuàng)建wait節(jié)點(diǎn)加入到waiters鏈表中
- 根據(jù)是否定時(shí)將當(dāng)前線程掛起(
LockSupport.parkNanos
LockSupport.park
)來阻塞當(dāng)前線程,直到超時(shí)或者線程被finishCompletion方法喚醒 - 當(dāng)線程掛起超時(shí)或者被喚醒后杯巨,重新循環(huán)執(zhí)行上述邏輯
get方法是FutureTask中的關(guān)鍵方法蚤告,了解了get方法邏輯也就了解為什么當(dāng)調(diào)用get方法時(shí)線程會被阻塞直到任務(wù)運(yùn)行完成。
4.5 cancel
cancel方法用于結(jié)束當(dāng)前任務(wù):
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
- 根據(jù)mayInterruptIfRunning是否為true服爷,CAS設(shè)置狀態(tài)為INTERRUPTING或CANCELLED杜恰,設(shè)置成功,繼續(xù)第二步层扶,否則返回false
- 如果mayInterruptIfRunning為true箫章,調(diào)用runner.interupt(),設(shè)置狀態(tài)為INTERRUPTED
- 喚醒所有在get()方法等待的線程