如果線程池的拒絕策略設(shè)置成DiscardPolicy或者DiscardOldestPolicy衔瓮,通過Future獲取執(zhí)行結(jié)果,可能導(dǎo)致線程會一直阻塞荒澡。
問題復(fù)現(xiàn)
// 創(chuàng)建一個單線程报辱,拒絕策略時 DiscardPolicy
private final static ThreadPoolExecutor executorService = new
ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES,
new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.DiscardPolicy());
public static void main(String[] args) throws Exception {
//提交任務(wù),阻塞 5 秒
Future taskOne = executorService.submit(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//此時单山,隊列和線程已經(jīng)都被占用碍现,當(dāng)前提交的任務(wù)會執(zhí)行拒絕策略
Future taskTwo = null;
try {
taskTwo = executorService.submit(() -> System.out.println("start runable three"));
} catch (Exception e) {
System.out.println(e.getLocalizedMessage());
}
System.out.println("獲取結(jié)果:");
System.out.println("task one " + taskOne.get()); //(5)等待任務(wù)one執(zhí)行完畢
System.out
.println("task two " + (taskTwo == null ? null : taskTwo.get())); // (7)等待任務(wù)three執(zhí)行完畢
executorService.shutdown(); //關(guān)閉線程池,阻塞直到所有任務(wù)執(zhí)行完畢
}
執(zhí)行結(jié)果如下米奸,第一個task正持缃樱可以獲取結(jié)果,但是第二個task一直獲取不到結(jié)果悴晰,程序一直卡在這里慢睡,不會繼續(xù)執(zhí)行。
獲取結(jié)果:
task one null
問題分析
提交任務(wù)到線程池時铡溪,會包裝成 FutureTask 漂辐,初始狀態(tài)是 NEW。執(zhí)行的任務(wù)是包裝后的FutureTask對象棕硫。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 包裝成 FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
提交執(zhí)行任務(wù)方法邏輯如下髓涯。
public void execute(Runnable command) {
...
//如果線程個數(shù)小于核心線程數(shù)則新增處理線程
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果當(dāng)前線程個數(shù)已經(jīng)達(dá)到核心線程數(shù)則把任務(wù)放入隊列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 嘗試新增處理線程
else if (! addWorker(command, false))
reject(command); //新增失敗則調(diào)用拒絕策略
}
示例代碼中第二個任務(wù)會執(zhí)行到reject邏輯。DiscardPolicy的方法是空實現(xiàn)哈扮,所以新創(chuàng)建的FutureTask還是NEW狀態(tài)纬纪,這個狀態(tài)和get方法阻塞有密切的關(guān)系蚓再。
DiscardPolicy 和 DiscardOldestPolicy 代碼如下。他們有一個共同點就是沒有處理task的狀態(tài)包各。
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* 空方法摘仅,task會保留在NEW狀態(tài)
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* poll 出一個任務(wù),但是沒有任務(wù)處理问畅,所以poll出來的任務(wù)是NEW狀態(tài)
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
先看下 FutureTask 的狀態(tài)娃属。前面我們看到了初始化狀態(tài)是NEW,其他狀態(tài)說明如下按声。
private static final int NEW = 0; 新的任務(wù)膳犹,初始狀態(tài)
private static final int COMPLETING = 1; 當(dāng)任務(wù)被設(shè)置結(jié)果時,處于COMPLETING狀態(tài)签则,這是一個中間狀態(tài)须床。
private static final int NORMAL = 2; 表示任務(wù)正常結(jié)束。
private static final int EXCEPTIONAL = 3; 表示任務(wù)因異常而結(jié)束
private static final int CANCELLED = 4; 任務(wù)還未執(zhí)行之前就調(diào)用了cancel(true)方法渐裂,任務(wù)處于CANCELLED
private static final int INTERRUPTING = 5; 當(dāng)任務(wù)調(diào)用cancel(true)中斷程序時豺旬,任務(wù)處于INTERRUPTING狀態(tài),這是一個中間狀態(tài)柒凉。
private static final int INTERRUPTED = 6; 任務(wù)調(diào)用cancel(true)中斷程序時會調(diào)用interrupt()方法中斷線程運行族阅,任務(wù)狀態(tài)由INTERRUPTING轉(zhuǎn)變?yōu)镮NTERRUPTED
繼續(xù)看下 FutureTask 的get方法。
public V get() throws InterruptedException, ExecutionException {
int s = state;
//當(dāng)狀態(tài)值<=COMPLETING時需要等待膝捞,否則調(diào)用report返回
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
// 正常結(jié)束坦刀,返回結(jié)果
if (s == NORMAL)
return (V)x;
// 如果是 >= CANCELLED 拋出取消異常,包括:CANCELLED蔬咬,INTERRUPTING鲤遥,INTERRUPTED狀態(tài)
if (s >= CANCELLED)
throw new CancellationException();
// 剩下的條件就是 EXCEPTIONAL 了,執(zhí)行的任務(wù)拋出異常
throw new ExecutionException((Throwable)x)
}
到這里已經(jīng)很清楚了林艘。FutureTask狀態(tài)>COMPLETING 才會返回盖奈。因為拒絕策略沒有修改FutureTask的狀態(tài),F(xiàn)utureTask的狀態(tài)一直是NEW狐援,所以不會返回钢坦。
其他 RejectedExecutionHandler 為什么不會導(dǎo)致阻塞
我看看下默認(rèn)的 AbortPolicy 的實現(xiàn):
public static class AbortPolicy implements RejectedExecutionHandler {
// 回憶一下submit方法,最后會執(zhí)行reject策略啥酱。
// AbortPolicy 直接拋出異常爹凹,調(diào)用方馬上可以獲取結(jié)果
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
CallerRunsPolicy 策略則是讓調(diào)用線程執(zhí)行提交的任務(wù),執(zhí)行任務(wù)時會更新狀態(tài)镶殷,自然也不會阻塞禾酱。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
解決方案
- 使用帶超時時間的get方法,這樣使用DiscardPolicy拒絕策略不會一直阻塞。
- 如果一定要使用Discardpolicy 拒絕策略宇植,需要自定義拒絕策略。
public void rejectedExecution(Runnable runable, ThreadPoolExecutor e) {
if (! e.isShutdown()) {
if(null ! = runable && runable instanceof FutureTask){
((FutureTask) runable).cancel(true);
}
}
}