1吵冒、線程池ThreadPoolExecutor介紹
構(gòu)造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
默認(rèn)構(gòu)造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
默認(rèn)的線程工廠和拒絕策略
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
corePoolSize
線程池中的核心線程數(shù)椿猎,當(dāng)提交一個任務(wù)時,線程池創(chuàng)建一個新線程執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于corePoolSize;如果當(dāng)前線程數(shù)為corePoolSize啡省,繼續(xù)提交的任務(wù)被保存到阻塞隊列中,等待被執(zhí)行髓霞;如果執(zhí)行了線程池的prestartAllCoreThreads()方法卦睹,線程池會提前創(chuàng)建并啟動所有核心線程。
maximumPoolSize
線程池中允許的最大線程數(shù)方库。如果當(dāng)前阻塞隊列滿了结序,且繼續(xù)提交任務(wù),則創(chuàng)建新的線程執(zhí)行任務(wù)纵潦,前提是當(dāng)前線程數(shù)小于maximumPoolSize徐鹤;
keepAliveTime
線程空閑時的存活時間配喳,即當(dāng)線程沒有任務(wù)執(zhí)行時,繼續(xù)存活的時間凳干;默認(rèn)情況下,該參數(shù)只在線程數(shù)大于corePoolSize時才有用被济;
unit
keepAliveTime的單位救赐;
workQueue
用來保存等待被執(zhí)行的任務(wù)的阻塞隊列,且任務(wù)必須實現(xiàn)Runable接口只磷,在JDK中提供了如下阻塞隊列:
1经磅、ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊列,按FIFO排序任務(wù)钮追;
2预厌、LinkedBlockingQuene:基于鏈表結(jié)構(gòu)的阻塞隊列,按FIFO排序任務(wù)元媚,吞吐量通常要高于ArrayBlockingQuene轧叽;
3、SynchronousQuene:一個不存儲元素的阻塞隊列刊棕,每個插入操作必須等到另一個線程調(diào)用移除操作巧娱,否則插入操作一直處于阻塞狀態(tài)碌秸,吞吐量通常要高于LinkedBlockingQuene;
4、priorityBlockingQuene:具有優(yōu)先級的無界阻塞隊列墓怀;
threadFactory
創(chuàng)建線程的工廠,通過自定義的線程工廠可以給每個新建的線程設(shè)置一個具有識別度的線程名壶愤。
handler
線程池的飽和策略对雪,當(dāng)阻塞隊列滿了,且沒有空閑的工作線程当犯,如果繼續(xù)提交任務(wù)垢村,必須采取一種策略處理該任務(wù),線程池提供了4種策略:
1灶壶、AbortPolicy:直接拋出異常肝断,默認(rèn)策略;
2驰凛、CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù)胸懈;
3、DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù)恰响,并執(zhí)行當(dāng)前任務(wù)趣钱;
4、DiscardPolicy:直接丟棄任務(wù)胚宦;
當(dāng)然也可以根據(jù)應(yīng)用場景實現(xiàn)RejectedExecutionHandler接口首有,自定義飽和策略燕垃,如記錄日志或持久化存儲不能處理的任務(wù)。
Executors
Executors.newFixedThreadPool創(chuàng)建的線程池corePoolSize和maximumPoolSize值是相等的井联,它使用的LinkedBlockingQueue卜壕;
Executors.newSingleThreadExecutor將corePoolSize和maximumPoolSize都設(shè)置為1,也使用的LinkedBlockingQueue烙常;
Executors.newCachedThreadPool將corePoolSize設(shè)置為0轴捎,將maximumPoolSize設(shè)置為Integer.MAX_VALUE,使用的SynchronousQueue蚕脏,也就是說來了任務(wù)就創(chuàng)建線程運行侦副,當(dāng)線程空閑超過60秒,就銷毀線程驼鞭。
2秦驯、實現(xiàn)原理
線程池里面的線程的時序圖
線程池內(nèi)部狀態(tài)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
其中AtomicInteger變量ctl的功能非常強大:利用低29位表示線程池中線程數(shù),通過高3位表示線程池的運行狀態(tài):
1挣棕、RUNNING:-1 << COUNT_BITS译隘,即高3位為111,該狀態(tài)的線程池會接收新任務(wù)洛心,并處理阻塞隊列中的任務(wù)细燎;
2、SHUTDOWN: 0 << COUNT_BITS皂甘,即高3位為000玻驻,該狀態(tài)的線程池不會接收新任務(wù),但會處理阻塞隊列中的任務(wù)偿枕;
3璧瞬、STOP : 1 << COUNT_BITS,即高3位為001渐夸,該狀態(tài)的線程不會接收新任務(wù)嗤锉,也不會處理阻塞隊列中的任務(wù),而且會中斷正在運行的任務(wù)墓塌;
4瘟忱、TIDYING : 2 << COUNT_BITS,即高3位為010苫幢;
5访诱、TERMINATED: 3 << COUNT_BITS,即高3位為011韩肝;
任務(wù)提交
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
//submit調(diào)用execute方法
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
小于核心線程數(shù)corePoolSize直接增加線程(第1步)触菜,否則向阻塞隊列添加任務(wù)(第2步),如果阻塞隊列已滿且小于最大線程數(shù)則增加線程(第3步)哀峻,超過最大線程數(shù)執(zhí)行拒絕策略(第4步)涡相。
Worker類實現(xiàn)
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker類設(shè)計如下:
1哲泊、繼承了AQS類,可以方便的實現(xiàn)工作線程的同步和中止操作(催蝗?)切威;
2、實現(xiàn)了Runnable接口丙号,可以將自身作為一個任務(wù)在工作線程中執(zhí)行牢屋;
3、當(dāng)前提交的任務(wù)firstTask作為參數(shù)傳入Worker的構(gòu)造方法槽袄;
4、線程工廠在創(chuàng)建線程thread時锋谐,將Woker實例本身this作為參數(shù)傳入遍尺,當(dāng)執(zhí)行start方法啟動線程thread時,本質(zhì)是執(zhí)行了Worker的runWorker方法涮拗。
addWorker方法實現(xiàn)
private boolean addWorker(Runnable firstTask, boolean core) {
//狀態(tài)和線程數(shù)判斷
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//HashSet中添加Worker乾戏,并執(zhí)行start方法
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);//執(zhí)行new Worker方法,創(chuàng)建線程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWoker方法實現(xiàn)的前半部分:
1三热、判斷線程池的狀態(tài)鼓择,如果線程池的狀態(tài)值大于或等SHUTDOWN,則不處理提交的任務(wù)就漾,直接返回呐能;
2、通過參數(shù)core判斷當(dāng)前需要創(chuàng)建的線程是否為核心線程抑堡,如果core為true摆出,且當(dāng)前線程數(shù)小于corePoolSize,則跳出循環(huán)首妖,開始創(chuàng)建新的線程偎漫。
addWoker方法實現(xiàn)的前半部分:線程池的工作線程通過Woker類實現(xiàn),在ReentrantLock鎖的保證下有缆,把Woker實例插入到HashSet后象踊,并啟動Woker中的線程( t.start();最終調(diào)用Woker的runWorker方法)
runWorker實現(xiàn)
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker方法是線程池的核心:
1、線程啟動之后棚壁,通過unlock方法釋放鎖杯矩,設(shè)置AQS的state為0,表示運行中斷袖外;
2菊碟、獲取第一個任務(wù)firstTask,執(zhí)行任務(wù)的run方法在刺,不過在執(zhí)行任務(wù)之前逆害,會進行加鎖操作头镊,任務(wù)執(zhí)行完會釋放鎖;
3魄幕、在執(zhí)行任務(wù)的前后相艇,可以根據(jù)業(yè)務(wù)場景自定義beforeExecute和afterExecute鉤子方法;
4纯陨、firstTask執(zhí)行完成之后坛芽,(循環(huán))通過getTask方法從阻塞隊列中獲取等待的任務(wù),如果隊列中沒有任務(wù)翼抠,getTask方法會被阻塞并掛起咙轩,不會占用cpu資源;
getTask實現(xiàn)
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
整個getTask操作在自旋下完成:
1阴颖、workQueue.take:如果阻塞隊列為空活喊,當(dāng)前線程會被掛起等待;當(dāng)隊列中有任務(wù)加入時量愧,線程被喚醒钾菊,take方法返回任務(wù),并執(zhí)行偎肃;
2煞烫、workQueue.poll:如果在keepAliveTime時間內(nèi),阻塞隊列還是沒有任務(wù)累颂,則返回null滞详;(?keepAliveTime作用)
所以紊馏,線程池中實現(xiàn)的線程可以一直執(zhí)行由用戶提交的任務(wù)茵宪。
FutureTask.get實現(xiàn)
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
內(nèi)部通過awaitDone方法對主線程進行阻塞:
1、如果主線程被中斷瘦棋,則拋出中斷異常稀火;
2、判斷FutureTask當(dāng)前的state赌朋,如果大于COMPLETING凰狞,說明任務(wù)已經(jīng)執(zhí)行完成,則直接返回沛慢;
3赡若、如果當(dāng)前state等于COMPLETING,說明任務(wù)已經(jīng)執(zhí)行完团甲,這時主線程只需通過yield方法讓出cpu資源逾冬,等待state變成NORMAL;
4、通過WaitNode類封裝當(dāng)前線程身腻,并通過UNSAFE添加到waiters鏈表产还;
5、最終通過LockSupport的park或parkNanos掛起線程嘀趟;
refer
http://www.reibang.com/p/87bff5cc8d8c
http://blog.csdn.net/wxq544483342/article/details/53118674
http://blog.csdn.net/evankaka/article/details/51489322