ThreadPollExecutor的源碼中有一段
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
COUNT_BITS 的十進制值是29, 它是用來干什么的呢?
RUNNING用二進制值表示是 11100000 00000000 00000000 00000000
SHUTDOWN用二進制表示是 00000000 00000000 00000000 00000000
STOP用二進制表示是 00100000 00000000 00000000 00000000
TIDYING 用二進制表示 01000000 00000000 00000000 00000000
TERMINATED 二進制表示 01100000 00000000 00000000 00000000
可見這幾個值就是用一個32位的數(shù)(即Integer) 的前三位表示,
private static int ctlOf(int rs, int wc) { return rs | wc; }
那么ctl是一個什么樣的值呢
可以看出來ctl的初始值 RUNNING | 0 ,結果是RUNNING 也就是 11100000 00000000 00000000 00000000
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
ThreadPoolExecutor的execute
方法中, 先獲取了ctl
然后執(zhí)行workerCountOf(c),我們看看workerCountOf(c)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static int workerCountOf(int c) { return c & CAPACITY; }
CAPACITY的二進制值 1左移29位減去1.也就是 00011111 11111111 11111111 11111111
表示的是后29位表示容量
ctl & CAPACITY 得到的就是 后29位表示的值,
由此可知,
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
這段代碼的意思是, ctl這個int值 的后29位如果小于 corePoolSize
, 就執(zhí)行addWorker
看看addWorker的代碼
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
//................為了方便查看,這里省略了一些代碼
}
return workerStarted;
}
for循環(huán)里第二行 runStateOf
private static int runStateOf(int c) { return c & ~CAPACITY; }
~CAPACITY表示的當然就是 11100000 00000000 00000000 00000000
所以這個代碼是取ctl的高三位, 也就是狀態(tài)部分
下面的判斷語句,
rs >= SHUTDOWN 也就是說 rs是 除了RUNNING以外的狀態(tài)值, 因為RUNNING是負值,SHUTDOWN等都是正值
也就是說這里的if語句 如果當前ctl獲得的狀態(tài)不是RUNNING 并且 狀態(tài)等于SHUTDOWN或者firstTask不為空, 或者workQueue為空 ,就直接return false
那么走下來的一定是, ctl獲得的狀態(tài)是RUNNING 或者 (狀態(tài)不是SHUTDOWN, 且firstTask為空, 且workQueue不為空)
這種情況下走進下一個for循環(huán)
里面如果worker的數(shù)量大于最大容量CAPACITY 也就是2^29, 或者大于core/maximumPoolSize,直接返回false
否則執(zhí)行CAI
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
這個方法的代碼是把ctl的值原子的+1
所以結論
ctl是存放線程池狀態(tài)和工作線程數(shù)量的變量
前三位存放線程池的數(shù)量.后29位存放wrokerCount, 通過移位運算和邏輯運算來獲取具體的值
為什么要這么做呢?
實際上是為了在存儲workCount和線程池狀態(tài)的時候要保證線程安全,這樣做的話就可以只用一個AtomicInteger保證了線程安全.減少了一定的資源消耗