書接上文喇勋,<a href="http://www.reibang.com/p/aa5884bcd032">Java線程池</a>涤姊。
接下來記錄一下線程池的工作機制和原理
線程池的兩個核心隊列:
- 線程等待池籽懦,即線程隊列BlockingQueue财骨。
- 任務處理池(PoolWorker)肋拔,即正在工作的Thread列表(HashSet<Worker>)锈津。
線程池的核心參數(shù):
- 核心池大小(corePoolSize)只损,即固定大小一姿,設定好之后,線程池的穩(wěn)定峰值跃惫,達到這個值之后池的線程數(shù)大小不會釋放叮叹。
- 最大處理線程池數(shù)(maximumPoolSize),當線程池里面的線程數(shù)超過corePoolSize爆存,小于maximumPoolSize時會動態(tài)創(chuàng)建與回收線程池里面的線程池資源蛉顽。
線程池的運行機制:
舉個栗子。假如有一個工廠先较,工廠里面有10個人携冤,每個工人同時只能做一件事情。因此只要當10個工人中有工人是空閑的闲勺,來了任務就分配給空閑的工人做曾棕;當10個工人都有任務時,如果還來任務菜循,就把任務進行排隊等待翘地。
如果說新任務數(shù)目增長的速度遠遠大于工作做任務的速度,那么此時工廠的主管可能就需要采取補救措施了癌幕,比如重新招4個工人進來衙耕;然后就將任務分配給這4個剛招進來的工人處理。
如果說這14個工人做任務的速度還是不夠勺远,此時工廠主管就要考慮不再接受新的任務或者拋棄前面的一些任務了橙喘。當這14個工人當中有人空閑時,而新任務增長的速度又比較緩慢胶逢,工廠主管就要考慮辭掉4個臨時工了厅瞎,只保持原來10個工人饰潜,比較額外的工人是需要花費的。
而這個栗子中永遠等待干活的10個工人機制就是workerQueue磁奖。這個栗子中的corePoolSize就是10囊拜,而maximumPoolSize就是14(10+4)。也就是說corePoolSize就是線程池的大小比搭,maximumPoolSize在我看來就是一種線程池任務超過負荷的一種補救措施冠跷,即任務量突然過大時的一種補救措施。再看看下面圖好好理解一下身诺。工人永遠在等待干活蜜托,就像workerQueue永遠在循環(huán)干活一樣,除非霉赡,整個線程池停止了橄务。
線程池里面的線程的時序圖如下圖所示:
自定義線程池與ExecutorService
自定義線程池需要用到ThreadFactory,本節(jié)將通過創(chuàng)建一個線程的例子對ExecutorService及其參數(shù)進行詳細講解穴亏。
1.認識ExecutorService家族
ExecutorService家族成員如下所示:
使用startUML畫的蜂挪,我是UML菜鳥,所以湊合著看下嗓化。
上圖中主要元素說明如下:
Executor:線程池的頂級接口棠涮,但是嚴格意義上講Executor并不是一個線程池,而只是一個執(zhí)行線程的工具刺覆。
ExecutorService:真正線程池接口严肪。這個接口繼承了Executor接口,并聲明了一些方法:
submit谦屑、invokeAll驳糯、invokeAny以及shutDown等。
AbstractExecutorService實現(xiàn)了ExecutorService接口氢橙,基本實現(xiàn)了ExecutorService中聲明的所有方法酝枢。
ThreadPoolExecutor:ExecutorService的默認實現(xiàn),繼承了類AbstractExecutorService悍手。
ScheduledExecutorService:與Timer/TimerTask類似隧枫,解決那些需要任務重復執(zhí)行的問題。
ScheduledThreadPoolExecutor:繼承ThreadPoolExecutor的ScheduledExecutorService接口實現(xiàn)谓苟,周期性任務調(diào)度的類實現(xiàn)。
Executors是個線程工廠類协怒,方便我們快速地創(chuàng)建線程池涝焙。
2.利用ThreadFactory創(chuàng)建一個線程
java.util.concurrent.ThreadFactory提供了一個創(chuàng)建線程的工廠的接口。
ThreadFactory源碼如下:
public interface ThreadFactory{
@override
public Thread newThread(Runnable r);
}
我們可以看到上面的接口類中有一個newThread()的方法孕暇,為此我們自己手動定義一個線程工廠類仑撞,有木有激動啊赤兴,呵呵,下面我們就手動寫一個自己的線程工廠類吧隧哮!
MyThreadFactory.java
public class MyThreadFactory implements ThreadFactory{
@Override
public Thread newThread(Runnable r){
return new Thread(r);
}
}
上面已經(jīng)創(chuàng)建好了我們自己的線程工廠類桶良,但是啥都沒有做,就是直接new了一個Thread就返回回去了沮翔,我們一般在創(chuàng)建線程的時候陨帆,都需要定義其線程的名字,因為我們在定義了線程的名字之后就能在出現(xiàn)問題的時候根據(jù)監(jiān)視工具來查找錯誤的來源采蚀,所以我們來看下官方實現(xiàn)的ThreadFactory吧疲牵!
這個類在java.util.concurrent.Executors類中的靜態(tài)類中DefaultThreadFactory
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory{
private static final AtomicInteger poolNumber=new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber=new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory(){
SecurityManager s=System.getSecurityManager();
group=(s!=null)?s.getThreadGroup():Thread.currentThread().getThreadGroup();
namePrefix="pool-"+poolNumber.getAndIncrement()+"-thread-";
}
public Thread newThread(Runnable r){
Thread t=new Thread(group,r,namePrefix+threadNumber.getAndIncrement(),0);
if((t.isDaemon())
t.setDaemon(false);
if(t.getPriority()!=Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
3.了解線程池的拒絕策略(RejectExecutionHandler)
當調(diào)用ThreadPoolExecutor的execute方法時,而此時線程池處于一個飽和的狀態(tài)榆鼠,并且任務隊列也已經(jīng)滿了那么就需要做丟棄處理纲爸,RejectExecutionHandler就是這樣的一個處理接口類。
RejectExecutionHandler.java
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds would be exceeded, or upon
* shutdown of the Executor.
*
* <p>In the absence of other alternatives, the method may throw
* an unchecked {@link RejectedExecutionException}, which will be
* propagated to the caller of {@code execute}.
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
在JDK里面有4中拒絕策略妆够,如下圖所示:
- AbortPolicy:一言不合就拋異常(默認使用策略)识啦。
- CallerRunsPolicy:只用調(diào)用者所在線程來運行任務。
- DiscardOldestPolicy:丟棄隊列里最近的一個任務神妹,并執(zhí)行當前任務颓哮。
- DiscardPolicy:不處理,直接丟棄灾螃。
來看下源碼吧:
AbortPolicy : 一言不合就拋異常的
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
CallerRunsPolicy:調(diào)用者所在線程來運行任務
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
DiscardOldestPolicy :丟棄隊列里面最近的一個任務,并執(zhí)行當前任務
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
DiscardPolicy : 不處理题翻,直接丟棄
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
思考問題:
為什么有任務拒絕的情況發(fā)生呢:
這里先假設有一個前提:線程池里面有一個任務隊列,用于緩存所有待處理的任務腰鬼,正在處理的任務將從任務隊列中移除嵌赠。因此,在任務隊列長度有限的情況下熄赡,就會出現(xiàn)現(xiàn)任務的拒絕情況姜挺,需要一種策略來處理發(fā)生這種已滿無法加入的情況。另外彼硫,在線程池關閉的時候炊豪,也需要對任務加入隊列操作進行額外的協(xié)調(diào)處理。
4.ThreadPoolExecutor詳解
ThreadPoolExecutor類是線程池中最核心的一個類拧篮,因此如果要想透徹的了解Java線程池词渤,必須先了解這個大BOSS,下面來看下其源碼:
4種構造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler) {
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
通過源碼我們清楚的看到串绩,最終構造函數(shù)調(diào)用了最后一個構造函數(shù)缺虐,后面的那個構造函數(shù)才是真正的構造函數(shù),接下來研究一下參數(shù)礁凡。
- int corePoolSize:核心池大小高氮,這個參數(shù)跟后面講的線程池原理有很大的關系慧妄。在創(chuàng)建了線程池之后,默認情況下剪芍,線程池中并沒有任何線程塞淹,而是等待所有的任務到來之時才進行創(chuàng)建線程去執(zhí)行任務,除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法 罪裹,從這個兩個方法的名字可以知道是預創(chuàng)建線程的意思饱普,即在沒有任務來臨之前先創(chuàng)建好corePoolSize個線程或者一個線程。默認情況下坊谁,在創(chuàng)建好線程池之后费彼,線程池中的線程數(shù)為0,當有任務來之后口芍,就會創(chuàng)建一個線程去執(zhí)行任務箍铲,當線程池中的線程數(shù)量達到corePoolSize后,就會把達到的任務放到緩存隊列中去鬓椭。
- int maximumPoolSize:線程池最大線程數(shù)量颠猴,這是個非常重要的參數(shù),它表示在線程池中最多能創(chuàng)建線程的數(shù)量小染;在corePoolSize和maximumPoolSize的線程數(shù)會被自動釋放翘瓮,而小于corePoolSize的則不會。
- long keepAliveTime:表示線程沒有執(zhí)行任務時最多保持多久時間會終止裤翩。默認情況下资盅,只有當線程池中的線程數(shù)大于corePoolSize時,keepAliveTime才會生效,直到線程池數(shù)量不大于corePoolSize踊赠,即只有當線程池數(shù)量大于corePoolSize數(shù)量呵扛,超出這個數(shù)量的線程一旦到達keepAliveTime就會終止。但是如果調(diào)用了allowCoreThreadTimeout(boolean)方法筐带,即使線程池的線程數(shù)量不大于corePoolSize今穿,線程也會在keepAliveTime之后就終止,知道線程池的數(shù)量為0為止伦籍。
- TimeUnit unit:參數(shù)keepAliveTime的時間單位蓝晒,一個時間單位枚舉類。
- BlockingQueue workQueue:一個阻塞隊列帖鸦,用來存儲等待執(zhí)行任務的隊列芝薇,這個參數(shù)選擇也很重要,會對線程池的運行過程產(chǎn)生重大影響作儿,一般來說剩燥,這里的阻塞隊列就是(ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue灭红;)。
- ThreadFactory ThreadFactory:線程工廠口注,主要用來創(chuàng)建線程变擒;可以是一個自定義的線程工廠,默認就是Executors.defaultThreadFactory()寝志。用來在線程池中創(chuàng)建線程娇斑。
- RejectedExecutionHandler handler:表示當拒絕處理任務時的策略,也是可以自定義的材部,默認是我們前面的4種取值:
- ThreadPoolExecutor.AbortPolicy(默認的毫缆,一言不合即拋異常的)
- ThreadPoolExecutor.DiscardPolicy(一言不合就丟棄任務)
- ThreadPoolExecutor.DiscardOldestPolicy(一言不合就把最近的任務給拋棄,然后執(zhí)行當前任務)
- ThreadPoolExecutor.CallerRunsPolicy(由調(diào)用者所在線程來執(zhí)行任務)
所以想自定義線程池就可以從上面的幾個參數(shù)入手乐导。接下來具體看下代碼,了解一下實現(xiàn)原理:
// 默認異常處理機制
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
//任務緩存隊列苦丁,用來存放等待執(zhí)行的任務
private final BlockingQueue<Runnable> workQueue;
//線程池的主要狀態(tài)鎖,對線程狀態(tài)(比如線程大小物臂、runState等)的改變都需要這個鎖
private final ReentrantLock mainLock = new ReentrantLock();
//用來存放工作集
private final HashSet<Worker> workers = new HashSet<Worker>();
//volatile 可變變量關鍵字旺拉,寫的時候用mainLock做鎖,讀的時候無鎖棵磷,高性能
private volatile long keepAliveTime;
//是否允許核心線程超時
private volatile boolean allowCoreThreadTimeOut;
//核心線程數(shù)量
private volatile int corePoolSize;
//線程最大線程數(shù)量
private volatile int maximumPoolSize;
//任務拒絕策略
private volatile RejectedExcutionHandler handler;
結合之前的知識蛾狗,大概就能猜出里面是怎么實現(xiàn)的了,具體可以參考一下JDK的源代碼仪媒,這樣我們就能做到了解原理又會用了沉桌。
5.自定義實現(xiàn)一個簡單的Web請求連接池
我們來自定義一個簡單的Web請求線程池。模仿Web服務的需求場景說明如下:
- 服務器可容納的最小請求數(shù)是多少算吩。
- 可以動態(tài)擴充的請求數(shù)大小是多少留凭。
- 多久回收多余線程數(shù)即請求數(shù)。
- 用戶訪問量打了怎么處理赌莺。
- 線程隊列機制采取有優(yōu)先級的排隊的執(zhí)行機制冰抢。
根據(jù)上面的場景,看下這個線程池如何編寫艘狭?
public class MyExecutors extends Executors{
//利用默認線程工廠和PriorityBlockingQueue隊列機制鹦倚,當然了沮明,我們還可以自定義ThreadFactory和繼承queue進行自定義擴展
public static ExecutorService newMyWebThreadPool(int minSpareThreads,int maxThreads,int maxIdleTime){
return new ThreadPoolExecutor(minSpareThread,maxThreads,maxIdleTime,TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<Runnable>());
}
}
6.線程池在工作中的錯誤使用
- (1)分不清楚線程是單例還是多對象。
- (2)線程池數(shù)量設置很大邑贴。
- (3)注意死鎖問題