Future接口
Future接口被設(shè)計用來代表一個異步操作的執(zhí)行結(jié)果。你可以用它來獲取一個操作的執(zhí)行結(jié)果楣铁,取消一個操作,判斷一個操作是否已經(jīng)完成或者是否被取消
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
boolean isDone();
Future接口一共定義了五個方法:
- get()
該方法用來獲取執(zhí)行結(jié)果更扁,如果任務(wù)還在執(zhí)行中盖腕,就阻塞等待; - get(long timeout,TimeUnit unit)
該方法同get方法類似浓镜,所不同的是溃列,它最多等待指定的時間。如果指定的時間內(nèi)任務(wù)沒有完成膛薛,則會拋出TimeoutException異常听隐; - cancel(boolean mayInterruptIfRunnintg)
該方法用來嘗試取消一個任務(wù)的執(zhí)行,它的返回值是boolean類型哄啄,表示取消操作是否成功雅任。 - isCancelled()
該方法用來判斷任務(wù)是否被取消了。如果一個任務(wù)在正常執(zhí)行完成之前被cancel掉了咨跌,則返回true - isDone()
如果一個任務(wù)已經(jīng)結(jié)束沪么,則返回true。注意锌半,這里的任務(wù)結(jié)束包含了以下三種情況:任務(wù)正常執(zhí)行完畢禽车,任務(wù)拋出了異常,任務(wù)已經(jīng)被取消。
關(guān)于cancel 方法哭当,這里要補充說幾點:
首先有以下三種情況之一的猪腕,cancel操作一定是失敗的:
1.任務(wù)已經(jīng)執(zhí)行完成了
2.任務(wù)已經(jīng)被取消了
3.任務(wù)因為某種原因不能被取消
其他情況下,cancel操作將返回true钦勘。值得注意的是陋葡,cancel操作返回true并不代表任務(wù)真的就是被取消了,這取決于發(fā)送cancel狀態(tài)時任務(wù)所處的狀態(tài):
1.如果發(fā)起cancel時任務(wù)還沒有正式開始運行彻采,則隨后任務(wù)就不會被執(zhí)行
2.如果發(fā)起cancel時任務(wù)已經(jīng)在運行了腐缤,則這時就需要看mayInterruptIfRunning參數(shù)了:
- 如果mayInterruptIfRunning為true,則當(dāng)前在執(zhí)行的任務(wù)會被中斷
- 如果mayInterruptIfRunning為false肛响,則可以允許正在執(zhí)行的任務(wù)繼續(xù)運行岭粤,直到它執(zhí)行完畢
RunnableFuture接口
RunnableFuture接口人如其名,就是同時實現(xiàn)了Runnable接口和Future接口特笋;
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
有的同學(xué)可能會對這個接口產(chǎn)生疑惑剃浇,既然已經(jīng)繼承了Runnable,該接口自然就繼承了run方法猎物,為什么要在該接口的內(nèi)部再寫一個run方法了虎囚?
單純從理論上來說,這里確實是沒有必要的蔫磨,再多寫一遍淘讥,我覺得大概就是為了看上去直觀一點,便于文檔或者UML圖展示堤如。
在使用ThreadPoolExecutor的submit提交任務(wù)交給線程池中的線程去執(zhí)行蒲列。有幾個submit方法
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
在submit方法中,我們看見有一行代碼:
RunnableFuture<T> ftask = newTaskFor(task);
這行代碼的功能:對我們的task進(jìn)行了類型轉(zhuǎn)行搀罢,task類型是Runnable/Callable,轉(zhuǎn)化為了一個RunnableFuture對象蝗岖。
根據(jù)task類型由于有兩種Runnable/Callable,分別有兩種不同的重載方法newTaskFor魄揉。如下:
/**
* Returns a {@code RunnableFuture} for the given runnable and default
* value.
*
* @param runnable the runnable task being wrapped
* @param value the default value for the returned future
* @param <T> the type of the given value
* @return a {@code RunnableFuture} which, when run, will run the
* underlying runnable and which, as a {@code Future}, will yield
* the given value as its result and provide for cancellation of
* the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
/**
* Returns a {@code RunnableFuture} for the given callable task.
*
* @param callable the callable task being wrapped
* @param <T> the type of the callable's result
* @return a {@code RunnableFuture} which, when run, will call the
* underlying callable and which, as a {@code Future}, will yield
* the callable's result as its result and provide for
* cancellation of the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
從newTaskFor函數(shù)中可以看到剪侮,就是直接調(diào)用了FutureTask的有參構(gòu)造函數(shù)拭宁。
FutureTask是繼承了RunnableFuture類來實現(xiàn)的洛退。如下:
public class FutureTask<V> implements RunnableFuture<V>
下面來看下RunnableFuture類的內(nèi)容,如下:
/**
* A {@link Future} that is {@link Runnable}. Successful execution of
* the {@code run} method causes completion of the {@code Future}
* and allows access to its results.
* @see FutureTask
* @see Executor
* @since 1.6
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
//作為Runnable的Future杰标。成功執(zhí)行run方法可以完成Future并允許訪問其結(jié)果
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
RunnableFuture接口比較簡單兵怯,繼承了Runnable,F(xiàn)uture接口腔剂。并只有一個run方法媒区。
下面我們下看下FutureTask類的構(gòu)造方法內(nèi)部實現(xiàn):
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Callable}.
*
* @param callable the callable task
* @throws NullPointerException if the callable is null
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Runnable}, and arrange that {@code get} will return the
* given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion. If
* you don't need a particular result, consider using
* constructions of the form:
* {@code Future<?> f = new FutureTask<Void>(runnable, null)}
* @throws NullPointerException if the runnable is null
*/
//創(chuàng)建一個FutureTask對象,執(zhí)行的還是里面所包含的Runnable對象 如果Runnable對象正常執(zhí)行完成,則FutureTask對象調(diào)用get方法的時候就會得到結(jié)果result
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
在第二個構(gòu)造函數(shù)中袜漩,我們看到了
this.callable=Executors.callable(runable,result);
這行代碼是將Runnable類型轉(zhuǎn)換為了Callable類型绪爸。因此,我們可以看下Executors.callable(runnable,result)方法的實現(xiàn)宙攻,到底是如何轉(zhuǎn)行的呢?
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
將Runnable適配為了一個Callable對象奠货,轉(zhuǎn)化為的對象雖然是Callable對象了,但是調(diào)用此對象的call方法其實就是調(diào)用了Runnable接口的run方法并且返回值是null座掘。
看下RunnableAdapter類递惋,此類實現(xiàn)了Callable接口.
/**
* A callable that runs given task and returns given result.
*/
private static final class RunnableAdapter<T> implements Callable<T> {
private final Runnable task;
private final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
可以明顯看出來,這個方法采用了設(shè)計模式種的適配器模式溢陪,將一個Runnable類型對象適配成Callable類型萍虽。
因為Runnable接口沒有返回值,所以為了與Callable兼容形真,我們額外傳入了一個result參數(shù)杉编,使得返回的Callable對象的call方法直接執(zhí)行Runnable的run方法,然后返回傳入的result參數(shù)咆霜。
有些人可能會有這個疑問王财,你把result參數(shù)傳進(jìn)去,又原封不動的返回出來裕便,有什么意義呀绒净?這樣做確實沒有什么意義,result參數(shù)的存在只是為了將一個Runnable對象適配成Callable類型偿衰。
總結(jié)一下上面的邏輯:
- 首先挂疆,在我們寫程序的時候,我們可能在線程池中的submit方法來提交一個任務(wù)task下翎,這個task可能是runnable對象缤言,也可能是callable對象。為了方便處理视事,我們需要將Runnable胆萧,Callable統(tǒng)一起來,因此借助了Executors類中的RunnableAdapter類將Runnable對象適配為了一個Callable對象俐东。
- 而submit方法要求返回一個Future對象跌穗,我們可以通過這個對象來獲取任務(wù)的運行結(jié)果。而FutureTask作為Future的實現(xiàn)類實現(xiàn)了對象任務(wù)task的封裝虏辫,并且可以通過封裝后的對象獲取返回值蚌吸。
FutureTask
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>
FutureTask,人如其名砌庄,F(xiàn)utureTask包含了Future和Task兩部分羹唠。
FutureTask實現(xiàn)了RunnableFuture接口奕枢,即Runnable接口和Future接口,其中Runnable接口對應(yīng)了FutureTask名字種的Task
,代表FutureTask本質(zhì)上也是一個任務(wù)佩微。而Future接口就對應(yīng)了FutureTask名字種的Future缝彬,表示了我們對于這個任務(wù)可以執(zhí)行某些操作,列如:判斷任務(wù)是否執(zhí)行完畢哺眯,獲取任務(wù)的執(zhí)行結(jié)果跌造,取消執(zhí)行的任務(wù)等。
所以簡單來說族购,F(xiàn)utureTask本質(zhì)上就是一個"Task",我們可以把它當(dāng)作簡單的Runnable對象來使用壳贪。但是它又同時實現(xiàn)了Future接口,因此我們可以對它所代表的"Task"進(jìn)行額外的控制操作寝杖。
Java并發(fā)工具類的三板斧
狀態(tài)
隊列
CAS
狀態(tài)
首先是找狀態(tài)违施,在FutureTask中,狀態(tài)是由state屬性來表示瑟幕,主要有:
1.NEW(開始狀態(tài))
2.COMPLETING(正在運行狀態(tài))
3.NORMAL(正常運行完結(jié)束狀態(tài))
4.EXCEPTIONAL(異常狀態(tài))
5.CANCELLED(取消狀態(tài))
6.INTERRUPTING(中斷)
7.INTERRUPTED(中斷結(jié)束狀態(tài))
源碼中對這幾個狀態(tài)以及狀態(tài)之間的轉(zhuǎn)換關(guān)系如下所示:
/**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
初始態(tài) 中間態(tài) 終止態(tài)
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
任務(wù)的初始狀態(tài)都是
NEW
磕蒲,這一點是構(gòu)造函數(shù)保證的。任務(wù)的終止?fàn)顟B(tài)有4種:
-NORMAL
:任務(wù)正常執(zhí)行完畢
-EXCEPTIONAL
:任務(wù)執(zhí)行過程中發(fā)生異常
-CANCELLED
:任務(wù)被取消
-INTERRUPTED
:任務(wù)被中斷-
任務(wù)的中間狀態(tài)有兩種:
-
COMPLETING
正在設(shè)置任務(wù)結(jié)果 -
INTERRUPTING
正在中斷運行任務(wù)的線程
值得一提的是只盹,任務(wù)的中間狀態(tài)是一個瞬時的辣往,它非常短暫,而且任務(wù)的中間態(tài)并不代表任務(wù)正在執(zhí)行殖卑,而是任務(wù)已經(jīng)執(zhí)行完了站削,正在設(shè)置最終的返回結(jié)果,
,所以可以這么說:
-
只要state 不處于
NEW
狀態(tài),就說明任務(wù)已經(jīng)執(zhí)行完畢了孵稽。
注意许起,這里的執(zhí)行完畢就是指傳入的Callable對象的call方法執(zhí)行完畢,或者拋出異常菩鲜,所以這里的COMPLETING
的名字顯得有點迷惑园细,它并不意味著任務(wù)正在執(zhí)行中,而意味著call方法執(zhí)行完畢接校,正在設(shè)置任務(wù)執(zhí)行的結(jié)果猛频。
而將一個任務(wù)的狀態(tài)設(shè)置為終止態(tài)只有三種方法:
- set
- setException
- cancel
隊列
接著我們來看隊列,在FutureTask中蛛勉,隊列的實現(xiàn)是一個單向的鏈表鹿寻,它表示所有等待任務(wù)執(zhí)行完畢的線程的集合。
我們知道董习,F(xiàn)utureTask實現(xiàn)了Future接口烈和,可以獲取"Task"的執(zhí)行結(jié)果,那么如果獲取結(jié)果時皿淋,任務(wù)還沒有執(zhí)行完畢怎么辦呢招刹?那么獲取結(jié)果的線程就會在一個等待隊列中掛起,直到任務(wù)執(zhí)行完畢被喚醒窝趣。這一點有點類似我們之前學(xué)習(xí)的AQS中的sync queue
疯暑。
我們之前說過,在并發(fā)編程中使用隊列通常是將當(dāng)前線程包裝成某種類型的數(shù)據(jù)結(jié)構(gòu)仍到等待隊列中哑舒,我們先來看看隊列中的每一個節(jié)點是怎樣的結(jié)構(gòu):
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
可見妇拯,相比于AQS的sync queue
所使用的雙向鏈表中的Node,這個WaitNode要簡單多了洗鸵,它只包含了一個記錄線程的thread
屬性和指向下一個節(jié)點的next
屬性越锈。FutureTask中的這個單向鏈表是當(dāng)做棧
來使用的。它是一個線程安全的棧膘滨,為啥要使用一個線程安全的棧呢甘凭?因為同一時刻可能有多個線程都在獲取任務(wù)的執(zhí)行結(jié)果,如果任務(wù)還在執(zhí)行過程中火邓,則這些線程就要被包裝成WaitNode扔到棧頂丹弱,即完成入棧操作,這樣就有可能會出行多個線程同時入棧的情況铲咨,因此需要使用CAS操作保證入棧的線程安全躲胳,對于出棧也是同理。
由于FutureTask中的隊列本質(zhì)上是一個棧纤勒,那么使用這個隊列就只需要一個指向棧節(jié)點的指針就行了坯苹,在FutureTask中,就是waiters
屬性;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
事實上摇天,它就是整個單向鏈表的頭節(jié)點北滥,
綜上,F(xiàn)utureTask中所使用的隊列結(jié)構(gòu)如下:
CAS操作
CAS操作大多數(shù)是用來改變狀態(tài)的闸翅,在FutureTask中也不例外再芋。我們一般在靜態(tài)代碼塊中初始化需要CAS操作的屬性和偏移量
// Unsafe mechanics
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long STATE;
private static final long RUNNER;
private static final long WAITERS;
static {
try {
STATE = U.objectFieldOffset
(FutureTask.class.getDeclaredField("state"));
RUNNER = U.objectFieldOffset
(FutureTask.class.getDeclaredField("runner"));
WAITERS = U.objectFieldOffset
(FutureTask.class.getDeclaredField("waiters"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
// Reduce the risk of rare disastrous classloading in first call to
// LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
Class<?> ensureLoaded = LockSupport.class;
}
從這個靜態(tài)代碼塊中我們也可以看出溶弟,CAS操作主要針對3個屬性攘残,包括state
,runner
,和waiters
,說明這3個屬性基本是會被多個線程同時訪問的易茬。其中state
屬性代表了任務(wù)的狀態(tài)神凑,waiters
屬性代表了指向棧頂節(jié)點的指針鹊奖,這兩個我們上面分析了密任。runner
屬性代表了執(zhí)行FutureTask中的"Task"的線程刁岸。為什么需要一個屬性來記錄執(zhí)行任務(wù)的線程呢独郎?這是為了中斷或者取消任務(wù)做準(zhǔn)備的液南,只有知道了執(zhí)行任務(wù)的線程是誰壳猜,我們才能區(qū)中斷它。
FutureTask類run方法
public void run() {
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
/**
* Ensures that any interrupt from a possible cancel(true) is only
* delivered to a task while in run or runAndReset.
*/
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
首先我們看到滑凉,在run方法的一開始统扳,就檢查當(dāng)前狀態(tài)是不是NEW喘帚,并且使用CAS操作將runner
屬性設(shè)置為當(dāng)前線程,即記錄執(zhí)行任務(wù)的線程咒钟〈涤桑可見,runner
屬性是在運行時被初始化的朱嘴。
接下來倾鲫,我們就調(diào)用了Callable對象的call方法來執(zhí)行任務(wù),如果任務(wù)執(zhí)行成功萍嬉,就使用set(result)
設(shè)置結(jié)果乌昔,否則,用setException(ex)
設(shè)置拋出的異常壤追。
我們先來看看set(result)
方法:
/**
* Sets the result of this future to the given value unless
* this future has already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon successful completion of the computation.
*
* @param v the value
*/
protected void set(V v) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = v;
U.putOrderedInt(this, STATE, NORMAL); // final state
finishCompletion();
}
}
這個方法一開始通過CAS操作將state
屬性由原理的NEW
狀態(tài)修改為COMPLETING
狀態(tài)磕道,我們一開始介紹state狀態(tài)的時候說過,COMPLETING
是一個非常短暫的中間態(tài)大诸,表示正在設(shè)置執(zhí)行的結(jié)果捅厂。
狀態(tài)設(shè)置成功后,我們就把任務(wù)執(zhí)行結(jié)果賦值給outcome资柔,然后直接把state狀態(tài)設(shè)置為NORMAL
,注意焙贷,這里是直接設(shè)置,沒有先比較再設(shè)置的操作贿堰,由于state屬性被設(shè)置成volatile辙芍,保證了state狀態(tài)對其他線程的可見性。在這之后羹与,我們調(diào)用了finishCompletion
來完成執(zhí)行結(jié)果的設(shè)置故硅。
接下來我們再來看看發(fā)生了異常的版本setException(ex)
;
protected void setException(Throwable t) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = t;
U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
finishCompletion();
}
}
可見,除了將outcome屬性賦值為異常對象纵搁,以及將state的終止?fàn)顟B(tài)修改為EXCEPTIONAL
吃衅,其余都和set方法類似。在方法的最后腾誉,都調(diào)用了finishCompletion()
來完成執(zhí)行結(jié)果的設(shè)置徘层。那么我們就來看看finishCompletion()
干了點啥:
/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (U.compareAndSwapObject(this, WAITERS, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
這個方法事實上完成了一個善后工作。我們先來看看if條件語句中的CAS操作:
U.compareAndSwapObject(this, WAITERS, q, null)
該方法是將waiters
屬性的值由原值設(shè)置為null利职,我們知道,waiters
屬性指向了棧的棧頂節(jié)點趣效,可以說是代表了整個棧,將該值設(shè)為null的目的就是清空整個棧猪贪。如果設(shè)置不成功跷敬,則if語句塊不會被指向,又進(jìn)行下一輪for循環(huán)热押,而下一輪for循環(huán)的判斷條件又是waiters!=null
,由此我們知道西傀,雖然最外層的for循環(huán)乍一看好像是什么遍歷節(jié)點的操作斤寇,其實只是為了確保waiters屬性被成功設(shè)置為null,本質(zhì)上相當(dāng)于一個自旋操作池凄。
將waiters屬性設(shè)置為null以后抡驼,接下來for(;;)
死循環(huán)才是真正的遍歷節(jié)點鬼廓,可以看出肿仑,循環(huán)內(nèi)部就是一個普通的遍歷鏈表的操作,這個循環(huán)的作用也正式遍歷鏈表中所有等待的線程碎税,并喚醒他們尤慰。
最后我們來看看finally塊
finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
在finally塊中,我們將runner屬性置為null雷蹂,并且檢查有沒有遺漏的中斷伟端,如果發(fā)現(xiàn)s>INTERRUPTING
說明執(zhí)行任務(wù)的線程有可能被中斷了,因為s>=INTERRUPTING
只有兩種可能匪煌,state狀態(tài)為INTERRUPTING
和INTERRUPTED
责蝠。
有的人就要問了,我們已經(jīng)執(zhí)行過set方法或者setException放不是已經(jīng)將state狀態(tài)設(shè)置成NORMAL
或者EXCEPTIONAL
了嗎萎庭?怎么會出行INTERRUPTING
或者INTERRUPTED
狀態(tài)呢霜医?別忘了,咱們在多線程的環(huán)境中驳规,在當(dāng)前線程執(zhí)行run方法的同時肴敛,有可能其他線程取消了任務(wù)的執(zhí)行,此時線程就可能對state狀態(tài)進(jìn)行改寫吗购,這也就是我們在設(shè)置終止?fàn)顟B(tài)的時候用putOrderedInt
方法医男,而沒有用CAS操作的原因--我們無法確信在設(shè)置state前是處于COMPLETING
中間態(tài)還是INTERRUPTING
中間態(tài)。
run方法重點做了以下幾件事:
1.將runner屬性設(shè)置為當(dāng)前正在執(zhí)行run方法的線程
2.調(diào)用callable成員變量的call方法來執(zhí)行任務(wù)
3.設(shè)置執(zhí)行結(jié)果outcome捻勉,如果執(zhí)行成功镀梭,則outcome保存的就是執(zhí)行結(jié)果,如果執(zhí)行過程中發(fā)生了異常踱启,則outcome中保存的就是異常报账,在設(shè)置結(jié)果之前,先將state狀態(tài)設(shè)為中間態(tài)
4.對outcome的賦值完成后禽捆,設(shè)置state狀態(tài)為終止態(tài)(NORAML
或者EXCEPTIONAL
)
5.喚醒Treiber棧中所有等待的線程
6.善后清理(waiters,callable,runner設(shè)為null)
7,檢查是否有遺漏的中斷笙什,如果有,等待中斷狀態(tài)完成胚想。
這里再說一句琐凭,我們前面說的state只要不是NEW狀態(tài),就說明任務(wù)已經(jīng)執(zhí)行完成了
就體現(xiàn)再這里浊服,因為run方法中统屈,我們是在c.call()
執(zhí)行完畢后或者拋出異常之后才開始設(shè)置中間態(tài)和終止態(tài)的胚吁。
cancel(boolean mayInterruptIfRunning)
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
U.compareAndSwapInt(this, STATE, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
U.putOrderedInt(this, STATE, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
首先,對于任務(wù)已經(jīng)執(zhí)行完成了或者任務(wù)已經(jīng)被取消過了愁憔,則cancel操作一定是失敗的(返回false)腕扶,這兩條,是通過簡單判斷state值是否為NEW
實現(xiàn)的吨掌,因為我們前面說過半抱,只要state不為NEW,說明任務(wù)已經(jīng)執(zhí)行完畢了膜宋。從代碼中也可以看出窿侈,只要state部位NEW,則直接返回false秋茫。
如果state還是NEW狀態(tài)史简,我們再往下看:
U.compareAndSwapInt(this, STATE, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)
這一段根據(jù)mayInterruptIfRunning
的值將state的狀態(tài)由 NEW
設(shè)置為INTERRUPTING
或者CANCELLED
,當(dāng)操作成功之后肛著,就可以執(zhí)行后面的try語句圆兵,但無論怎么,該方法最后都返回了true枢贿。
我們再接著看try快干了啥:
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
U.putOrderedInt(this, STATE, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
我們知道殉农,runner
屬性中存放的是當(dāng)前正在執(zhí)行任務(wù)的線程,因此萨咕,這個try快的目的就是中斷當(dāng)前正在執(zhí)行任務(wù)的線程统抬,最后將state的狀態(tài)設(shè)置為INTERRUPTED
,當(dāng)然危队,中斷操作完成后聪建,還需要通過finishCompleted()
來喚醒所有再棧中等待的線程。
我們現(xiàn)在總結(jié)一下茫陆,cancel方法實際上完成以下兩種狀態(tài)轉(zhuǎn)換之一:
1.NEW-> CANCELLED
(對應(yīng)于 mayInterruptIfRunning=false)
2.NEW->INTERRUPTING->INTERRUPTED
(對應(yīng)于mayInterruptIfRunning=true)
對于第一條路徑金麸,雖說cancel方法最終返回了true,但它只是簡單的把state狀態(tài)設(shè)為CANCELLED簿盅,并不會中斷線程的執(zhí)行挥下。但是這樣帶來的后果是,任務(wù)即使執(zhí)行完畢了桨醋,也無法設(shè)置任務(wù)的執(zhí)行結(jié)果棚瘟,因為前面分析run方法的時候我們知道,設(shè)置任務(wù)結(jié)果有一個中間狀態(tài)喜最,而這個中間態(tài)的設(shè)置偎蘸,是以當(dāng)前state狀態(tài)為NEW為前提的。
對于第二條路徑,則會中斷執(zhí)行任務(wù)的線程迷雪。但是響不響應(yīng)這個中斷是由執(zhí)行任務(wù)的線程自己決定的限书,更具體的說,這取決于c.call()
方法內(nèi)部是否對中斷進(jìn)行了響應(yīng)章咧,是否將中斷異常拋出倦西。
那call方法中是怎么處理中斷的呢?從上面的代碼中可以看出,catch語句處理了所有的Throwable
的異常赁严,這自然也包括了中斷異常扰柠。
然而,值得一提的是误澳,即使這里進(jìn)入了catch(Throwable ex){}
代碼塊耻矮,setException(ex)
的操作一定是失敗的秦躯,因為再我們?nèi)∠蝿?wù)執(zhí)行的線程中忆谓,我們已經(jīng)先把state狀態(tài)設(shè)為了INTERRUPTING
了,而setException(ex)
的操作要求設(shè)置前線程的狀態(tài)為NEW
踱承。所以這里響應(yīng)cancel方法方法所造成的中斷最大的意義不是為了對中斷進(jìn)行處理倡缠,而是簡單的停止任務(wù)線程的執(zhí)行,節(jié)省CPU資源茎活。
那么有人要問了昙沦,既然這個setException(ex)
的操作一定是失敗的,那放在這里有什么用呢载荔?事實上盾饮,這個setException(ex)
是用來處理任務(wù)自己再正常執(zhí)行過程中產(chǎn)生的異常的,在我們沒有主動去cancel任務(wù)時懒熙,任務(wù)的state狀態(tài)在執(zhí)行過程中就回始終時NEW
丘损,如果任務(wù)此時自己發(fā)生了異常,則這個異常就會被setException(ex)
方法成功的記錄到outcome
中工扎。
反正無論如何徘钥,run方法最終都會進(jìn)入finally塊,而這時候它會發(fā)現(xiàn)s>=INTERRUPTING
,如果檢測發(fā)現(xiàn)s=INTERRUPTING
,說明cancel方法還沒有執(zhí)行到中斷當(dāng)前線程的地方肢娘,那就等待它將state狀態(tài)設(shè)置成INTERUPTED
呈础。到這里,對cancel方法的分析就和上面對run方法的分析對接上了橱健。
isCancelled()
說完了cancel而钞,我們再來看看isCancelled()方法,相較而言拘荡,它就簡單多了:
public boolean isCancelled() {
return state >= CANCELLED;
}
那么state>=CANCELLED
包含了那些狀態(tài)呢,它包括了:CANCELLED INTERRUPTING INTERRUPTED
isDone()
與isCancelled方法類似臼节,isDone方法也是簡單地通過state狀態(tài)來判斷。
public boolean isDone() {
return state != NEW;
}
關(guān)于這一點,其實我們之前已經(jīng)說過了官疲,只要state狀態(tài)不是NEW袱结,則任務(wù)已經(jīng)執(zhí)行完畢了,因為state狀態(tài)不存在類似任務(wù)正在執(zhí)行中這種狀態(tài)途凫,即使是短暫的中間態(tài)垢夹,也是發(fā)生在任務(wù)已經(jīng)執(zhí)行完畢,正在設(shè)置結(jié)果的時候维费。
FutureTask類中的get方法介紹
當(dāng)我們使用如下代碼的時候:
Future f=pool.submit(new Runnable(){});
....
Object obj=f.get();//獲取任務(wù)的返回結(jié)果
Future中的get方法獲取結(jié)果時里面的內(nèi)部實現(xiàn)時怎么樣的呢果元?我們一起來看看:
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
//說明任務(wù)還正在執(zhí)行,需要等待
//返回值為任務(wù)執(zhí)行后的狀態(tài)值犀盟,可能時正常執(zhí)行完而晒,也可能時中斷拋出異常返回,也可能是超時返回
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
/**
* Awaits completion or aborts on interrupt or timeout.
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion or at timeout
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// The code below is very delicate, to achieve these goals:
// - call nanoTime exactly once for each call to park
// - if nanos <= 0L, return promptly without allocation or nanoTime
// - if nanos == Long.MIN_VALUE, don't underflow
// - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
// and we suffer a spurious wakeup, we will do no worse than
// to park-spin for a while
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
if (s > COMPLETING) { //執(zhí)行完畢了
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) //正在執(zhí)行
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
Thread.yield(); //等待執(zhí)行完畢
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
else if (!queued)
queued = U.compareAndSwapObject(this, WAITERS,
q.next = waiters, q);
else if (timed) {//檢查等待是否超時了阅畴,如果超時倡怎,則返回此時的狀態(tài),否則繼續(xù)等待掛起
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) { //已經(jīng)超時 返回此時的狀態(tài)
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else
LockSupport.park(this);//上面條件如果都不滿足贱枣,則喚醒該線程
}
}
/**
* Returns result or throws exception for completed task.
*
* @param s completed state value
*/
//如果正常執(zhí)行完监署,則返回結(jié)果,否則根據(jù)任務(wù)的狀態(tài)拋出相應(yīng)的異常
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
有一點我們先特別說明一下纽哥,F(xiàn)utureTask中會涉及到兩類線程钠乏,一類是執(zhí)行任務(wù)的線程,它只有一個,FutureTask的run方法就由該線程來執(zhí)行春塌,一類是獲取任務(wù)執(zhí)行結(jié)果的線程晓避,它可以有多個,這些線程可以并發(fā)執(zhí)行只壳,每一個線程都是獨立的俏拱,都可以調(diào)用get方法來獲取任務(wù)的執(zhí)行結(jié)果。
如果任務(wù)還沒有執(zhí)行完吕世,則這些線程就需要進(jìn)入棧中掛起彰触,直到任務(wù)執(zhí)行結(jié)束,或者等待的線程自身被中斷命辖。
...............
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
//.................
}
我們先檢測當(dāng)前線程是否被中斷了况毅,這是因為get方法是阻塞式的,如果等待的任務(wù)還沒有執(zhí)行完尔艇,則調(diào)用get方法的線程會被仍到棧中掛起等待尔许,直到任務(wù)執(zhí)行完畢。但是终娃,如果任務(wù)遲遲沒有執(zhí)行完畢味廊,則我們也有可能直接中斷在棧中的線程,以停止等待。
當(dāng)檢測到線程被中斷后余佛,我們調(diào)用了removeWaiter:
- 如果任務(wù)已經(jīng)進(jìn)入終止態(tài)(
S>COMPLETING
),我們就直接返回任務(wù)的狀態(tài)柠新; - 否則,如果任務(wù)正在設(shè)置執(zhí)行結(jié)果(
s==COMPLETING
),我們就讓出當(dāng)前線程的CPU資源繼續(xù)等待 - 否則辉巡,就說明任務(wù)還沒有執(zhí)行恨憎,或者任務(wù)正在執(zhí)行過程中,那么這時郊楣,如果q現(xiàn)在還為null憔恳,說明當(dāng)前線程還沒有進(jìn)入等待隊列,于是我們新建了一個
WaitNode
净蚤,WaitNode
的構(gòu)造函數(shù)我們之前已經(jīng)看過了钥组,就是生成一個記錄了當(dāng)前線程的節(jié)點; - 如果q不為null今瀑,說明代表當(dāng)前線程的WaitNode已經(jīng)被創(chuàng)建出來了程梦,則接下來如果
queued=false
,表示當(dāng)前線程還沒有入隊,所以我們執(zhí)行入隊操作放椰。 - 如果以上的條件都不滿足作烟,則接下來因為現(xiàn)在是不帶超時機制的get,timed為false,則
else if
代碼塊跳過,然后來到最后一個else砾医,把當(dāng)前線程掛起,此時線程就處于阻塞等待的狀態(tài)衣厘。
至此如蚜,在任務(wù)沒有執(zhí)行完畢的情況下,獲取任務(wù)執(zhí)行結(jié)果的線程就會在棧中被LockSupport.park(this)
掛起影暴。
那么這個掛起的線程什么時候會被喚醒呢错邦?有兩種情況:
1.任務(wù)執(zhí)行完畢了,在finishCompletion
方法中會喚醒所有的在棧中等待的線程型宙。
2.等待的線程自身因為被中斷等原因還被喚醒撬呢。
至此我們知道,除非被中斷妆兑, 否則get方法會在原地自旋等待或者直接掛起(對應(yīng)任務(wù)還沒有執(zhí)行完的情況)魂拦,知道任務(wù)執(zhí)行完成。前面分析run方法和cancel方法的時候知道搁嗓,在run方法結(jié)束后芯勘,或者cancel方法取消完成后,都會調(diào)用finishCompletion()
來喚醒掛起的線程腺逛,使他們得以進(jìn)入下一輪循環(huán)荷愕,獲取任務(wù)執(zhí)行結(jié)果。
最后,等awaitDone函數(shù)返回后安疗,get方法返回了report(s)
抛杨,根據(jù)任務(wù)的狀態(tài),匯報執(zhí)行結(jié)果:
/**
* Returns result or throws exception for completed task.
*
* @param s completed state value
*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
report方法非常簡單荐类,它根據(jù)當(dāng)前state狀態(tài)蝶桶,返回正常執(zhí)行的結(jié)果,或者拋出指定的異常掉冶。
值得注意的是真竖,awiteDone
方法和get方法都沒有加鎖,這在多個線程同時執(zhí)行g(shù)et方法的時候會不會產(chǎn)生線程安全問題呢厌小?通過查看方法內(nèi)部的參數(shù)我們知道恢共,整個方法內(nèi)部用的大多數(shù)是局部變量,因此不會產(chǎn)生線程安全問題璧亚,對于全局的共享變量waiters
的修改時讨韭,也使用了CAS操作,保證了線程安全癣蟋,而state變量本身時volatile的透硝,保證了讀取時的可見性,因此整個方法調(diào)用雖然沒有加鎖疯搅,它仍然時線程安全 的濒生。
總結(jié)
Runnable對象可以通過RunnableAdapter適配器適配到Callable對象
Future對象可以通過get方法獲取任務(wù)的返回值
FutureTask可以簡單來看是對任務(wù)Runnable/Callable的封裝。
FutureTask實現(xiàn)了Runnable和Future接口幔欧,它表示了一個帶有任務(wù)狀態(tài)和任務(wù)結(jié)果的任務(wù)罪治,它的各種操作都是圍繞著任務(wù)的狀態(tài)展開的,值得注意的是礁蔗,在所有7個任務(wù)狀態(tài)中觉义,只要不是NEW
狀態(tài),就表示任務(wù)已經(jīng)執(zhí)行完畢或者不再執(zhí)行了浴井,并沒有表示任務(wù)正在執(zhí)行中的狀態(tài)晒骇。
除了代表了任務(wù)的Callable對象,代表任務(wù)執(zhí)行結(jié)果的outcome屬性磺浙,F(xiàn)utureTask還包含了一個代表所有等待任務(wù)結(jié)束的線程的Treiber棧洪囤,這一點其實和各種鎖的等待隊列特別像,即如果拿不到鎖屠缭,則當(dāng)前線程就會被仍進(jìn)等待隊列中箍鼓;這里則是如果任務(wù)還沒有執(zhí)行結(jié)束,則所有等待任務(wù)執(zhí)行完畢的線程就會被放入棧中呵曹,直到任務(wù)執(zhí)行完畢了款咖,才會被喚醒何暮。
FutureTask雖然為我們提供了獲取任務(wù)執(zhí)行的結(jié)果,遺憾的是铐殃,在獲取任務(wù)結(jié)果時海洼,如果任務(wù)還沒有執(zhí)行完成,則當(dāng)前線程會自旋或者掛起等待富腊,這和我們實現(xiàn)異步的初衷時相違背的坏逢。 我們后面將繼續(xù)介紹另外一個同步工具類CompletableFuture。