Future
Future是J.U.C中的一個(gè)接口募寨,它代表著一個(gè)異步執(zhí)行結(jié)果族展。
Future可以看成線程在執(zhí)行時(shí)留給調(diào)用者的一個(gè)存根,通過這個(gè)存在可以進(jìn)行查看線程執(zhí)行狀態(tài)(isDone)拔鹰、取消執(zhí)行(cancel)仪缸、阻塞等待執(zhí)行完成并返回結(jié)果(get)、異步執(zhí)行回調(diào)函數(shù)(callback)等操作列肢。
public interface Future<V> {
/** 取消恰画,mayInterruptIfRunning-false:不允許在線程運(yùn)行時(shí)中斷 **/
boolean cancel(boolean mayInterruptIfRunning);
/** 是否取消**/
boolean isCancelled();
/** 是否完成 **/
boolean isDone();
/** 同步獲取結(jié)果 **/
V get() throws InterruptedException, ExecutionException;
/** 同步獲取結(jié)果,響應(yīng)超時(shí) **/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
用ReentrantLock實(shí)現(xiàn)Future
一個(gè)小栗子:
public class ResponseFuture implements Future<String> {
private final ResponseCallback callback;
private String responsed;
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public ResponseFuture(ResponseCallback callback) {
this.callback = callback;
}
public boolean isDone() {
return null != this.responsed;
}
public String get() throws InterruptedException, ExecutionException {
if (!isDone()) {
try {
this.lock.lock();
while (!this.isDone()) {
condition.await();
if (this.isDone()) break;
}
} finally {
this.lock.unlock();
}
}
return this.responsed;
}
// 返回完成
public void done(String responsed) throws Exception{
this.responsed = responsed;
try {
this.lock.lock();
this.condition.signal();
if(null != this.callback) this.callback.call(this.responsed);
} finally {
this.lock.unlock();
}
}
public String get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
throw new UnsupportedOperationException();
}
public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException();
}
public boolean isCancelled() {
throw new UnsupportedOperationException();
}
}
使用這個(gè)Future:
public class ResponseCallback {
public String call(String o) {
System.out.println("ResponseCallback:處理完成瓷马,返回:"+o);
return o;
}
}
public class FutureTest {
public static void main(String[] args) {
final ResponseFuture responseFuture = new ResponseFuture(new ResponseCallback());
new Thread(new Runnable() {// 請(qǐng)求線程
public void run() {
System.out.println("發(fā)送一個(gè)同步請(qǐng)求");
// System.out.println(responseFuture.get()); 放開這句拴还,就是同步
System.out.println("接著處理其他事情,過一會(huì)ResponseCallback會(huì)打印結(jié)果");
}
}).start();
new Thread(new Runnable() {// 處理線程
public void run() {
try {
Thread.sleep(10000);// 模擬處理一會(huì)
responseFuture.done("ok");// 處理完成
}catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
這個(gè)Future使用了Condition的await和signal方法
同步操作:調(diào)用get時(shí)如果發(fā)現(xiàn)未有結(jié)果返回,線程阻塞等待欧聘,直到有結(jié)果返回時(shí)通知Future喚醒等待的線程片林。
異步操作:調(diào)用線性不用等待,有結(jié)果返回時(shí)調(diào)用signal()發(fā)現(xiàn)沒有等待線程怀骤,直接調(diào)用Callback费封。
FutureTask
FutureTask是J.U.C中Future的實(shí)現(xiàn),它同時(shí)也實(shí)現(xiàn)了Runnable接口蒋伦,可以直接在ExecutorService中提交執(zhí)行這個(gè)Future弓摘。
一個(gè)小栗子:
public class FutureTaskTest {
public static class CallableImpl implements Callable<String>{
public String call() throws Exception {
String tName = Thread.currentThread().getName();
System.out.println(tName+" :開始執(zhí)行");
Thread.sleep(6000);
return tName+" ok";
}
}
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newFixedThreadPool(1);
// // 同步執(zhí)行,等待6000毫秒拿到結(jié)果
// FutureTask<String> future = new FutureTask<String>(new CallableImpl());
// es.submit(future);
// System.out.println(future.get());
// 異步執(zhí)行
FutureTask<String> future = new FutureTask<String>(new CallableImpl()) {
protected void done() {
try {
System.out.println("異步返回的結(jié)果:"+this.get());
} catch (Exception e) {
e.printStackTrace();
}
}
};
es.submit(future);
System.out.println("執(zhí)行其他操作");
es.shutdown();
}
}
FutureTask實(shí)現(xiàn)
使用單向鏈表WaitNode
表示排隊(duì)等待的線程痕届, volatile state
表示當(dāng)前線程的執(zhí)行狀態(tài)衣盾,狀態(tài)轉(zhuǎn)換如下:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
awaitDone
阻塞等待
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {// 當(dāng)前線程中斷
removeWaiter(q);// 移除等待的節(jié)點(diǎn)
throw new InterruptedException();
}
int s = state;// 當(dāng)前狀態(tài)
if (s > COMPLETING) {//已經(jīng)執(zhí)行完成
if (q != null)
q.thread = null;// 節(jié)點(diǎn)綁定的線程置空
return s;
}
else if (s == COMPLETING) //完成中
Thread.yield();// 出讓CPU
else if (q == null)// 等待節(jié)點(diǎn)為空寺旺,構(gòu)建節(jié)點(diǎn)
q = new WaitNode();
else if (!queued)// waiters設(shè)置為q
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {// 超時(shí),摘除節(jié)點(diǎn)
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);//掛起線程势决,響應(yīng)超時(shí)
}
else
LockSupport.park(this);// 掛起線程
}
}
run
線程執(zhí)行
// s1
public void run() {
// state!=NEW,說明沒必要執(zhí)行,返回蓝撇。
// CAS操作果复,將runner設(shè)置為當(dāng)前線程,如果失敗渤昌,說明有其他線程在執(zhí)行虽抄,返回。
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();//執(zhí)行
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);// 異常 s2
}
if (ran)
set(result);// 完成 s3
}
} finally {
runner = null;// 置空runner
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
s1:state不為NEW或者有其他線程在執(zhí)行時(shí)独柑,都不會(huì)執(zhí)行run方法迈窟。
執(zhí)行中出現(xiàn)異常轉(zhuǎn)入s2
執(zhí)行結(jié)束轉(zhuǎn)入s3
// s2
protected void setException(Throwable t) {
// 完成 NEW-COMPLETING-EXCEPTIONAL 狀態(tài)置換
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
s2:這步完成狀態(tài):NEW-COMPLETING-EXCEPTIONAL 置換
將異常賦給結(jié)果outcome
轉(zhuǎn)入s4
// s3
protected void set(V v) {
// 完成NEW-COMPLETING-NORMAL狀態(tài)置換
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
s3:這步完成狀態(tài):NEW-COMPLETING-NORMAL 置換
將執(zhí)行結(jié)果賦給outcome
轉(zhuǎn)入s4
// s4
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//waiters置空
for (;;) {// 依次喚醒所有等待的節(jié)點(diǎn)
Thread t = q.thread;
if (t != null) {
q.thread = null;// 置空
LockSupport.unpark(t);// 喚醒
}
WaitNode next = q.next;//向后探測(cè)
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();// 回調(diào)鉤子
callable = null; // to reduce footprint
}
s4:for(;;)中異常喚醒等待線程
調(diào)用回調(diào)鉤子done()
被喚醒的線程會(huì)在awaitDone中讓出CPU,退出循環(huán)忌栅。
小結(jié)
1车酣、Future代表著一個(gè)異步執(zhí)行結(jié)果
2、get是同步操作索绪,當(dāng)前線程會(huì)阻塞等待執(zhí)行完畢湖员,返回結(jié)果
3、異步操作使用鉤子進(jìn)行回調(diào)瑞驱,不阻塞當(dāng)前線程