一享甸、通過(guò)ReentrantLock來(lái)分析AbstractQueuedSynchronizer源碼
//初始化一個(gè)公平鎖
ReentrantLock lock = new ReentrantLock(true);
加鎖lock
//java.util.concurrent.locks.ReentrantLock.FairSync
final void lock() {
acquire(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 線程嘗試獲取鎖 tryAcquire
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
列出一些AQS中的對(duì)象屬性域
//If head exists, its waitStatus is guaranteed not to be CANCELLED
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
private Thread exclusiveOwnerThread;
若state == 0;
若等待隊(duì)列中沒(méi)有線程排隊(duì),compareAndSetState(0, acquires)成功暮蹂;
setExclusiveOwnerThread(current)搞糕,返回true贩虾。表示加鎖成功;
若current == getExclusiveOwnerThread();
state += acquires旅薄,更新state旷赖,返回true顺又。表示鎖重入成功
判斷等待隊(duì)列中是否有線程排隊(duì)。若 head == tail 說(shuō)明等待隊(duì)列沒(méi)有線線程入隊(duì)等孵;若 head != tail稚照,head.next != null,當(dāng)前的線程與head.next中的線程不同俯萌,說(shuō)明等待隊(duì)列中有線程在等待果录;若 head != tail,head.next == null 咐熙,說(shuō)明等待隊(duì)列中有線程在等待弱恒。(這種情況怎么產(chǎn)生的?)
- tryAcquired失敗棋恼,創(chuàng)建node并加入等待隊(duì)列 addWaiter
//return the new node
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//return the node's predecessor
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
列出一些Node中的對(duì)象屬性域
//The field is initialized to 0 for normal sync nodes, and CONDITION for condition nodes.
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
最初等待隊(duì)列未初始化返弹,head和tail都為null,通過(guò)CAS競(jìng)爭(zhēng)某個(gè)線程會(huì)將head和tail初始化為一個(gè)空Node爪飘;
不斷嘗試讓當(dāng)前Node成為tail义起,并返回此Node;
- 線程再次嘗試獲取鎖或者Park
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
若當(dāng)前節(jié)點(diǎn)是等待隊(duì)列中的第一個(gè)節(jié)點(diǎn)师崎,若嘗試獲取鎖成功默终,設(shè)置head為當(dāng)前Node,返回中斷狀態(tài)犁罩;
判斷是否需要Park齐蔽,若需要?jiǎng)tPark當(dāng)前線程;
判斷當(dāng)前線程是否需要Park床估,總是檢查前一個(gè)Node的waitStatus是否為-1含滴,一般Node初始化時(shí)設(shè)置為0。所以Park當(dāng)前線程總會(huì)經(jīng)歷這樣一個(gè)流程顷窒,將前一個(gè)Node的waitStatus設(shè)置為-1蛙吏,然后Park當(dāng)前線程。
解鎖release
//java.util.concurrent.locks.ReentrantLock.FairSync
public void unlock() {
sync.release(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- 解鎖 tryRelease
//java.util.concurrent.locks.ReentrantLock.Sync
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
若當(dāng)前線程沒(méi)有持有鎖則拋出異常鞋吉;
state -= releases鸦做,若 state == 0,設(shè)置setExclusiveOwnerThread(null)谓着,設(shè)置解鎖狀態(tài) free = true泼诱;
更新state;
返回解鎖狀態(tài) free;
- tryRelease成功,UnPark等待隊(duì)列中的第一個(gè)線程
顯示鎖的API | 特性 |
---|---|
public boolean tryLock() | 可輪詢赊锚,非阻塞治筒。非公平 |
public final boolean tryAcquireNanos(int arg, long nanosTimeout) | 可配置超時(shí)屉栓,可定時(shí)中斷。 |
public void lockInterruptibly() throws InterruptedException | 可中斷 |
條件隊(duì)列Condition
//java.util.concurrent.locks.ReentrantLock.Sync
final ConditionObject newCondition() {
return new ConditionObject();
}
列出一些ConditionObject中的對(duì)象屬性域
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
- 等待 await
public final void await() throws InterruptedException {
//首先檢查中斷
if (Thread.interrupted())
throw new InterruptedException();
//將當(dāng)前線程包裝成Node加入“條件隊(duì)列”
Node node = addConditionWaiter();
//解鎖(包括重入鎖)耸袜,保留state(下次獲取鎖時(shí)會(huì)用到)
int savedState = fullyRelease(node);
int interruptMode = 0;
//Node沒(méi)有轉(zhuǎn)移到“阻塞隊(duì)列”友多,則Park。signal喚醒會(huì)使得Node轉(zhuǎn)移到阻塞隊(duì)列(相當(dāng)于擁有了競(jìng)爭(zhēng)鎖的權(quán)力)
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//嘗試獲取鎖或者被Park
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
- 喚醒 signal
public final void signal() {
//檢查當(dāng)前線程是否持有鎖(上面await方法在fullyRelease時(shí)執(zhí)行了隱式檢查)
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//喚醒“條件隊(duì)列”中第一個(gè)Node
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
//設(shè)置firstWaiter 為下一個(gè)節(jié)點(diǎn)堤框,若下一個(gè)節(jié)點(diǎn)為null則表明“條件隊(duì)列”中已經(jīng)沒(méi)有Node了域滥,設(shè)置lastWaiter為null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//first節(jié)點(diǎn)將被轉(zhuǎn)移到“阻塞隊(duì)列”,沒(méi)有必要保留與下一個(gè)Node的關(guān)系了
first.nextWaiter = null;
} while (!transferForSignal(first) &&//轉(zhuǎn)移Node失敗則嘗試轉(zhuǎn)移下一個(gè)Node
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//將Node加入“阻塞隊(duì)列”
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//unpark這個(gè)Node
LockSupport.unpark(node.thread);
return true;
}
二蜈抓、常用工具類CountDownLatch启绰、CyclicBarrier 、Semaphore沟使、ReadWriteLock
CountDownLatch
//java.util.concurrent.CountDownLatch
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
//java.util.concurrent.CountDownLatch.Sync
Sync(int count) {
setState(count);
}
初始化CountDownLatch委可,實(shí)質(zhì)上就是設(shè)置了state值。
- 等待 await
//java.util.concurrent.CountDownLatch
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//響應(yīng)中斷
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//java.util.concurrent.CountDownLatch.Sync
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
state沒(méi)有countdown到0腊嗡,則會(huì)執(zhí)行下面的方法着倾。
喚醒隊(duì)列中所有線程(state為0時(shí))或Park當(dāng)前線程
//java.util.concurrent.locks.AbstractQueuedSynchronizer
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//包裝當(dāng)前線程為Node并加到“阻塞隊(duì)列”
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//若為“阻塞隊(duì)列”中的頭節(jié)點(diǎn),檢查state是否為0燕少,若為0
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//會(huì)喚醒下一個(gè)Node
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//Park當(dāng)前線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
//將當(dāng)前Node設(shè)置為“阻塞隊(duì)列”的頭節(jié)點(diǎn)屈呕,執(zhí)行doReleaseShared(喚醒“阻塞隊(duì)列”中排頭Node)
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
//ShareMode模式下的釋放操作(喚醒后繼節(jié)點(diǎn),保證傳播棺亭。對(duì)于ExclusiveMode,釋放通過(guò)多次調(diào)用unparkSuccessor)
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
//如果當(dāng)前Node的下一個(gè)節(jié)點(diǎn)存在蟋软,Unpark它镶摘。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
- 遞減state countDown
//java.util.concurrent.CountDownLatch
public void countDown() {
sync.releaseShared(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
//state為0時(shí),執(zhí)行doReleaseShared(喚醒“阻塞隊(duì)列”中其他Node)
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
//java.util.concurrent.CountDownLatch.Sync
//遞減state
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
CyclicBarrier
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
初始化CyclicBarrier
- 等待 await
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//所有線程到達(dá)“柵欄”后岳守,由最后一個(gè)到達(dá)的線程執(zhí)行barrierCommand凄敢。并且喚醒其他await的線程。
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
//線程await湿痢,這里會(huì)釋放鎖
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
列出一些CyclicBarrier中的對(duì)象屬性域
//The number of parties
private final int parties;
//Number of parties still waiting
private int count;
private final ReentrantLock lock = new ReentrantLock();
//Condition to wait on until tripped
private final Condition trip = lock.newCondition();
//The command to run when tripped
private final Runnable barrierCommand;
private Generation generation = new Generation();
//用于重置“柵欄”
private static class Generation {
boolean broken = false;
}
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
執(zhí)行await時(shí)涝缝,count等于0時(shí),表示所有線程已到達(dá)trip譬重。喚醒所有await的線程拒逮,重置“柵欄”。
Semaphore
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
//java.util.concurrent.Semaphore.Sync
Sync(int permits) {
setState(permits);
}
初始化Semaphore臀规,設(shè)置state值滩援。默認(rèn)使用非公平模式。
- 獲取資源 acquire
//java.util.concurrent.Semaphore
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
//支持中斷的塔嬉,共享獲取模式
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//當(dāng)tryAcquireShared返回負(fù)值玩徊,那么doAcquireSharedInterruptibly可能會(huì)Park當(dāng)前線程
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//java.util.concurrent.Semaphore.FairSync
protected int tryAcquireShared(int acquires) {
//當(dāng)remain不小于0時(shí)租悄,需要自旋重試
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
//可用資源available 為0,remaining<0恩袱;可用資源available>=請(qǐng)求的資源acquires泣棋,則remaining >=0;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
- 釋放資源 release
//java.util.concurrent.Semaphore
public void release() {
sync.releaseShared(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
//這里會(huì)unpark“阻塞隊(duì)列”中的排頭的Node(非Head)
doReleaseShared();
return true;
}
return false;
}
//java.util.concurrent.Semaphore.Sync
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
ReentrantReadWriteLock
//java.util.concurrent.locks.ReentrantReadWriteLock
//默認(rèn)初始化一個(gè)非公平的Sync
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
ReadLock、WriteLock是ReentrantReadWriteLock的 靜態(tài)內(nèi)部類
畔塔,可以發(fā)現(xiàn)它們用的 sync
是同一個(gè)潭辈。
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
//ReadLock使用state的高16位
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
//WriteLock使用state的低16位
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
ReadLock、WriteLock共用state俩檬。ReadLock使用state的高16位萎胰,WriteLock使用state的低16位。(這種技巧在ExecutorThreadPool也用到了)
- 讀鎖的加鎖
//java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock
public void lock() {
sync.acquireShared(1);
}
//
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
可以明顯的看到棚辽,ReadLock使用的是共享式的加鎖方式技竟。
//java.util.concurrent.locks.ReentrantReadWriteLock.Sync
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
//exclusiveCount(c)取state的低16位,不為0屈藐,則說(shuō)明存在寫(xiě)鎖榔组。這時(shí)若當(dāng)前線程不是持有寫(xiě)鎖的線程,則獲取鎖失敗
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//獲取state的高16位(這是留給讀鎖用的)
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
//嘗試CAS修改state(這里沒(méi)有區(qū)分當(dāng)前讀鎖是否是重入)
compareAndSetState(c, c + SHARED_UNIT)) {
//下面都是一些統(tǒng)計(jì)信息联逻,暫時(shí)忽略
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//Full version of acquire for reads, that handles CAS misses and reentrant reads not dealt with in tryAcquireShared.
return fullTryAcquireShared(current);
}
- 寫(xiě)鎖的加鎖
//java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock
public void lock() {
sync.acquire(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
可以明顯的看到搓扯,WriteLock使用的是獨(dú)占式的加鎖方式。
//java.util.concurrent.locks.ReentrantReadWriteLock.Sync
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
//c !=0 說(shuō)明此時(shí)有讀鎖或?qū)戞i存在
if (c != 0) {
//c !=0,w==0 代表此時(shí)有讀鎖
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
//此時(shí)有寫(xiě)鎖且為當(dāng)前線程持有
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
ReentrantReadWriteLock的核心規(guī)律就是包归,寫(xiě)鎖和寫(xiě)鎖互斥锨推,寫(xiě)鎖和讀鎖互斥,讀鎖和讀鎖不互斥公壤。其目的是在讀并發(fā)比較高的情況下换可,會(huì)有更好的效率。
三厦幅、總結(jié)
常用工具類 | |
---|---|
CyclicBarrier | 基于ReentrantLock實(shí)現(xiàn)沾鳄,主要應(yīng)用了Condition |
ReentrantLock | 基于AQS獨(dú)占模式實(shí)現(xiàn) |
CountDownLatch | 基于AQS共享模式實(shí)現(xiàn) |
Semaphore | 基于AQS共享模式實(shí)現(xiàn) |
ReentrantReadWriteLock | 基于AQS獨(dú)占模式(寫(xiě)鎖)和共享模式(讀鎖)的混合型 |