一裹虫、前言
借用Java并發(fā)編程實(shí)踐中的話"編寫(xiě)正確的程序并不容易,而編寫(xiě)正常的并發(fā)程序就更難了",相比于順序執(zhí)行的情況壹士,多線程的線程安全問(wèn)題是微妙而且出乎意料的,因?yàn)樵跊](méi)有進(jìn)行適當(dāng)同步的情況下多線程中各個(gè)操作的順序是不可預(yù)期的偿警,本文算是對(duì)多線程情況下同步策略的一個(gè)一個(gè)簡(jiǎn)單介紹躏救。
二、 什么是線程安全問(wèn)題
線程安全問(wèn)題是指當(dāng)多個(gè)線程同時(shí)讀寫(xiě)一個(gè)狀態(tài)變量螟蒸,并且沒(méi)有任何同步措施時(shí)候盒使,導(dǎo)致臟數(shù)據(jù)或者其他不可預(yù)見(jiàn)的結(jié)果的問(wèn)題。Java中首要的同步策略是使用Synchronized關(guān)鍵字七嫌,它提供了可重入的獨(dú)占鎖少办。
三、 什么是共享變量可見(jiàn)性問(wèn)題
要談可見(jiàn)性首先需要介紹下多線程處理共享變量時(shí)候的Java中內(nèi)存模型诵原。
Java內(nèi)存模型規(guī)定了所有的變量都存放在主內(nèi)存中英妓,當(dāng)線程使用變量時(shí)候都是把主內(nèi)存里面的變量拷貝到了自己的工作內(nèi)存中的。
當(dāng)線程操作一個(gè)共享變量時(shí)候操作流程為:
線程首先從主內(nèi)存拷貝共享變量到自己的工作內(nèi)存
然后對(duì)工作內(nèi)存里的變量進(jìn)行處理
處理完后更新變量值到主內(nèi)存
那么假如線程A和B同時(shí)去處理一個(gè)共享變量绍赛,會(huì)出現(xiàn)什么情況那蔓纠?
首先他們都會(huì)去走上面的三個(gè)流程,假如線程A拷貝共享變量到了工作內(nèi)存惹资,并且已經(jīng)對(duì)數(shù)據(jù)進(jìn)行了更新但是還沒(méi)有更新會(huì)主內(nèi)存贺纲,這時(shí)候線程B拷貝共享變量到了自己的工內(nèi)存進(jìn)行處理,處理后褪测,線程A才把自己的處理結(jié)果更更新到主內(nèi)存猴誊,可知 線程B處理的并不是線程A處理后的結(jié)果,也就是說(shuō)線程A處理后的變量值對(duì)線程B不可見(jiàn)侮措,這就是共享變量的可見(jiàn)性問(wèn)題懈叹。
構(gòu)成共享變量?jī)?nèi)存不可見(jiàn)原因是因?yàn)槿搅鞒滩皇窃有圆僮鳎旅嬷朗褂们‘?dāng)同步就可以解決這個(gè)問(wèn)題分扎。
我們知道ArrayList是線程不安全的澄成,因?yàn)樗淖x寫(xiě)方法沒(méi)有同步策略,會(huì)導(dǎo)致臟數(shù)據(jù)和不可預(yù)期的結(jié)果畏吓,下面我們就一一講解如何解決墨状。
這是線程不安全的
public class ArrayList<E>
{
public E get(int index) {
rangeCheck(index);
return elementData(index);
}
public E set(int index, E element) {
rangeCheck(index);
E oldValue = elementData(index);
elementData[index] = element;
return oldValue;
}
}
四、原子性
4.1 介紹
假設(shè)線程A執(zhí)行操作Ao和線程B執(zhí)行操作Bo 菲饼,那么從A看肾砂,當(dāng)B線程執(zhí)行Bo操作時(shí)候,那么Bo操作全部執(zhí)行宏悦,要么全部不執(zhí)行镐确,我們稱Ao和Bo操作互為原子性操作包吝,在設(shè)計(jì)計(jì)數(shù)器時(shí)候一般都是先讀取當(dāng)前值,然后+1源葫,然后更新會(huì)變量诗越,是讀-改-寫(xiě)的過(guò)程,這個(gè)過(guò)程必須是原子性的操作息堂。
public class ThreadNotSafeCount {
private Long value;
public Long getCount() {
return value;
}
public void inc() {
++value;
}
}
如上代碼是線程不安全的嚷狞,因?yàn)椴荒鼙WC++value是原子性操作。方法一是使用Synchronized進(jìn)行同步如下:
public class ThreadSafeCount {
private Long value;
public synchronized Long getCount() {
return value;
}
public synchronized void inc() {
++value;
}
}
注意储矩,這里不能簡(jiǎn)單的使用volatile修飾value進(jìn)行同步感耙,因?yàn)樽兞恐狄蕾嚵水?dāng)前值
使用Synchronized確實(shí)可以實(shí)現(xiàn)線程安全,即實(shí)現(xiàn)可見(jiàn)性和同步持隧,但是Synchronized是獨(dú)占鎖,沒(méi)有獲取內(nèi)部鎖的線程會(huì)被阻塞掉逃片,那么有沒(méi)有剛好的實(shí)現(xiàn)那屡拨?答案是肯定的。
4.2 原子變量類(lèi)
原子變量類(lèi)比鎖更輕巧褥实,比如AtomicLong代表了一個(gè)Long值呀狼,并提供了get,set方法,get损离,set方法語(yǔ)音和volatile相同哥艇,因?yàn)锳tomicLong內(nèi)部就是使用了volatile修飾的真正的Long變量。另外提供了原子性的自增自減操作僻澎,所以計(jì)數(shù)器可以改下為:
public class ThreadSafeCount {
private AtomicLong value = new AtomicLong(0L);
public Long getCount() {
return value.get();
}
public void inc() {
value.incrementAndGet();
}
}
那么相比使用synchronized的好處在于原子類(lèi)操作不會(huì)導(dǎo)致線程的掛起和重新調(diào)度貌踏,因?yàn)樗麅?nèi)部使用的是cas的非阻塞算法。
常用的原子類(lèi)變量為:AtomicLong窟勃,AtomicInteger祖乳,AtomicBoolean,AtomicReference.
五 CAS介紹
CAS 即CompareAndSet,也就是比較并設(shè)置秉氧,CAS有三個(gè)操作數(shù)分別為:內(nèi)存位置眷昆,舊的預(yù)期值,新的值汁咏,操作含義是當(dāng)內(nèi)存位置的變量值為舊的預(yù)期值時(shí)候使用新的值替換舊的值亚斋。通俗的說(shuō)就是看內(nèi)存位置的變量值是不是我給的舊的預(yù)期值,如果是則使用我給的新的值替換他攘滩,如果不是返回給我舊值帅刊。這個(gè)是處理器提供的一個(gè)原子性指令。上面介紹的AtomicLong的自增就是使用這種方式實(shí)現(xiàn):
public final long incrementAndGet() {
for (;;) {
long current = get();(1)
long next = current + 1;(2)
if (compareAndSet(current, next))(3)
return next;
}
}
public final boolean compareAndSet(long expect, long update) {
return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}
假如當(dāng)前值為1轰驳,那么線程A和檢查B同時(shí)執(zhí)行到了(3)時(shí)候各自的next都是2厚掷,current=1弟灼,假如線程A先執(zhí)行了3,那么這個(gè)是原子性操作冒黑,會(huì)把檔期值更新為2并且返回1田绑,if判斷true所以incrementAndGet返回2.這時(shí)候線程B執(zhí)行3,因?yàn)閏urrent=1而當(dāng)前變量實(shí)際值為2,所以if判斷為false抡爹,繼續(xù)循環(huán)掩驱,如果沒(méi)有其他線程去自增變量的話,這次線程B就會(huì)更新變量為3然后退出冬竟。
這里使用了無(wú)限循環(huán)使用CAS進(jìn)行輪詢檢查欧穴,雖然一定程度浪費(fèi)了cpu資源,但是相比鎖來(lái)說(shuō)避免的線程上下文切換和調(diào)度泵殴。
六涮帘、什么是可重入鎖
當(dāng)一個(gè)線程要獲取一個(gè)被其他線程占用的鎖時(shí)候,該線程會(huì)被阻塞笑诅,那么當(dāng)一個(gè)線程再次獲取它自己已經(jīng)獲取的鎖時(shí)候是否會(huì)被阻塞那?如果不需要阻塞那么我們說(shuō)該鎖是可重入鎖调缨,也就是鎖只要該線程獲取了該鎖,那么可以無(wú)限制次數(shù)進(jìn)入被該鎖鎖住的代碼吆你。
先看一個(gè)例子如果鎖不是可重入的弦叶,看看會(huì)出現(xiàn)什么問(wèn)題。
public class Hello{
public Synchronized void helloA(){
System.out.println("hello");
}
public Synchronized void helloB(){
System.out.println("hello B");
helloA();
}
}
如上面代碼當(dāng)調(diào)用helloB函數(shù)前會(huì)先獲取內(nèi)置鎖妇多,然后打印輸出伤哺,然后調(diào)用helloA方法,調(diào)用前會(huì)先去獲取內(nèi)置鎖者祖,如果內(nèi)置鎖不是可重入的那么該調(diào)用就會(huì)導(dǎo)致死鎖了立莉,因?yàn)榫€程持有并等待了鎖導(dǎo)致helloB永遠(yuǎn)不會(huì)獲取內(nèi)置鎖。
實(shí)際上內(nèi)部鎖是可重入鎖咸包,例如synchronized關(guān)鍵字管理的方法桃序,可重入鎖的原理是在所內(nèi)部維護(hù)了一個(gè)線程標(biāo)示標(biāo)示該鎖目前被那個(gè)線程占用,然后關(guān)聯(lián)一個(gè)計(jì)數(shù)器烂瘫,一開(kāi)始計(jì)數(shù)器值為0媒熊,說(shuō)明該鎖沒(méi)有被任何線程占用,當(dāng)一個(gè)線程獲取了該鎖坟比,計(jì)數(shù)器會(huì)變成1芦鳍,其他線程在獲取該鎖時(shí)候發(fā)現(xiàn)鎖的所有者不是自己所以被阻塞,但是當(dāng)獲取該鎖的線程再次獲取鎖時(shí)候發(fā)現(xiàn)鎖擁有者是自己會(huì)把計(jì)數(shù)器值+1葛账, 當(dāng)釋放鎖后計(jì)數(shù)器會(huì)-1柠衅,當(dāng)計(jì)數(shù)器為0時(shí)候,鎖里面的線程標(biāo)示重置為null,這時(shí)候阻塞的線程會(huì)獲取被喚醒來(lái)獲取該鎖籍琳。
七菲宴、Synchronized關(guān)鍵字
7.1 Synchronized介紹
synchronized塊是Java提供的一種強(qiáng)制性內(nèi)置鎖贷祈,每個(gè)Java對(duì)象都可以隱式的充當(dāng)一個(gè)用于同步的鎖的功能,這些內(nèi)置的鎖被稱為內(nèi)部鎖或者叫監(jiān)視器鎖喝峦,執(zhí)行代碼在進(jìn)入synchronized代碼塊前會(huì)自動(dòng)獲取內(nèi)部鎖势誊,這時(shí)候其他線程訪問(wèn)該同步代碼塊時(shí)候會(huì)阻塞掉。拿到內(nèi)部鎖的線程會(huì)在正常退出同步代碼塊或者異常拋出后釋放內(nèi)部鎖谣蠢,這時(shí)候阻塞掉的線程才能獲取內(nèi)部鎖進(jìn)入同步代碼塊粟耻。
7.2 Synchronized同步實(shí)例
內(nèi)部鎖是一種互斥鎖,具體說(shuō)是同時(shí)已有一個(gè)線程可以拿到該鎖眉踱,當(dāng)一個(gè)線程拿到該鎖并且沒(méi)有釋放的情況下挤忙,其他線程只能等待。
對(duì)于上面說(shuō)的ArrayList可以使用synchronized進(jìn)行同步來(lái)處理可見(jiàn)性問(wèn)題谈喳。
使用synchronized對(duì)方法進(jìn)行同步
public class ArrayList<E>
{
public synchronized E get(int index) {
rangeCheck(index);
return elementData(index);
}
public synchronized E set(int index, E element) {
rangeCheck(index);
E oldValue = elementData(index);
elementData[index] = element;
return oldValue;
}
}
如圖當(dāng)線程A獲取內(nèi)部鎖進(jìn)入同步代碼塊后册烈,線程B也準(zhǔn)備要進(jìn)入同步塊,但是由于A還沒(méi)釋放鎖婿禽,所以B現(xiàn)在進(jìn)入等待茄厘,使用同步可以保證線程A獲取鎖到釋放鎖期間的變量值對(duì)B獲取鎖后都可見(jiàn)。也就是說(shuō)當(dāng)B開(kāi)始執(zhí)行A執(zhí)行的代碼同步塊時(shí)候可以看到A操作的所有變量值谈宛,這里具體說(shuō)是當(dāng)線程B獲取b的值時(shí)候能夠保證獲取的值是2。這時(shí)因?yàn)榫€程A進(jìn)入同步塊修改變量值后胎署,會(huì)在退出同步塊前把值刷新到主內(nèi)存吆录,而線程B在進(jìn)入同步塊前會(huì)首先清空本地內(nèi)存內(nèi)容,從主內(nèi)存重新獲取變量值琼牧,所以實(shí)現(xiàn)了可見(jiàn)性恢筝。但是要注意一點(diǎn)所有線程使用的是同一個(gè)鎖。
注意 Synchronized關(guān)鍵字會(huì)引起現(xiàn)場(chǎng)上下文切換和線程調(diào)度巨坊。
八撬槽、 ReentrantReadWriteLock介紹
使用synchronized可以實(shí)現(xiàn)同步,但是缺點(diǎn)是同時(shí)只有一個(gè)線程可以訪問(wèn)共享變量趾撵,但是正常情況下侄柔,對(duì)于多個(gè)讀操作操作共享變量時(shí)候是不需要同步的,synchronized時(shí)候無(wú)法實(shí)現(xiàn)多個(gè)讀線程同時(shí)執(zhí)行占调,而大部分情況下讀操作次數(shù)多于寫(xiě)操作暂题,所以這大大降低了并發(fā)性,所以出現(xiàn)了ReentrantReadWriteLock究珊,它可以實(shí)現(xiàn)讀寫(xiě)分離薪者,運(yùn)行多個(gè)線程同時(shí)進(jìn)行讀取,但是最多運(yùn)行一個(gè)寫(xiě)現(xiàn)線程存在剿涮。
對(duì)于上面的方法現(xiàn)在可以修改為:
public class ArrayList<E>
{
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public E get(int index) {
Lock readLock = readWriteLock.readLock();
readLock.lock();
try {
return list.get(index);
} finally {
readLock.unlock();
}
}
public E set(int index, E element) {
Lock wirteLock = readWriteLock.writeLock();
wirteLock.lock();
try {
return list.set(index, element);
} finally {
wirteLock.unlock();
}
}
}
如代碼在get方法時(shí)候通過(guò) readWriteLock.readLock()獲取了讀鎖言津,多個(gè)線程可以同時(shí)獲取這讀鎖攻人,set方法通過(guò)readWriteLock.writeLock()獲取了寫(xiě)鎖,同時(shí)只有一個(gè)線程可以獲取寫(xiě)鎖悬槽,其他線程在獲取寫(xiě)鎖時(shí)候會(huì)阻塞直到寫(xiě)鎖被釋放怀吻。假如一個(gè)線程已經(jīng)獲取了讀鎖,這時(shí)候如果一個(gè)線程要獲取寫(xiě)鎖時(shí)候要等待直到釋放了讀鎖陷谱,如果一個(gè)線程獲取了寫(xiě)鎖烙博,那么所有獲取讀鎖的線程需要等待直到寫(xiě)鎖被釋放。所以相比synchronized來(lái)說(shuō)運(yùn)行多個(gè)讀者同時(shí)存在烟逊,所以提高了并發(fā)量渣窜。
注意 需要使用者顯式調(diào)用Lock與unlock操作
九、 Volatile變量
對(duì)于避免不可見(jiàn)性問(wèn)題宪躯,Java還提供了一種弱形式的同步乔宿,即使用了volatile關(guān)鍵字。該關(guān)鍵字確保了對(duì)一個(gè)變量的更新對(duì)其他線程可見(jiàn)访雪。當(dāng)一個(gè)變量被聲明為volatile時(shí)候详瑞,線程寫(xiě)入時(shí)候不會(huì)把值緩存在寄存器或者或者在其他地方,當(dāng)線程讀取的時(shí)候會(huì)從主內(nèi)存重新獲取最新值臣缀,而不是使用當(dāng)前線程的拷貝內(nèi)存變量值坝橡。即所謂的內(nèi)存屏障作用。
volatile雖然提供了可見(jiàn)性保證精置,但是不能使用他來(lái)構(gòu)建復(fù)合的原子性操作计寇,也就是說(shuō)當(dāng)一個(gè)變量依賴其他變量或者更新變量值時(shí)候新值依賴當(dāng)前老值時(shí)候不在適用。與synchronized相似之處在于如圖
如圖線程A修改了volatile變量b的值脂倦,然后線程B讀取了改變量值番宁,那么所有A線程在寫(xiě)入變量b值前可見(jiàn)的變量值,在B讀取volatile變量b后對(duì)線程B都是可見(jiàn)的赖阻,途中線程B對(duì)A操作的變量a,b的值都可見(jiàn)的蝶押。volatile的內(nèi)存語(yǔ)義和synchronized有類(lèi)似之處,具體說(shuō)是說(shuō)當(dāng)線程寫(xiě)入了volatile變量值就等價(jià)于線程退出synchronized同步塊(會(huì)把寫(xiě)入到本地內(nèi)存的變量值同步到主內(nèi)存)火欧,讀取volatile變量值就相當(dāng)于進(jìn)入同步塊(會(huì)先清空本地內(nèi)存變量值棋电,從主內(nèi)存獲取最新值)。
下面的Integer也是線程不安全的布隔,因?yàn)闆](méi)有進(jìn)行同步措施
public class ThreadNotSafeInteger {
private int value;
public int get() {
return value;
}
public void set(int value) {
this.value = value;
}
}
使用synchronized關(guān)鍵字進(jìn)行同步如下:
public class ThreadSafeInteger {
private int value;
public synchronized int get() {
return value;
}
public synchronized void set(int value) {
this.value = value;
}
}
等價(jià)于使用volatile進(jìn)行同步如下:
public class ThreadSafeInteger {
private volatile int value;
public int get() {
return value;
}
public void set(int value) {
this.value = value;
}
}
這里使用synchronized和使用volatile是等價(jià)的离陶,但是并不是所有情況下都是等價(jià),一般只有滿足下面所有條件才能使用volatile
寫(xiě)入變量值時(shí)候不依賴變量的當(dāng)前值衅檀,或者能夠保證只有一個(gè)線程修改變量值招刨。
寫(xiě)入的變量值不依賴其他變量的參與。
讀取變量值時(shí)候不能因?yàn)槠渌蜻M(jìn)行加鎖哀军。
另外 加鎖可以同時(shí)保證可見(jiàn)性和原子性沉眶,而volatile只保證變量值的可見(jiàn)性打却。
為什么volitale不能保證原子性?
因?yàn)関olitale只是加了內(nèi)存屏障谎倔,保證了可見(jiàn)性柳击。而不能保證多個(gè)操作執(zhí)行過(guò)程中不存在其他線程獲取cpu時(shí)間片。
注意 volatile關(guān)鍵字不會(huì)引起線程上下文切換和線程調(diào)度片习。
十捌肴、 樂(lè)觀鎖與悲觀鎖
10.1 悲觀鎖
悲觀鎖,指數(shù)據(jù)被外界修改持保守態(tài)度(悲觀),在整個(gè)數(shù)據(jù)處理過(guò)程中藕咏,將數(shù)據(jù)處于鎖定狀態(tài)状知。 悲觀鎖的實(shí)現(xiàn),往往依靠數(shù)據(jù)庫(kù)提供的鎖機(jī)制 孽查。數(shù)據(jù)庫(kù)中實(shí)現(xiàn)是對(duì)數(shù)據(jù)記錄進(jìn)行操作前饥悴,先給記錄加排它鎖,如果獲取鎖失敗盲再,則說(shuō)明數(shù)據(jù)正在被其他線程修改西设,則等待或者拋出異常。如果加鎖成功答朋,則獲取記錄贷揽,對(duì)其修改,然后事務(wù)提交后釋放排它鎖梦碗。
一個(gè)例子:select * from 表 where .. for update;
悲觀鎖是先加鎖再訪問(wèn)策略擒滑,處理加鎖會(huì)讓數(shù)據(jù)庫(kù)產(chǎn)生額外的開(kāi)銷(xiāo),還有增加產(chǎn)生死鎖的機(jī)會(huì)叉弦,另外在多個(gè)線程只讀情況下不會(huì)產(chǎn)生數(shù)據(jù)不一致行問(wèn)題,沒(méi)必要使用鎖藻糖,只會(huì)增加系統(tǒng)負(fù)載淹冰,降低并發(fā)性,因?yàn)楫?dāng)一個(gè)事務(wù)鎖定了該條記錄巨柒,其他讀該記錄的事務(wù)只能等待樱拴。
10.2 樂(lè)觀鎖
樂(lè)觀鎖是相對(duì)悲觀鎖來(lái)說(shuō)的,它認(rèn)為數(shù)據(jù)一般情況下不會(huì)造成沖突洋满,所以在訪問(wèn)記錄前不會(huì)加排他鎖晶乔,而是在數(shù)據(jù)進(jìn)行提交更新的時(shí)候,才會(huì)正式對(duì)數(shù)據(jù)的沖突與否進(jìn)行檢測(cè)牺勾,具體說(shuō)根據(jù)update返回的行數(shù)讓用戶決定如何去做正罢。樂(lè)觀鎖并不會(huì)使用數(shù)據(jù)庫(kù)提供的鎖機(jī)制,一般在表添加version字段或者使用業(yè)務(wù)狀態(tài)來(lái)做驻民。
樂(lè)觀鎖直到提交的時(shí)候才去鎖定翻具,所以不會(huì)產(chǎn)生任何鎖和死鎖履怯。
十一、獨(dú)占鎖與共享鎖
根據(jù)鎖能夠被單個(gè)線程還是多個(gè)線程共同持有裆泳,鎖又分為獨(dú)占鎖和共享鎖叹洲。獨(dú)占鎖保證任何時(shí)候都只有一個(gè)線程能讀寫(xiě)權(quán)限,ReentrantLock就是以獨(dú)占方式實(shí)現(xiàn)的互斥鎖工禾。共享鎖則可以同時(shí)有多個(gè)讀線程运提,但最多只能有一個(gè)寫(xiě)線程觅丰,讀和寫(xiě)是互斥的杆烁,例如ReadWriteLock讀寫(xiě)鎖,它允許一個(gè)資源可以被多線程同時(shí)進(jìn)行讀操作候味,或者被一個(gè)線程 寫(xiě)操作笙隙,但兩者不能同時(shí)進(jìn)行洪灯。
獨(dú)占鎖是一種悲觀鎖,每次訪問(wèn)資源都先加上互斥鎖竟痰,這限制了并發(fā)性签钩,因?yàn)樽x操作并不會(huì)影響數(shù)據(jù)一致性,而獨(dú)占鎖只允許同時(shí)一個(gè)線程讀取數(shù)據(jù)坏快,其他線程必須等待當(dāng)前線程釋放鎖才能進(jìn)行讀取铅檩。
共享鎖則是一種樂(lè)觀鎖,它放寬了加鎖的條件莽鸿,允許多個(gè)線程同時(shí)進(jìn)行讀操作昧旨。
十二、公平鎖與非公平鎖
根據(jù)線程獲取鎖的搶占機(jī)制鎖可以分為公平鎖和非公平鎖祥得,公平鎖表示線程獲取鎖的順序是按照線程加鎖的時(shí)間先后來(lái)分決定的兔沃,也就是最早加鎖鎖的線程將最早獲取鎖,也就是先來(lái)先得的FIFO順序级及。而非公平鎖則運(yùn)行闖入乒疏,也就是先來(lái)不一定先得。
ReentrantLock提供了公平和非公平鎖的實(shí)現(xiàn):
公平鎖ReentrantLock pairLock = new ReentrantLock(true);
非公平鎖 ReentrantLock pairLock = new ReentrantLock(false);
如果構(gòu)造函數(shù)不傳遞參數(shù)饮焦,則默認(rèn)是非公平鎖怕吴。
在沒(méi)有公平性需求的前提下盡量使用非公平鎖,因?yàn)楣芥i會(huì)帶來(lái)性能開(kāi)銷(xiāo)县踢。
假設(shè)線程A已經(jīng)持有了鎖转绷,這時(shí)候線程B請(qǐng)求該鎖將會(huì)被掛起,當(dāng)線程A釋放鎖后硼啤,假如當(dāng)前有線程C也需要獲取該鎖议经,如果采用非公平鎖方式,則根據(jù)線程調(diào)度策略線程B和C兩者之一可能獲取鎖,這時(shí)候不需要任何其他干涉爸业,如果使用公平鎖則需要把C掛起其骄,讓B獲取當(dāng)前鎖。
十三扯旷、 AbstractQueuedSynchronizer介紹
AbstractQueuedSynchronizer提供了一個(gè)隊(duì)列拯爽,大多數(shù)開(kāi)發(fā)者可能從來(lái)不會(huì)直接用到AQS,AQS中這個(gè)單一的狀態(tài)信息 state,可以通過(guò)protected的getState,setState,compareAndSetState函數(shù)進(jìn)行調(diào)用钧忽。對(duì)于ReentrantLock來(lái)說(shuō)毯炮,state可以用來(lái)表示該線程獲可重入鎖的次數(shù),semaphore來(lái)說(shuō)state用來(lái)表示當(dāng)前可用信號(hào)的個(gè)數(shù)耸黑,F(xiàn)utuerTask用來(lái)表示任務(wù)狀態(tài)(例如還沒(méi)開(kāi)始桃煎,運(yùn)行,完成大刊,取消)为迈。
十四、CountDownLatch原理
14.1 一個(gè)例子
public class Test {
private static final int ThreadNum = 10;
public static void main(String[] args) {
//創(chuàng)建一個(gè)CountDownLatch實(shí)例缺菌,管理計(jì)數(shù)為T(mén)hreadNum
CountDownLatch countDownLatch = new CountDownLatch(ThreadNum);
//創(chuàng)建一個(gè)固定大小的線程池
ExecutorService executor = Executors.newFixedThreadPool(ThreadNum);
//添加線程到線程池
for(int i =0;i<ThreadNum;++i){
executor.execute(new Person(countDownLatch, i+1));
}
System.out.println("開(kāi)始等待全員簽到...");
try {
//等待所有線程執(zhí)行完畢
countDownLatch.await();
System.out.println("簽到完畢葫辐,開(kāi)始吃飯");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
executor.shutdown();
}
}
static class Person implements Runnable{
private CountDownLatch countDownLatch;
private int index;
public Person(CountDownLatch cdl,int index){
this.countDownLatch = cdl;
this.index = index;
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("person " + index +"簽到");
//線程執(zhí)行完畢,計(jì)數(shù)器減一
countDownLatch.countDown();
}
}
}
如上代碼伴郁,創(chuàng)建一個(gè)線程池和CountDownLatch實(shí)例耿战,每個(gè)線程通過(guò)構(gòu)造函數(shù)傳入CountDownLatch的實(shí)例,主線程通過(guò)await等待線程池里面線程任務(wù)全部執(zhí)行完畢焊傅,子線程則執(zhí)行完畢后調(diào)用countDown計(jì)數(shù)器減一剂陡,等所有子線程執(zhí)行完畢后,主線程的await才會(huì)返回狐胎。
14.2 原理
先看下類(lèi)圖:
可知CountDownLatch內(nèi)部還是使用AQS實(shí)現(xiàn)的鸭栖。
首先通過(guò)構(gòu)造函數(shù)初始化AQS的狀態(tài)值
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
然后看下await方法:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//如果線程被中斷則拋異常
if (Thread.interrupted())
throw new InterruptedException();
//嘗試看當(dāng)前是否計(jì)數(shù)值為0,為0則直接返回握巢,否者進(jìn)入隊(duì)列等待
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
如果tryAcquireShared返回-1則 進(jìn)入doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//加入隊(duì)列狀態(tài)為共享節(jié)點(diǎn)
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//如果多個(gè)線程調(diào)用了await被放入隊(duì)列則一個(gè)個(gè)返回纤泵。
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//shouldParkAfterFailedAcquire會(huì)把當(dāng)前節(jié)點(diǎn)狀態(tài)變?yōu)镾IGNAL類(lèi)型,然后調(diào)用park方法把當(dāng)先線程掛起镜粤,
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
調(diào)用await后,當(dāng)前線程會(huì)被阻塞玻褪,知道所有子線程調(diào)用了countdown方法肉渴,并在在計(jì)數(shù)為0時(shí)候調(diào)用該線程unpark方法激活線程,然后該線程重新tryAcquireShared會(huì)返回1带射。
然后看下 countDown方法:
委托給sync
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
首先看下tryReleaseShared
protected boolean tryReleaseShared(int releases) {
//循環(huán)進(jìn)行cas同规,直到當(dāng)前線程成功完成cas使計(jì)數(shù)值(狀態(tài)值state)減一更新到state
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
該函數(shù)一直返回false直到當(dāng)前計(jì)數(shù)器為0時(shí)候才返回true。
返回true后會(huì)調(diào)用doReleaseShared,該函數(shù)主要作用是調(diào)用uppark方法激活調(diào)用await的線程券勺,代碼如下:
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//節(jié)點(diǎn)類(lèi)型為SIGNAL绪钥,把類(lèi)型在通過(guò)cas設(shè)置回去,然后調(diào)用unpark激活調(diào)用await的線程
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
激活主線程后关炼,主線程會(huì)在調(diào)用tryAcquireShared獲取鎖程腹。
十五、ReentrantLock獨(dú)占鎖原理
15.1 ReentrantLock結(jié)構(gòu)
先上類(lèi)圖:
可知ReentrantLock最終還是使用AQS來(lái)實(shí)現(xiàn)儒拂,并且根據(jù)參數(shù)決定內(nèi)部是公平還是非公平鎖寸潦,默認(rèn)是非公平鎖
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
加鎖代碼:
public void lock() {
sync.lock();
}
15.2 公平鎖原理
先看Lock方法:
lock方法最終調(diào)用FairSync重寫(xiě)的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
//獲取當(dāng)前線程和狀態(tài)值
final Thread current = Thread.currentThread();
int c = getState();
//狀態(tài)為0說(shuō)明該鎖未被任何線程持有
if (c == 0) {
//為了實(shí)現(xiàn)公平,首先看隊(duì)列里面是否有節(jié)點(diǎn)社痛,有的話再看節(jié)點(diǎn)所屬線程是不是當(dāng)前線程见转,是的話hasQueuedPredecessors返回false,然后使用原子操作compareAndSetState保證一個(gè)線程更新?tīng)顟B(tài)為1,設(shè)置排他鎖歸屬與當(dāng)前線程蒜哀。其他線程通過(guò)cass則返回false.
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//狀態(tài)不為0說(shuō)明該鎖已經(jīng)被線程持有斩箫,則看是否是當(dāng)前線程持有,是則重入鎖次數(shù)+1.
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
公平性保證代碼:
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
再看看unLock方法撵儿,最終調(diào)用了Sync的tryRelease方法:
protected final boolean tryRelease(int releases) {
//如果不是鎖持有者調(diào)用UNlock則拋出異常乘客。
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果當(dāng)前可重入次數(shù)為0,則清空鎖持有線程
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//設(shè)置可重入次數(shù)為原始值-1
setState(c);
return free;
}
15.3 非公平鎖原理
final void lock() {
//如果當(dāng)前鎖空閑0统倒,則設(shè)置狀態(tài)為1寨典,并且設(shè)置當(dāng)前線程為鎖持有者
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);//調(diào)用重寫(xiě)的tryAcquire方法-》nonfairTryAcquire方法
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//狀態(tài)為0說(shuō)明沒(méi)有線程持有該鎖
if (compareAndSetState(0, acquires)) {//cass原子性操作,保證只有一個(gè)線程可以設(shè)置狀態(tài)
setExclusiveOwnerThread(current);//設(shè)置鎖所有者
return true;
}
}//如果當(dāng)前線程是鎖持有者則可重入鎖計(jì)數(shù)+1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
15.3 總結(jié)
可知公平與非公平都是先執(zhí)行tryAcquire嘗試獲取鎖房匆,如果成功則直接獲取鎖耸成,如果不成功則把當(dāng)前線程放入隊(duì)列。對(duì)于放入隊(duì)列里面的第一個(gè)線程A在unpark后會(huì)進(jìn)行自旋調(diào)用tryAcquire嘗試獲取鎖浴鸿,假如這時(shí)候有一個(gè)線程B執(zhí)行了lock操作井氢,那么也會(huì)調(diào)用tryAcquire方法嘗試獲取鎖,但是線程B并不在隊(duì)列里面岳链,但是線程B有可能比線程A優(yōu)先獲取到鎖花竞,也就是說(shuō)雖然線程A先請(qǐng)求的鎖,但是卻有可能沒(méi)有B先獲取鎖掸哑,這是非公平鎖實(shí)現(xiàn)约急。而公平鎖要保證線程A要比線程B先獲取鎖。所以公平鎖相比非公平鎖在tryAcquire里面添加了hasQueuedPredecessors方法用來(lái)保證公平性苗分。
十六厌蔽、ReentrantReadWriteLock原理
如圖讀寫(xiě)鎖內(nèi)部維護(hù)了一個(gè)ReadLock和WriteLock,并且也提供了公平和非公平的實(shí)現(xiàn)摔癣,下面只介紹下非公平的讀寫(xiě)鎖實(shí)現(xiàn)奴饮。我們知道AQS里面只維護(hù)了一個(gè)state狀態(tài)纬向,而ReentrantReadWriteLock則需要維護(hù)讀狀態(tài)和寫(xiě)狀態(tài),一個(gè)state是無(wú)法表示寫(xiě)和讀狀態(tài)的戴卜。所以ReentrantReadWriteLock使用state的高16位表示讀狀態(tài)也就是讀線程的個(gè)數(shù)逾条,低16位表示寫(xiě)鎖可重入量。
static final int SHARED_SHIFT = 16;
共享鎖(讀鎖)狀態(tài)單位值65536
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
共享鎖線程最大個(gè)數(shù)65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
排它鎖(寫(xiě)鎖)掩碼 二進(jìn)制 15個(gè)1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
返回讀鎖線程數(shù)
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
返回寫(xiě)鎖可重入個(gè)數(shù)
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
16.1 WriteLock
lock 獲取鎖 對(duì)應(yīng)寫(xiě)鎖只需要分析下Sync的tryAcquire和tryRelease
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
//c!=0說(shuō)明讀鎖或者寫(xiě)鎖已經(jīng)被某線程獲取
if (c != 0) {
//w=0說(shuō)明已經(jīng)有線程獲取了讀鎖或者w!=0并且當(dāng)前線程不是寫(xiě)鎖擁有者投剥,則返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//說(shuō)明某線程獲取了寫(xiě)鎖师脂,判斷可重入個(gè)數(shù)
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 設(shè)置可重入數(shù)量(1)
setState(c + acquires);
return true;
}
//第一個(gè)寫(xiě)線程獲取寫(xiě)鎖
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
unlock 釋放鎖
protected final boolean tryRelease(int releases) {
// 看是否是寫(xiě)鎖擁有者調(diào)用的unlock
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//獲取可重入值,這里沒(méi)有考慮高16位薇缅,因?yàn)閷?xiě)鎖時(shí)候讀鎖狀態(tài)值肯定為0
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
//如果寫(xiě)鎖可重入值為0則釋放鎖危彩,否者只是簡(jiǎn)單更新?tīng)顟B(tài)值。
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
16.2 ReadLock
對(duì)應(yīng)讀鎖只需要分析下Sync的tryAcquireShared和tryReleaseShared
lock 獲取鎖
protected final int tryAcquireShared(int unused) {
//獲取當(dāng)前狀態(tài)值
Thread current = Thread.currentThread();
int c = getState();
//如果寫(xiě)鎖計(jì)數(shù)不為0說(shuō)明已經(jīng)有線程獲取了寫(xiě)鎖泳桦,然后看是不是當(dāng)前線程獲取的寫(xiě)鎖汤徽。
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//獲取讀鎖計(jì)數(shù)
int r = sharedCount(c);
//嘗試獲取鎖,多個(gè)讀線程只有一個(gè)會(huì)成功灸撰,不成功的進(jìn)入下面fullTryAcquireShared進(jìn)行重試
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
unlock 釋放鎖
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
//循環(huán)直到自己的讀計(jì)數(shù)-1 cas更新成功
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
十七谒府、參考
Java并發(fā)編程實(shí)踐
http://www.hollischuang.com/archives/934
http://ifeve.com/juc-reentrantreadwritelock/