Java的線程池中涩拙,如果不斷往線程池提交任務际长,最終會發(fā)生什么?
如果work queue是一個有界隊列吃环,隊列放滿,線程數量達到maxsize洋幻,且沒有空閑線程時郁轻,再往線程池提交任務會觸發(fā)線程池的拒絕策略。
線程池有哪些拒絕策略呢文留?
1. AbortPolicy 丟棄并拋出異常-- 默認策略
定義
/**
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolRejectPolicyTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
for (int i = 0; i < 8; i++) {
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println("Thread is submitted.");
}};
threadPool.submit(task);
}
}
}
Thread is submitted.
Thread is submitted.
Thread is submitted.
Thread is submitted.
Thread is submitted.
Thread is submitted.
Thread is submitted.
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@6bc7c054 rejected from java.util.concurrent.ThreadPoolExecutor@232204a1[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 7]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at demo.multithread.ThreadPoolRejectPolicyTest.main(ThreadPoolRejectPolicyTest.java:21)
2. DiscardPolicy 直接丟棄任務
定義
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
示例
一共提交8個任務好唯,其中有一個默默被丟棄。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolRejectPolicyTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new ThreadPoolExecutor.DiscardPolicy());
for (int i = 0; i < 8; i++) {
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println("Thread is submitted.");
try {
Thread.sleep(60000l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}};
threadPool.submit(task);
}
int threadCount = ((ThreadPoolExecutor)threadPool).getPoolSize();
System.out.println("threadCount " + threadCount);
int threadCntInWorkQueue = ((ThreadPoolExecutor)threadPool).getQueue().size();
System.out.println("threadCntInWorkQueue " + threadCntInWorkQueue);
threadPool.shutdown();
}
}
Thread is submitted.
threadCount 2
threadCntInWorkQueue 5
Thread is submitted.
3. DiscardOldestPolicy 丟棄最舊的任務燥翅,再試著處理
定義
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
示例
線程池1個核心線程骑篙,max線程數為2,work queue大小為5.
可以看到森书,提交8個任務后靶端,第2個任務被丟棄了。因為第2個任務是oldest凛膏,第一個被放進queue的任務杨名。
package demo.multithread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolRejectPolicyTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 0; i < 8; i++) {
Runnable task = new MyTask(i);
threadPool.submit(task);
}
int threadCount = ((ThreadPoolExecutor)threadPool).getPoolSize();
System.out.println("threadCount " + threadCount);
int threadCntInWorkQueue = ((ThreadPoolExecutor)threadPool).getQueue().size();
System.out.println("threadCntInWorkQueue " + threadCntInWorkQueue);
threadPool.shutdown();
}
}
class MyTask implements Runnable {
private int taskId;
MyTask(int i) {
taskId = i;
}
@Override
public void run() {
System.out.println("Running thread - id " + taskId);
try {
Thread.sleep(1000l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
}
Running thread - id 0
threadCount 2
threadCntInWorkQueue 5
Running thread - id 6
Running thread - id 2
Running thread - id 3
Running thread - id 4
Running thread - id 5
Running thread - id 7
4. CallerRunsPolicy 由調用者線程執(zhí)行任務
用這種拒絕策略時要注意,主線程既需要負責創(chuàng)建線程猖毫,又需要執(zhí)行任務台谍,會造成性能問題。
定義
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
示例
在輸出中吁断,能看出趁蕊,主線程號為1,而提交的任務中仔役,其中一個任務(最后一個被提交的任務)就是由主線程來執(zhí)行的掷伙。
package demo.multithread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolRejectPolicyTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new ThreadPoolExecutor.CallerRunsPolicy());
System.out.println("Main thread is " + Thread.currentThread().getId());
for (int i = 0; i < 8; i++) {
Runnable task = new MyTask(i);
threadPool.submit(task);
}
int threadCount = ((ThreadPoolExecutor)threadPool).getPoolSize();
System.out.println("threadCount " + threadCount);
int threadCntInWorkQueue = ((ThreadPoolExecutor)threadPool).getQueue().size();
System.out.println("threadCntInWorkQueue " + threadCntInWorkQueue);
threadPool.shutdown();
}
}
class MyTask implements Runnable {
private int taskId;
MyTask(int i) {
taskId = i;
}
@Override
public void run() {
System.out.println("Running thread - id " + taskId);
System.out.println("The thread id is " + Thread.currentThread().getId());
try {
Thread.sleep(1000l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
}
Main thread is 1
Running thread - id 0
The thread id is 9
Running thread - id 7
The thread id is 1
Running thread - id 6
The thread id is 10
Running thread - id 1
The thread id is 9
threadCount 2
threadCntInWorkQueue 3
Running thread - id 2
The thread id is 10
Running thread - id 3
The thread id is 9
Running thread - id 4
The thread id is 10
Running thread - id 5
The thread id is 9
5. 自定義拒絕策略
了解了前四種拒絕策略,發(fā)現:
abort又兵,discard炎咖,discardOldest都會丟棄任務;
callerRun雖然執(zhí)行了任務,但是會影響主線程性能乘盼。
若將work queue設置為無界隊列升熊,或者將maxsize設置為最大整數,都有可能造成out of memory绸栅。
那么可以通過自定義拒絕策略级野,讓后進來的task阻塞住,有資源了再處理粹胯。這樣可以讓每一個任務都得到執(zhí)行蓖柔。
package demo.multithread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolRejectPolicyTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new CustomRejectPolicy());
for (int i = 0; i < 8; i++) {
Runnable task = new MyTask(i);
threadPool.submit(task);
}
threadPool.shutdown();
}
}
class CustomRejectPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class MyTask implements Runnable {
private int taskId;
MyTask(int i) {
taskId = i;
}
@Override
public void run() {
System.out.println("Running thread - id " + taskId);
try {
Thread.sleep(1000l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
}
Running thread - id 0
Running thread - id 6
Running thread - id 1
Running thread - id 2
Running thread - id 3
Running thread - id 4
Running thread - id 5
Running thread - id 7