ReentrantLock 互斥鎖(獨(dú)占鎖)
鎖
我們知道鎖的基本原理是,基于將多線程并行任務(wù)通過某一種機(jī)制實(shí)現(xiàn)線程的串 行執(zhí)行锰茉,從而達(dá)到線程安全性的目的呢蔫。而Lock是juc中實(shí)現(xiàn)的鎖接口,他定義了鎖的一些行為規(guī)范飒筑,他的設(shè)計(jì)目的是為了解決synchronized關(guān)鍵字在一些并發(fā)場景下不適用的問題片吊。
Lock
juc 包下的接口,定義了鎖的規(guī)范协屡。有多種實(shí)現(xiàn)類俏脊。
ReentrantLock
ReentrantLock 重入鎖 一個(gè)持有鎖的線程,在釋放鎖之前肤晓。此線程如果再次訪問了該同步鎖的其他的方法联予,這個(gè)線程不需要再次競爭鎖,只需要記錄重入次數(shù)材原。重入鎖的設(shè)計(jì)目的是為了解決死鎖的問題
public class ReentrantLockDemo {
private static Lock lock = new ReentrantLock();
private static int count = 0;
private static void inrc() {
try {
//加鎖
lock.lock();
Thread.sleep(10);
count++;
//模擬重入鎖
dec();//2
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
private static void dec(){
lock.lock();
count--;
lock.unlock();
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 100; i++) {
new Thread(() -> {
inrc();//1
}).start();
}
TimeUnit.SECONDS.sleep(3);
System.out.println(count);
}
}
inr() 方法獲取鎖成功并沒有釋放鎖的情況下調(diào)用dec()再次獲取鎖,假如沒有重入鎖的話這里會(huì)導(dǎo)致死鎖季眷。此線程如果再次訪問了該同步鎖的其他的方法余蟹,這個(gè)線程不需要再次競爭鎖,只需要記錄重入次數(shù)子刮。
<span style='color:red'>內(nèi)部是如何實(shí)現(xiàn)的威酒?假如線程中斷鎖沒有及時(shí)釋放怎么辦呢</span>
架構(gòu)分析
- 在多線程環(huán)境下如何實(shí)互斥的邏輯? 多線程環(huán)境下通過更改邊界值state來實(shí)現(xiàn)獲取鎖與釋放鎖的邏輯挺峡。通過原子操作更改成員變量state葵孤,如果state=1則代表當(dāng)前線程獲取了鎖其他線程阻塞。
- 獲取鎖失敗的線程如何處理橱赠?獲取鎖失敗的線程進(jìn)行阻塞而不是循環(huán)尤仍。之所以阻塞是為了讓這些獲取鎖失敗的線程讓出cpu資源
- 如何存儲(chǔ)阻塞的線程呢?用一個(gè)雙端列隊(duì)queue存儲(chǔ)線程
- 當(dāng)前線程釋放鎖之后狭姨,如何喚醒阻塞的宰啦?釋放鎖的時(shí)候如果head節(jié)點(diǎn)的waitState= -1 那么通過LockSupport.unpark(thread) 喚醒header的后續(xù)waitState<= 0的節(jié)點(diǎn)
- 公平鎖與公平鎖什么區(qū)別苏遥? 阻塞列隊(duì)中的阻塞線程按照先進(jìn)先出的順序被喚醒的,第一個(gè)進(jìn)來的第一個(gè)被喚醒去搶占鎖赡模,但是這個(gè)時(shí)候新的線程T2獲取鎖田炭,那么T2并不是先插入阻塞列隊(duì)中,而是先嘗試獲取一個(gè)鎖漓柑,那么這種行為對(duì)于那些先進(jìn)入阻塞列隊(duì)中排隊(duì)的線程來說是不公平的教硫,就好比說大家排隊(duì)買票一樣,一個(gè)新來的沒有排隊(duì)直接去窗口買票了辆布,所以叫做非公平鎖反之則是公平鎖.
使用
private final Sync sync; //內(nèi)部類 實(shí)現(xiàn)類AbstractQueuedSynchronized抽象類
//默認(rèn)實(shí)現(xiàn)是非公平鎖
public ReentrantLock() {
sync = new NonfairSync();
}
//true 公平鎖 false 非公平鎖
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
//獲取鎖
public void lock() {
sync.lock();
}
源碼分析
獲取鎖
NonfairSync 重入鎖的核心實(shí)現(xiàn)
//中間態(tài)變量瞬矩,通過更改state的值判斷是否占有鎖 state=0 代表無線程占用
//state >= 1 代表有鎖 state =1 代表有一個(gè)線程占用鎖 state=5假如是重入鎖那么代表這個(gè)線程獲取了5次鎖,相應(yīng)的需要釋放5次鎖使得state=0從而達(dá)到釋放的目的谚殊。
private volatile int state;
//當(dāng)前占有鎖的線程
private transient Thread exclusiveOwnerThread;
//嘗試獲取鎖
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
compareAndSetState 通過調(diào)用unsafe的提供的cas本地方法(調(diào)用c語言) 嘗試把state=0的值更改為1丧鸯,如果成功設(shè)置占有鎖的線程為當(dāng)前線程,如果獲取失敗則調(diào)用acquire嘗試獲取1次
acquire
public final void acquire(int arg) {
//嘗試獲取一個(gè)鎖嫩絮,如果失敗了添加線程到等待的雙端隊(duì)列中
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}
//嘗試獲取鎖
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//此處體現(xiàn)了nonfair非公平的意思
if (c == 0) {
//說明已經(jīng)有線程釋放了鎖丛肢,那么當(dāng)前線程嘗試獲取一次,如果獲取成功返回true
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//次數(shù)體現(xiàn)了重入鎖的邏輯
//如果獲取鎖的線程是當(dāng)前線程
else if (current == getExclusiveOwnerThread()) {
//獲取鎖的次數(shù)+acquires
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//這里無需cas操作剿干,因?yàn)楸厝灰呀?jīng)獲取鎖了
setState(nextc);
return true;
}
return false;
}
acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 調(diào)用tryAcquire嘗試獲取鎖失敗的時(shí)候才會(huì)執(zhí)行這個(gè)方法蜂怎,先執(zhí)行里面的類addWaiter(Node.EXCLUSIVE)
addWaiter
AQS中維護(hù)了一個(gè)存儲(chǔ)了等待線程的Node節(jié)點(diǎn)的雙端鏈表,有首節(jié)點(diǎn)head與尾節(jié)點(diǎn)tail置尔,創(chuàng)建一個(gè)Node節(jié)點(diǎn)里面存儲(chǔ)的是當(dāng)前線程杠步,如果已經(jīng)有了tail節(jié)點(diǎn)則嘗試cas操作添加當(dāng)前節(jié)點(diǎn)到鏈表的尾結(jié)點(diǎn),如果沒有則進(jìn)行初始化操作cas操作創(chuàng)建一個(gè)head節(jié)點(diǎn)并且自旋(沒有任何結(jié)束條件的循環(huán))cas操作添加尾結(jié)點(diǎn)到鏈表的尾部榜轿,最終返回新增的Node節(jié)點(diǎn)幽歼。
//線程被中斷 或者 等待超時(shí)
static final int CANCELLED = 1;
//線程之間通訊的標(biāo)識(shí),是否需要喚醒后續(xù)節(jié)點(diǎn) NodeA = -1 那么當(dāng)NodeA釋放鎖的時(shí)候就需要喚醒被阻塞的后續(xù)節(jié)點(diǎn)NodeB
static final int SIGNAL = -1;
//
static final int CONDITION = -2;
static final int PROPAGATE = -3;
//節(jié)點(diǎn)的等待狀態(tài)
volatile int waitStatus;
//添加當(dāng)前線程到等待列隊(duì)中
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
// cas設(shè)置尾部節(jié)點(diǎn)
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//把節(jié)點(diǎn)插入到列隊(duì)中
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
//初始化header節(jié)點(diǎn)
if (compareAndSetHead(new Node()))
tail = head;
} else {
//cas操作添加尾結(jié)點(diǎn)
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued
對(duì)于插入到等待隊(duì)列中的Node節(jié)點(diǎn)通過 addWaiter 方法把線程添加到鏈表后谬盐,會(huì)接著把 Node 作為參數(shù)傳遞給 acquireQueued 方法甸私,去再次競爭鎖
獲取當(dāng)前節(jié)點(diǎn)的 prev 節(jié)點(diǎn)
-
如果 prev 節(jié)點(diǎn)為 head 節(jié)點(diǎn),那么它就有資格去爭搶鎖飞傀,調(diào)用 tryAcquire 搶占
鎖
-
搶占鎖成功以后皇型,把獲得鎖的節(jié)點(diǎn)設(shè)置為 head,并且移除原來的初始化 head
節(jié)點(diǎn)
如果獲得鎖失敗砸烦,則根據(jù)shouldParkAfterFailedAcquire決定是否需要掛起線程()
最后弃鸦,通過 cancelAcquire 取消獲得鎖的操作
//等待隊(duì)列中的線程嘗試獲取一個(gè)鎖
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//獲取當(dāng)前節(jié)點(diǎn)的pre節(jié)點(diǎn)
final Node p = node.predecessor();
//如果當(dāng)前節(jié)點(diǎn)是等待隊(duì)列中的第2個(gè)節(jié)點(diǎn) 這里會(huì)再次嘗試獲取一次鎖
if (p == head && tryAcquire(arg)) {
//成功了 設(shè)置當(dāng)前節(jié)點(diǎn)為頭節(jié)點(diǎn) 也就是說首節(jié)點(diǎn)是獲取了鎖的節(jié)點(diǎn)
setHead(node);
//這里幫助不必要的Node被gc回收
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
//線程掛起并讓出cup,當(dāng)被喚醒的時(shí)候返回中斷狀態(tài)
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//取消鎖
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire
//是否應(yīng)該阻塞獲取鎖失敗的節(jié)點(diǎn) pred 前置節(jié)點(diǎn) node當(dāng)前節(jié)點(diǎn)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//前節(jié)點(diǎn)已經(jīng)知道了 釋放鎖的時(shí)候喚醒你,所以安心阻塞就可以了
return true;
if (ws > 0) {
//如果前置節(jié)點(diǎn)已經(jīng)被中斷或者等待超時(shí)了幢痘,這里會(huì)會(huì)找到一個(gè)有效的前置節(jié)點(diǎn)并添加到當(dāng)前節(jié)點(diǎn)中
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//更新前置節(jié)點(diǎn)的狀態(tài)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
shouldParkAfterFailedAcquire 判斷節(jié)點(diǎn)是否被掛起唬格,如果節(jié)點(diǎn)被掛起了那么一定是需要一種機(jī)制它前面的節(jié)點(diǎn)在釋放鎖的時(shí)候能夠喚醒它,如果沒有這種機(jī)制線程被掛起了沒有喚醒豈不是有很大問題。這種機(jī)制就是通過設(shè)置前節(jié)點(diǎn)的共享變量waitStatus=-1來實(shí)現(xiàn)的西轩。如果前節(jié)點(diǎn)為-1那么我就可以安心的掛起了因?yàn)榍肮?jié)點(diǎn)會(huì)在釋放鎖的時(shí)候喚醒我员舵。如果前節(jié)點(diǎn)>0也就是被刪除了,那么就刪除處于waitStatus=1狀態(tài)的節(jié)點(diǎn)直到找到一個(gè)前節(jié)點(diǎn)的waitStatus <=0并相連接藕畔。
我當(dāng)時(shí)看到這里的時(shí)候一直不理解為什么要for (;;) 循環(huán)中判斷是否需要阻塞呢马僻? 直接把當(dāng)前Node對(duì)應(yīng)的線程阻塞不就行了嗎。假如TreadA 與 TheadB 注服,TreadA獲取了鎖韭邓,TreadB添加到等待隊(duì)列中,那么這個(gè)時(shí)候TreadA釋放了鎖溶弟,
ThreadB進(jìn)入到了acquireQueued方法還未阻塞并且還未獲取鎖女淑,所以ThreadA不需要喚醒ThreadB,因?yàn)門hreadB沒有阻塞辜御,它會(huì)在acquireQueued的后續(xù)方法中獲取鎖的鸭你。
//釋放鎖邏輯 if (h != null && h.waitStatus != 0) unparkSuccessor(h);
ThreadB進(jìn)入到了acquireQueued方法的,這里已經(jīng)獲取鎖失敗了擒权。TreadA鎖已經(jīng)被釋放了袱巨,所以ThreadB調(diào)用shouldParkAfterFailedAcquire方法并設(shè)置前置節(jié)點(diǎn)的狀態(tài)為-1并放回false不阻塞,為的是能夠再次循環(huán)一遍嘗試獲取鎖碳抄。如果這里設(shè)置節(jié)點(diǎn)狀態(tài)為-1沒有重新獲取的話愉老,會(huì)導(dǎo)致ThreadB一直阻塞不會(huì)獲取鎖,因?yàn)門hreadA已經(jīng)釋放了剖效。
parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
掛起當(dāng)前線程嫉入。這里調(diào)用了LockSupport.park(this)把線程掛起了并返回 Thread.interrupted()線程復(fù)位。
我這里一直不明白線程被中斷后為什么調(diào)用 Thread.interrupted()進(jìn)行復(fù)位然后節(jié)點(diǎn)繼續(xù)去搶占鎖璧尸。
if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) interrupted = true;
這里只是單純的標(biāo)識(shí)了線程被中斷過咒林,在獲取鎖成功后會(huì)再次發(fā)起一次中斷。
static void selfInterrupt() { Thread.currentThread().interrupt(); }
線程被中斷后直接退出不就行了嗎爷光? 線程被干掉并不是簡單的kill就完事了映九,直接刪除線程是不合理的比如說一些內(nèi)存資源沒有被釋放,所以Thread提供了一個(gè)interrupt()方法發(fā)送一個(gè)中斷的信號(hào)瞎颗,當(dāng)開發(fā)人員收到這個(gè)信號(hào)之后去進(jìn)行處理比如說釋放資源然后執(zhí)行完Runable方法讓系統(tǒng)去回收Thread。所以在重入鎖中業(yè)務(wù)代碼需要明確收到中斷信息后進(jìn)行釋放資源等操作捌议,前提是線程需要獲取鎖哼拔,釋放資源的操作肯定也是Lock.lock()中。所以AQS收到中斷信號(hào)后并沒有直接釋放線程瓣颅,而是記錄標(biāo)識(shí)然后繼續(xù)在等待列隊(duì)中嘗試獲取鎖倦逐,獲取鎖之后調(diào)用 interrupt()方法再次中斷。
釋放鎖
更新state的值如果為0則喚醒后續(xù)節(jié)點(diǎn)
release
public final boolean release(int arg) {
if (tryRelease(arg)) {
//釋放鎖成功
Node h = head; //得到 aqs 中 head 節(jié)點(diǎn)
if (h != null && h.waitStatus != 0)
//如果 head 節(jié)點(diǎn)不 為空并且狀態(tài)!=0.
//調(diào)用 unparkSuccessor(h) 喚醒后續(xù)節(jié)點(diǎn)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
釋放鎖的業(yè)務(wù)邏輯不需要考慮多線程的問題宫补,他還是被一個(gè)線程持有檬姥。因?yàn)橹厝腈i的機(jī)制state>=1 釋放就是 getState() - releases并跟新state為最新值曾我,如果state=0則返回。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}