線程池簡(jiǎn)單實(shí)現(xiàn)
package Thread;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadPoolDemo {
//1.阻塞隊(duì)列
private BlockQueue<Runnable> taskQueue;
//2.核心線程數(shù)
private int coreSize;
//3.獲取任務(wù)的超時(shí)時(shí)間
private long timeout;
//4.時(shí)間轉(zhuǎn)換
private TimeUnit timeUnit;
//5、線程集合
HashSet<Worker> workers = new HashSet<>();
//6.拒絕策略
private RejectPolicyDemo<Runnable> rejectPolicy;
public ThreadPoolDemo(int coreSize, long timeout, TimeUnit timeUnit , int queueCap , RejectPolicyDemo<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockQueue<Runnable>(queueCap);
this.rejectPolicy = rejectPolicy;
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
//1.當(dāng)task 不為空缺菌,執(zhí)行任務(wù)
//2.當(dāng)task 執(zhí)行完畢夕玩,再接著從任務(wù)隊(duì)列獲取任務(wù)繼續(xù)執(zhí)行
// while(task != null || (task = taskQueue.take()) != null){ //該策略會(huì)死等 闷串,就算線程池為空窘拯,也會(huì)一直等待
while(task != null || (task = taskQueue.poll(timeout,timeUnit)) != null){
try{
task.run();
}catch (Exception e){
e.printStackTrace();
}finally {
task = null; //沒(méi)有任務(wù)了 將task置為null
}
}
synchronized (workers){
workers.remove(this);
}
}
}
//執(zhí)行任務(wù)
public void excute(Runnable task){
//如果任務(wù)數(shù)沒(méi)有超過(guò) coresize 時(shí) 瘩蚪,直接交給worker對(duì)象執(zhí)行
//如果超過(guò)了 coresize時(shí) 就乓,將任務(wù)加入到阻塞隊(duì)列中
synchronized (workers){
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
}
else {
//可選擇阻塞隊(duì)列滿足后 選擇拒絕策略
// taskQueue.put(task);
//1.死等
//2 帶超時(shí)等待
//3 讓調(diào)用者放棄任務(wù)
//4 讓調(diào)用者拋出異常
//5 讓調(diào)用者自己執(zhí)行任務(wù)
taskQueue.tryPut(rejectPolicy , task);
}
}
}
public static void main(String[] args) {
ThreadPoolDemo threadPool = new ThreadPoolDemo(2 , 1000 , TimeUnit.MILLISECONDS , 100 ,
(queue, task)->{
//1.死等
// queue.put(task);
//2 帶超時(shí)等待
// queue.offer(task , 500 ,TimeUnit.MILLISECONDS );
//3 讓調(diào)用者放棄任務(wù)
// System.out.println("放棄任務(wù)");
//4 讓調(diào)用者拋出異常
// throw new RuntimeException("運(yùn)行失敗"+task); 拋出異常后 后續(xù)的任務(wù)不會(huì)再執(zhí)行
//5 讓調(diào)用者自己執(zhí)行任務(wù)
task.run();
});
}
}
@FunctionalInterface
interface RejectPolicyDemo<T>{
void reject(BlockQueue<T> queue , T task);
}
class BlockQueue<T> {
public BlockQueue() {
}
public BlockQueue(int capcity) {
this.capcity = capcity;
}
// 1.阻塞隊(duì)列
private Deque<T> queue = new ArrayDeque<>();
//2.鎖
private ReentrantLock lock = new ReentrantLock();
//3. 生產(chǎn)者變量
private Condition fullWaitSet = lock.newCondition();
//4.消費(fèi)者變量
private Condition emptyWaitSet = lock.newCondition();
//5.容量
private int capcity;
//帶超時(shí)的阻塞獲取
public T poll(long timeout , TimeUnit unit){
lock.lock();
try {
//將timeout 統(tǒng)一轉(zhuǎn)換成 納秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
if(nanos <= 0){
//超時(shí)
return null;
}
//防止虛假喚醒汉匙,返回的是剩余時(shí)間
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}
finally {
lock.unlock();
}
}
//阻塞獲取
public T take(){
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}
finally {
lock.unlock();
}
}
//帶超時(shí)時(shí)間的阻塞添加
public boolean offer(T task , long timeout , TimeUnit unit){
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while(capcity == queue.size()){
try {
if(nanos <= 0){
return false;
}
fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
//阻塞添加
public void put(T element){
lock.lock();
try {
while(capcity == queue.size()){
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(element);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
public int size(){
lock.unlock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
public void tryPut(RejectPolicyDemo<T> rejectPolicy, T task) {
lock.lock();
try {
//判斷隊(duì)列是否已滿
if(queue.size() == capcity){
rejectPolicy.reject(this , task);
}else { //有空閑
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}
源碼
ThreadPoolExecutor 使用 int 的高 3 位來(lái)表示線程池狀態(tài),低 29 位表示線程數(shù)量生蚁,ThreadPoolExecutor 類中的線程狀態(tài)變量如下:
// Integer.SIZE 值為 32
private static final int COUNT_BITS = Integer.SIZE - 3;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
狀態(tài)名 | 高三位 | 接受新任務(wù) | 處理阻塞隊(duì)列任務(wù) | 說(shuō)明 |
---|---|---|---|---|
RUNNING | 111 | Y | Y | 接收新任務(wù)噩翠,同時(shí)處理任務(wù)隊(duì)列中的任務(wù) |
SHUTDOWN | 000 | N | Y | 不接受新任務(wù),但是處理任務(wù)隊(duì)列中的任務(wù) |
STOP | 001 | N | N | 中斷正在執(zhí)行的任務(wù)邦投,同時(shí)拋棄阻塞隊(duì)列中的任務(wù) |
TIDYING | 010 | - | - | 任務(wù)執(zhí)行完畢绎秒,活動(dòng)線程為0時(shí),即將進(jìn)入終結(jié)階段 |
TERMINATED | 011 | - | - | 終結(jié)狀態(tài) |
從數(shù)字上面比較尼摹,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
線程池狀態(tài)和線程池中線程的數(shù)量由一個(gè)原子整型ctl來(lái)共同表示
- 使用一個(gè)數(shù)來(lái)表示兩個(gè)值的主要原因時(shí):可以通過(guò)一次cas同時(shí)更改兩個(gè)屬性的值
構(gòu)造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
構(gòu)造參數(shù)解釋:
- corePoolSize:核心線程數(shù)
- maximumPoolSize:最大線程數(shù)
- maximumPoolSize - corePoolSize = 救急線程數(shù)
- keepAliveTime:救急線程空閑時(shí)的最大生存時(shí)間
- unit:時(shí)間單位
- workQueue:阻塞隊(duì)列(存放任務(wù))
- 有界阻塞隊(duì)列 ArrayBlockingQueue
- 無(wú)界阻塞隊(duì)列 LinkedBlockingQueue
- 最多只有一個(gè)同步元素的隊(duì)列 SynchronousQueue
- 優(yōu)先隊(duì)列 PriorityBlockingQueue
- threadFactory:線程工廠(給線程取名字)
- handler:拒絕策略
救急線程在核心線程和阻塞隊(duì)列都放不下了才會(huì)使用
工作方式:
- 線程池中剛開(kāi)始沒(méi)有線程见芹,當(dāng)一個(gè)任務(wù)提交給線程池后,線程池會(huì)創(chuàng)建一個(gè)新線程來(lái)執(zhí)行任務(wù)蠢涝。
- 當(dāng)線程數(shù)達(dá)到 corePoolSize 并沒(méi)有線程空閑玄呛,這時(shí)再加入任務(wù),新加的任務(wù)會(huì)被加入 workQueue 隊(duì)列排 隊(duì)和二,直到有空閑的線程徘铝。
- 如果隊(duì)列選擇了有界隊(duì)列,那么任務(wù)超過(guò)了隊(duì)列大小時(shí)惯吕,會(huì)創(chuàng)建 maximumPoolSize - corePoolSize 數(shù)目的線 程來(lái)救急惕它。
- 如果線程到達(dá) maximumPoolSize 仍然有新任務(wù)這時(shí)會(huì)執(zhí)行拒絕策略。拒絕策略 jdk 提供了 下面的前 4 種實(shí)現(xiàn)废登,其它著名框架也提供了實(shí)現(xiàn)
- ThreadPoolExecutor.AbortPolicy 讓調(diào)用者拋出RejectedExecutionException 異常淹魄,這是默認(rèn)策略
- ThreadPoolExecutor.CallerRunsPolicy 讓調(diào)用者運(yùn)行任務(wù)
- ThreadPoolExecutor.DiscardPolicy 放棄本次任務(wù)
- ThreadPoolExecutor.DiscardOldestPolicy 放棄隊(duì)列中最早的任務(wù),本任務(wù)取而代之
- Dubbo 的實(shí)現(xiàn)堡距,在拋出 RejectedExecutionException 異常之前會(huì)記錄日志甲锡,并 dump 線程棧信息,方 便定位問(wèn)題
- Netty 的實(shí)現(xiàn)羽戒,是創(chuàng)建一個(gè)新線程來(lái)執(zhí)行任務(wù)
- ActiveMQ 的實(shí)現(xiàn)缤沦,帶超時(shí)等待(60s)嘗試放入隊(duì)列,類似我們之前自定義的拒絕策略
- PinPoint 的實(shí)現(xiàn)易稠,它使用了一個(gè)拒絕策略鏈缸废,會(huì)逐一嘗試策略鏈中每種拒絕策略
-
當(dāng)高峰過(guò)去后,超過(guò) corePoolSize 的救急線程如果一段時(shí)間沒(méi)有任務(wù)做,需要結(jié)束節(jié)省資源企量,這個(gè)時(shí)間由 keepAliveTime 和 unit 來(lái)控制测萎。
總覽
ThreadPoolExecutor
ThreadPoolExecutor 是 JDK 中的線程池實(shí)現(xiàn),這個(gè)類實(shí)現(xiàn)了一個(gè)線程池需要的各個(gè)方法梁钾,它實(shí)現(xiàn)了任務(wù)提交绳泉、線程管理逊抡、監(jiān)控等等方法姆泻。
我們經(jīng)常會(huì)使用Executors
這個(gè)工具類來(lái)快速構(gòu)造一個(gè)線程池,對(duì)于初學(xué)者而言冒嫡,這種工具類是很有用的拇勃,開(kāi)發(fā)者不需要關(guān)注太多的細(xì)節(jié),只要知道自己需要一個(gè)線程池孝凌,僅僅提供必需的參數(shù)就可以了方咆,其他參數(shù)都采用作者提供的默認(rèn)值。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
這兩個(gè)方法都會(huì)進(jìn)行使用ThreadPoolExecutor
來(lái)創(chuàng)建一個(gè)ThreadPoolExecutor實(shí)例(具體可見(jiàn)前面構(gòu)造方法)
Doug Lea 采用一個(gè) 32 位的整數(shù)來(lái)存放線程池的狀態(tài)和當(dāng)前池中的線程數(shù)蟀架,其中高 3 位用于存放線程池狀態(tài)瓣赂,低 29 位表示線程數(shù)(即使只有 29 位,也已經(jīng)不小了片拍,大概 5 億多煌集,現(xiàn)在還沒(méi)有哪個(gè)機(jī)器能起這么多線程的吧)。
核心方法 execute()
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果當(dāng)前線程數(shù)少于核心線程數(shù)捌省,那么直接添加一個(gè) worker 來(lái)執(zhí)行任務(wù)苫纤,
// 創(chuàng)建一個(gè)新的線程,并把當(dāng)前任務(wù) command 作為這個(gè)線程的第一個(gè)任務(wù)(firstTask)
if (workerCountOf(c) < corePoolSize) {
// 添加任務(wù)成功纲缓,那么就結(jié)束了卷拘。提交任務(wù)嘛,線程池已經(jīng)接受了這個(gè)任務(wù)祝高,這個(gè)方法也就可以返回了
// 至于執(zhí)行的結(jié)果栗弟,到時(shí)候會(huì)包裝到 FutureTask 中。
// 返回 false 代表線程池不允許提交任務(wù)
if (addWorker(command, true))
return;
// 前面說(shuō)的那個(gè)表示 “線程池狀態(tài)” 和 “線程數(shù)” 的整數(shù)
c = ctl.get();
}
// 到這里說(shuō)明工闺,要么當(dāng)前線程數(shù)大于等于核心線程數(shù)横腿,要么剛剛 addWorker 失敗了
// 如果線程池處于 RUNNING 狀態(tài),把這個(gè)任務(wù)添加到任務(wù)隊(duì)列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果線程池已不處于 RUNNING 狀態(tài)斤寂,那么移除已經(jīng)入隊(duì)的這個(gè)任務(wù)耿焊,并且執(zhí)行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果線程池還是 RUNNING 的,并且線程數(shù)為 0遍搞,那么開(kāi)啟新的線程
// 到這里罗侯,我們知道了,這塊代碼的真正意圖是:擔(dān)心任務(wù)提交到隊(duì)列中了溪猿,但是線程都關(guān)閉了
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果 workQueue 隊(duì)列滿了钩杰,那么進(jìn)入到這個(gè)分支
// 以 maximumPoolSize 為界創(chuàng)建新的 worker纫塌,
// 如果失敗,說(shuō)明當(dāng)前線程數(shù)已經(jīng)達(dá)到 maximumPoolSize讲弄,執(zhí)行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
四個(gè)拒絕策略的具體實(shí)現(xiàn)
// 只要線程池沒(méi)有被關(guān)閉措左,那么由提交任務(wù)的線程自己來(lái)執(zhí)行這個(gè)任務(wù)。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
// 不管怎樣避除,直接拋出 RejectedExecutionException 異常
// 這個(gè)是默認(rèn)的策略怎披,如果我們構(gòu)造線程池的時(shí)候不傳相應(yīng)的 handler 的話,那就會(huì)指定使用這個(gè)
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
// 不做任何處理瓶摆,直接忽略掉這個(gè)任務(wù)
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
// 這個(gè)相對(duì)霸道一點(diǎn)凉逛,如果線程池沒(méi)有被關(guān)閉的話,
// 把隊(duì)列隊(duì)頭的任務(wù)(也就是等待了最長(zhǎng)時(shí)間的)直接扔掉群井,然后提交這個(gè)任務(wù)到等待隊(duì)列中
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
Executor生成不一樣的連接池
- 生成一個(gè)固定大小的線程池:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
最大線程數(shù)設(shè)置為與核心線程數(shù)相等状飞,此時(shí) keepAliveTime 設(shè)置為 0(因?yàn)檫@里它是沒(méi)用的,即使不為 0书斜,線程池默認(rèn)也不會(huì)回收 corePoolSize 內(nèi)的線程)诬辈,任務(wù)隊(duì)列采用 LinkedBlockingQueue,無(wú)界隊(duì)列荐吉。
過(guò)程分析:剛開(kāi)始焙糟,每提交一個(gè)任務(wù)都創(chuàng)建一個(gè) worker,當(dāng) worker 的數(shù)量達(dá)到 nThreads 后稍坯,不再創(chuàng)建新的線程酬荞,而是把任務(wù)提交到 LinkedBlockingQueue 中,而且之后線程數(shù)始終為 nThreads瞧哟。
- 生成只有一個(gè)線程的固定線程池混巧,這個(gè)更簡(jiǎn)單,和上面的一樣勤揩,只要設(shè)置線程數(shù)為 1 就可以了:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 生成一個(gè)需要的時(shí)候就創(chuàng)建新的線程咧党,同時(shí)可以復(fù)用之前創(chuàng)建的線程(如果這個(gè)線程當(dāng)前沒(méi)有任務(wù))的線程池:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心線程數(shù)為 0,最大線程數(shù)為 Integer.MAX_VALUE陨亡,keepAliveTime 為 60 秒傍衡,任務(wù)隊(duì)列采用 SynchronousQueue。于 corePoolSize 是 0负蠕,那么提交任務(wù)的時(shí)候蛙埂,直接將任務(wù)提交到隊(duì)列中。
總結(jié)
- java線程池七大屬性
corePoolSize遮糖,
maximumPoolSize绣的,
workQueue,
keepAliveTime,
unit屡江,
threadFactory芭概,
rejectedExecutionHandler
- 線程池中的線程創(chuàng)建時(shí)機(jī)
- 如果當(dāng)前線程數(shù)少于 corePoolSize,那么提交任務(wù)的時(shí)候創(chuàng)建一個(gè)新的線程惩嘉,并由這個(gè)線程執(zhí)行這個(gè)任務(wù)罢洲;
- 如果當(dāng)前線程數(shù)已經(jīng)達(dá)到 corePoolSize,那么將提交的任務(wù)添加到隊(duì)列中文黎,等待線程池中的線程去隊(duì)列中取任務(wù)惹苗;
- 如果隊(duì)列已滿,那么創(chuàng)建新的線程來(lái)執(zhí)行任務(wù)臊诊,需要保證池中的線程數(shù)不會(huì)超過(guò) maximumPoolSize鸽粉,如果此時(shí)線程數(shù)超過(guò)了 maximumPoolSize斜脂,那么執(zhí)行拒絕策略抓艳。
- 任務(wù)執(zhí)行過(guò)程中發(fā)生異處理
如果某個(gè)任務(wù)執(zhí)行出現(xiàn)異常,那么執(zhí)行任務(wù)的線程會(huì)被關(guān)閉帚戳,而不是繼續(xù)接收其他任務(wù)玷或。然后會(huì)啟動(dòng)一個(gè)新的線程來(lái)代替它。
- 執(zhí)行拒絕策略的時(shí)機(jī)
- workers 的數(shù)量達(dá)到了 corePoolSize(任務(wù)此時(shí)需要進(jìn)入任務(wù)隊(duì)列)片任,任務(wù)入隊(duì)成功偏友,與此同時(shí)線程池被關(guān)閉了,而且關(guān)閉線程池并沒(méi)有將這個(gè)任務(wù)出隊(duì)对供,那么執(zhí)行拒絕策略位他。這里說(shuō)的是非常邊界的問(wèn)題,入隊(duì)和關(guān)閉線程池并發(fā)執(zhí)行产场,讀者仔細(xì)看看 execute 方法是怎么進(jìn)到第一個(gè) reject(command) 里面的鹅髓。
- workers 的數(shù)量大于等于 corePoolSize,將任務(wù)加入到任務(wù)隊(duì)列京景,可是隊(duì)列滿了窿冯,任務(wù)入隊(duì)失敗,那么準(zhǔn)備開(kāi)啟新的線程确徙,可是線程數(shù)已經(jīng)達(dá)到 maximumPoolSize醒串,那么執(zhí)行拒絕策略。