1 概述
Future代表異步計(jì)算返回的結(jié)果祭椰,提供了檢查是否結(jié)束蒲讯、等待結(jié)束以及獲取計(jì)算結(jié)果的方法垫蛆。Executor框架使用Runnable作為其任務(wù)表示形式违孝,但它不能返回一個值府蛇。許多任務(wù)實(shí)際上都是存在延遲計(jì)算的:執(zhí)行數(shù)據(jù)庫查詢集索,從網(wǎng)絡(luò)上獲取資源,或者某個復(fù)雜耗時的計(jì)算汇跨。對于這種任務(wù)务荆,Callable是一個更好的抽象,他能返回一個值穷遂,并可能拋出一個異常函匕。
/**
* A task that returns a result and may throw an exception.
* Implementors define a single method with no arguments called
* <tt>call</tt>.
*
* <p>The <tt>Callable</tt> interface is similar to {@link
* java.lang.Runnable}, in that both are designed for classes whose
* instances are potentially executed by another thread. A
* <tt>Runnable</tt>, however, does not return a result and cannot
* throw a checked exception.
*
* <p> The {@link Executors} class contains utility methods to
* convert from other common forms to <tt>Callable</tt> classes.
*
* @see Executor
* @since 1.5
* @author Doug Lea
* @param <V> the result type of method <tt>call</tt>
*/
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
public interface Future<V> {
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when <tt>cancel</tt> is called,
* this task should never run. If the task has already started,
* then the <tt>mayInterruptIfRunning</tt> parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return <tt>true</tt>. Subsequent calls to {@link #isCancelled}
* will always return <tt>true</tt> if this method returned <tt>true</tt>.
*
* @param mayInterruptIfRunning <tt>true</tt> if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return <tt>false</tt> if the task could not be cancelled,
* typically because it has already completed normally;
* <tt>true</tt> otherwise
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* Returns <tt>true</tt> if this task was cancelled before it completed
* normally.
*
* @return <tt>true</tt> if this task was cancelled before it completed
*/
boolean isCancelled();
/**
* Returns <tt>true</tt> if this task completed.
*
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* <tt>true</tt>.
*
* @return <tt>true</tt> if this task completed
*/
boolean isDone();
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
可以通過多種方法來創(chuàng)建一個Future來描述任務(wù)。ExecutorService中的submit方法接受一個Runnable或者Callable蚪黑,然后返回一個Future來獲得任務(wù)的執(zhí)行結(jié)果或者取消任務(wù)盅惜。
public interface ExecutorService extends Executor {
/**
* Submits a value-returning task for execution and returns a
* Future representing the pending results of the task. The
* Future's <tt>get</tt> method will return the task's result upon
* successful completion.
*
* <p>
* If you would like to immediately block waiting
* for a task, you can use constructions of the form
* <tt>result = exec.submit(aCallable).get();</tt>
*
* <p> Note: The {@link Executors} class includes a set of methods
* that can convert some other common closure-like objects,
* for example, {@link java.security.PrivilegedAction} to
* {@link Callable} form so they can be submitted.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Callable<T> task);
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's <tt>get</tt> method will
* return the given result upon successful completion.
*
* @param task the task to submit
* @param result the result to return
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Runnable task, T result);
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's <tt>get</tt> method will
* return <tt>null</tt> upon <em>successful</em> completion.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
Future<?> submit(Runnable task);
/**
* Executes the given tasks, returning a list of Futures holding
* their status and results when all complete.
* {@link Future#isDone} is <tt>true</tt> for each
* element of the returned list.
* Note that a <em>completed</em> task could have
* terminated either normally or by throwing an exception.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*
* @param tasks the collection of tasks
* @return A list of Futures representing the tasks, in the same
* sequential order as produced by the iterator for the
* given task list, each of which has completed.
* @throws InterruptedException if interrupted while waiting, in
* which case unfinished tasks are cancelled.
* @throws NullPointerException if tasks or any of its elements are <tt>null</tt>
* @throws RejectedExecutionException if any task cannot be
* scheduled for execution
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/**
* Executes the given tasks, returning a list of Futures holding
* their status and results
* when all complete or the timeout expires, whichever happens first.
* {@link Future#isDone} is <tt>true</tt> for each
* element of the returned list.
* Upon return, tasks that have not completed are cancelled.
* Note that a <em>completed</em> task could have
* terminated either normally or by throwing an exception.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*
* @param tasks the collection of tasks
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return a list of Futures representing the tasks, in the same
* sequential order as produced by the iterator for the
* given task list. If the operation did not time out,
* each task will have completed. If it did time out, some
* of these tasks will not have completed.
* @throws InterruptedException if interrupted while waiting, in
* which case unfinished tasks are cancelled
* @throws NullPointerException if tasks, any of its elements, or
* unit are <tt>null</tt>
* @throws RejectedExecutionException if any task cannot be scheduled
* for execution
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/**
* Executes the given tasks, returning the result
* of one that has completed successfully (i.e., without throwing
* an exception), if any do. Upon normal or exceptional return,
* tasks that have not completed are cancelled.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*
* @param tasks the collection of tasks
* @return the result returned by one of the tasks
* @throws InterruptedException if interrupted while waiting
* @throws NullPointerException if tasks or any element task
* subject to execution is <tt>null</tt>
* @throws IllegalArgumentException if tasks is empty
* @throws ExecutionException if no task successfully completes
* @throws RejectedExecutionException if tasks cannot be scheduled
* for execution
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/**
* Executes the given tasks, returning the result
* of one that has completed successfully (i.e., without throwing
* an exception), if any do before the given timeout elapses.
* Upon normal or exceptional return, tasks that have not
* completed are cancelled.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*
* @param tasks the collection of tasks
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the result returned by one of the tasks.
* @throws InterruptedException if interrupted while waiting
* @throws NullPointerException if tasks, or unit, or any element
* task subject to execution is <tt>null</tt>
* @throws TimeoutException if the given timeout elapses before
* any task successfully completes
* @throws ExecutionException if no task successfully completes
* @throws RejectedExecutionException if tasks cannot be scheduled
* for execution
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
假設(shè)我們通過一個方法從遠(yuǎn)程獲取一些計(jì)算結(jié)果,假設(shè)方法是List getDataFromRemote()忌穿,如果采用同步的方法抒寂,代碼大概是List data = getDataFromRemote(),我們將一直等待getDataFromRemote返回,然后才能繼續(xù)后面的工作掠剑,這個函數(shù)是從遠(yuǎn)程獲取計(jì)算結(jié)果的屈芜,如果需要很長時間,后面的代碼又和這個數(shù)據(jù)沒有什么關(guān)系的話朴译,阻塞在那里就會浪費(fèi)很多時間井佑。我們有什么辦法可以改進(jìn)呢?动分?毅糟?
能夠想到的辦法是調(diào)用函數(shù)后,立即返回澜公,然后繼續(xù)執(zhí)行姆另,等需要用數(shù)據(jù)的時候喇肋,再取或者等待這個數(shù)據(jù)。具體實(shí)現(xiàn)有兩種方式迹辐,一個是用Future蝶防,另一個是回調(diào)。
Future<List> future = getDataFromRemoteByFuture();
//do something....
List data = future.get();
可以看到我們返回的是一個Future對象明吩,然后接著自己的處理后面通過future.get()來獲得我們想要的值间学。也就是說在執(zhí)行g(shù)etDataFromRemoteByFuture的時候,就已經(jīng)啟動了對遠(yuǎn)程計(jì)算結(jié)果的獲取印荔,同時自己的線程還繼續(xù)執(zhí)行不阻塞低葫。知道獲取時候再拿數(shù)據(jù)就可以∪月桑看一下getDataFromRemoteByFuture的實(shí)現(xiàn):
private Future<List> getDataFromRemoteByFuture() {
return threadPool.submit(new Callable<List>() {
@Override
public List call() throws Exception {
return getDataFromRemote();
}
});
}
我們在這個方法中調(diào)用getDataFromRemote方法嘿悬,并且用到了線程池。把任務(wù)加入線程池之后水泉,理解返回Future對象善涨。Future的get方法,還可以傳入一個超時參數(shù)草则,用來設(shè)置等待時間钢拧,不會一直等下去。
也可以利用FutureTask來獲取結(jié)果:
FutureTask<List> futureTask = new FutureTask<List>(new Callable<List>() {
@Override
public List call() throws Exception {
return getDataFromRemote();
}
});
threadPool.submit(futureTask);
futureTask.get();
FutureTask是一個具體的實(shí)現(xiàn)類炕横,ThreadPoolExecutor的submit方法返回的就是一個Future的實(shí)現(xiàn)源内,這個實(shí)現(xiàn)就是FutureTask的一個具體實(shí)例,F(xiàn)utureTask幫助實(shí)現(xiàn)了具體的任務(wù)執(zhí)行看锉,以及和Future接口中的get方法的關(guān)聯(lián)姿锭。FutureTask除了幫助ThreadPool很好的實(shí)現(xiàn)了對加入線程池任務(wù)的Future支持外,也為我們提供了很大的便利伯铣,使得我們自己也可以實(shí)現(xiàn)支持Future的任務(wù)調(diào)度呻此。
Java本身沒有提供Promise方式實(shí)現(xiàn),在Scala以及Netty中都有Promise模式的實(shí)現(xiàn)腔寡,可以讓用戶控制future的完成狀態(tài)(成功焚鲜、失敗等)以及future的返回值,參考文檔中是一個自定義Promise的簡單實(shí)現(xiàn)放前。