概念
ReentrandLock是基于AbstractQueuedSynchronizer實(shí)現(xiàn)的恰矩,所以先來(lái)簡(jiǎn)單介紹下AQS。
我們最熟悉的同步鎖應(yīng)該是synchronized谦去,熟悉java的應(yīng)該都用過(guò)這個(gè)慷丽,它是通過(guò)底層的 monitorenter 和 monitorexit 這兩個(gè)字節(jié)碼指令來(lái)實(shí)現(xiàn)鎖的獲取和釋放。
這里介紹的AbstractQueuedSynchronizer同步器(以下簡(jiǎn)稱AQS)鳄哭,是基于FIFO隊(duì)列來(lái)實(shí)現(xiàn)的要糊,通過(guò)state的狀態(tài), 來(lái)實(shí)現(xiàn)acquire和release。
state為0妆丘,表示當(dāng)前沒(méi)有線程執(zhí)行锄俄,可以獲取鎖局劲,state為1表示已有線程正在實(shí)行
代碼分析
AQS是基于FIFO隊(duì)列來(lái)實(shí)現(xiàn)的,所以隊(duì)列必然由一個(gè)個(gè)節(jié)點(diǎn)組成奶赠,我們先從節(jié)點(diǎn)入手鱼填。
Node是AQS中的一個(gè)內(nèi)部靜態(tài)類,主要包含以下幾個(gè)成員變量:
//waitStatus有以下幾個(gè)狀態(tài)
/**
** CANCELLED,值為1毅戈,表示當(dāng)前線程被取消苹丸,被終端的Node不回去競(jìng)爭(zhēng)鎖,最終會(huì)被踢出隊(duì)列
** SIGNAL竹祷,值為-1谈跛,表示當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)被阻塞了,需要被喚醒
** CONDITION塑陵,值為-2感憾,表示當(dāng)前節(jié)點(diǎn)在等待condition,因?yàn)槟硞€(gè)條件被阻塞
** PROPAGATE令花,值為-3阻桅,表示鎖的下一次獲取可以無(wú)條件傳播
**/
volatile int waitStatus;
// 前驅(qū)節(jié)點(diǎn)
volatile Node prev;
// 后繼節(jié)點(diǎn)
volatile Node next;
// 入隊(duì)列時(shí)的當(dāng)前線程
volatile Thread thread;
// 表示下一個(gè)等待condition的Node
Node nextWaiter;
介紹完Node,再來(lái)介紹下幾個(gè)AQS比較重要的成員變量:
// 頭節(jié)點(diǎn)
private transient volatile Node head;
// 尾節(jié)點(diǎn)
private transient volatile Node tail;
// 同步狀態(tài)
private volatile int state;
介紹完AQS幾個(gè)關(guān)鍵的成員變量兼都,就可以來(lái)開干源代碼了嫂沉,下面通過(guò)一個(gè)簡(jiǎn)單demo例子,來(lái)逐步分析ReentrantLock的實(shí)現(xiàn)原理
demo代碼如下:
public static void main(String[] args) {
final ExecutorService exec = Executors.newFixedThreadPool(2);
final ReentrantLock lock = new ReentrantLock();
for(int i =0; i< 2; i++){
Runnable r = new Runnable() {
@Override
public void run() {
lock.lock();
try {
// do something start
TimeUnit.SECONDS.sleep(5);
// do something end
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
};
exec.submit(r);
}
exec.shutdown();
}
先來(lái)看下lock方法:
public void lock() {
sync.lock();
}
ReentrantLock中有一個(gè)抽象類Sync扮碧,它繼承了AQS趟章,所以ReentrantLock的實(shí)現(xiàn)大多就是基于Sync來(lái)完成實(shí)現(xiàn),ReentrantLock又分為公平鎖和非公平鎖之分慎王,所以蚓土,ReentrantLock內(nèi)部有兩個(gè)sync的實(shí)現(xiàn):
static final class NonfairSync extends Sync{...}
static final class FairSync extends Sync{...}
公平鎖:獲取鎖的順序?yàn)檎{(diào)用lock的順序
非公平鎖:每個(gè)線程搶占鎖的順序是不定的
由于ReentrantLock我們用的比較多的是非公平鎖,所以先來(lái)看下非公平鎖lock的實(shí)現(xiàn)赖淤,代碼如下:
final void lock() {
if (compareAndSetState(0, 1))
// 成功后將當(dāng)前線程設(shè)置到AQS的一個(gè)變量中蜀漆,說(shuō)明這個(gè)線程拿走了鎖。
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
demo中我啟動(dòng)了兩個(gè)線程咱旱,假設(shè)線程1進(jìn)來(lái)确丢,調(diào)用compareAndSetState方法,此時(shí)沒(méi)有任何線程占有鎖吐限,所以通過(guò)原子操作CAS(如果對(duì)cas不了解的可以百度下鲜侥,百度一堆)將state的狀態(tài)改為1,這個(gè)是肯定能成功的毯盈,之后將線程1設(shè)置到AQS的exclusiveOwnerThread個(gè)變量中剃毒,表示線程1拿走了這個(gè)鎖。這個(gè)時(shí)候線程2進(jìn)入了lock方法搂赋,此時(shí)通過(guò)CAS操作赘阀,發(fā)現(xiàn)預(yù)計(jì)值0此時(shí)已經(jīng)變成了1,所以設(shè)置失敗脑奠,進(jìn)入else分支基公,調(diào)用acquire方法。
也就是說(shuō)非公平鎖在線程第一次競(jìng)爭(zhēng)失敗后宋欺,還是會(huì)調(diào)用acquire方法轰豆,可能進(jìn)入隊(duì)列中,而公平鎖是直接調(diào)用acquire方法
acquire方法是調(diào)用父類AQS的acquire方法齿诞,代碼如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
以上代碼可以看出大概的邏輯酸休,線程1嘗試去獲取鎖,如果失敗祷杈,則加入到隊(duì)列斑司,掛起
我們來(lái)看下tryAcquire方法,非公平鎖最終調(diào)用的是nonfairTryAcquire方法但汞,代碼如下:
final boolean nonfairTryAcquire(int acquires) {
// 獲取當(dāng)前線程
final Thread current = Thread.currentThread();
// 獲取AQS中鎖的標(biāo)志位state 0 表示鎖沒(méi)有被占有 1 表示鎖已經(jīng)被拿走了
int c = getState();
if (c == 0) {
// cas修改state狀態(tài)
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
假設(shè)線程1進(jìn)來(lái)后宿刮,state=0,表示此時(shí)沒(méi)有線程占用鎖私蕾,修改state狀態(tài)僵缺,成功獲取鎖,返回成功踩叭,而此時(shí)線程2也去嘗試獲取鎖磕潮,由于state是volatile修飾的,所以線程2獲取到state此時(shí)為1容贝,進(jìn)入else if的邏輯自脯,這里要注意下,不是說(shuō)沒(méi)有獲取鎖就直接失敗嗤疯,我們知道ReentrandLock是一個(gè)重入鎖冤今,就是說(shuō)一個(gè)線程可以多次調(diào)用ReentrandLock,每次調(diào)用state+1茂缚,因?yàn)槭峭粋€(gè)線程(已經(jīng)拿到了鎖)戏罢,所以這里不存在競(jìng)爭(zhēng),直接返回成功脚囊,如果條件都沒(méi)滿足則返回false龟糕。
當(dāng)tryAcquire獲取鎖失敗后,調(diào)用acquireQueued方法悔耘,調(diào)用acquireQueued我們先來(lái)看addWaiter方法:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
mode這里表示Node的類型讲岁,在這里表示獨(dú)占鎖(AQS可以實(shí)現(xiàn)獨(dú)占鎖和共享鎖),用當(dāng)前線程構(gòu)造一個(gè)Node對(duì)象,線程1進(jìn)來(lái)后缓艳,此時(shí)隊(duì)列里面是空的校摩,所以尾節(jié)點(diǎn)tail為空,走enq方法(等會(huì)看這個(gè)方法)阶淘,假設(shè)此時(shí)enq方法已經(jīng)執(zhí)行結(jié)束衙吩,線程2進(jìn)入addWaiter方法,此時(shí)tail不為空溪窒,將線程2的Node的前驅(qū)節(jié)點(diǎn)指向pred坤塞,之后將線程2的node設(shè)置為tail,最終的目的就是將新來(lái)的線程澈蚌,放到隊(duì)列最后摹芙。等待喚醒。下面來(lái)看enq方法:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq方法的目的是讓所有的線程N(yùn)ode最終都加到隊(duì)列里面宛瞄,這里代碼寫的非常的6浮禾,先一步一步的分析,假設(shè)坛悉,第一個(gè)線程進(jìn)來(lái)伐厌,tail為空,這里的判斷和之前一樣裸影,因?yàn)槭嵌嗑€程挣轨,所以需要二次判斷,為空時(shí)轩猩,先用cas操作構(gòu)造一個(gè)頭結(jié)點(diǎn)卷扮,這里會(huì)出現(xiàn)競(jìng)爭(zhēng)的情況,因此enq方法使用while(true)的方式均践,做了個(gè)循環(huán)晤锹,如果設(shè)置成功,則將head指向tail彤委,如果失敗鞭铆,則重復(fù)嘗試,直到成功為止焦影。
當(dāng)head設(shè)置成功后车遂,之后的線程進(jìn)入else邏輯,將后續(xù)進(jìn)來(lái)的Node加入到隊(duì)尾斯辰,compareAndSetTail失敗話舶担,這里沒(méi)有直接返回,而是通過(guò)for循環(huán)繼續(xù)插入到隊(duì)尾彬呻,最終所有沒(méi)有成功獲取到鎖的線程衣陶,全部加入到隊(duì)列中柄瑰。
接著看AQS的acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
首先判斷node的前驅(qū)節(jié)點(diǎn)是否是head,如果是剪况,證明當(dāng)前線程就是下一個(gè)即將獲取到鎖的線程教沾,所以此時(shí)先嘗試獲取鎖一次這里調(diào)用了tryAcquire方法,如果獲取到鎖拯欧,則將當(dāng)前head設(shè)置成當(dāng)前node详囤,并將之前的node(之前為head的node)設(shè)置為空财骨,gc回收镐作。
如果不是head或者獲取鎖失敗,則調(diào)用shouldParkAfterFailedAcquire方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
該方法是判斷這個(gè)節(jié)點(diǎn)是否需要被掛起隆箩,介紹這個(gè)方法回顧下之前的该贾,我們知道Node節(jié)點(diǎn)中,出了當(dāng)前線程捌臊,前驅(qū)節(jié)點(diǎn)杨蛋,后驅(qū)節(jié)點(diǎn),還有個(gè)重要的變量:waitStatus理澎, 這個(gè)變量是來(lái)表示當(dāng)前node是否還需要競(jìng)爭(zhēng)鎖逞力,某些場(chǎng)景下,有些線程是會(huì)放棄鎖的競(jìng)爭(zhēng)的糠爬,如Condition類的操作寇荧。
只有當(dāng)node的狀態(tài)為SIGNAL狀態(tài),當(dāng)前節(jié)點(diǎn)才能被掛起执隧。假設(shè)當(dāng)前節(jié)點(diǎn)的waitStatus為SIGNAL揩抡,則調(diào)用parkAndCheckInterrupt方法,此時(shí)線程被真正的掛起镀琉。
至此峦嗤,線程2進(jìn)入FOFI隊(duì)列,成功被掛起的過(guò)程就完成了屋摔。
這里有個(gè)很巧妙的設(shè)計(jì)烁设,acquireQueued方法,在線程被掛起后钓试,還是處在for循環(huán)中装黑,所以當(dāng)線程被喚醒時(shí),會(huì)繼續(xù)執(zhí)行亚侠,此時(shí)執(zhí)行tryAcquire成功后曹体,獲取鎖,之后將節(jié)點(diǎn)刪除即p.next = null; 這樣獲取鎖的Node在隊(duì)列里就沒(méi)有了
接下來(lái)我們來(lái)看看unlock方法硝烂。
unlock方法其實(shí)最終調(diào)用的是AQS的release方法:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
調(diào)用內(nèi)部類Sync的tryRelease方法箕别,嘗試釋放鎖:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
releases=1,后驅(qū)state的值,減1串稀,只有當(dāng)c=0的時(shí)候才能釋放所有鎖除抛,之前提到過(guò),ReentrandLock是重入鎖母截,每次lock一下到忽,state都會(huì)加1,所以需要對(duì)應(yīng)數(shù)量的unlock才能完全釋放鎖清寇,釋放鎖后喘漏,將當(dāng)前持有鎖的變量設(shè)為null,即setExclusiveOwnerThread(null);
如果解鎖成功华烟,則繼續(xù)執(zhí)行unparkSuccessor方法翩迈,喚醒線程:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
這里要注意下,如果node的下個(gè)節(jié)點(diǎn)為空盔夜,或者waitStatus>0(沒(méi)有可喚醒的線程)负饲,則從隊(duì)為倒序查詢waitStatus<=0的節(jié)點(diǎn),最終執(zhí)行的是LockSupport.unpark(s.thread);
至此釋放鎖的過(guò)程也完成了喂链。