引言
在上篇文章深入剖析并發(fā)之AQS獨(dú)占鎖&重入鎖(ReetrantLock)及Condition實(shí)現(xiàn)原理中我們曾基于ReetrantLock鎖分析了AQS獨(dú)占模式的實(shí)現(xiàn)原理,本章則準(zhǔn)備從Semaphore信號量的角度出發(fā)一探AQS共享模式的具體實(shí)現(xiàn)忍坷。共享模式與獨(dú)占模式區(qū)別在于:共享模式下允許多條線程同時(shí)獲取鎖資源佩研,而在之前分析的獨(dú)占模式中,在同一時(shí)刻只允許一條線程持有鎖資源晰骑。
一硕舆、快速認(rèn)識Semaphore信號量及實(shí)戰(zhàn)
Semaphore信號量是java.util.concurrent(JUC)包下的一個(gè)并發(fā)工具類抚官,可以用來控制同一時(shí)刻訪問臨界資源(共享資源)的線程數(shù)耗式,以確保訪問臨界資源的線程能夠正確、合理的使用公共資源彪见。而其內(nèi)部則于ReetrantLock一樣余指,都是通過直接或間接的調(diào)用AQS框架的方法實(shí)現(xiàn)酵镜。在Semaphore中存在一個(gè)“許可”的概念:
在初始化Semaphore信號量需要為這個(gè)許可傳入一個(gè)數(shù)值,該數(shù)值表示表示同一時(shí)刻可以訪問臨界資源的最大線程數(shù)垢粮,也被稱為許可集蜡吧。一條線程想要訪問臨界資源則需要先執(zhí)行
acquire()
獲取一個(gè)許可昔善,如果線程在獲取時(shí)許可集已經(jīng)被分配完了君仆,那么該線程則會進(jìn)入阻塞等待狀態(tài),直至有其他持有許可的線程釋放后才有可能獲取到許可钥庇。當(dāng)線程訪問完成臨界資源后則需要執(zhí)行release()
方法釋放已獲取的許可上沐。
其實(shí)通過如上這段描述参咙,我們不難發(fā)現(xiàn)蕴侧,Semaphore信號量里面的“許可”概念與前面我們文章中净宵,分析的互斥鎖中的“同步狀態(tài)標(biāo)識”有著異曲同工之妙择葡,其實(shí)也就是我們所談的“鎖資源”剃氧。下面我們可以簡單看看Semaphore類中所提供的方法:
// 調(diào)用該方法后線程會從許可集中嘗試獲取一個(gè)許可
public void acquire()
// 線程調(diào)用該方法時(shí)會釋放已獲取的許可
public void release()
// Semaphore構(gòu)造方法:permits→許可集數(shù)量
Semaphore(int permits)
// Semaphore構(gòu)造方法:permits→許可集數(shù)量朋鞍,fair→公平與非公平
Semaphore(int permits, boolean fair)
// 從信號量中獲取許可滥酥,該方法不響應(yīng)中斷
void acquireUninterruptibly()
// 返回當(dāng)前信號量中未被獲取的許可數(shù)
int availablePermits()
// 獲取并返回當(dāng)前信號量中立即未被獲取的所有許可
int drainPermits()
// 返回等待獲取許可的所有線程Collection集合
protected Collection<Thread> getQueuedThreads();
// 返回等待獲取許可的線程估計(jì)數(shù)量
int getQueueLength()
// 查詢是否有線程正在等待獲取當(dāng)前信號量中的許可
boolean hasQueuedThreads()
// 返回當(dāng)前信號量的公平類型坎吻,如為公平鎖返回true禾怠,非公平鎖為false
boolean isFair()
// 獲取當(dāng)前信號量中一個(gè)許可,當(dāng)沒有許可可用時(shí)直接返回false不阻塞線程
boolean tryAcquire()
// 在給定時(shí)間內(nèi)獲取當(dāng)前信號量中一個(gè)許可芽偏,超時(shí)還未獲取成功則返回false
boolean tryAcquire(long timeout, TimeUnit unit)
如上便是Semaphore信號量提供的一些主要方法污尉,下面我們可以上個(gè)簡單小案例演示被碗,需求如下:
現(xiàn)在項(xiàng)目中有個(gè)需求,每晚需要長時(shí)間處理大量的Excel表數(shù)據(jù)與數(shù)據(jù)庫中數(shù)據(jù)對賬請求兴喂,由于文件讀取屬于IO密集型任務(wù)衣迷,我們可以使用多線程的方式優(yōu)化壶谒,加速處理速度汗菜。但是在該項(xiàng)目中因?yàn)檫€有其他業(yè)務(wù)要處理陨界,為了保證整體性能痛阻,所以對于該業(yè)務(wù)的實(shí)現(xiàn)最多只能使用三個(gè)數(shù)據(jù)庫連接對象录平。因?yàn)槿绻?dāng)前業(yè)務(wù)線程同一時(shí)刻獲取的數(shù)據(jù)庫連接數(shù)量過多斗这,會導(dǎo)致其他業(yè)務(wù)線程需要操作數(shù)據(jù)庫時(shí)獲取不到連接對象阻塞(因?yàn)閿?shù)據(jù)庫連接對象與線程對象一樣數(shù)據(jù)珍惜資源/資源有限)表箭,從而引發(fā)整體程序堆積大量客戶端請求導(dǎo)致系統(tǒng)整體癱瘓免钻。這時(shí)我們就需要控制同一時(shí)刻最多只有三條線程拿到數(shù)據(jù)庫連接進(jìn)行操作极舔,此時(shí)就可以使用Semaphore做流量控制拆魏。
public class SemaphoreDemo {
public static void main(String[] args) {
// 自定義線程池(后續(xù)文章會詳細(xì)分析到)
// 環(huán)境:四核四線程CPU 任務(wù)阻塞系數(shù)0.9
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
4*2, 40,
30, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1024*10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
// 設(shè)置信號量同一時(shí)刻最大線程數(shù)為3
final Semaphore semaphore = new Semaphore(3);
// 模擬100個(gè)對賬請求
for (int index = 0; index < 100; index++) {
final int serial = index;
threadPool.execute(()->{
try {
// 使用acquire()獲取許可
semaphore.acquire();
System.out.println(Thread.currentThread().getName() +
"線程成功獲取許可!請求序號: " + serial);
// 模擬數(shù)據(jù)庫IO
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 臨界資源訪問結(jié)束后釋放許可
semaphore.release();
}
});
}
// 關(guān)閉線程池資源
threadPool.shutdown();
}
}
上述代碼中拥峦,在創(chuàng)建Semaphore信號量對象時(shí)為該對象初始化了三個(gè)許可略号,也就意味著在同一時(shí)刻允許三條線程同時(shí)訪問臨界資源玄柠。線程在訪問臨界資源之前随闪,需要使用acquire()
先成功獲取一個(gè)許可铐伴,才能訪問臨界資源当宴。如果一條線程獲取許可泽疆,該信號量對象的許可集已經(jīng)被分配完時(shí)殉疼,新來的線程需進(jìn)入等待狀態(tài)瓢娜。之前獲取許可成功的線程在操作完成之后需執(zhí)行release()
方法釋放已獲取的許可眠砾。我們執(zhí)行如上案例則可看到執(zhí)行結(jié)果幾乎每隔一千毫秒會出現(xiàn)三條線程同時(shí)訪問,如下:
/*
第一秒:程序運(yùn)行初
pool-1-thread-1線程成功獲取許可柒巫!請求序號: 0
pool-1-thread-2線程成功獲取許可堡掏!請求序號: 1
pool-1-thread-3線程成功獲取許可布疼!請求序號: 2
第二秒:程序運(yùn)行1000ms后
pool-1-thread-4線程成功獲取許可!請求序號: 3
pool-1-thread-5線程成功獲取許可砾层!請求序號: 4
pool-1-thread-6線程成功獲取許可肛炮!請求序號: 5
第三秒:程序運(yùn)行2000ms后
pool-1-thread-7線程成功獲取許可侨糟!請求序號: 6
pool-1-thread-8線程成功獲取許可秕重!請求序號: 7
pool-1-thread-2線程成功獲取許可溶耘!請求序號: 8
第四秒:程序運(yùn)行3000ms后
........
*/
如上便是一個(gè)簡單使用Demo服鹅,總體看來關(guān)于Semaphore信號量的用法還是比較簡單的企软。不過我們也在前面提到過這么一句話:
Semaphore信號量里的“許可”概念與前面我們文章中分析的互斥鎖的“同步狀態(tài)標(biāo)識”有著異曲同工之妙仗哨。
那我們能否使用Semaphore信號量實(shí)現(xiàn)一把獨(dú)占鎖呢厌漂?答案也是肯定的桩卵,可以。我們只需要在創(chuàng)建信號量對象時(shí)高职,只給許可集分配一個(gè)數(shù)量即可,如下:
final Semaphore semaphore = new Semaphore(1);
二寥粹、Semaphore信號量中AQS的共享模式實(shí)現(xiàn)
Semaphore信號量其實(shí)與我們上篇文章所分析的ReetrantLock類結(jié)構(gòu)大致相同,其內(nèi)部存在繼承自AbstractQueuedSynchronizer內(nèi)部Sync類以及它的兩個(gè)子類:FairSync公平鎖類和NofairSync非公平鎖類媚狰,從這我們也可以看出崭孤,Semaphore的內(nèi)部實(shí)現(xiàn)其實(shí)與ReetrantLock一樣都是基于AQS組件實(shí)現(xiàn)的辨宠。在上一篇文章中我們也曾提到货裹,AQS設(shè)計(jì)的初衷并不打算直接作為調(diào)用類對外暴露服務(wù)弧圆,而只是作為并發(fā)包基礎(chǔ)組件墓阀,為其他并發(fā)工具類提供基礎(chǔ)設(shè)施,如維護(hù)同步隊(duì)列经伙、控制/修改同步狀態(tài)等帕膜。具體的獲取鎖和釋放鎖的邏輯則交給子類自己去實(shí)現(xiàn)溢十,從而也能最大程度的保留框架的靈活性张弛。因此無論是Semaphore還是ReetrantLock都需要獨(dú)自實(shí)現(xiàn)tryAcquireShared(int arg)
獲取鎖方法以及tryReleaseShared(int arg)
釋放鎖方法吞鸭。AQS總體類圖關(guān)系如下:
如上圖刻剥,Semaphore與ReetrantLock的結(jié)構(gòu)大致相同造虏,而實(shí)現(xiàn)思路也大致相同,獲取鎖(許可)的方法tryAcquireShared(arg)分別由兩個(gè)子類FairSync和NofairSync實(shí)現(xiàn)陶珠,因?yàn)楣芥i和非公平鎖的加鎖方式畢竟存在些許不同背率,而釋放鎖tryReleaseShared(arg)的邏輯則交由Sync實(shí)現(xiàn)寝姿,因?yàn)獒尫挪僮鞫际窍嗤模虼朔旁诟割怱ync中實(shí)現(xiàn)自然是最好的方式埃篓。下面我們就從Semaphore源碼的角度分析AQS共享模式的具體實(shí)現(xiàn)原理架专,我們先從非公平鎖的獲取鎖實(shí)現(xiàn)開始部脚。
2.1委刘、AQS共享模式之Semaphore的NofairSync非公平鎖實(shí)現(xiàn)
我們在創(chuàng)建Semaphore對象時(shí)鹰椒,也和ReetrantLock一樣手動(dòng)選擇公平鎖和非公平鎖:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
通過Semaphore的構(gòu)造函數(shù)我們不難發(fā)現(xiàn)淆珊,如果在我們創(chuàng)建時(shí)不選擇聲明公平類型奸汇,Semaphore默認(rèn)創(chuàng)建的是非公平鎖類型擂找,NonfairSync構(gòu)造如下:
static final class NonfairSync extends Sync {
// 構(gòu)造函數(shù):將給定的許可數(shù)permits傳給父類同步狀態(tài)標(biāo)識state
NonfairSync(int permits) {
super(permits);
}
// 釋放鎖的方法實(shí)現(xiàn)則是直接調(diào)用父類Sync的釋放鎖方法
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
從如上源碼中婴洼,我們可以得知Semaphore中的非公平鎖NonfairSync類的構(gòu)造函數(shù)是基于調(diào)用父類Sync構(gòu)造函數(shù)完成的柬采,而在創(chuàng)建Semaphore對象時(shí)傳入的許可數(shù)permits最終則會傳遞給AQS同步器的同步狀態(tài)標(biāo)識state,如下:
// 父類 - Sync類構(gòu)造函數(shù)
Sync(int permits) {
setState(permits); // 調(diào)用AQS內(nèi)部的set方法
}
// AQS(AbstractQueuedSynchronizer)同步器
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer {
// 同步狀態(tài)標(biāo)識
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
// 對state變量進(jìn)行CAS操作
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
}
從上述分析中可知礁遣,Semaphore對象創(chuàng)建時(shí)傳入的許可數(shù)permits祟霍,實(shí)則其實(shí)最終是在對AQS內(nèi)部的state進(jìn)行初始化沸呐。初始化完成后崭添,state代表著當(dāng)前信號量對象的可用許可數(shù)呼渣。
2.1.1屁置、信號量中非公平鎖NonfairSync獲取許可/鎖實(shí)現(xiàn)
我們在使用Semaphore時(shí)獲取鎖是調(diào)用Semaphore.acquire()
方法仁连,怖糊,調(diào)用該方法的線程會開始獲取鎖/許可伍伤,嘗試對permits/state
進(jìn)行CAS加一,CAS成功則代表獲取成功麦乞。下面我們來分析一下Semaphore獲取許可的方法acquire()
的具體實(shí)現(xiàn)姐直,源碼如下:
// Semaphore類 → acquire()方法
public void acquire() throws InterruptedException {
// Sync類繼承AQS声畏,此處直接調(diào)用AQS內(nèi)部的acquireSharedInterruptibly()方法
sync.acquireSharedInterruptibly(1);
}
// AbstractQueuedSynchronizer類 → acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 判斷是否出現(xiàn)線程中斷信號(標(biāo)志)
if (Thread.interrupted())
throw new InterruptedException();
// 如果tryAcquireShared(arg)執(zhí)行結(jié)果不小于0愿棋,則線程獲取同步狀態(tài)成功
if (tryAcquireShared(arg) < 0)
// 未獲取成功加入同步隊(duì)列阻塞等待
doAcquireSharedInterruptibly(arg);
}
信號量獲取許可的方法acquire()最終是通過Sync對象調(diào)用AQS內(nèi)部的acquireSharedInterruptibly()
方法完成的糠雨,而acquireSharedInterruptibly()
在獲取同步狀態(tài)標(biāo)識的過程中是可以響應(yīng)線程中斷操作的甘邀,如果該操作沒有沒中斷垮庐,則首先調(diào)用tryAcquireShared(arg)
嘗試獲取一個(gè)許可數(shù)突硝,獲取成功則返回執(zhí)行業(yè)務(wù)解恰,方法結(jié)束护盈。如果獲取失敗,則調(diào)用doAcquireSharedInterruptibly(arg)
將當(dāng)前線程加入同步隊(duì)列阻塞等待紊服。不過值得我們注意的是:tryAcquireShared(arg)
方法是AQS提供的模板方法欺嗤,并沒有提供具體實(shí)現(xiàn)煎饼,而是把具體實(shí)現(xiàn)的邏輯交由子類完成吆玖,我們先看看信號量中非公平鎖NonfairSync類的實(shí)現(xiàn):
// Semaphore類 → NofairSync內(nèi)部類 → tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
// 調(diào)用了父類Sync中的實(shí)現(xiàn)方法
return nonfairTryAcquireShared(acquires);
}
// Syn類 → nonfairTryAcquireShared()方法
abstract static class Sync extends AbstractQueuedSynchronizer {
final int nonfairTryAcquireShared(int acquires) {
// 開啟自旋死循環(huán)
for (;;) {
int available = getState();
int remaining = available - acquires;
// 判斷信號量中可用許可數(shù)是否已<0或者CAS執(zhí)行是否成功
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
nonfairTryAcquireShared(acquires)
方法首先獲取到state值后马篮,減去一得到remaining
值沾乘,如果不小于0則代表著當(dāng)前信號量中還存在可用許可,當(dāng)前線程開始嘗試cas更新state值浑测,cas成功則代表獲取同步狀態(tài)成功翅阵,返回remaining
值。反之,如果remaining
值小于0則代表著信號量中的許可數(shù)已被其他線程獲取掷匠,目前不存在可用許可數(shù)槐雾,直接返回小于0的remaining
值,nonfairTryAcquireShared(acquires)
方法執(zhí)行結(jié)束擎值,回到AQS的acquireSharedInterruptibly()
方法鸠儿。當(dāng)返回的remaining
值小于0時(shí),if(tryAcquireShared(arg)<0)
條件成立田晚,進(jìn)入if執(zhí)行doAcquireSharedInterruptibly(arg)
方法將當(dāng)前線程加入同步隊(duì)列阻塞贤徒,等待其他線程釋放同步狀態(tài)。線程入列方法如下:
// AbstractQueuedSynchronizer類 → doAcquireSharedInterruptibly()方法
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 創(chuàng)建節(jié)點(diǎn)狀態(tài)為Node.SHARED共享模式的節(jié)點(diǎn)并將其加入同步隊(duì)列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 開啟自旋操作
for (;;) {
final Node p = node.predecessor();
// 判斷前驅(qū)節(jié)點(diǎn)是否為head
if (p == head) {
// 嘗試獲取同步狀態(tài)state
int r = tryAcquireShared(arg);
// 如果r不小于0說明獲取同步狀態(tài)成功
if (r >= 0) {
// 將當(dāng)前線程結(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)并喚醒后繼節(jié)點(diǎn)線程
setHeadAndPropagate(node, r);
p.next = null; // 置空方便GC
failed = false;
return;
}
}
// 調(diào)整同步隊(duì)列中node節(jié)點(diǎn)的狀態(tài)并判斷是否應(yīng)該被掛起
// 并判斷是否存在中斷信號序宦,如果需要中斷直接拋出異常結(jié)束執(zhí)行
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
// 結(jié)束該節(jié)點(diǎn)線程的請求
cancelAcquire(node);
}
}
// AbstractQueuedSynchronizer類 → setHeadAndPropagate()方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 獲取同步隊(duì)列中原本的head頭節(jié)點(diǎn)
setHead(node); // 將傳入的node節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)
/*
* propagate=剩余可用許可數(shù),h=舊的head節(jié)點(diǎn)
* h==null,(h=head)==null:
* 非空判斷的標(biāo)準(zhǔn)寫法疫剃,避免原本head以及新的頭節(jié)點(diǎn)node為空
* 如果當(dāng)前信號量對象中剩余可用許可數(shù)大于0或者
* 原本頭節(jié)點(diǎn)h或者新的頭節(jié)點(diǎn)node不是結(jié)束狀態(tài)則喚醒后繼節(jié)點(diǎn)線程
*
* 寫兩個(gè)if的原因在于避免造成不必要的喚醒,因?yàn)楹苡锌赡軉拘蚜撕罄m(xù)
* 節(jié)點(diǎn)的線程之后壤躲,還沒有線程釋放許可/鎖凌唬,從而導(dǎo)致再次陷入阻塞
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 避免傳入的node為同步隊(duì)列的唯一節(jié)點(diǎn),
// 因?yàn)殛?duì)列中如果只存在node一個(gè)節(jié)點(diǎn)更耻,那么后驅(qū)節(jié)點(diǎn)s必然為空
if (s == null || s.isShared())
doReleaseShared(); // 喚醒后繼節(jié)點(diǎn)
}
}
在doAcquireSharedInterruptibly(arg)
方法中總共做了三件事:
- 一秧均、創(chuàng)建一個(gè)狀態(tài)為Node.SHARED共享模式的節(jié)點(diǎn),并通過addWaiter()加入隊(duì)列
- 二讶隐、加入成功后開啟自旋巫延,判斷前驅(qū)節(jié)點(diǎn)是否為head,是則嘗試獲取同步狀態(tài)標(biāo)識疼阔,獲取成功后婆廊,將自己設(shè)置為head節(jié)點(diǎn),如果可用許可數(shù)大于0則喚醒后繼節(jié)點(diǎn)的線程
- 三宾舅、如果前驅(qū)節(jié)點(diǎn)不為head的節(jié)點(diǎn)以及前驅(qū)節(jié)點(diǎn)為head節(jié)點(diǎn)但獲取同步狀態(tài)失敗的節(jié)點(diǎn)扶平,則調(diào)用
shouldParkAfterFailedAcquire(p,node)
判斷前驅(qū)節(jié)點(diǎn)的狀態(tài)是否為SIGNAL狀態(tài)(一般shouldParkAfterFailedAcquire(p,node)
中的for循環(huán)至少需要執(zhí)行兩次以上才會返回ture结澄,第一次把前驅(qū)節(jié)點(diǎn)設(shè)置為SIGNAL狀態(tài),第二次檢測到SIGNAL狀態(tài)),如果是則調(diào)用parkAndCheckInterrupt()
掛起當(dāng)前線程并返回線程中斷狀態(tài)
如上便是doAcquireSharedInterruptibly(arg)
方法的大概工作破镰,接下來我們可以看看shouldParkAfterFailedAcquire()
以及parkAndCheckInterrupt()
方法:
// AbstractQueuedSynchronizer類 → shouldParkAfterFailedAcquire()方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取當(dāng)前節(jié)點(diǎn)的等待狀態(tài)
int ws = pred.waitStatus;
// 如果為等待喚醒(SIGNAL)狀態(tài)則返回true
if (ws == Node.SIGNAL)
return true;
// 如果當(dāng)前節(jié)點(diǎn)等待狀態(tài)大于0則說明是結(jié)束狀態(tài),
// 遍歷前驅(qū)節(jié)點(diǎn)直到找到?jīng)]有結(jié)束狀態(tài)的節(jié)點(diǎn)
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果當(dāng)前節(jié)點(diǎn)等待狀態(tài)小于0又不是SIGNAL狀態(tài)孕似,
// 則將其設(shè)置為SIGNAL狀態(tài),代表該節(jié)點(diǎn)的線程正在等待喚醒
// 也就是代表節(jié)點(diǎn)是剛從Condition的條件等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列泛烙,
// 節(jié)點(diǎn)狀態(tài)為CONDITION狀態(tài)(Semaphore中不存在condition的概念蔽氨,
// 所以同步隊(duì)列不會出現(xiàn)這個(gè)狀態(tài)的節(jié)點(diǎn),此處代碼不會執(zhí)行)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// AbstractQueuedSynchronizer類 → parkAndCheckInterrupt()方法
private final boolean parkAndCheckInterrupt() {
// 將當(dāng)前線程掛起
LockSupport.park(this);
// 獲取線程中斷狀態(tài),interrupted()是判斷當(dāng)前中斷狀態(tài)自赔,
// 而并不是中斷線程蟋滴,線程需要中斷返回true津函,反之false
return Thread.interrupted();
}
LockSupport → park()方法
public static void park(Object blocker) {
Thread t = Thread.currentThread();
// 設(shè)置當(dāng)前線程的監(jiān)視器blocker
setBlocker(t, blocker);
// 調(diào)用了native方法到JVM級別的阻塞機(jī)制阻塞當(dāng)前線程
UNSAFE.park(false, 0L);
// 阻塞結(jié)束后把blocker置空
setBlocker(t, null);
}
shouldParkAfterFailedAcquire()
方法的作用是判斷節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是否為等待喚醒狀態(tài)(SIGNAL狀態(tài)),如果是則返回true允坚。如果前驅(qū)節(jié)點(diǎn)的waitStatus大于0(只有CANCELLED結(jié)束狀態(tài)=1>0)稠项,既代表該前驅(qū)節(jié)點(diǎn)已沒有用了展运,應(yīng)該從同步隊(duì)列移除拗胜,執(zhí)行do/while循環(huán)遍歷所有前前驅(qū)節(jié)點(diǎn),直到尋找到非CANCELLED結(jié)束狀態(tài)的節(jié)點(diǎn)勘畔。如果節(jié)點(diǎn)狀態(tài)為SIGNAL等待喚醒狀態(tài)則直接調(diào)用parkAndCheckInterrupt()
掛起當(dāng)前線程。至此整個(gè)Semaphore.acquire()
獲取許可的方法流程結(jié)束诉字。如下圖:
如上圖琅轧,在AQS同步器中存在一個(gè)變量state冲杀,Semaphore信號量對象在初始化時(shí)傳遞的permits許可數(shù)會間接的賦值給AQS中的state同步標(biāo)識剩檀,而
permits/state
則代表著同一時(shí)刻可同時(shí)訪問臨界/共享資源的最大線程數(shù)。當(dāng)一條線程調(diào)用Semaphore.acquire()
獲取許可時(shí)运嗜,會首先判斷state是否大于0,如果大于則代表還有可用許可數(shù)奋救,state減1冗荸,線程獲取成功并返回執(zhí)行盔粹。直到state為零時(shí),代表著當(dāng)前信號量已經(jīng)不存在可用許可數(shù)了进萄,后續(xù)請求的線程則需要封裝成Node節(jié)點(diǎn)并將其加入同步隊(duì)列開啟自旋操作直至有線程釋放許可(state加一)。
至此援雇,AQS共享模式中非公平鎖的獲取鎖原理分析完畢具温。但是我們?nèi)缟戏治龅氖强身憫?yīng)線程中斷請求的獲取許可方式,而Semaphore中也實(shí)現(xiàn)了一套不可中斷式的獲取方法剂习,如下:
// Semaphore類 → acquireUninterruptibly()方法
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
// AbstractQueuedSynchronizer類 → acquireShared()方法
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// AbstractQueuedSynchronizer類 → doAcquireShared()方法
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 在前面的可中斷式獲取鎖方法中此處是直接拋出異常強(qiáng)制中斷線程的
// 而在不可中斷式的獲取方法中尸曼,這里是沒有拋出異常中斷線程的
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
觀察如上源碼不難發(fā)現(xiàn)冤竹,可響應(yīng)線程中斷的方法與不可響應(yīng)線程中斷的方法區(qū)別在于:
可響應(yīng)線程中斷的方法在每次操作之前會先檢測線程中斷信號,如果線程需要中斷操作,則直接拋出異常強(qiáng)制中斷線程的執(zhí)行肠阱。反之,不可響應(yīng)線程中斷的方法不會檢測線程中斷信號噪伊,而且不會拋出異常強(qiáng)制中斷。
2.1.2拙寡、信號量中非公平鎖NonfairSync釋放許可/鎖實(shí)現(xiàn)
使用Semaphore時(shí)釋放鎖則調(diào)用的是Semaphore.release()
方法般堆,調(diào)用該方法之后線程持有的許可會被釋放,同時(shí)permits/state
加一和橙,接下來Semaphore獲取許可的方法release()
的具體實(shí)現(xiàn),源碼如下:
// Semaphore類 → release()方法
public void release() {
sync.releaseShared(1);
}
// AbstractQueuedSynchronizer類 → releaseShared(arg)方法
public final boolean releaseShared(int arg) {
// 調(diào)用子類Semaphore中tryReleaseShared()方法實(shí)現(xiàn)
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
與之前獲取許可的方法一樣,Semaphore釋放許可的方法release()
也是通過間接調(diào)用AQS內(nèi)部的releaseShared(arg)
完成乡翅。因?yàn)锳QS的releaseShared(arg)
是魔法方法,所以最終的邏輯實(shí)現(xiàn)由Semaphore的子類Sync完成靶累,如下:
// Semaphore類 → Sync子類 → tryReleaseShared()方法
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 獲取AQS中當(dāng)前同步狀態(tài)state值
int current = getState();
// 對當(dāng)前的state值進(jìn)行增加操作
int next = current + releases;
// 不可能出現(xiàn)拴曲,除非傳入的releases為負(fù)數(shù)
if (next < current)
throw new Error("Maximum permit count exceeded");
// CAS更新state值為增加之后的next值
if (compareAndSetState(current, next))
return true;
}
}
釋放鎖/許可的方法邏輯相對來說比較簡單,對AQS中的state加一釋放獲取的同步狀態(tài)委乌。不過值得注意的是:在我們上篇文章分享的AQS獨(dú)占模式實(shí)現(xiàn)中心软,釋放鎖的邏輯中是沒有保證線程安全的耳贬,因?yàn)楠?dú)占模式的釋放鎖邏輯永遠(yuǎn)只會存在一條線程同時(shí)操作顷蟆。而在共享模式中,可能會存在多條線程同時(shí)釋放許可/鎖資源肮街,所以在此處使用了CAS+自旋的方式保證線程安全問題。
如果此處tryReleaseShared(releases)
CAS更新成功,那么則會進(jìn)入if(tryReleaseShared(arg))
中執(zhí)行doReleaseShared();
喚醒后繼節(jié)點(diǎn)線程仪际。
// AbstractQueuedSynchronizer類 → doReleaseShared()方法
private void doReleaseShared() {
/*
* 為了防止釋放過程中有其他線程進(jìn)入隊(duì)列,這里必須開啟自旋
* 如果頭節(jié)點(diǎn)設(shè)置失敗則重新檢測繼續(xù)循環(huán)
*/
for (;;) {
// 獲取隊(duì)列head頭節(jié)點(diǎn)
Node h = head;
// 如果頭節(jié)點(diǎn)不為空并且隊(duì)列中還存在其他節(jié)點(diǎn)
if (h != null && h != tail) {
// 獲取頭節(jié)點(diǎn)的節(jié)點(diǎn)狀態(tài)
int ws = h.waitStatus;
// 如果節(jié)點(diǎn)狀態(tài)為SIGNAL等待喚醒狀態(tài)則代表
if (ws == Node.SIGNAL) {
// 嘗試cas修改節(jié)點(diǎn)狀態(tài)值為0
// 失敗則繼續(xù)下次循環(huán)
// 成功則喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h); // 喚醒后繼節(jié)點(diǎn)線程
}
// 節(jié)點(diǎn)狀態(tài)為0時(shí)嘗試將節(jié)點(diǎn)狀態(tài)修改為PROPAGATE傳播狀態(tài)
// 失敗則跳出循環(huán)繼續(xù)下次循環(huán)
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 如果當(dāng)前隊(duì)列頭節(jié)點(diǎn)發(fā)生變化繼續(xù)循環(huán),反之則終止自旋
if (h == head)
break;
}
}
// AbstractQueuedSynchronizer類 → unparkSuccessor()方法
// 參數(shù):傳入需要喚醒后繼節(jié)點(diǎn)的節(jié)點(diǎn)
private void unparkSuccessor(Node node) {
// 獲取node節(jié)點(diǎn)的線程狀態(tài)
int ws = node.waitStatus;
if (ws < 0)
// 設(shè)置head節(jié)點(diǎn)為0
compareAndSetWaitStatus(node, ws, 0);
// 獲取后繼節(jié)點(diǎn)
Node s = node.next;
// 如果后繼節(jié)點(diǎn)為空或線程狀態(tài)已經(jīng)結(jié)束
if (s == null || s.waitStatus > 0) {
s = null;
// 遍歷整個(gè)隊(duì)列拿到可喚醒的節(jié)點(diǎn)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 喚醒后繼節(jié)點(diǎn)線程
LockSupport.unpark(s.thread);
}
在doReleaseShared()
方法直接在執(zhí)行中獲取head節(jié)點(diǎn),通過調(diào)用unparkSuccessor()
方法喚醒head后繼節(jié)點(diǎn)中的線程挣输。而因?yàn)樵摲椒w的邏輯是一個(gè)for(;;){}
死循環(huán),退出的條件為:只當(dāng)隊(duì)頭不發(fā)生改變時(shí)才退出向瓷,如果發(fā)生改變了則代表著一定有其他線程在當(dāng)前線程釋放鎖/許可的過程中獲取到了鎖猖任,所以當(dāng)前循環(huán)會繼續(xù),在第二次循環(huán)過程中长搀,在滿足條件h.waitStauts==0
的情況下彻况,這里會把head節(jié)點(diǎn)的waitStauts設(shè)置為Node.PROPAGATE
傳播狀態(tài)是為了保證喚醒傳遞良蛮。因?yàn)锳QS共享模式下是會出現(xiàn)多個(gè)線程同時(shí)對同步狀態(tài)標(biāo)識state進(jìn)行操作,如線程T1在執(zhí)行release()→doReleaseShared()
釋放許可操作皮胡,剛喚醒后繼線程準(zhǔn)備替換為head頭節(jié)點(diǎn)(準(zhǔn)備替換但是還沒替換),此時(shí)另外一條線程T2正好在同一時(shí)刻執(zhí)行acquire()→doAcquireShared()→setHeadAndPropagate()
獲取鎖操作烹笔,假設(shè)T2線程獲取的是最后一個(gè)可用許可饰豺,在執(zhí)行到setHeadAndPropagate()
方法(這個(gè)方法中存在一個(gè)超長判斷)饶套,傳入的propagate=0
:
// AbstractQueuedSynchronizer類 → setHeadAndPropagate()方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 獲取同步隊(duì)列中原本的head頭節(jié)點(diǎn)
setHead(node); // 將傳入的node節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)
/*
* propagate=剩余可用許可數(shù),h=舊的head節(jié)點(diǎn)
* h==null,(h=head)==null:
* 非空判斷的標(biāo)準(zhǔn)寫法,避免原本head以及新的頭節(jié)點(diǎn)node為空
* 如果當(dāng)前信號量對象中剩余可用許可數(shù)大于0或者
* 原本頭節(jié)點(diǎn)h或者新的頭節(jié)點(diǎn)node不是結(jié)束狀態(tài)則喚醒后繼節(jié)點(diǎn)線程
*
* 寫兩個(gè)if的原因在于避免造成不必要的喚醒构挤,因?yàn)楹苡锌赡軉拘蚜撕罄m(xù)
* 節(jié)點(diǎn)的線程之后,還沒有線程釋放許可/鎖矾飞,從而導(dǎo)致再次陷入阻塞
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 避免傳入的node為同步隊(duì)列的唯一節(jié)點(diǎn)驼鹅,
// 因?yàn)殛?duì)列中如果只存在node一個(gè)節(jié)點(diǎn)豺型,那么后驅(qū)節(jié)點(diǎn)s必然為空
if (s == null || s.isShared())
doReleaseShared(); // 喚醒后繼節(jié)點(diǎn)
}
}
// 超長判斷:該判斷的作用在于面對各種特殊情況能夠時(shí)保證及時(shí)獲取鎖
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 避免傳入的node為同步隊(duì)列的唯一節(jié)點(diǎn)剪验,
// 因?yàn)殛?duì)列中如果只存在node一個(gè)節(jié)點(diǎn)娶眷,那么后驅(qū)節(jié)點(diǎn)s必然為空
if (s == null || s.isShared())
doReleaseShared(); // 喚醒后繼節(jié)點(diǎn)
}
根據(jù)這個(gè)超長判斷的邏輯,因?yàn)閭魅氲?code>propagate=0代表著當(dāng)前已經(jīng)沒有可用許可數(shù)了伤塌,不滿足超長判斷中的第一個(gè)條件propagate>0
,所以T2線程獲取鎖之后理論上是不需要再喚醒其他線程獲取鎖/許可熊痴,但是因?yàn)門1線程已經(jīng)訪問完成臨界資源了正在釋放持有的許可,那么就會造成一種情況:隊(duì)列中head節(jié)點(diǎn)的后繼節(jié)點(diǎn)如果此時(shí)嘗試獲取鎖/許可,那么有很大幾率獲取到T1線程釋放的許可鄙煤。所以在釋放鎖時(shí),將head節(jié)點(diǎn)的waitStauts設(shè)置為Node.PROPAGATE
傳播狀態(tài)值為-3,滿足這個(gè)超長判斷中的第三個(gè)條件h.waitStatus < 0
锥腻,所以此時(shí)T2也會喚醒head后繼節(jié)點(diǎn)中等待獲取鎖/許可資源的線程。這樣去實(shí)現(xiàn)的好處在于:能夠充分照顧到head的后繼節(jié)點(diǎn)同時(shí)也能保證喚醒的傳遞。
至于這里為什么獲取到鎖/許可的線程需要繼續(xù)喚醒后繼節(jié)點(diǎn)線程甲葬?因?yàn)檫@里是共享鎖,而不是獨(dú)占鎖。一個(gè)線程剛獲得了共享鎖/許可棉钧,那么很有可能還有剩余的共享鎖可供排隊(duì)在后面的線程獲得,所以需要喚醒后面的線程。
至此休溶,釋放許可邏輯結(jié)束,對比獲取許可的邏輯相對來說要簡單許多,只需要更新state值后調(diào)用doReleaseShared()方法喚醒后繼節(jié)點(diǎn)線程即可杉女。但是在理解doReleaseShared()方法時(shí)需要額外注意:調(diào)用doReleaseShared()方法的線程會存在兩種:
- 一是釋放共享鎖/許可數(shù)的線程层释。調(diào)用release()方法釋放許可時(shí)必然調(diào)用它喚醒后繼線程
- 二是剛獲取到共享鎖/許可數(shù)的線程廉白。一定情況下楣嘁,在滿足“超長判斷”的任意條件時(shí)也會調(diào)用它喚醒后繼線程
2.2、AQS共享模式之Semaphore的FairSync公平鎖實(shí)現(xiàn)
AQS共享模式中的公平鎖實(shí)現(xiàn)除開在獲取鎖的邏輯上與非公平鎖的有些許不同外,其他的實(shí)現(xiàn)大致相同漱病。
2.2.1、信號量中公平鎖FairSync獲取許可/鎖實(shí)現(xiàn)
公平鎖的概念是指先請求鎖的線程一定比后請求鎖的線程要先執(zhí)行,先獲取到鎖資源,從時(shí)間上需要保證執(zhí)行的先后順序沿量。
公平鎖獲取許可執(zhí)行邏輯:Semaphore.acquire()獲取許可方法 → AQS.acquireSharedInterruptibly()方法 → AQS.tryAcquireShared()獲取共享鎖模板方法 → FairSync.tryAcquireShared()方法
// Semaphore類 → 構(gòu)造方法
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
// Semaphore類 → acquire()獲取鎖方法
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AbstractQueuedSynchronizer類 → acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 調(diào)用AQS定義的獲取共享鎖的模板方法
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// AbstractQueuedSynchronizer類 → tryAcquireShared()模板方法
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// Semaphore類 → FairSync公平鎖類
static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}
// Semaphore類 → FairSync內(nèi)部類 → tryAcquireShared()子類實(shí)現(xiàn)
protected int tryAcquireShared(int acquires) {
for (;;) {
// 不同點(diǎn):先判斷隊(duì)列中是否存在節(jié)點(diǎn)后再執(zhí)行獲取鎖操作
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
從獲取鎖/許可的代碼中可以非常明顯的看出乌妒,公平鎖的實(shí)現(xiàn)相對來說损话,與非公平鎖的唯一不同點(diǎn)在于:公平鎖的模式下獲取鎖,會先調(diào)用hasQueuedPredecessors()
方法判斷同步隊(duì)列中是否存在節(jié)點(diǎn)忘闻,如果存在則直接返回-1回到acquireSharedInterruptibly()
方法if(tryAcquireShared(arg)<0)
判斷,調(diào)用doAcquireSharedInterruptibly(arg)
方法將當(dāng)前線程封裝成Node.SHARED
共享節(jié)點(diǎn)加入同步隊(duì)列等待。反之永高,如果隊(duì)列中不存在節(jié)點(diǎn)則嘗試直接獲取鎖/許可辐脖。
2.2.2、信號量中公平鎖FairSync釋放許可/鎖實(shí)現(xiàn)
公平鎖釋放許可的邏輯與非公平鎖的實(shí)現(xiàn)是一致的,因?yàn)槎际荢ync類的子類,而釋放鎖的邏輯都是對state減一更新后歹苦,喚醒后繼節(jié)點(diǎn)的線程蚪腋。所以關(guān)于釋放鎖的具體實(shí)現(xiàn)則是交由Sync類實(shí)現(xiàn),這里不再重復(fù)贅述。
三哩簿、ReetrantLock與Semaphore的區(qū)別
對比項(xiàng) | ReetrantLock | Semaphore |
---|---|---|
實(shí)現(xiàn)模式 | 獨(dú)占模式 | 共享模式 |
獲取鎖方法 | tryAcquire() | tryAcquireShared() |
釋放鎖方法 | tryRelease() | tryAcquireShared() |
是否支持重入 | 支持 | 不支持 |
線程中斷 | 支持 | 支持 |
Condition | 支持 | 不支持 |
隊(duì)列數(shù)量 | 一個(gè)同步+多個(gè)等待 | 單個(gè)同步 |
節(jié)點(diǎn)類型 | Node.EXCLUSIVE | Node.SHARED |
四稼稿、共享模式的其他實(shí)現(xiàn)者
除開Semaphore信號量的實(shí)現(xiàn)是基于AQS的共享模式之外,在JUC并發(fā)包中CountDownLatch、ReetrantReadWriteLock讀寫鎖的Read讀鎖等都是基于AQS的共享模式實(shí)現(xiàn)的改执,下面我們也可以簡單的看看關(guān)于CountDownLatch的用法裹粤。
4.1、CountDownLatch應(yīng)用場景實(shí)戰(zhàn)
在CountDownLatch初始化時(shí)和Semaphore一樣,我們需要傳入一個(gè)數(shù)字count作為最大線程數(shù)
CountDownLatch countDownLatch = new CountDownLatch(3);
這個(gè)參數(shù)同樣會間接的賦值給AQS內(nèi)部的state同步狀態(tài)標(biāo)識。一般我們會調(diào)用它的兩個(gè)方法:await()與countDown()
:
- await():調(diào)用await()方法的線程會被封裝成共享節(jié)點(diǎn)加入同步隊(duì)列阻塞等待葱弟,直至state=0時(shí)才會喚醒同步隊(duì)列中所有的線程
- countDown():調(diào)用countDown()方法的線程會對state減一
而關(guān)于CountDownLatch有兩種用法:
- 一射窒、多等一:初始化count=1点寥,多條線程await()阻塞弟疆,一條線程調(diào)用countDown()喚醒所有阻塞線程
- 二历葛、一等多:初始化count=x帜羊,多線程countDown()對count進(jìn)行減一稠集,一條線程await()阻塞,當(dāng)count=0時(shí)阻塞的線程開始執(zhí)行
如上兩種用法在我們的項(xiàng)目中也可以有很多的應(yīng)用場景,多等一的用法我們可以用來在一定程度上模擬并發(fā)測試接口并發(fā)安全問題线定、死鎖問題等湾趾,如:
final CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 1; i <= 3; i++) {
new Thread(() -> {
try {
System.out.println("線程:" + Thread.currentThread().getName()
+ "....阻塞等待澳眷!");
countDownLatch.await();
// 可以在此處調(diào)用需要并發(fā)測試的方法或接口
System.out.println("線程:" + Thread.currentThread().getName()
+ "....開始執(zhí)行勿侯!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T" + i).start();
}
Thread.sleep(1000);
countDownLatch.countDown();
/*
程序開始運(yùn)行:
線程:T2....阻塞等待!
線程:T1....阻塞等待!
線程:T3....阻塞等待!
程序運(yùn)行一秒后(三條線程幾乎同時(shí)執(zhí)行):
線程:T2....開始執(zhí)行睦焕!
線程:T1....開始執(zhí)行本谜!
線程:T3....開始執(zhí)行!
*/
如上案例中,創(chuàng)建了一個(gè)CountDownLatch對象,初始化時(shí)傳遞count=1籽腕,循環(huán)創(chuàng)建三條線程T1,T2,T3
阻塞等待揍很,主線程在一秒后調(diào)用countDown()喚醒了同步隊(duì)列中的三條線程繼續(xù)執(zhí)行,原理如下:
我們在實(shí)際開發(fā)過程中,往往很多并發(fā)任務(wù)都存在前后依賴關(guān)系膘融,如詳情頁需要調(diào)用多個(gè)接口完成數(shù)據(jù)聚合攘宙、并行執(zhí)行獲取到數(shù)據(jù)后需要進(jìn)行結(jié)果合并铺韧、多個(gè)操作完成后需要進(jìn)行數(shù)據(jù)檢查等等讯壶,而這些場景下我們可以使用一等多的用法:
final CountDownLatch countDownLatch = new CountDownLatch(3);
Map data = new HashMap();
for (int i = 1; i <= 3; i++) {
final int page = i;
new Thread(() -> {
System.out.println("線程:" + Thread.currentThread().getName() +
"....讀取分段數(shù)據(jù):"+(page-1)*200+"-"+page*200+"行");
// 數(shù)據(jù)加入結(jié)果集:data.put();
countDownLatch.countDown();
}, "T" + i).start();
}
countDownLatch.await();
System.out.println("線程:" + Thread.currentThread().getName()
+ "....對數(shù)據(jù)集:data進(jìn)行處理");
/*
運(yùn)行結(jié)果:
線程:T1....讀取分段數(shù)據(jù):0-200行
線程:T2....讀取分段數(shù)據(jù):200-400行
線程:T3....讀取分段數(shù)據(jù):400-600行
線程main....對數(shù)據(jù)集:data進(jìn)行處理
*/
如上一等多的案例中帐萎,for循環(huán)開啟三個(gè)線程T1,T2,T3
并行執(zhí)行同時(shí)讀取數(shù)據(jù)增快處理效率葛躏,讀取完成之后將數(shù)據(jù)加入data結(jié)果集中匯總,主線程等待三條線程讀取完成后對數(shù)據(jù)集data進(jìn)行處理,如下:
4.2匪蟀、CountDownLatch實(shí)現(xiàn)原理
前面我們曾提到過段化,CountDownLatch也是基于AQS共享模式實(shí)現(xiàn)的,與Semaphore一樣,會將傳入的count間接的賦值給AQS內(nèi)部的state同步狀態(tài)標(biāo)識。
private final Sync sync;
// CountDownLatch構(gòu)造方法
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
// 對其內(nèi)部Sync對象進(jìn)行初始化
this.sync = new Sync(count);
}
// CountDownLatch類 → Sync內(nèi)部類
private static final class Sync extends AbstractQueuedSynchronizer {
// Sync構(gòu)造函數(shù):對AQS內(nèi)部的state進(jìn)行賦值
Sync(int count) {setState(count);}
// 調(diào)用await()方法最終會調(diào)用到這里
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
}
// CountDownLatch類 → await()方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
在CountDownLatch中,存在一個(gè)Sync內(nèi)部類,當(dāng)我們創(chuàng)建一個(gè)CountDownLatch對象時(shí),實(shí)則其內(nèi)部的構(gòu)造函數(shù)是在對其sync對象進(jìn)行初始化,與我們前面所說的一樣
CountDownLatch countDownLatch = new CountDownLatch(count);
初始化時(shí)傳遞的count數(shù)字最終會通過調(diào)用setState(state)
方法賦值給AQS內(nèi)部的同步狀態(tài)標(biāo)識state變量,而當(dāng)線程調(diào)用await()
方法時(shí),會調(diào)用AQS的acquireSharedInterruptibly()
方法:
// AbstractQueuedSynchronizer類 → acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 最終調(diào)用到CountDownLatch內(nèi)部Sync類的tryAcquireShared()方法
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// CountDownLatch類 → Sync內(nèi)部類 → tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
因?yàn)锳QS中tryAcquireShared(arg)
方法僅是模板方法的原因,所以線程在執(zhí)行acquireSharedInterruptibly()
方法時(shí)最終會調(diào)用到CountDownLatch內(nèi)部Sync類的tryAcquireShared()
方法眠菇。當(dāng)count/state為0時(shí)返回true,反之,count不為0時(shí)返回false,最終回到if(tryAcquireShared(arg)<0)
執(zhí)行時(shí)急凰,如果count不為0則執(zhí)行doAcquireSharedInterruptibly(arg)
方法將當(dāng)前線程信息封裝成Node.SHARED
共享節(jié)點(diǎn)加入同步隊(duì)列阻塞等待。原理如下:
如上便是
CountDownLatch.await()
的實(shí)現(xiàn)原理聂渊,大體來說還是比較簡單。下面我們接著分析一下CountDownLatch.countDown()
方法的實(shí)現(xiàn)撵孤。
// CountDownLatch類 → countDown()方法
public void countDown() {
sync.releaseShared(1);
}
// AbstractQueuedSynchronizer類 → releaseShared()方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// AbstractQueuedSynchronizer類 → 模板方法:tryReleaseShared()
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// CountDownLatch類 → Sync內(nèi)部類 → tryReleaseShared()方法
// 調(diào)用countDown()方法最終會調(diào)用到這里
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
調(diào)用CountDownLatch.countDown()
方法后會調(diào)用AQS的tryReleaseShared()
模板方法奴潘,最終調(diào)用CountDownLatch
中Sync內(nèi)部類
的tryReleaseShared()
方法奈虾。在該方法中首先會先對于state/count
進(jìn)行一次為0判斷碉纳,如果不為0則對count/state減一铁孵,然后再次對更新之后的state/count
進(jìn)行為0判斷熙宇,如果減一后state等于0返回true馆蠕,回到releaseShared()
的if(tryReleaseShared(arg))
執(zhí)行doReleaseShared()
喚醒同步隊(duì)列中的阻塞線程吼渡。反之寄雀,如果減一后不為0阿趁,當(dāng)前線程則直接返回呜呐,方法結(jié)束洋魂。如下:
4.3、CountDownLatch與CyclicBarrier的區(qū)別
在JUC包中還存在另一個(gè)和CountDownLatch作用相同的工具類:CyclicBarrier。與CountDownLatch不同的是:CyclicBarrier是基于AQS的獨(dú)占模式實(shí)現(xiàn)的,其內(nèi)部通過ReetrantLock與Condition實(shí)現(xiàn)線程阻塞與喚醒。對比如下:
對比項(xiàng) | CountDownLatch | CyclicBarrier |
---|---|---|
實(shí)現(xiàn)模式 | 共享模式 | 獨(dú)占模式 |
計(jì)數(shù)方式 | 減法 | 減法 |
復(fù)用支持 | 不可復(fù)用 | 計(jì)數(shù)可置0 |
重置支持 | 不可重置 | 可重置 |
設(shè)計(jì)重點(diǎn) | 一等多 | 多等多 |
五再膳、總結(jié)
通過Semaphore與CountDownLatch原理進(jìn)行分析后灾杰,不難得知凛篙,在初始化時(shí)傳遞的許可數(shù)/計(jì)數(shù)器最終都會間接的傳遞給AQS的同步狀態(tài)標(biāo)識state填物。當(dāng)一條線程嘗試獲取共享鎖時(shí)雁刷,會對state減一目派,當(dāng)state為0時(shí)代表沒有可用共享鎖了,其他后續(xù)請求的線程會被封裝成共享節(jié)點(diǎn)加入同步隊(duì)列等待,直至其他持有共享鎖的線程釋放(state加一)。不過與獨(dú)占模式不同的是:共享模式中裳擎,除開釋放鎖時(shí)會喚醒后繼節(jié)點(diǎn)的線程外拷淘,獲取共享鎖成功的線程也會在滿足一定條件下喚醒后繼節(jié)點(diǎn)结洼。至于共享模式中的公平鎖與非公平鎖則與之前的獨(dú)占模式的公平鎖與非公平鎖相同鸣峭,公平鎖情況下,先判斷隊(duì)列是否存在Node再獲取鎖惰爬,從而保證每條線程獲取共享鎖時(shí)都是先到先得的順序執(zhí)行的咨跌。而非公平鎖情況下殉摔,通過線程競爭的方式獲取,不管隊(duì)列中是否已經(jīng)存在Node節(jié)點(diǎn)挽懦,請求的線程都會先執(zhí)行一遍獲取鎖的邏輯蔫磨,只要執(zhí)行成功就能獲取到共享鎖搀罢,獲得線程執(zhí)行權(quán)。