線程池ThreadPoolExecutor的使用
標(biāo)簽(空格分隔): ThreadPoolExecutor
一 創(chuàng)建ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){}
參數(shù)說(shuō)明:
- corePoolSize著角,核心線程數(shù)这溅,池中所保存的最少線程個(gè)數(shù)。
- maximumPoolSize腺律,池中允許的最大線程數(shù)戈盈,核心線程數(shù)+非核心線程數(shù)浇雹。
- keepAliveTime橄霉,當(dāng)池中總線程數(shù)大于核心時(shí)晌姚,此為終止多余的空閑線程 等待新任務(wù)的最長(zhǎng)時(shí)間。
- unit痘括,keepAliveTime參數(shù)的時(shí)間單位长窄,有DAYS、HOURS远寸、 MINUTES抄淑、SECONDS屠凶、MILLISECONDS驰后、MICROSECONDS、NANOSECONDS矗愧。常用MILLISECONDS灶芝。
- workQueue,執(zhí)行前用于保持任務(wù)的隊(duì)列唉韭。
- threadFactory夜涕,執(zhí)行程序創(chuàng)建新線程時(shí)使用的工廠,默認(rèn)使用Executors.defaultThreadFactory()属愤。
- handler女器,由于超出線程范圍和隊(duì)列容量而使執(zhí)行被阻塞時(shí)所使用的處理程序(策略),詳情見(jiàn)ThreadPoolExecutor的內(nèi)部靜態(tài)類(一共有4個(gè)住诸,都實(shí)現(xiàn)了RejectedExecutionHandler接口)驾胆,默認(rèn)使用AbortPolicy(用于被拒絕任務(wù)的處理程序,它將拋出 RejectedExecutionException)贱呐。
二 提交任務(wù)
public Future<?> submit(Runnable task){}
public <T> Future<T> submit(Callable<T> task){}
public void execute(Runnable command){}
說(shuō)明:
- 常見(jiàn)有3種提交任務(wù)方式丧诺,其中submit是繼承是基類AbstractExecutorService(類如其名,是抽樣類)奄薇,execute方法是其自己實(shí)現(xiàn)驳阎,查看源碼可知submit底層還是調(diào)用execute方法。
- execute和submit方法的區(qū)別,二者的入?yún)㈩愋兔黠@不同呵晚,而且execute沒(méi)有返回值蜘腌,submit是帶有返回值的;
三 基本知識(shí)點(diǎn)
- 使用完線程池后劣纲,corePoolSize會(huì)保持不變逢捺,在核心線程創(chuàng)建之后的非核心線程會(huì)在keepAliveTime之后失效。非核心線程就像任務(wù)比較多的時(shí)候臨時(shí)招進(jìn)來(lái)的臨時(shí)工癞季,當(dāng)活干完就不再需要劫瞳。
@Test
public void corePoolSizeIsKeep(){
ThreadPoolExecutor executor =
new ThreadPoolExecutor(6, 10, 5L, TimeUnit.SECONDS, new SynchronousQueue<>());
submit9(executor);
watch(executor);
}
運(yùn)行結(jié)果見(jiàn)下圖:
corePoolSize不會(huì)變化.png
- 當(dāng)使用無(wú)界的任務(wù)隊(duì)列創(chuàng)建線程池時(shí),即便提交的任務(wù)數(shù)超過(guò)了核心線程數(shù)绷柒,也不會(huì)再開(kāi)辟新的非核心線程志于。
@Test
public void meaninglessMaximunPoolSize(){
ThreadPoolExecutor executor =
new ThreadPoolExecutor(3, 6, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
submit9(executor);
watch(executor);
}
運(yùn)行結(jié)果見(jiàn)下圖:
無(wú)意義的maximumPoolSize.png
- 當(dāng)使用了有界的任務(wù)隊(duì)列創(chuàng)建線程,并且提交的任務(wù)數(shù)超過(guò)核心線程數(shù)加上任務(wù)隊(duì)列废睦,但不超過(guò)最大線程數(shù)+任務(wù)隊(duì)列大小時(shí)伺绽,會(huì)開(kāi)辟新的非核心線程。
@Test
public void maximunPoolSize(){
ThreadPoolExecutor executor =
new ThreadPoolExecutor(3, 4, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2));
submit6(executor);
watch(executor);
}
運(yùn)行結(jié)果見(jiàn)下圖:
maximumPoolSize.png
- 當(dāng)使用了有界的任務(wù)隊(duì)列創(chuàng)建線程嗜湃,并且提交的任務(wù)數(shù)超過(guò)最大線程數(shù)+任務(wù)隊(duì)列大小時(shí)奈应,會(huì)啟用拒絕策略AbortPolicy并拋出異常。
@Test
public void abortPolicyReject(){
ThreadPoolExecutor executor =
new ThreadPoolExecutor(3, 5, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));
submit6(executor);
executor.submit(new SleepChild());
watch(executor);
}
----------
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@3f8f9dd6 rejected from java.util.concurrent.ThreadPoolExecutor@aec6354[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at com.taobao.juc.ExecutorsTest.abortPolicyReject(ExecutorsTest.java:123)
- ThreadPoolExecutor.CallerRunsPolicy策略的解決方法是提交任務(wù)的線程不再提交任務(wù)购披,而是幫助執(zhí)行任務(wù)杖挣。
@Test
public void CallerRunsPolicy(){
ThreadPoolExecutor executor =
new ThreadPoolExecutor(3, 5, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1), new CallerRunsPolicy());
submit6(executor);
executor.submit(new SleepChild());
watch(executor);
}
運(yùn)行結(jié)果如下:
CallerRunsPolicy策略.png
四 附錄
- SleepChild的源碼
public class SleepChild implements Runnable {
@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " run.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 其他源碼
private void submit6(ThreadPoolExecutor executor){
executor.submit(new SleepChild());
executor.submit(new SleepChild());
executor.submit(new SleepChild());
System.out.println("===================first====================");
System.out.println("core pool size: " + executor.getCorePoolSize() + ", pool size: " + executor.getPoolSize() +
", work queue size: " + executor.getQueue().size());
executor.submit(new SleepChild());
executor.submit(new SleepChild());
executor.submit(new SleepChild());
System.out.println("===================second====================");
System.out.println("core pool size: " + executor.getCorePoolSize() + ", pool size: " + executor.getPoolSize() +
", work queue size: " + executor.getQueue().size());
}
----------
private void submit9(ThreadPoolExecutor executor){
executor.submit(new SleepChild());
executor.submit(new SleepChild());
executor.submit(new SleepChild());
System.out.println("===================first====================");
System.out.println("core pool size: " + executor.getCorePoolSize() + ", pool size: " + executor.getPoolSize() +
", work queue size: " + executor.getQueue().size());
executor.submit(new SleepChild());
executor.submit(new SleepChild());
executor.submit(new SleepChild());
System.out.println("===================second====================");
System.out.println("core pool size: " + executor.getCorePoolSize() + ", pool size: " + executor.getPoolSize() +
", work queue size: " + executor.getQueue().size());
executor.submit(new SleepChild());
executor.submit(new SleepChild());
executor.submit(new SleepChild());
System.out.println("===================third====================");
System.out.println("core pool size: " + executor.getCorePoolSize() + ", pool size: " + executor.getPoolSize() +
", work queue size: " + executor.getQueue().size());
}
-------------
private void watch(ThreadPoolExecutor executor){
//主線程檢測(cè)線程池的狀態(tài)
while(true){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("watch, core pool size: " + executor.getCorePoolSize() + ", pool size: " + executor.getPoolSize() +
", work queue size: " + executor.getQueue().size());
}
}