為什么要有讀寫鎖
ReentrantReadWriteLock 適用于讀多寫少的場景,標(biāo)識(shí)同一時(shí)間,可以有多個(gè)線程并發(fā)讀哼凯,但是不可以多個(gè)線程并發(fā)寫。由于互斥鎖楚里,如:ReentrantLock断部,上鎖后,無論你是讀操作還是寫操作班缎,它們之間都是互斥的來保證線程安全蝴光,但是這樣做的話,效率比較低吝梅。
使用示例
public class ReentrantReadWriteLockTest {
public static void main(String[] args) throws InterruptedException {
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
Thread r1 = new Thread(() -> {
// 上讀鎖
readLock.lock();
try {
System.out.println("r1 開始讀啦J琛!");
} finally {
// 釋放讀鎖
readLock.unlock();
}
});
Thread r2 = new Thread(() -> {
// 上讀鎖
readLock.lock();
try {
System.out.println("r2 開始讀啦K招!");
} finally {
readLock.unlock();
}
});
Thread w1 = new Thread(() -> {
// 上讀鎖
writeLock.lock();
try {
System.out.println("w1 開始寫啦6苑唷右冻!");
} finally {
// 釋放寫鎖
writeLock.unlock();
}
});
r1.start();
r2.start();
w1.start();
}
}
核心思想
- 基于AQS實(shí)現(xiàn),用state來標(biāo)識(shí)是否獲取到鎖著拭,獲取不到鎖纱扭,線程交由AQS同步隊(duì)列進(jìn)行管理
- 用32位的state來同時(shí)表示讀鎖和寫鎖,低16位表示寫鎖儡遮,高16位表示讀鎖
- 公平與非公平
- 寫寫互斥乳蛾,讀寫互斥,讀讀共享
- 鎖降級(jí)
源碼分析
構(gòu)造方法
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;
/**
* Creates a new {@code ReentrantReadWriteLock} with
* default (nonfair) ordering properties.
*/
public ReentrantReadWriteLock() {
this(false);
}
/**
* Creates a new {@code ReentrantReadWriteLock} with
* the given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
}
從構(gòu)造方法上看鄙币,默認(rèn)是使用非公平的方式獲取鎖
獲取寫鎖(互斥鎖)
AQS.aquire(int arg) 方法
/**
* 獲取互斥鎖的模版方法
* 流程:調(diào)用子類實(shí)現(xiàn)的tryAcquire方法嘗試獲取鎖肃叶,若獲取鎖成功,則終止十嘿,若獲取不成功因惭,則調(diào)用AQS的addWaiter方法,新增節(jié)點(diǎn)到AQS的雙端同步隊(duì)列里绩衷,
* 然后調(diào)用acquireQueued方法再次進(jìn)行判斷獲取鎖資源蹦魔,若還是獲取不到鎖,則將線程阻塞
* @param arg
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
ReentrantReadWriteLock. tryAcquire(int arg) 方法
/**
* 獲取寫鎖(互斥鎖)
* @param acquires
* @return true:獲取鎖成功咳燕;false:獲取鎖失敗
*/
protected final boolean tryAcquire(int acquires) {
// 獲取當(dāng)前線程對(duì)象
Thread current = Thread.currentThread();
// 獲取當(dāng)前鎖狀態(tài)(AQS 的 state)
int c = getState();
// 獲取寫鎖(互斥鎖)數(shù)量
int w = exclusiveCount(c);
if (c != 0) { // 當(dāng)前有鎖狀態(tài)(讀鎖或?qū)戞i勿决,或兩者都有)
// (Note: if c != 0 and w == 0 then shared count != 0)
/**
* 重要:
* w==0 : 意味著目前沒有線程持有寫鎖。則:(c != 0 && w == 0) 意味著當(dāng)前有線程持有讀鎖招盲,但是沒有任何線程持有寫鎖
* current != getExclusiveOwnerThread(): 意味著當(dāng)前線程沒有持有寫鎖低缩。
* 所以:return false 有兩種情況:1、當(dāng)前有線程持有讀鎖(理論:不能夠進(jìn)行鎖升級(jí)宪肖,讀寫互斥) 2表制、目前有其他線程持有寫鎖(理論:寫寫互斥)
* 這兩種情況健爬,滿足其一,都會(huì)返回false --> 當(dāng)前線程進(jìn)去AQS隊(duì)列進(jìn)行等待調(diào)度
*/
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 判斷持寫鎖數(shù)量是否超過最大值么介,超過則報(bào)錯(cuò)
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
/**
* 走到這里意味著:(w != 0 && current == getExclusiveOwnerThread() && (w + exclusiveCount(acquires) < MAX_COUNT))= true
* 即:當(dāng)前線程本身就持有了寫鎖娜遵,并且寫鎖數(shù)量小于最大值。這里相當(dāng)于寫鎖的重入(鎖重入)
*/
setState(c + acquires);
return true;
}
/**
* 走到這里意味著:c==0 即 當(dāng)前屬于無鎖狀態(tài)壤短。
* writerShouldBlock():判斷是否需要阻塞當(dāng)前線程设拟,分公平和非公平兩種情況。
* 公平:看在我前面是否有線程正在阻塞(AQS隊(duì)列中久脯,是否有節(jié)點(diǎn)正在等待喚醒)纳胧,有:則writerShouldBlock() == true。若無:則為 false帘撰,表示當(dāng)前線程不需要阻塞跑慕。
* 總結(jié):公平的獲取寫鎖的機(jī)制為:查看前面是否有線程持有鎖(無論是讀鎖還是寫鎖)或有線程正在等待鎖,若有摧找,直接進(jìn)入隊(duì)列等待
* 非公平:writerShouldBlock()永遠(yuǎn)返回的是false核行,即不需要看前面是否有節(jié)點(diǎn)正在阻塞等待喚醒,直接上來就cas搶鎖
* 總結(jié):非公平的獲取寫鎖的機(jī)制為:
*
*/
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires)) // CAS搶鎖
return false;
// 走到這里意味著:線程搶鎖(寫鎖)成功蹬耘,將exclusiveOwnerThread屬性設(shè)置為自己芝雪,標(biāo)識(shí)我已經(jīng)獲得了寫鎖,你們其他線程進(jìn)隊(duì)列等待去吧
setExclusiveOwnerThread(current);
return true;
}
ReentrantReadWriteLock. addWaiter() 方法
/**
* 向隊(duì)列添加新的節(jié)點(diǎn)综苔,從尾巴插入
* @param mode 新增的節(jié)點(diǎn)對(duì)象
* @return 返回新的尾節(jié)點(diǎn)
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* 向隊(duì)列添加新的節(jié)點(diǎn)惩系,從尾巴插入
* @param node 新增的節(jié)點(diǎn)對(duì)象
* @return 返回新的尾節(jié)點(diǎn)
*/
private Node enq(final Node node) {
for (;;) { // 自旋,向隊(duì)列尾部新增節(jié)點(diǎn)如筛,直到成功為止
Node t = tail;
if (t == null) { // Must initialize
/**
* 若當(dāng)前的尾節(jié)點(diǎn)為空堡牡,說明現(xiàn)在還沒有任何節(jié)點(diǎn)插入到隊(duì)列內(nèi),此時(shí)我們對(duì)頭節(jié)點(diǎn)和尾節(jié)點(diǎn)進(jìn)行初始化new Node()妙黍。
* 因此得出一個(gè)結(jié)論:頭節(jié)點(diǎn)在后續(xù)的邏輯中悴侵,永遠(yuǎn)不可能為空
*/
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { // 將當(dāng)前節(jié)點(diǎn)設(shè)置為新的尾節(jié)點(diǎn)
t.next = node;
return t;
}
}
}
}
我們可以看到,在調(diào)用enq()方法前拭嫁,程序會(huì)先進(jìn)行cas做節(jié)點(diǎn)插入邏輯可免,但是enq()方法也有同樣的插入邏輯,這樣做會(huì)不會(huì)感覺邏輯重復(fù)了做粤,是不是多余了浇借?
實(shí)際上這是一個(gè)優(yōu)化,大部分情況下是不用走到enq方法里去自旋插入節(jié)點(diǎn)的怕品,為了提高性能妇垢,避免進(jìn)入for循環(huán),這里將插入操作提前了。
AQS.acquireQueued(final Node node, int arg) 方法
/**
* 獲取鎖資源闯估,獲取不到則對(duì)線程進(jìn)行阻塞
* @param node 已經(jīng)在隊(duì)列里的節(jié)點(diǎn)
* @param arg
* @return
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 是否被中斷標(biāo)識(shí)
boolean interrupted = false;
for (;;) { // 自旋
// 獲取前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
/**
* 若前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)灼舍,那么當(dāng)前線程此時(shí)立刻嘗試獲取鎖
*
* 問:為什么要在這里獲取鎖?
* 答:因?yàn)榍膀?qū)節(jié)點(diǎn)如果是頭節(jié)點(diǎn)的話涨薪,會(huì)存在兩種情況:
* 1. 該頭結(jié)點(diǎn)是初始化的虛節(jié)點(diǎn)骑素,那么當(dāng)前線程對(duì)應(yīng)的node節(jié)點(diǎn)就應(yīng)該是隊(duì)列里的第一個(gè)"線程節(jié)點(diǎn)",那么此時(shí)應(yīng)該直接嘗試獲取鎖刚夺,并且將頭結(jié)點(diǎn)更新為當(dāng)前節(jié)點(diǎn)
* 2. 該頭節(jié)點(diǎn)不是虛節(jié)點(diǎn)献丑,是正常的"線程節(jié)點(diǎn)",那么此時(shí)很有可能頭結(jié)點(diǎn)釋放了鎖侠姑,那么此時(shí)去嘗試獲取鎖就有比較大的概率獲取到创橄,并且將頭結(jié)點(diǎn)更新為當(dāng)前節(jié)點(diǎn)
*/
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
// 搶鎖成功,return false
return interrupted;
}
// 判斷線程是否可以阻塞莽红,可以的話則進(jìn)行阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果上面的代碼妥畏,出現(xiàn)異常,那么這里進(jìn)行兜底處理船老,會(huì)判斷failed標(biāo)識(shí)符咖熟,若為true,則調(diào)用cancel方法將節(jié)點(diǎn)失效掉
if (failed)
cancelAcquire(node);
}
}
/**
* 判斷當(dāng)前線程是否可以"安心"的阻塞
* @param pred
* @param node
* @return
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取當(dāng)前線程的前驅(qū)節(jié)點(diǎn)的狀態(tài)
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// 若前驅(qū)節(jié)點(diǎn)的狀態(tài)為SIGNAL(-1),則直接返回true柳畔,表示可以安心的進(jìn)行阻塞了
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
/**
* 若前驅(qū)節(jié)點(diǎn)的狀態(tài)>0,即為CANCELLED(1)狀態(tài)郭赐,則遍歷往前尋找到一個(gè)CANCELLED狀態(tài)節(jié)點(diǎn)為止薪韩,并將尋找到的節(jié)點(diǎn)與本節(jié)點(diǎn)進(jìn)行關(guān)聯(lián)。
* 關(guān)聯(lián)完后捌锭,返回false俘陷,由上游程序自旋重新調(diào)用本方法進(jìn)行判斷
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
/**
* 若前驅(qū)節(jié)點(diǎn)的狀態(tài)不為SIGNAL(-1),CANCELLED(1)。狀態(tài)可能為:CONDITION(-2),PROPAGATE(-3)或 初始化(0)狀態(tài)观谦,
* 則將前驅(qū)節(jié)點(diǎn)的狀態(tài)變成SIGNAL后拉盾,由上游程序自選重新調(diào)用本方法進(jìn)行判斷
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
* 阻塞當(dāng)前線程
* @return 被中斷,則返回true豁状,否則 返回false
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
AQS.cancelAcquire方法
/**
* 取消節(jié)點(diǎn)捉偏。
* 流程:將當(dāng)前節(jié)點(diǎn)Thread置為空-->獲取到當(dāng)前節(jié)點(diǎn)往前的第一個(gè)有效節(jié)點(diǎn)(非CANCELLED狀態(tài))-->將當(dāng)前節(jié)點(diǎn)的狀態(tài)置為CANCELLED(失效)-->將當(dāng)前節(jié)點(diǎn)從隊(duì)列內(nèi)清除出去-->喚醒后繼節(jié)點(diǎn)
* 雖然在這個(gè)方法里,會(huì)將當(dāng)前節(jié)點(diǎn)從隊(duì)列內(nèi)清除出去泻红,但是在并發(fā)情況下夭禽,其他線程有可能獲取到當(dāng)前節(jié)點(diǎn)的狀態(tài)為CANCEELED
* @param node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
// 日常判空
if (node == null)
return;
node.thread = null;
// 獲取前驅(qū)節(jié)點(diǎn)
Node pred = node.prev;
// 從后往前遍歷獲取到不為CANCELLED狀態(tài)的前驅(qū)節(jié)點(diǎn)為止
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 獲取前驅(qū)節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)
Node predNext = pred.next;
// 將當(dāng)前線程節(jié)點(diǎn)的狀態(tài)變?yōu)镃ANCELLED狀態(tài)
node.waitStatus = Node.CANCELLED;
/**
* 若當(dāng)前線程節(jié)點(diǎn)為隊(duì)列內(nèi)的尾節(jié)點(diǎn),則將上面找到的有效的前驅(qū)節(jié)點(diǎn)作為新的尾節(jié)點(diǎn)(CAS操作谊路,將tail變量賦值為pred)讹躯,成功后tail==pred
*/
if (node == tail && compareAndSetTail(node, pred)) {
/**
* 重點(diǎn):
* 由于是雙向練表,所以這里需要將新的尾節(jié)點(diǎn)(pred)的next節(jié)點(diǎn)cas為null。此時(shí)的狀態(tài)為:pred->null pred<-node潮梯,當(dāng)前節(jié)點(diǎn)的前驅(qū)指針還是指向pred的骗灶。
* 由于此時(shí)next-null,所以整條鏈路的next是不完整的秉馏,所以后續(xù)遍歷查找節(jié)點(diǎn)應(yīng)該從后往前利用pred指針找
*/
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
/**
* 進(jìn)到這里意味著:當(dāng)前的節(jié)點(diǎn)不是尾節(jié)點(diǎn) 或 上面cas更新尾節(jié)點(diǎn)失敗(并發(fā)引起)
*/
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
// 將上面找到的前驅(qū)節(jié)點(diǎn)與當(dāng)前線程的后繼節(jié)點(diǎn)相連(CAS next指針)
compareAndSetNext(pred, predNext, next);
} else {
// 喚醒后繼節(jié)點(diǎn)
unparkSuccessor(node);
}
// 當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)指向自己耙旦,方便GC
node.next = node; // help GC
}
}
釋放寫鎖(互斥鎖)
AQS.release(int arg) 和unparkSuccessor(Node node)方法
// 釋放互斥鎖
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 喚醒后繼節(jié)點(diǎn)
unparkSuccessor(h);
return true;
}
return false;
}
// 喚醒后繼節(jié)點(diǎn)
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 當(dāng)前釋放鎖的節(jié)點(diǎn)的狀態(tài)
int ws = node.waitStatus;
if (ws < 0)
// 若當(dāng)前節(jié)點(diǎn)狀態(tài)不是CANCELLED狀態(tài)(即:是有效節(jié)點(diǎn)),則直接cas沃饶,將當(dāng)前節(jié)點(diǎn)狀態(tài)修改為0(初始化狀態(tài))
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 獲取后繼節(jié)點(diǎn)
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
/**
* 若當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)為空 或者 后繼節(jié)點(diǎn)的狀態(tài)為CANCELLED狀態(tài)母廷,則從后往前找到第一個(gè)狀態(tài)不為CANCELLED的節(jié)點(diǎn)
* 問:為什么要從后往前找?
* 答:這是由于在取消節(jié)點(diǎn)的cancelAcquire()方法里糊肤,我們?nèi)∠?jié)點(diǎn)時(shí)琴昆,會(huì)有短暫的時(shí)刻導(dǎo)致next指針不完整,但是pre指針對(duì)整條鏈路來說是完整的馆揉,所以需要從后往前找业舍。詳細(xì)看cancelAcquire方法
*/
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 喚醒上面找到的當(dāng)前線程的后繼有效節(jié)點(diǎn)
LockSupport.unpark(s.thread);
}
獲取讀鎖(共享鎖)
AQS.acquireShared(int arg) 方法
/**
* 獲取共享鎖,若獲取不成功升酣,則將線程放到隊(duì)列里面去等待舷暮。
* 流程:調(diào)用子類的tryAcquireShared方法嘗試獲取鎖,若獲取鎖不成功噩茄,則調(diào)用doAcquireShared方法下面,入隊(duì)等待
* @param arg
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
ReentrantReadWriteLock.tryAcquireShared(int arg) 方法
/**
* 獲取讀鎖(共享鎖)
* @param unused
* @return
*/
protected final int tryAcquireShared(int unused) {
// 獲取當(dāng)前線程對(duì)象
Thread current = Thread.currentThread();
// 獲取當(dāng)前鎖狀態(tài)
int c = getState();
/**
* exclusiveCount(c) != 0 意味著 當(dāng)前有線程持有寫鎖(互斥鎖)
* getExclusiveOwnerThread() != current 意味著 當(dāng)前持有寫鎖(互斥鎖)的線程不是本線程
*/
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current) // 這個(gè)判斷意味著,同個(gè)線程绩聘,獲取完寫鎖后沥割,也可以直接獲取讀鎖,即同個(gè)線程同時(shí)可以持有寫鎖和讀鎖(支持鎖降級(jí))
// 走到這里表示:當(dāng)前有其他線程持有寫鎖凿菩,那么本線程獲取讀鎖失敗机杜,直接返回-1,交由AQS將本線程放入隊(duì)列內(nèi)等待喚醒調(diào)度
return -1;
// 獲取讀鎖數(shù)量
int r = sharedCount(c);
/**
* readerShouldBlock() 判斷是否需要阻塞當(dāng)前線程衅谷,分公平和非公平兩種情況椒拗。
* 公平: 看在我前面是否有線程正在阻塞(AQS隊(duì)列中,是否有節(jié)點(diǎn)正在等待喚醒)获黔,有:readerShouldBlock() == true蚀苛。若無:則為 false,表示當(dāng)前線程不需要阻塞肢执。
* 非公平:看我前面是否有線程正在等待著獲取寫鎖(即互斥節(jié)點(diǎn))枉阵,若有,則readerShouldBlock() == true预茄。若無:則為 false,表示不需要阻塞兴溜,可以直接進(jìn)行cas搶鎖
* 問:非公平方式獲取讀鎖侦厚,為什么需要判斷隊(duì)列里面是否有互斥節(jié)點(diǎn)(等待獲取寫鎖的線程)?
* 答:防止寫線程饑餓拙徽。如果獲取讀鎖不需要判斷隊(duì)列內(nèi)是否有線程在等待獲取寫鎖的話刨沦,那么如果有大量的線程在爭搶讀鎖的情況的話,
* 寫線程將會(huì)很久拿不到寫鎖膘怕,將會(huì)長久阻塞想诅,不利于寫線程的執(zhí)行。這就是線程饑餓岛心,所以才需要在獲取讀線程先判斷是否有線程正在等待獲取寫線程来破。
* 若有,則讀線程也需要進(jìn)入隊(duì)列排隊(duì)等待忘古,若沒有徘禁,則讀線程直接上去CAS搶鎖
*/
if (!readerShouldBlock() &&
r < MAX_COUNT && // 判斷讀鎖數(shù)量是否小于最大值
compareAndSetState(c, c + SHARED_UNIT)) {
// 走到這里意味著:目前隊(duì)列中沒有等待的線程(公平) 或 沒有等待獲取寫鎖的線程(非公平),并且讀鎖數(shù)量在最大值范圍內(nèi)髓堪,且cas獲取讀鎖成功
if (r == 0) { // 讀鎖數(shù)量為0送朱,即:我是第一個(gè)獲取到讀鎖的線程
/**
* 將當(dāng)前線程標(biāo)識(shí)為第一個(gè)獲取讀鎖的線程。思考:為什么要這樣干旁?有什么好處驶沼?
* 答:提升性能,因?yàn)榫S護(hù)ThreadLocal成本比較高
*/
firstReader = current;
// 第一個(gè)獲取讀鎖的線程對(duì)應(yīng)的鎖重入次數(shù)
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 進(jìn)到這里來意味著:程序允許到現(xiàn)在争群,只有一個(gè)線程在一直持有鎖回怜,就是當(dāng)前線程,所以將讀鎖重入次數(shù)++
firstReaderHoldCount++;
} else {
/**
* 進(jìn)到這里意味著:已經(jīng)有其他線程持有讀鎖了换薄,那么我需要在ThreadLocal里維護(hù)自己的讀鎖重入次數(shù)
* cachedHoldCounter:為HoldCounter類型對(duì)象,始終保存最近(最后)獲取鎖的線程及其鎖重入次數(shù)
*/
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
/**
* 進(jìn)到這里意味著:
* 情況1:cachedHoldCounter == null 鹉戚,那么此時(shí)需要進(jìn)行更新,更新最后獲取鎖的線程引用
* 情況2:當(dāng)前標(biāo)識(shí)的最后獲取鎖的線程對(duì)象不是自己本身专控,那么也需要對(duì)其進(jìn)行更新
*/
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
/**
* 進(jìn)到這里意味著:當(dāng)前標(biāo)識(shí)的最后獲取鎖的線程是自己本身,并且鎖數(shù)量被釋放完了(rh.count == 0)
* 問:為什么需要readHolds.set(rh)遏餐?
* 答:怎么樣才能讓程序進(jìn)入到這里伦腐,就是最后一個(gè)線程在獲取鎖后,又將鎖給釋放掉失都,然后又重新獲取鎖柏蘑。因?yàn)獒尫沛i的同時(shí),
* 會(huì)將count--,當(dāng)減到0時(shí)粹庞,會(huì)將當(dāng)前線程在ThreadLocal里的引用給remove掉咳焚,所以才需要這里的set操作。
* new Thread(()->{
* readLock.lock();
* readLock.unlock();
* readLock.lock();
* }).start();
*/
readHolds.set(rh);
rh.count++;
}
// 返回?fù)屾i成功
return 1;
}
/**
* 走到這里意味著:
* 1庞溜、readerShouldBlock() == true 即:判斷到當(dāng)前線程需要進(jìn)行阻塞革半。
* 公平:表明前面有線程等待獲取鎖(無論是讀鎖還是寫鎖)碑定。
* 非公平:表明前面有寫線程正在等待獲取寫鎖(隊(duì)列內(nèi)有互斥節(jié)點(diǎn))
* 2、r < MAX_COUNT 讀鎖數(shù)量超過最大限制
* 3又官、CAS搶鎖失敗
*/
return fullTryAcquireShared(current);
}
/**
* 一個(gè)完整的獲取讀鎖(共享鎖)的方法延刘。當(dāng)調(diào)用tryAcquireShared()獲取鎖沒成功,就會(huì)調(diào)用此方法來獲取讀鎖六敬。
* 這里解釋一下:tryAcquireShared()方法上面講了獲取鎖的流程碘赖,其實(shí)是作者為了性能做的一個(gè)優(yōu)化,一般情況下是不用走到fullTryAcquireShared()這個(gè)方法的外构,
* 除非在tryAcquireShared()方法中獲取鎖不成功就會(huì)走到這個(gè)方法來死循環(huán)重試拿鎖普泡。
*/
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
// 自旋搶鎖
for (;;) {
// 獲取當(dāng)前鎖狀態(tài)
int c = getState();
// 判斷當(dāng)前有沒有線程持有寫鎖(互斥鎖),0表示沒有审编,>0表示有
if (exclusiveCount(c) != 0) {
// 判斷當(dāng)前持有寫鎖的線程是不是當(dāng)前線程本身撼班,若不是,則return -1 割笙,交由AQS管理权烧,讓本線程進(jìn)入隊(duì)列等待
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) { // 判斷當(dāng)前讀線程是否需要被阻塞。
// Make sure we're not acquiring read lock reentrantly
/**
* 進(jìn)到這里意味著:
* 1伤溉、公平:判斷到隊(duì)列內(nèi)有線程正在等待獲取鎖
* 2般码、非公平:判斷到隊(duì)列內(nèi)有寫線程在等待獲取鎖
*/
if (firstReader == current) {
// 若當(dāng)前線程是已經(jīng)獲取過讀鎖的第一個(gè)線程,那么啥這里啥也不做乱顾,交由最下面的代碼進(jìn)行CAS操作
// assert firstReaderHoldCount > 0;
} else {
// 判斷rh==null板祝,為什么需要判斷是否為空,是因?yàn)橛锌赡芡粋€(gè)線程走净,在下面CAS搶鎖失敗后券时,重新循環(huán)搶鎖,這時(shí)伏伯,rh就可能不為空
if (rh == null) {
// cachedHoldCounter:為HoldCounter類型對(duì)象,始終保存最近(最后)獲取鎖的線程及其鎖重入次數(shù)橘洞。也是一種優(yōu)化手段,避免維護(hù)ThreadLocal说搅。
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
/**
* 走到這里意味著:線程在下面CAS失敗后炸枣,重新循環(huán)從頭開始走邏輯,但是此時(shí)弄唧,當(dāng)前線程的鎖已經(jīng)被釋放了适肠。
* 此時(shí)直接返回-1,讓當(dāng)前線程進(jìn)入隊(duì)列等待
*/
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
/**
* 走到這里意味著:
* 情況1:當(dāng)前持有互斥鎖的線程是當(dāng)前線程本身
* 情況2:當(dāng)前線程判斷到不需要阻塞(公平:前面沒有任何線程在等待獲取鎖候引,那么直接CAS獲取鎖侯养。非公平:前面沒有任何線程在等待獲取鎖 或 前面沒有寫線程在隊(duì)列里等待)
* 情況3:當(dāng)前線程是firstReader線程
*/
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
// 返回?fù)屾i成功
return 1;
}
}
}
AQS.doAcquireShared(int arg) 方法
/**
* 獲取共享鎖,不成功則將線程進(jìn)行阻塞
* @param arg
*/
private void doAcquireShared(int arg) {
// 向隊(duì)列新增一個(gè)共享節(jié)點(diǎn)
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {// 自旋
// 獲取前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
if (p == head) {
// 若前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)澄干,則嘗試獲取鎖
int r = tryAcquireShared(arg);
if (r >= 0) {
// 若獲取鎖成功逛揩,則設(shè)置新的頭節(jié)點(diǎn)柠傍,并結(jié)束程序
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 若獲取鎖失敗,或前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn)息尺,阻塞當(dāng)前線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 如果上面的代碼携兵,出現(xiàn)異常,那么這里進(jìn)行兜底處理搂誉,會(huì)判斷failed標(biāo)識(shí)符徐紧,若為true,則調(diào)用cancel方法將節(jié)點(diǎn)失效掉
cancelAcquire(node);
}
}
釋放共享鎖
AQS.releaseShared(int arg)
/**
* 流程:調(diào)用子類tryReleaseShared方法嘗試釋放鎖炭懊,釋放鎖成功后并级,則調(diào)用doReleaseShared方法,喚醒下一個(gè)節(jié)點(diǎn)進(jìn)行搶鎖
* @param arg
* @return
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
ReentrantReadWriteLock.tryReleaseShared(int unused)
/**
* 釋放共享鎖(讀鎖)
* @param unused
* @return
*/
protected final boolean tryReleaseShared(int unused) {
// 獲取當(dāng)前線程對(duì)象
Thread current = Thread.currentThread();
// 判斷當(dāng)前線程是否是firstReader線程
if (firstReader == current) {
if (firstReaderHoldCount == 1)
// 若本線程的讀鎖重入次數(shù)此時(shí)判斷已經(jīng)是為1了侮腹,那么本次釋放就會(huì)將這個(gè)線程的讀鎖全部釋放完嘲碧,那么也需要釋放本線程,所以將firstReader引用指向空
firstReader = null;
else
// 讀鎖重入次數(shù)--
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
/**
* 進(jìn)到這里意味著:
* 情況1:cachedHoldCounter == null 父阻,注意愈涩,這里跟獲取共享鎖的邏輯不同,這里是釋放鎖加矛,所以不用更新cachedHoldCounter引用
* 情況2:當(dāng)前線程不是cachedHoldCounter線程(不是最后獲取鎖的線程)履婉,那么當(dāng)前線程的持有鎖的重入次數(shù)情況需從readHolds(ThreadLocal)里面獲取
*/
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
// 鎖重入次數(shù)<=1,那么本次釋放后,需要將本個(gè)線程也釋放掉斟览,那么readHolds里面也不需要再維護(hù)本線程的持鎖重入情況了毁腿,所以需要remove掉
readHolds.remove();
if (count <= 0) // 釋放鎖太多次,導(dǎo)致益處苛茂,則拋異常
throw unmatchedUnlockException();
}
--rh.count;
}
// 自旋 CAS釋放鎖已烤,即CAS減少state值,直到CAS成功為止
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
// 釋放鎖成功妓羊,判斷到鎖重入次數(shù)為
return nextc == 0;
}
}
AQS.doReleaseShared()方法
/**
* 釋放共享鎖胯究,喚醒后續(xù)等待的線程節(jié)點(diǎn)
*/
private void doReleaseShared() {
for (;;) {
// 獲取當(dāng)前的頭節(jié)點(diǎn)
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 若節(jié)點(diǎn)狀態(tài)為SIGNAL(-1),則cas將狀態(tài)置為初始化狀態(tài)(0)
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 上面cas成功躁绸,則喚醒下一個(gè)節(jié)點(diǎn)
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
鎖降級(jí)
鎖降級(jí):表示同一個(gè)線程唐片,獲取了寫鎖后,在釋放寫鎖前涨颜,又獲取了讀鎖,我們稱這個(gè)過程叫做鎖降級(jí)茧球。通過對(duì)tryAcquireShared源碼的分析庭瑰,ReentrantReadWriteLock是允許鎖降級(jí)的。
小試牛刀
- 問題1:ReentrantReadWriteLock是如何解決線程饑餓的抢埋?
- 問題2:firstReader和firstReaderHoldCount的作用是什么弹灭?
- 問題3:所有讀線程維護(hù)自身ThreadLocalHoldCounter督暂,將所有的ThreadLocalHoldCounter里的count相加,是否與state的高16位的值一致穷吮?
- 問題4:什么是鎖降級(jí)逻翁?
- 問題5:AQS的同步隊(duì)列為什么是雙端隊(duì)列?
這里只對(duì)問題5進(jìn)行一個(gè)回答捡鱼,其他的問題通過上面的源碼分析八回,都可以找到答案。
AQS的同步隊(duì)列為什么是雙端隊(duì)列驾诈?
我們想象一下如果是單向隊(duì)列的話缠诅,刪除一個(gè)節(jié)點(diǎn)是怎么樣的流程?
1乍迄、單向鏈表刪除其中一個(gè)節(jié)點(diǎn)管引,需要從頭遍歷到當(dāng)前節(jié)點(diǎn),并記錄我的上一個(gè)節(jié)點(diǎn)引用闯两,再將上一個(gè)節(jié)點(diǎn)的next指向我的下一個(gè)節(jié)點(diǎn)
2褥伴、通過源碼我們知道,當(dāng)線程進(jìn)入隊(duì)列時(shí)漾狼,要判斷上一個(gè)節(jié)點(diǎn)的狀態(tài)是否為signal重慢,若為signal,則當(dāng)前線程可以安心阻塞
3邦投、雙向鏈表伤锚,方便喚醒下一個(gè)節(jié)點(diǎn)。這里說一下:原本AQS只是單向鏈表志衣,并且只是維護(hù)了一個(gè)prev前指針屯援,那么此時(shí)你要喚醒你的下一個(gè)節(jié)點(diǎn)就不好喚醒了,問題就跟刪除一個(gè)節(jié)點(diǎn)一樣了念脯。
結(jié)論:AQS的同步隊(duì)列為什么是雙端隊(duì)列狞洋?AQS程序是Doug Lea設(shè)計(jì),用雙向隊(duì)列只不過是更加符合他的設(shè)計(jì)绿店,并且更加容易實(shí)現(xiàn)罷了吉懊。那么方便啥呢?方便你去獲取前一個(gè)節(jié)點(diǎn)的狀態(tài)假勿,也方便你去喚醒下一個(gè)節(jié)點(diǎn)借嗽。