Callable接口
Java中的子線程通常是通過Thread
或者Runnable
的方式實(shí)現(xiàn)的伺帘,但是這種方式只能通過回調(diào),或者共享變量等方式來傳遞數(shù)據(jù)簸州,而Callable
則是可以獲取返回結(jié)果的一種子線程實(shí)現(xiàn)方式谨敛。
Callable是一個(gè)接口尊蚁,源碼如下:
public interface Callable<V> {
V call() throws Exception;
}
非常簡(jiǎn)單,只有一個(gè)方法吃嘿,和一個(gè)泛型V祠乃,所以我們創(chuàng)建Callable
對(duì)象的時(shí)候,也只需要指定返回類型并實(shí)現(xiàn)call
方法就可以了兑燥。
Future接口
看完了Callable
接口亮瓷,會(huì)發(fā)現(xiàn)它非常簡(jiǎn)單,沒有辦法在子線程中直接通過它來獲取到返回結(jié)果的降瞳,這時(shí)候就需要Future
發(fā)揮作用了嘱支。源碼如下:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
-
boolean cancel(boolean mayInterruptIfRunning)
在任務(wù)開始前取消,傳入值表示任務(wù)開始后是否允許打斷力崇,返回值表示是否取消成功(任務(wù)已經(jīng)開始不允許打斷斗塘,已經(jīng)運(yùn)行結(jié)束,已經(jīng)取消等等狀態(tài)會(huì)返回失斄裂ァ) -
boolean isCancelled()
是否已經(jīng)被取消 -
boolean isDone()
是否完成任務(wù) -
V get()
嘗試獲取返回結(jié)果馍盟,阻塞方法 -
V get(long timeout, TimeUnit unit)
同上,可以指定超時(shí)時(shí)間
可以看到茧吊,Future
實(shí)際上可以理解為Callable
的管理類贞岭。
在線程池中執(zhí)行任務(wù)時(shí),除了execute
方法之外搓侄,還有一個(gè)submit
方法:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
它返回的就是一個(gè)Future
對(duì)象瞄桨,可以通過它來回去Callable
任務(wù)的執(zhí)行結(jié)果:
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("Sub Thread is calculating...");
Thread.sleep(10000);
return 10;
}
};
Future<Integer> future = Executors.newCachedThreadPool().submit(callable);
try {
System.out.println("Main Thread start waiting result... ");
int res = future.get();
System.out.println("Main Thread get result: " + res);
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
//運(yùn)行結(jié)果:
Main Thread start waiting result...
Sub Thread is calculating...
Main Thread get result: 10
FutureTask
如果不用Java提供的線程池,直接用Thread
怎樣在子線程中運(yùn)行Callable
呢讶踪? 這時(shí)候就要用到FutureTask
類了芯侥。
FutureTask
實(shí)現(xiàn)了Future
和Runnable
接口,這就意味著乳讥,它既可以放在Thread
中去運(yùn)行柱查,又能夠?qū)θ蝿?wù)進(jìn)行管理,下面是源碼:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
可以看到云石,構(gòu)造函數(shù)中傳入了待運(yùn)行的任務(wù)Callable
對(duì)象或者Runnable
對(duì)象和指定的返回結(jié)果唉工,V result
用來指定運(yùn)行完成后的返回值,如果不想用指定值汹忠,可以用Future<?> f = new FutureTask<Void>(runnable, null)
來返回null淋硝。采用Callable構(gòu)造方法創(chuàng)建的FutureTask對(duì)象雹熬,執(zhí)行完畢返回的是實(shí)際運(yùn)算結(jié)果,而Runnable 構(gòu)造函數(shù)返回值是傳入的result谣膳。
task的狀態(tài)
FutureTask中持有的任務(wù)對(duì)象竿报,有以下幾種狀態(tài):
private static final int NEW = 0; //新建或運(yùn)行中
private static final int COMPLETING = 1;//任務(wù)運(yùn)行結(jié)束,正在處理一些后續(xù)操作
private static final int NORMAL = 2;//任務(wù)已經(jīng)完成参歹,COMPLETING的下一個(gè)狀態(tài)
private static final int EXCEPTIONAL = 3;//任務(wù)拋出異常仰楚,COMPLETING的下一個(gè)狀態(tài)
private static final int CANCELLED = 4;//任務(wù)被取消
private static final int INTERRUPTING = 5;//收到打斷指令,還沒有執(zhí)行interrupt
private static final int INTERRUPTED = 6;//收到打斷指令犬庇,也執(zhí)行了interrupt
可能的狀態(tài)變化主要有以下幾種:
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
task的執(zhí)行過程
FutureTask實(shí)現(xiàn)了Runnable接口僧界,是可以直接放在Thread中執(zhí)行的,實(shí)際上運(yùn)行的就是它的run方法:
public void run() {
//r如果當(dāng)前狀態(tài)不是NEW臭挽,說明任務(wù)已經(jīng)執(zhí)行完成了捂襟,直接返回
//如果當(dāng)前狀態(tài)是NEW,嘗試用CAS方式將當(dāng)前線程賦值給RUNNER欢峰,賦值前RUNNER的值應(yīng)該是null葬荷,否則賦值失敗
//賦值失敗表示已經(jīng)有線程執(zhí)行了run方法,直接返回
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//運(yùn)行中拋出異常
setException(ex);
}
//ran為true纽帖,說明正常運(yùn)行結(jié)束宠漩,得到了返回結(jié)果
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
執(zhí)行結(jié)果其實(shí)是比較簡(jiǎn)單的,通過RUNNER
來記錄執(zhí)行任務(wù)的線程懊直,從而保證只有一個(gè)線程可以執(zhí)行該任務(wù)扒吁。運(yùn)行結(jié)束后有兩個(gè)出口:
-
setException(ex);
運(yùn)行中出錯(cuò),拋出異常 -
set(result);
任務(wù)執(zhí)行完畢室囊,獲取到返回值
protected void setException(Throwable t) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = t;
U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
finishCompletion();
}
}
protected void set(V v) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = v;
U.putOrderedInt(this, STATE, NORMAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (U.compareAndSwapObject(this, WAITERS, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//喚醒等待線程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
這兩個(gè)方法實(shí)際上是一樣的雕崩,都是將狀態(tài)賦值為COMPLETING
,然后保存結(jié)果(運(yùn)行結(jié)果或錯(cuò)誤信息)融撞,再執(zhí)行finishCompletion
方法盼铁,通知WAITERS
里記錄的等待線程繼續(xù)執(zhí)行,并清空WAITERS
尝偎。
獲取返回結(jié)果
獲取返回結(jié)果是通過get()
方法:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// The code below is very delicate, to achieve these goals:
// - call nanoTime exactly once for each call to park
// - if nanos <= 0L, return promptly without allocation or nanoTime
// - if nanos == Long.MIN_VALUE, don't underflow
// - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
// and we suffer a spurious wakeup, we will do no worse than
// to park-spin for a while
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
Thread.yield();
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
else if (!queued)
//cas機(jī)制饶火,將新建節(jié)點(diǎn)q的next指向原來的節(jié)點(diǎn)workers,然后將workers更新為新建的節(jié)點(diǎn)致扯。workers(WAITERS)實(shí)際上就是持有了所有等待線程的一個(gè)鏈表
queued = U.compareAndSwapObject(this, WAITERS,
q.next = waiters, q);
else if (timed) {
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else
LockSupport.park(this);
}
}
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
get()
方法比較簡(jiǎn)單趁窃,先判斷當(dāng)前狀態(tài),如果狀態(tài) < COMPLETING
急前,說明任務(wù)沒有執(zhí)行完畢,直接調(diào)用awaitDone
方法瀑构。
awaitDone
方法可以接受兩個(gè)參數(shù)裆针,用來指定是否設(shè)置超時(shí)時(shí)間刨摩。它內(nèi)部是一個(gè)無限for循環(huán)。下面是awaitDone
方法的執(zhí)行步驟(忽略超時(shí)設(shè)置):
進(jìn)入
awaitDone
方法時(shí)世吨,state一定是小于COMPLETING
的澡刹,第一次會(huì)走else if (q == null)
分支,創(chuàng)建一個(gè)WaitNode()
對(duì)象用來保存當(dāng)前線程第二次循環(huán)q已經(jīng)不是null了耘婚,如果任務(wù)仍然沒有結(jié)束罢浇,會(huì)執(zhí)行
else if (!queued)
分支,queued
表示創(chuàng)建的WaitNode()
是否已經(jīng)添加到鏈表里沐祷,如果沒有嘗試添加嚷闭,直到添加成功為止。等待線程添加成功以后進(jìn)入下一個(gè)循環(huán)赖临,此時(shí)如果任務(wù)仍然沒有結(jié)束胞锰,會(huì)走到else分支,掛起當(dāng)前線程(阻塞)
此處阻塞的是等待結(jié)果的線程兢榨,也就是調(diào)用
FutureTask
的get()
方法的線程嗅榕,而不是執(zhí)行任務(wù)的線程。阻塞線程用的是LockSupport.park(this)
方法吵聪,喚醒的方法是LockSupport.unpark()
凌那,該方法在上邊的finishCompletion()
中出現(xiàn)了,也就是說吟逝,任務(wù)執(zhí)行結(jié)束(運(yùn)行完帽蝶,拋出異常,被取消)時(shí)澎办,等待的線程才會(huì)被喚醒嘲碱,繼續(xù)下一次循環(huán)。任務(wù)結(jié)束以后局蚀,如果state是
COMPLETING
狀態(tài)麦锯,說明一些清理任務(wù)還沒有執(zhí)行完,等待的線程會(huì)讓出cpu琅绅,讓其他線程優(yōu)先執(zhí)行直到state 大于
COMPLETING
扶欣,說明FutureTask
已經(jīng)完全結(jié)束了,此時(shí)會(huì)會(huì)執(zhí)行(s > COMPLETING)
分支千扶,把節(jié)點(diǎn)置空料祠,并返回。
awaitDone
返回以后澎羞,說明任務(wù)已經(jīng)執(zhí)行完成了髓绽,會(huì)進(jìn)入report
方法:
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
可以看到,如果是正常結(jié)束妆绞,或者拋出異常結(jié)束顺呕,會(huì)返回結(jié)果枫攀,而如果是被取消,則會(huì)拋出異常株茶。
使用
Callable<Integer> call = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("正在計(jì)算結(jié)果...");
Thread.sleep(3000);
return 1;
}
};
FutureTask<Integer> task = new FutureTask<>(call);
Thread thread = new Thread(task);
thread.start();
Integer result = task.get();
System.out.println("結(jié)果為:" + result);
總結(jié)
FutureTask
可以視為一個(gè)管理Callable
任務(wù)的工具類来涨,執(zhí)行Callable
任務(wù)的是FutureTask
的run
方法,所以启盛,可以通過new Thread(futuretask)
的方法來實(shí)現(xiàn)子線程執(zhí)行任務(wù)獲取執(zhí)行結(jié)果是通過
FutureTask
的get
方法蹦掐,調(diào)用該方法后,如果線程會(huì)被掛起僵闯,知道任務(wù)結(jié)束為止獲取結(jié)果的線程數(shù)量沒有限定卧抗,可以是任意個(gè)線程
獲取結(jié)果的線程被掛起以后,可以通過取消棍厂,超時(shí)等方法在任務(wù)執(zhí)行完畢以前結(jié)束掛起狀態(tài)颗味。