1.J.U.C 簡介
Java.util.concurrent 是在并發(fā)編程中比較常用的工具類乐设,里面包含很多用來在并發(fā)場景中使用的組件。比如線程池碗短、阻塞隊列、計時器题涨、同步器偎谁、并發(fā)集合等等。并發(fā)包的作者是大名鼎鼎的 Doug Lea纲堵。我們在接下來的課程中巡雨,回去剖析一些經(jīng)典的比較常用的組件的設(shè)計思想
2.Lock
Lock 在 J.U.C 中是最核心的組件,前面我們講 synchronized 的時候說過席函,鎖最重要的特性就是解決并發(fā)安全問題铐望。為什么要以 Lock 作為切入點呢?如果有同學(xué)看過 J.U.C 包中的所有組件,一定會發(fā)現(xiàn)絕大部分的組件都有用到了 Lock正蛙。所以通過 Lock 作為切入點使得在后續(xù)的學(xué)習(xí)過程中會更加輕松督弓。
Lock 簡介
在 Lock 接口出現(xiàn)之前,Java 中的應(yīng)用程序?qū)τ诙嗑€程的并發(fā)安全處理只能基于
synchronized 關(guān)鍵字來解決乒验。但是 synchronized 在有些場景中會存在一些短板愚隧,
也就是它并不適合于所有的并發(fā)場景。但是在 Java5 以后锻全,Lock 的出現(xiàn)可以解決
synchronized 在某些場景中的短板狂塘,它比 synchronized 更加靈活。
Lock 的實現(xiàn)
Lock 本質(zhì)上是一個接口鳄厌,它定義了釋放鎖和獲得鎖的抽象方法荞胡,定義成接口就意
味著它定義了鎖的一個標準規(guī)范,也同時意味著鎖的不同實現(xiàn)了嚎。實現(xiàn) Lock 接口的
類有很多泪漂,以下為幾個常見的鎖實現(xiàn)
ReentrantLock
:表示重入鎖,它是唯一一個實現(xiàn)了 Lock 接口的類歪泳。重入鎖指的是
線程在獲得鎖之后窖梁,再次獲取該鎖不需要阻塞,而是直接關(guān)聯(lián)一次計數(shù)器增加重入
次數(shù)
ReentrantReadWriteLock
:重入讀寫鎖夹囚,它實現(xiàn)了 ReadWriteLock 接口邀窃,在這個
類中維護了兩個鎖,一個是 ReadLock瞬捕,一個是 WriteLock,他們都分別實現(xiàn)了 Lock
接口劣砍。讀寫鎖是一種適合讀多寫少的場景下解決線程安全問題的工具刑枝,基本原則是: 讀和讀不互斥、讀和寫互斥迅腔、寫和寫互斥装畅。也就是說涉及到影響數(shù)據(jù)變化的操作都會存在互斥。
StampedLock
: stampedLock 是 JDK8 引入的新的鎖機制沧烈,可以簡單認為是讀寫鎖的一個改進版本掠兄,讀寫鎖雖然通過分離讀和寫的功能使得讀和讀之間可以完全并發(fā),但是讀和寫是有沖突的,如果大量的讀線程存在蚂夕,可能會引起寫線程的饑餓迅诬。
stampedLock 是一種樂觀的讀策略,使得樂觀鎖完全不會阻塞寫線程
Lock 的類關(guān)系圖
Lock 有很多的鎖的實現(xiàn)婿牍,但是直觀的實現(xiàn)是 ReentrantLock 重入鎖
下面是lock的方法介紹
void lock() // 如果鎖可用就 獲得鎖侈贷,如果鎖不可用就阻塞 直到鎖釋放
void lockInterruptibly() // 和 lock()方法相似, 但阻塞的線程可中斷, 拋出java.lang.InterruptedExcepti on異常
boolean tryLock() // 非阻塞獲取鎖;嘗試獲取鎖,如果成功返回true
boolean tryLock(long timeout, TimeUnit timeUnit) //帶有超時時間的獲取鎖方法
void unlock() // 釋放鎖
3.ReentrantLock 重入鎖
重入鎖牍汹,表示支持重新進入的鎖铐维,也就是說,如果當前線程t1通過調(diào)用lock方 法獲取了鎖之后慎菲,再次調(diào)用lock嫁蛇,是不會再阻塞去獲取鎖的,直接增加重試次數(shù) 就行了露该。synchronized和ReentrantLock都是可重入鎖睬棚。不理解為什么鎖會存在重入的特性,那是因為對于同步鎖的理解程度還不夠解幼,比如在下面這類 的場景中抑党,存在多個加鎖的方法的相互調(diào)用,其實就是一種重入特性的場景底靠。
重入鎖的設(shè)計目的
比如調(diào)用demo方法獲得了當前的對象鎖,然后在這個方法中再去調(diào)用 demo2鳄逾,demo2中的存在同一個實例鎖,這個時候當前線程會因為無法獲得 demo2的對象鎖而阻塞枚抵,就會產(chǎn)生死鎖俄精。重入鎖的設(shè)計目的是避免線程的死鎖嫌套。
public class App {
public synchronized void demo(){ //main獲得對象鎖
System.out.println("demo");
demo2();
}
public void demo2(){
synchronized (this) { //增加重入次數(shù)就行
System.out.println("demo2");
}//減少重入次數(shù)
}
public static void main(String[] args) {
App app=new App();
app.demo();
}
}
ReentrantLock 的使用案例
/**
* @Project: ThreadExample
* @description: ReentrantLock的使用
* @author: sunkang
* @create: 2020-06-27 22:39
* @ModificationHistory who when What
**/
public class ReentrantLockDemo {
private int count = 0;
Lock lock = new ReentrantLock();
public void increment(){
lock.lock();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++;
lock.unlock();
}
public static void main(String[] args) throws InterruptedException {
ReentrantLockDemo reentrantLock = new ReentrantLockDemo();
for (int i=0 ;i<1000;i++){ //使用1000個線程進行計數(shù)器累加
new Thread(()->reentrantLock.increment()).start();
}
Thread.sleep(3000);
System.out.println(reentrantLock.count);
}
}
ReentrantReadWriteLock
我們以前理解的鎖,基本都是排他鎖痹筛,也就是這些鎖在同一時刻只允許一個線程進 行訪問,而讀寫所在同一時刻可以允許多個線程訪問滋早,但是在寫線程訪問時杆麸,所有 的讀線程和其他寫線程都會被阻塞。讀寫鎖維護了一對鎖揭斧,一個讀鎖未蝌、一個寫鎖; 一般情況下左冬,讀寫鎖的性能都會比排它鎖好梅忌,因為大多數(shù)場景讀是多于寫的牧氮。在讀 多于寫的情況下丹莲,讀寫鎖能夠提供比排它鎖更好的并發(fā)性和吞吐量.
public class RWLock {
static ReentrantReadWriteLock wrl=new ReentrantReadWriteLock();
static Map<String,Object> cacheMap=new HashMap<>();
static Lock read=wrl.readLock();
static Lock write=wrl.writeLock();
//線程B/C/D
public static final Object get(String key){
System.out.println("begin read data:"+key);
read.lock(); //獲得讀鎖-> 阻塞
try {
return cacheMap.get(key);
}finally {
read.unlock();
}
}
//線程A
public static final Object put(String key,Object val){
write.lock();//獲得了寫鎖
try{
return cacheMap.put(key,val);
}finally {
write.unlock();
}
}
}
在這個案例中,通過hashmap來模擬了一個內(nèi)存緩存洲赵,然后使用讀寫所來保證這 個內(nèi)存緩存的線程安全性叠萍。當執(zhí)行讀操作的時候,需要獲取讀鎖抄腔,在并發(fā)訪問的時 候赫蛇,讀鎖不會被阻塞,因為讀操作不會影響執(zhí)行結(jié)果暂幼。
在執(zhí)行寫操作是旺嬉,線程必須要獲取寫鎖,當已經(jīng)有線程持有寫鎖的情況下雨效,當前線 程會被阻塞徽龟,只有當寫鎖釋放以后传透,其他讀寫操作才能繼續(xù)執(zhí)行。使用讀寫鎖提升 讀操作的并發(fā)性托享,也保證每次寫操作對所有的讀寫操作的可見性
讀鎖與讀鎖可以共享
讀鎖與寫鎖不可以共享(排他)
寫鎖與寫鎖不可以共享(排他)
4.ReentrantLock 的實現(xiàn)原理
我們知道鎖的基本原理是闰围,基于將多線程并行任務(wù)通過某一種機制實現(xiàn)線程的串 行執(zhí)行,從而達到線程安全性的目的校仑。在 synchronized 中迄沫,我們分析了偏向鎖羊瘩、 輕量級鎖、樂觀鎖睬捶∏苊常基于樂觀鎖以及自旋鎖來優(yōu)化了 synchronized 的加鎖開銷积仗, 同時在重量級鎖階段寂曹,通過線程的阻塞以及喚醒來達到線程競爭和同步的目的。 那么在ReentrantLock中翔烁,也一定會存在這樣的需要去解決的問題蹬屹。就是在多線程 競爭重入鎖時,競爭失敗的線程是如何實現(xiàn)阻塞以及被喚醒的呢厦取?
AQS 是什么
在 Lock 中虾攻,用到了一個同步隊列 AQS,全稱 AbstractQueuedSynchronizer朋沮,它 是一個同步工具也是Lock用來實現(xiàn)線程同步的核心組件樊拓。如果你搞懂了AQS,那 么J.U.C中絕大部分的工具都能輕松掌握图呢。
AQS 的兩種功能
從使用層面來說赴叹,AQS的功能分為兩種:獨占和共享
獨占鎖,每次只能有一個線程持有鎖绽媒,比如前面給大家演示的ReentrantLock就是 以獨占方式實現(xiàn)的互斥鎖
共享鎖是辕,允許多個線程同時獲取鎖旁蔼,并發(fā)訪問共享資源牌芋,比如 ReentrantReadWriteLock
AQS 的內(nèi)部實現(xiàn)
AQS 隊列內(nèi)部維護的是一個 FIFO 的雙向鏈表躺屁,這種結(jié)構(gòu)的特點是每個數(shù)據(jù)結(jié)構(gòu)都有兩個指針,分別指向直接的后繼節(jié)點和直接前驅(qū)節(jié)點烁兰。所以雙向鏈表可以從任 意一個節(jié)點開始很方便的訪問前驅(qū)和后繼广辰。每個 Node 其實是由線程封裝,當線 程爭搶鎖失敗后會封裝成Node加入到ASQ隊列中去几睛;當獲取鎖的線程釋放鎖以 后,會從隊列中喚醒一個阻塞的節(jié)點(線程)焕济。
Node類的組成如下
tatic final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev; //前驅(qū)節(jié)點
volatile Node next; //后繼節(jié)點
volatile Thread thread;//當前線程
Node nextWaiter; //存儲在condition隊列中的后繼節(jié)點
//是否為共享鎖
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
//將線程構(gòu)造成一個Node,添加到等待隊列
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
//這個方法會在Condition隊列使用,后續(xù)單獨寫一篇文章分析condition
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
釋放鎖以及添加線程對于隊列的變化
添加節(jié)點
當出現(xiàn)鎖競爭以及釋放鎖的時候,AQS同步隊列中的節(jié)點會發(fā)生變化能曾,首先看一下添加節(jié)點的場景。
這里會涉及到兩個變化
新的線程封裝成Node節(jié)點追加到同步隊列中驼唱,設(shè)置prev節(jié)點以及修改當前節(jié)點的前置節(jié)點的next節(jié)點指向自己
通過CAS講tail重新指向新的尾部節(jié)點
head節(jié)點表示獲取鎖成功的節(jié)點,當頭結(jié)點在釋放同步狀態(tài)時京办,會喚醒后繼節(jié)點惭婿, 如果后繼節(jié)點獲得鎖成功,會把自己設(shè)置為頭結(jié)點佑力,節(jié)點的變化過程如下
這個過程也是涉及到兩個變化
- 修改head節(jié)點指向下一個獲得鎖的節(jié)點
- 新的獲得鎖的節(jié)點漓滔,將prev的指針指向null
設(shè)置 head 節(jié)點不需要用 CAS透且,原因是設(shè)置 head 節(jié)點是由獲得鎖的線程來完成 的鲸沮,而同步鎖只能由一個線程獲得讼溺,所以不需要 CAS 保證,只需要把 head 節(jié)點設(shè)置為原首節(jié)點的后繼節(jié)點,并且斷開原h(huán)ead節(jié)點的next引用即可
5.ReentrantLock 的源碼分析
清楚了AQS的基本架構(gòu)以后归敬,我們來分析一下AQS的源碼,仍然以ReentrantLock為模型鄙早。
5.1 ReentrantLock的時序圖
調(diào)用ReentrantLock中的lock()方法弄慰,源碼的調(diào)用過程我使用了時序圖來展現(xiàn)
從圖上可以看出來,當鎖獲取失敗時蝶锋,會調(diào)用addWaiter()方法將當前線程封裝成Node節(jié)點加入到AQS隊列陆爽,基于這個思路,我們來分析AQS的源碼實現(xiàn)
5.2ReentrantLock.lock()
public void lock() {
sync.lock();
}
這個是獲取鎖的入口扳缕,調(diào)用sync這個類里面的方法,sync是什么呢粥庄?
abstract static class Sync extends AbstractQueuedSynchronizer
sync是一個靜態(tài)內(nèi)部類描验,它繼承了AQS這個抽象類耕魄,前面說過AQS是一個同步工具甩鳄,主要用來實現(xiàn)同步控制。我們在利用這個工具的時候阿逃,會繼承它來實現(xiàn)同步控制功能。
通過進一步分析萝映,發(fā)現(xiàn)Sync這個類有兩個具體的實現(xiàn)造寝,分別是NofairSync(非公平鎖)
,FailSync(公平鎖)
.
公平鎖 表示所有線程嚴格按照FIFO來獲取鎖
非公平鎖 表示可以存在搶占鎖的功能,也就是說不管當前隊列上是否存在其他線程等待尺上,新線程都有機會搶占鎖
公平鎖和非公平鎖的實現(xiàn)上的差異挣菲,我會在文章后面做一個解釋,接下來的分析仍然以非公平鎖
作為主要分析邏輯元暴。
5.3NonfairSync.lock
以非公平鎖為例,來看看lock中的實現(xiàn)
- 非公平鎖和公平鎖最大的區(qū)別在于啸驯,在非公平鎖中我搶占鎖的邏輯是,不管有 沒有線程排隊饲趋,我先上來cas去搶占一下
- CAS成功反镇,就表示成功獲得了鎖
- CAS失敗,調(diào)用acquire(1)走鎖競爭邏輯
final void lock() {
if (compareAndSetState(0, 1)) //通過cas操作來修改state狀態(tài),表示爭搶鎖的操作
setExclusiveOwnerThread(Thread.currentThread());//設(shè)置當前獲得鎖狀態(tài)的線程
else
acquire(1); //嘗試去獲取鎖
}
CAS的實現(xiàn)原理
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
通過 cas 樂觀鎖的方式來做比較并替換,這段代碼的意思是,如果當前內(nèi)存中的 state的值和預(yù)期值expect相等,則替換為update。更新成功返回true,否則返 回false.
這個操作是原子的叁扫,不會出現(xiàn)線程安全問題,這里面涉及到Unsafe這個類的操作糖儡, 以及涉及到state這個屬性的意義。
state 是 AQS 中的一個屬性,它在不同的實現(xiàn)中所表達的含義不一樣,對于重入 鎖的實現(xiàn)來說,表示一個同步狀態(tài)。它有兩個含義的表示
- 當state=0時殖告,表示無鎖狀態(tài)
- 當 state>0 時墓懂,表示已經(jīng)有線程獲得了鎖焰宣,也就是 state=1,但是因為ReentrantLock允許重入捕仔,所以同一個線程多次獲得同步鎖的時候匕积,state會遞增, 比如重入5次榜跌,那么state=5闪唆。 而在釋放鎖的時候,同樣需要釋放5次直到state=0 其他線程才有資格獲得鎖
Unsafe類
Unsafe類是在sun.misc包下钓葫,不屬于Java標準悄蕾。但是很多Java的基礎(chǔ)類庫,包 括一些被廣泛使用的高性能開發(fā)庫都是基于 Unsafe 類開發(fā)的础浮,比如 Netty帆调、 Hadoop、Kafka等豆同;
Unsafe 可認為是 Java 中留下的后門番刊,提供了一些低層次操作,如直接內(nèi)存訪問影锈、 線程的掛起和恢復(fù)芹务、CAS、線程同步鸭廷、內(nèi)存屏障
而 CAS 就是 Unsafe 類中提供的一個原子操作枣抱,第一個參數(shù)為需要改變的對象, 第二個為偏移量(即之前求出來的 headOffset 的值)靴姿,第三個參數(shù)為期待的值沃但,第 四個為更新后的值整個方法的作用是如果當前時刻的值等于預(yù)期值var4相等磁滚,則 更新為新的期望值 var5佛吓,如果更新成功,則返回true垂攘,否則返回false维雇;
stateOffset
一個Java對象可以看成是一段內(nèi)存,每個字段都得按照一定的順序放在這段內(nèi)存 里晒他,通過這個方法可以準確地告訴你某個字段相對于對象的起始內(nèi)存地址的字節(jié) 偏移吱型。用于在后面的compareAndSwapInt中,去根據(jù)偏移量找到對象在內(nèi)存中的 具體位置
所以stateOffset表示state這個字段在AQS類的內(nèi)存中相對于該類首地址的偏移 量
compareAndSwapInt
在 unsafe.cpp 文件中陨仅,可以找到 compareAndSwarpInt 的實現(xiàn)
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj); //將 Java 對象解析成 JVM 的 oop(普通對象指針),
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); //根據(jù)對象 p和地址偏移量找到地址
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
//基于 cas 比較并替換津滞, x 表示需要更新的值铝侵,addr 表示 state 在內(nèi)存中的地址,e 表示預(yù)期值
UNSAFE_END
5.4AQS.accquire
acquire 是 AQS 中的方法触徐,如果 CAS 操作未能成功咪鲜,說明 state 已經(jīng)不為 0,此 時繼續(xù)acquire(1)操作
大家思考一下撞鹉,acquire方法中的1的參數(shù)是用來做什么呢疟丙?
這個方法的主要邏輯是
- 通過tryAcquire嘗試獲取獨占鎖,如果成功返回true鸟雏,失敗返回false
- 如果tryAcquire失敗享郊,則會通過addWaiter方法將當前線程封裝成Node添加 到AQS隊列尾部
- acquireQueued,將Node作為參數(shù)孝鹊,通過自旋去嘗試獲取鎖炊琉。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
NonfairSync.tryAcquire
這個方法的作用是嘗試獲取鎖,如果成功返回true又活,不成功返回false 它是重寫 AQS 類中的 tryAcquire 方法温自,并且大家仔細看一下 AQS 中 tryAcquire 方法的定義,并沒有實現(xiàn)皇钞,而是拋出異常悼泌。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
ReentrantLock.nofairTryAcquire
- 獲取當前線程,判斷當前的鎖的狀態(tài)
- 如果state=0表示當前是無鎖狀態(tài)夹界,通過cas更新state狀態(tài)的值
- 當前線程是屬于重入馆里,則增加重入次數(shù)
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();//獲取當前執(zhí)行的線程
int c = getState();//獲得 state 的值
if (c == 0) {//表示無鎖狀態(tài)
if (compareAndSetState(0, acquires)) {//cas 替換 state 的值,cas 成功表示獲取鎖成功
setExclusiveOwnerThread(current);//保存當前獲得鎖的線程,下次再來的時候不要再嘗試競爭鎖
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//如果同一個線程來獲得鎖可柿,直接增加重入次數(shù)
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
5.5AQS.addWaiter
當 tryAcquire 方法獲取鎖失敗以后鸠踪,則會先調(diào)用 addWaiter 將當前線程封裝成 Node.
入?yún)?mode 表示當前節(jié)點的狀態(tài),傳遞的參數(shù)是 Node.EXCLUSIVE,表示獨占狀 態(tài)盔性。意味著重入鎖用到了AQS的獨占鎖功能
- 將當前線程封裝成Node
- 當前鏈表中的 tail 節(jié)點是否為空南缓,如果不為空,則通過 cas 操作把當前線程的node添加到AQS隊列
- 如果為空或者cas失敗评汰,調(diào)用enq將節(jié)點添加到AQS隊列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//把當前線程封裝為 Node
Node pred = tail; //tail 是 AQS 中表示同比隊列隊尾的屬性,默認是 null
if (pred != null) {//tail 不為空的情況下痢虹,說明隊列中存在節(jié)點
node.prev = pred;//把當前線程的 Node 的 prev 指向 tail
if (compareAndSetTail(pred, node)) {//通過 cas 把 node加入到 AQS 隊列被去,也就是設(shè)置為 tail
pred.next = node;//設(shè)置成功以后,把原 tail 節(jié)點的 next指向當前 node
return node;
}
}
enq(node);//tail=null或者compareAndSetTail(pred, node)=false,把 node 添加到同步隊列
return node;
}
enq
enq就是通過自旋操作把當前節(jié)點加入到隊列中
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
圖解分析
假設(shè)3個線程來爭搶鎖奖唯,那么截止到enq方法運行結(jié)束之后惨缆,或者調(diào)用addwaiter 方法結(jié)束后,AQS中的鏈表結(jié)構(gòu)圖
5.6AQS.acquireQueued
通過 addWaiter 方法把線程添加到鏈表后,會接著把 Node 作為參數(shù)傳遞給acquireQueued方法坯墨,去競爭鎖
- 獲取當前節(jié)點的prev節(jié)點
- 如果prev節(jié)點為head節(jié)點寂汇,那么它就有資格去爭搶鎖,調(diào)用tryAcquire搶占 鎖
- 搶占鎖成功以后捣染,把獲得鎖的節(jié)點設(shè)置為 head健无,并且移除原來的初始化 head 節(jié)點
- 如果獲得鎖失敗,則根據(jù)waitStatus決定是否需要掛起線程
- 最后液斜,通過cancelAcquire取消獲得鎖的操作
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//獲取當前節(jié)點的 prev 節(jié)點
if (p == head && tryAcquire(arg)) {//如果是 head 節(jié)點累贤,說明有資格去爭搶鎖
setHead(node);//獲取鎖成功,也就是ThreadA 已經(jīng)釋放了鎖少漆,然后設(shè)置 head 為 ThreadB 獲得執(zhí)行權(quán)限
p.next = null; //把原 head 節(jié)點從鏈表中移除
failed = false;
return interrupted;
}
//ThreadA 可能還沒釋放鎖臼膏,使得 ThreadB 在執(zhí)行 tryAcquire 時會返回 false
if (shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt())
interrupted = true; //并且返回當前線程在等待過程中有沒有中斷過。
}
} finally {
if (failed)
cancelAcquire(node);
}
}
NofairSync.tryAcquire
這個方法在前面分析過示损,就是通過state的狀態(tài)來判斷是否處于無鎖狀態(tài)渗磅,然后在 通過cas進行競爭鎖操作。成功表示獲得鎖检访,失敗表示獲得鎖失敗
shouldParkAfterFailedAcquire
如果ThreadA的鎖還沒有釋放的情況下始鱼,ThreadB和ThreadC來爭搶鎖肯定是會 失敗,那么失敗以后會調(diào)用shouldParkAfterFailedAcquire方法
Node 有 5 中狀態(tài)脆贵,分別是:CANCELLED(1)医清,SIGNAL(-1) CONDITION(2)、PROPAGATE(-3)卖氨、默認狀態(tài)(0)
CANCELLED: 在同步隊列中等待的線程等待超時或被中斷会烙,需要從同步隊列中取消該Node的結(jié)點, 其結(jié)點的waitStatus為CANCELLED,即結(jié)束狀態(tài)筒捺,進入該狀 態(tài)后的結(jié)點將不會再變化
SIGNAL: 只要前置節(jié)點釋放鎖柏腻,就會通知標識為SIGNAL狀態(tài)的后續(xù)節(jié)點的線程
CONDITION: 和Condition有關(guān)系
PROPAGATE:共享模式下,PROPAGATE狀態(tài)的線程處于可運行狀態(tài)
0.初始狀態(tài)
這個方法的主要作用是系吭,通過 Node 的狀態(tài)來判斷五嫂,ThreadA 競爭鎖失敗以后是否應(yīng)該被掛起。
- 如果ThreadA的pred節(jié)點狀態(tài)為SIGNAL肯尺,那就表示可以放心掛起當前線程
- 通過循環(huán)掃描鏈表把CANCELLED狀態(tài)的節(jié)點移除
- 修改pred節(jié)點的狀態(tài)為SIGNAL,返回false.
返回false時沃缘,也就是不需要掛起,返回true蟆盹,則需要調(diào)用parkAndCheckInterrupt 掛起當前線程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//前置節(jié)點的waitStatus
if (ws == Node.SIGNAL)//如果前置節(jié)點為 SIGNAL孩灯,意味著只需要等待其他前置節(jié)點的線程被釋放闺金,
return true;//返回 true逾滥,意味著可以直接放心的掛起了
if (ws > 0) {//ws 大于 0,意味著 prev 節(jié)點取消了排隊,直接移除這個節(jié)點就行
do {
node.prev = pred = pred.prev;//相當于: pred=pred.prev;
node.prev=pred;
} while (pred.waitStatus > 0); //這里采用循環(huán)寨昙,從雙向列表中移除 CANCELLED 的節(jié)點
pred.next = node;
} else {//利用 cas 設(shè)置 prev 節(jié)點的狀態(tài)為 SIGNAL(-1)
compareAndSetWaitStatus(pred, ws,Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt
使用LockSupport.park掛起當前線程編程WATING狀態(tài)
Thread.interrupted讥巡,返回當前線程是否被其他線程觸發(fā)過中斷請求,也就是thread.interrupt();
如果有觸發(fā)過中斷請求舔哪,那么這個方法會返回當前的中斷標識 true欢顷,并且對中斷標識進行復(fù)位標識已經(jīng)響應(yīng)過了中斷請求。如果返回true捉蚤,意味 著在acquire方法中會執(zhí)行selfInterrupt()抬驴。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
selfInterrupt: 標識如果當前線程在acquireQueued中被中斷過,則需要產(chǎn)生一 個中斷請求缆巧,原因是線程在調(diào)用acquireQueued方法的時候是不會響應(yīng)中斷請求 的
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
圖解分析
通過acquireQueued方法來競爭鎖布持,如果ThreadA還在執(zhí)行中沒有釋放鎖的話, 意味著ThreadB和ThreadC只能掛起了陕悬。
LockSupport
LockSupport類是Java6引入的一個類题暖,提供了基本的線程同步原語。LockSupport 實際上是調(diào)用了Unsafe類里的函數(shù)捉超,歸結(jié)到Unsafe里胧卤,只有兩個函數(shù)
public native void unpark(Object var1);
public native void park(boolean var1, long var2);
unpark函數(shù)為線程提供“許可(permit)”,線程調(diào)用park函數(shù)則等待“許可”拼岳。這個有 點像信號量枝誊,但是這個“許可”是不能疊加的,“許可”是一次性的惜纸。
permit 相當于 0/1 的開關(guān)侧啼,默認是 0,調(diào)用一次unpark 就加 1變成了 1.調(diào)用一次park會消費permit堪簿,又會變成0痊乾。 如果再調(diào)用一次park會阻塞,因為permit已 經(jīng)是0了椭更。直到permit變成1.這時調(diào)用unpark會把permit設(shè)置為1.每個線程都 有一個相關(guān)的permit,permit最多只有一個哪审,重復(fù)調(diào)用unpark不會累積
6.鎖的釋放流程
如果這個時候ThreadA釋放鎖了,那么我們來看鎖被釋放后會產(chǎn)生什么效果
ReentrantLock.unlock
在unlock中虑瀑,會調(diào)用release方法來釋放鎖
public final boolean release(int arg) {
if (tryRelease(arg)) { //釋放鎖成功
Node h = head; //得到 aqs 中 head 節(jié)點
if (h != null && h.waitStatus != 0)//如果 head 節(jié)點不為空并且狀態(tài)湿滓!=0.調(diào)用 unparkSuccessor(h)喚醒后續(xù)節(jié)點
unparkSuccessor(h);
return true;
}
return false;
}
ReentrantLock.tryRelease
這個方法可以認為是一個設(shè)置鎖狀態(tài)的操作,通過將state狀態(tài)減掉傳入的參數(shù)值 (參數(shù)是1)舌狗,如果結(jié)果狀態(tài)為0叽奥,就將排它鎖的Owner設(shè)置為null,以使得其它的線程有機會進行執(zhí)行痛侍。
在排它鎖中朝氓,加鎖的時候狀態(tài)會增加 1(當然可以自己修改這個值),在解鎖的時 候減掉1,同一個鎖赵哲,在可以重入后待德,可能會被疊加為2、 3枫夺、 4這些值将宪,只有unlock() 的次數(shù)與 lock()的次數(shù)對應(yīng)才會將 Owner 線程設(shè)置為空,而且也只有這種情況下 才會返回true橡庞。
protected final boolean tryRelease(int releases){
int c = getState() - releases;
if (Thread.currentThread() !=getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;//獲得 head 節(jié)點的狀態(tài)
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);// 設(shè)置 head 節(jié)點狀態(tài)為 0
Node s = node.next;//得到 head 節(jié)點的下一個節(jié)點
if (s == null || s.waitStatus > 0) {
//如果下一個節(jié)點為 null 或者 status>0 表示 cancelled 狀態(tài).
//通過從尾部節(jié)點開始掃描较坛,找到距離 head 最近的一個waitStatus<=0 的節(jié)點
s = null;
for (Node t = tail; t != null && t != node; t =t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) //next 節(jié)點不為空,直接喚醒這個線程即可
LockSupport.unpark(s.thread);
}
為什么在釋放鎖的時候是從 tail 進行掃描
我覺得有必要單獨擰出來說一下扒最,我們再回到 enq 那個方法燎潮、。在標注為紅色部分的代碼來看一個新的節(jié)點是如何加入到鏈表中的
- 將新的節(jié)點的prev指向tail
- 通過cas將tail設(shè)置為新的節(jié)點扼倘,因為cas是原子操作所以能夠保證線程安全性
- t.next=node确封;設(shè)置原tail的next節(jié)點指向新的節(jié)點
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在cas操作之后, t.next=node操作之前再菊。 存在其他線程調(diào)用unlock方法從head 開始往后遍歷爪喘,由于 t.next=node 還沒執(zhí)行意味著鏈表的關(guān)系還沒有建立完整。 就會導(dǎo)致遍歷到 t 節(jié)點的時候被中斷纠拔。所以從后往前遍歷秉剑,一定不會存在這個問 題。
圖解分析
通過鎖的釋放稠诲,原本的結(jié)構(gòu)就發(fā)生了一些變化侦鹏。head節(jié)點的waitStatus變成了0, ThreadB被喚醒
7.原本掛起的線程繼續(xù)執(zhí)行
通過ReentrantLock.unlock臀叙,原本掛起的線程被喚醒以后繼續(xù)執(zhí)行略水,應(yīng)該從哪里執(zhí) 行大家還有印象吧。 原來被掛起的線程是在 acquireQueued 方法中劝萤,所以被喚 醒以后繼續(xù)從這個方法開始執(zhí)行
AQS.acquireQueued
這個方法前面已經(jīng)完整分析過了渊涝,我們只關(guān)注一下 ThreadB 被喚醒以后的執(zhí)行流 程。
由于ThreadB的prev節(jié)點指向的是head床嫌,并且ThreadA已經(jīng)釋放了鎖跨释。所以這 個時候調(diào)用tryAcquire方法時,可以順利獲取到鎖
- 把ThreadB節(jié)點當成head
- 把原h(huán)ead節(jié)點的next節(jié)點指向為null
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
圖解分析
- 設(shè)置新head節(jié)點的prev=null
設(shè)置原h(huán)ead節(jié)點的next節(jié)點為null
8.公平鎖和非公平鎖的區(qū)別
鎖的公平性是相對于獲取鎖的順序而言的厌处,如果是一個公平鎖鳖谈,那么鎖的獲取順序 就應(yīng)該符合請求的絕對時間順序,也就是 FIFO阔涉。
在上面分析的例子來說缆娃,只要 CAS 設(shè)置同步狀態(tài)成功捷绒,則表示當前線程獲取了鎖,而公平鎖則不一樣龄恋,差異點 有兩個
FairSync.tryAcquire
final void lock() {
acquire(1);
}
非公平鎖在獲取鎖的時候疙驾,會先通過CAS進行搶占凶伙,而公平鎖則不會
FairSync.tryAcquire
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
這個方法與 nonfairTryAcquire(int acquires)比較郭毕,不同的地方在于判斷條件多了hasQueuedPredecessors()方法,也就是加入了同步隊列中當前節(jié)點是否有前驅(qū)節(jié)點
的判斷函荣,如果該方法返回true显押,則表示有線程比當前線程更早地請求獲取鎖, 因此需要等待前驅(qū)線程獲取并釋放鎖之后才能繼續(xù)獲取鎖傻挂。