接下來探討ReadWriteLock的公平鎖實現(xiàn), 也是分如下場景分析
情景1 三個線程都是讀
public static void main(String[] args){
final Printer printer = new Printer();
Thread thread1 = new Thread(){
@Override
public void run() {
try {
printer.read("test1");
} catch (Exception e) {
e.printStackTrace();
}
}
};
Thread thread2 = new Thread(){
@Override
public void run() {
try {
printer.read("test2");
} catch (Exception e) {
e.printStackTrace();
}
}
};
Thread thread3 = new Thread(){
@Override
public void run() {
try {
printer.read("test3");
} catch (Exception e) {
e.printStackTrace();
}
}
};
thread1.start();
thread2.start();
thread3.start();
}
ReadWriteLock lock = new ReentrantReadWriteLock(true);
public void read(String str) throws Exception{
lock.readLock().lock();
try {
System.out.println(str);
Thread.sleep(200);
}finally {
lock.readLock().unlock();
}
}
public void lock() {
sync.acquireShared(1);
}
// AQS
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// sync
protected final int tryAcquireShared(int unused) {
// Thread-0
Thread current = Thread.currentThread();
// c = 0
int c = getState();
// 不走此分支
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// r = 0
int r = sharedCount(c);
//
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
......
}
......
}
static final class FairSync extends Sync {
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
// 這段代碼目的就是看queue里是否還存在想要獲取鎖的線程
// 此段代碼解析在《ReentrantLock之公平鎖unlock》這片文章中可看到
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// 此時h和t都是null, h == t, 所以返回false
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
接下來回到tryAcquireShared方法中
protected final int tryAcquireShared(int unused) {
......
// readerShouldBlock方法返回false,
if (!readerShouldBlock() &&
// r = 0, 確實比MAX_COUNT小
r < MAX_COUNT &&
// 成功設(shè)置state為65536
compareAndSetState(c, c + SHARED_UNIT)) {
// 因為r=0, 進入此分支
if (r == 0) {
// firstReader為thread-0
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
......
} else {
......
}
// 從此返回
return 1;
}
......
}
此時線程1已經(jīng)獲取讀鎖, 接下來線程2開始執(zhí)行
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
// thread-1
Thread current = Thread.currentThread();
// state = 65536
int c = getState();
// 沒加寫鎖, 跳過此分支
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// r = 1, 表示此時已經(jīng)獲得一次讀鎖
int r = sharedCount(c);
// readerShouldBlock方法仍舊返回false
if (!readerShouldBlock() &&
// MAX_COUNT為65535, 也就是獲取讀鎖的最大次數(shù)就是65535
// 此時為1, 并沒有大于MAX_COUNT
r < MAX_COUNT &&
// 已經(jīng)設(shè)置state為65536*2
compareAndSetState(c, c + SHARED_UNIT)) {
// 不走此分支
if (r == 0) {
......
}
// 由于非重入鎖也不走此分支
else if (firstReader == current) {
firstReaderHoldCount++;
}
// 最終走此分支
else {
// rh為null
HoldCounter rh = cachedHoldCounter;
// 走此分支
if (rh == null || rh.tid != getThreadId(current))
// 新建HoldCounter對象
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
......
// 并將HoldCounter對象count(對應(yīng)重入次數(shù))+1
rh.count++;
}
return 1;
}
......
}
到此第二個線程也獲得讀鎖成功
接下來第三個線程開始執(zhí)行
// ReadLock
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
// thread-2
Thread current = Thread.currentThread();
// 131072 ==> 65536 * 2
int c = getState();
// 沒有讀鎖, 此分支不走
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// r = 2
int r = sharedCount(c);
// 都是讀操作, 沒有阻塞
if (!readerShouldBlock() &&
// 讀次數(shù)小于65535
r < MAX_COUNT &&
// state = 65536 * 3 ==> 196608
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
......
} else if (firstReader == current) {
......
}
// 走此分支
else {
// cachedHoldCounter表示上一個成功獲取讀鎖的線程
// 在此就是線程thread-1
HoldCounter rh = cachedHoldCounter;
// 由于rh.tid保存的是thread-1的線程id
// 與當前線程保存的線程id不相等
// 所以最終還是走此分支
if (rh == null || rh.tid != getThreadId(current))
// 創(chuàng)建一個HoldCounter對象
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
// 將其獲得鎖次數(shù)加一
rh.count++;
}
// 返回
return 1;
}
...
}
這時線程3也獲取到讀鎖
情景2 前兩個線程是讀, 最后一個線程是寫
由于線程1和線程2的執(zhí)行流程與情景1相同, 所以不再做分析, 接下來就做線程3的分析
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
// thread-2
Thread current = Thread.currentThread();
// c = 65536 * 2
int c = getState();
// w = 0
int w = exclusiveCount(c);
// 進入此分支
if (c != 0) {
// 由于w為0, 進入此分支
if (w == 0 || current != getExclusiveOwnerThread())
return false;
......
}
......
}
由于之前已經(jīng)有讀操作獲取鎖, 所以寫操作會被阻塞住, 接下來是執(zhí)行寫操作的線程如何被阻塞
回到acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// 這塊代碼就沒什么特別的
// ReentrantLock阻塞線程也用的這個
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}