前言
??JDK中為了處理線(xiàn)程之間的同步問(wèn)題,除了提供鎖機(jī)制之外,還提供了幾個(gè)非常有用的并發(fā)工具類(lèi):CountDownLatch悴了、CyclicBarrier寇钉、Semphore刀疙、Exchanger、Phaser扫倡;
??CountDownLatch谦秧、CyclicBarrier、Semphore撵溃、Phaser 這四個(gè)工具類(lèi)提供一種并發(fā)流程的控制手段疚鲤;而Exchanger工具類(lèi)則提供了在線(xiàn)程之間交換數(shù)據(jù)的一種手段。
簡(jiǎn)介
??Phaser 是JDK1.7版本中新增的缘挑,是一個(gè)可重用的同步barrier集歇,它的功能與 CountDownLatch、CyclicBarrier 相似语淘,但是使用起來(lái)更加靈活诲宇。可以
用來(lái)解決控制多個(gè)線(xiàn)程分階段共同完成任務(wù)的情景問(wèn)題惶翻。
Phaser中有兩個(gè)重要的計(jì)數(shù):
phase
:當(dāng)前的周期索引(或者 階段索引)姑蓝,初始值為0,當(dāng)所有線(xiàn)程執(zhí)行完本階段的任務(wù)后吕粗,phase就會(huì)加一纺荧,進(jìn)入下一階段;可以結(jié)合onAdvance()方法,在不同的階段宙暇,執(zhí)行不同的屏障方法输枯。
parties
:注冊(cè)的線(xiàn)程數(shù),即Phaser要監(jiān)控的線(xiàn)程數(shù)量客给,或者說(shuō)是 建立的屏障的數(shù)量用押。屏障的數(shù)量不是固定的,每個(gè)階段的屏障的數(shù)量都可以是不一樣靶剑。
下面詳細(xì)介紹Phaser一些機(jī)制
1蜻拨、Registration(注冊(cè)機(jī)制):
與其他barrier不同的是,Phaser中的
“注冊(cè)的同步者(parties)”
會(huì)隨時(shí)間而變化桩引,Phaser可以通過(guò)構(gòu)造器初始化parties個(gè)數(shù)缎讼,也可以在Phaser運(yùn)行期間隨時(shí)加入(方法
register( ), bulkRegister(int)
)新的parties,以及在運(yùn)行期間注銷(xiāo)(方法
arriveAndDeregister( )
)parties坑匠。
運(yùn)行時(shí)可以隨時(shí)加入血崭、注銷(xiāo)parties,只會(huì)影響Phaser內(nèi)部的計(jì)數(shù)器厘灼,它建立任何內(nèi)部的bookkeeping(賬本)夹纫,因此task不能查詢(xún)自己是否已經(jīng)注冊(cè)了,當(dāng)然你可以通過(guò)實(shí)現(xiàn)子類(lèi)來(lái)達(dá)成這一設(shè)計(jì)要求设凹。
2舰讹、Synchronization(同步機(jī)制):
類(lèi)似于CyclicBarrier,Phaser也可以awaited多次闪朱,它的arrivedAndAwaitAdvance()方法的效果類(lèi)似于CyclicBarrier的await()月匣。Phaser的每個(gè)周期(generation)都有一個(gè)phase數(shù)字杉编,phase 從0開(kāi)始铜靶,當(dāng)所有的已注冊(cè)的parties都到達(dá)后(arrive)將會(huì)導(dǎo)致此phase數(shù)字自增(advance)勾邦,當(dāng)達(dá)到Integer.MAX_VALUE后繼續(xù)從0開(kāi)始括蝠。這個(gè)phase數(shù)字用于表示當(dāng)前parties所處于的“階段周期”,它既可以標(biāo)記和控制parties的wait行為痹愚、喚醒等待的時(shí)機(jī)春弥。
Arrival:
Phaser中的arrive()太颤、arriveAndDeregister()方法粪狼,這兩個(gè)方法不會(huì)阻塞(block)退腥,但是會(huì)返回相應(yīng)的phase數(shù)字,當(dāng)此phase中最后一個(gè)party也arrive以后再榄,phase數(shù)字將會(huì)增加,即phase進(jìn)入下一個(gè)周期享潜,同時(shí)觸發(fā)(onAdvance)那些阻塞在上一phase的線(xiàn)程困鸥。這一點(diǎn)類(lèi)似于CyclicBarrier的barrier到達(dá)機(jī)制;更靈活的是,我們可以通過(guò)重寫(xiě)onAdvance方法來(lái)實(shí)現(xiàn)更多的觸發(fā)行為疾就。
Waiting:
Phaser中的awaitAdvance()方法澜术,需要指定一個(gè)phase數(shù)字,表示此Thread阻塞直到phase推進(jìn)到此周期猬腰,arriveAndAwaitAdvance()方法阻塞到下一周期開(kāi)始(或者當(dāng)前phase結(jié)束)鸟废。不像CyclicBarrier,即使等待Thread已經(jīng)interrupted姑荷,awaitAdvance方法會(huì)繼續(xù)等待盒延。Phaser提供了Interruptible和Timout的阻塞機(jī)制,不過(guò)當(dāng)線(xiàn)程Interrupted或者timout之后將會(huì)拋出異常鼠冕,而不會(huì)修改Phaser的內(nèi)部狀態(tài)添寺。如果必要的話(huà),你可以在遇到此類(lèi)異常時(shí)懈费,進(jìn)行相應(yīng)的恢復(fù)操作计露,通常是在調(diào)用forceTermination()方法之后。
Phaser通常在ForJoinPool中執(zhí)行tasks憎乙,它可以在有task阻塞等待advance時(shí)票罐,確保其他tasks的充分并行能力。
3泞边、Termination(終止):
Phaser可以進(jìn)入Termination狀態(tài)该押,可以通過(guò)isTermination()方法判斷;當(dāng)Phaser被終止后繁堡,所有的同步方法將會(huì)立即返回(解除阻塞)沈善,不需要等到advance(即advance也會(huì)解除阻塞),且這些阻塞方法將會(huì)返回一個(gè)負(fù)值的phase值(awaitAdvance方法椭蹄、arriveAndAwaitAdvance方法)闻牡。當(dāng)然,向一個(gè)termination狀態(tài)的Phaser注冊(cè)party將不會(huì)有效绳矩;此時(shí)onAdvance()方法也將會(huì)返回true(默認(rèn)實(shí)現(xiàn))罩润,即所有的parties都會(huì)被deregister,即register個(gè)數(shù)為0翼馆。
4割以、Tiering(分層):
Phaser可以“分層”,以tree的方式構(gòu)建Phaser來(lái)降低“競(jìng)爭(zhēng)”应媚。如果一個(gè)Phaser中有大量parties严沥,這會(huì)導(dǎo)致嚴(yán)重的同步競(jìng)爭(zhēng),所以我們可以將它們分組并共享一個(gè)parent Phaser中姜,這樣可以提高吞吐能力消玄;Phaser中注冊(cè)和注銷(xiāo)parties都會(huì)有Child 和parent Phaser自動(dòng)管理跟伏。當(dāng)Child Phaser中中注冊(cè)的parties變?yōu)榉?時(shí)(在構(gòu)造函數(shù)Phaser(Phaser parent,int parties),或者register()方法)翩瓜,Child Phaser將會(huì)注冊(cè)到其Parent上受扳;當(dāng)Child Phaser中的parties變?yōu)?時(shí)(比如由arrivedAndDegister()方法),那么此時(shí)Child Phaser也將從其parent中注銷(xiāo)出去兔跌。
5勘高、Monitoring.(監(jiān)控):
同步的方法只會(huì)被register操作調(diào)用,對(duì)于當(dāng)前state的監(jiān)控方法可以在任何時(shí)候調(diào)用坟桅,比如getRegisteredParties()獲取已經(jīng)注冊(cè)的parties個(gè)數(shù)华望,getPhase()獲取當(dāng)前phase周期數(shù)等;因?yàn)檫@些方法并非同步桦卒,所以只能反映當(dāng)時(shí)的瞬間狀態(tài)立美。
Phaser的API介紹
構(gòu)造方法
方法名 描述
Phaser() 構(gòu)建一個(gè)Phaser
Phaser(int parties) 創(chuàng)建一個(gè)指定屏障數(shù)量的Phaser
Phaser(Phaser parent) 相當(dāng)于 Phaser(parent, 0)
Phaser(Phaser parent, int parties) 創(chuàng)建一個(gè)指定屏障數(shù)量的Phaser,此phaser是注冊(cè)在另一個(gè)Phaser parent下
方法摘要
方法名 描述
public int arrive() 到達(dá)此phaser的屏障點(diǎn)方灾,使phaser的到達(dá)的線(xiàn)程數(shù)加一建蹄,但不會(huì)阻塞等待其他線(xiàn)程。
返回:
phase值裕偿,即當(dāng)前階段(周期)的索引洞慎,或者是負(fù)值(當(dāng)Phaser 停止時(shí))
public int arriveAndDeregister() 到達(dá)此phaser的屏障點(diǎn),使phaser的到達(dá)的線(xiàn)程數(shù)加一,并且會(huì)取消一個(gè)屏障點(diǎn)的注冊(cè)嘿棘。也不會(huì)阻塞等待其他線(xiàn)程劲腿。
返回:
phase值,即當(dāng)前階段(周期)的索引鸟妙,或者是負(fù)值(當(dāng)Phaser 停止時(shí))
public int arriveAndAwaitAdvance() 到達(dá)此phaser的屏障點(diǎn)焦人,并且阻塞等待其他線(xiàn)程到達(dá)此屏障點(diǎn)。注意:這是
非中斷的阻塞
重父,此方法與awaitAdvance(arrive())等同花椭。如果你希望阻塞機(jī)制支持timeout、interrupted響應(yīng)房午,可以使用類(lèi)似的其他方法(參見(jiàn)下文)矿辽。如果你希望到達(dá)后且注銷(xiāo),而且阻塞等到當(dāng)前phase下其他的parties到達(dá)郭厌,可以使用awaitAdvance(arriveAndDeregister())方法組合袋倔。
返回:
phase值,即當(dāng)前階段(周期)的索引折柠;如果Phaser 停止宾娜,則返回負(fù)值
public int awaitAdvance(int phase) 在指定的階段(周期)phase下等待其他線(xiàn)程到達(dá)屏障點(diǎn),注意:這是
非中斷的阻塞
扇售。如果指定的phase與Phaser當(dāng)前的phase不一致碳默,或者Phaser 停止了贾陷,則立即返回缘眶。
參數(shù) phase:
通常就是arrive()嘱根、arriveAndDeregister()的返回值;
public int awaitAdvanceInterruptibly(int phase)
throws InterruptedException 此方法是可中斷的巷懈,其他與awaitAdvance()一致
public int awaitAdvanceInterruptibly(
int phase, long timeout,TimeUnit unit)
throws InterruptedException, TimeoutException 超時(shí)等待方法该抒,其他與awaitAdvance()一致
public int register() 新注冊(cè)一個(gè)party,導(dǎo)致Phaser內(nèi)部registerPaties數(shù)量加1顶燕;如果此時(shí)onAdvance方法正在執(zhí)行凑保,此方法將會(huì)等待它執(zhí)行完畢后才會(huì)返回。此方法返回當(dāng)前的phase周期數(shù)涌攻,如果Phaser已經(jīng)中斷欧引,將會(huì)返回負(fù)數(shù)。
public int bulkRegister(int parties) 批量注冊(cè)多個(gè)party恳谎,與register()相似
protected boolean onAdvance(int phase, int registeredParties) barrier action(屏障方法)芝此。如果需要,則必須繼承Phaser類(lèi)因痛,重寫(xiě)此方法婚苹。如果返回true表示此Phaser應(yīng)該終止(此后將會(huì)把Phaser的狀態(tài)為termination,即isTermination()將返回true鸵膏。)膊升,否則可以繼續(xù)進(jìn)行。phase參數(shù)表示當(dāng)前周期數(shù)谭企,registeredParties表示當(dāng)前已經(jīng)注冊(cè)的parties個(gè)數(shù)廓译。
默認(rèn)實(shí)現(xiàn)為:return registeredParties == 0;在很多情況下债查,開(kāi)發(fā)者可以通過(guò)重寫(xiě)此方法非区,來(lái)實(shí)現(xiàn)自定義的
public void forceTermination() 強(qiáng)制終止,此后Phaser對(duì)象將不可用攀操,即register等將不再有效院仿。此方法將會(huì)導(dǎo)致Queue中所有的waiter線(xiàn)程被喚醒。這個(gè)方法對(duì)于在一個(gè)或多個(gè)任務(wù)遇到意外異常之后協(xié)調(diào)恢復(fù)是很有用的速和。
public int getArrivedParties() 獲取已經(jīng)到達(dá)的parties個(gè)數(shù)歹垫。
public int getUnarrivedParties() 獲取沒(méi)有到達(dá)的parties個(gè)數(shù)。
public Phaser getParent() 獲取其父親類(lèi)Phaser颠放,沒(méi)有則返回null
public Phaser getRoot() 返回該phaser的根祖先排惨,如果沒(méi)有父類(lèi),返回此phaser碰凶。
public boolean isTerminated() 如果該phaser被終止暮芭,則返回true鹿驼。
@ Example1 多階段(周期)、帶屏障事件示例
??例子很簡(jiǎn)單辕宏,模擬跑步比賽的過(guò)程畜晰,分為三個(gè)階段:1、參賽者到達(dá)起跑點(diǎn)瑞筐,并在起跑點(diǎn)等待其他參賽者凄鼻;2、參賽者齊人后聚假,開(kāi)始準(zhǔn)備块蚌,并等待槍聲。3膘格、參賽這到達(dá)終點(diǎn)峭范,并結(jié)束比賽,不再等待任何情況瘪贱。
public class PhaserTest{
public static MyPhaser myPhaser = new MyPhaser();
public static void main(String[] args) {
MyPhaser myPhaser = new MyPhaser();
// 一次性注冊(cè)5個(gè)party纱控,即建立5個(gè)屏障點(diǎn)
myPhaser.bulkRegister(5);
for (int i = 0; i < 5; i++) {
Thread runner = new Thread(new Runnable() {
@Override
public void run() {
// 第一階段(周期),phaser的周期數(shù)初始值為0
System.out.println(Thread.currentThread().getName() + "到達(dá)了起跑點(diǎn)政敢!");
// 到達(dá)了屏障點(diǎn)(起跑點(diǎn))其徙,阻塞等待其他線(xiàn)程
myPhaser.arriveAndAwaitAdvance();
// 繼續(xù)運(yùn)行,將進(jìn)入第二階段喷户,phaser的周期數(shù)加一
System.out.println(Thread.currentThread().getName() + "準(zhǔn)備起跑唾那!");
// 到達(dá)了屏障點(diǎn)(準(zhǔn)備起跑),阻塞等待其他線(xiàn)程
myPhaser.arriveAndAwaitAdvance();
// 進(jìn)入第三階段
System.out.println(Thread.currentThread().getName() + "到達(dá)了終點(diǎn)褪尝!");
// 參數(shù)者到達(dá)了終點(diǎn)闹获,結(jié)束比賽,不再等待其他參賽者
myPhaser.arriveAndDeregister();// 取消注冊(cè)一個(gè)party
}
}, "參賽者" + i + "號(hào)");
runner.start();
}
}
}
MyPhaser類(lèi)河哑,定制 barrier action(屏障事件)
public class MyPhaser extends Phaser {
@Override
//改寫(xiě)onAdvance方法
public boolean onAdvance(int phase, int registeredParties) {
//判斷當(dāng)前的Phaser是否終止
if (!isTerminated()) {
// 分成三個(gè)階段避诽,在不同的階段(周期),執(zhí)行不同的屏障事件
if (phase == 0) {
// ....
System.out.println("第一階段:所有參賽者都到達(dá)了起跑點(diǎn)璃谨!");
} else if (phase == 1) {
// ....
System.out.println("第二階段:所有參賽者都已經(jīng)就位沙庐,并準(zhǔn)備好!比賽正式開(kāi)始");
} else if (phase == 2) {
// ....
System.out.println("第三階段:所有參賽者都到達(dá)終點(diǎn)佳吞,比賽結(jié)束9俺!");
}
}
return super.onAdvance(phase, registeredParties);
}
}
運(yùn)行結(jié)果:
參賽者0號(hào)到達(dá)了起跑點(diǎn)底扳!
參賽者3號(hào)到達(dá)了起跑點(diǎn)铸抑!
參賽者4號(hào)到達(dá)了起跑點(diǎn)!
參賽者2號(hào)到達(dá)了起跑點(diǎn)衷模!
參賽者1號(hào)到達(dá)了起跑點(diǎn)鹊汛!
第一階段:所有參賽者都到達(dá)了起跑點(diǎn)蒲赂!
參賽者0號(hào)準(zhǔn)備起跑!
參賽者1號(hào)準(zhǔn)備起跑刁憋!
參賽者2號(hào)準(zhǔn)備起跑滥嘴!
參賽者3號(hào)準(zhǔn)備起跑!
參賽者4號(hào)準(zhǔn)備起跑职祷!
第二階段:所有參賽者都已經(jīng)就位氏涩,并準(zhǔn)備好!比賽正式開(kāi)始
參賽者4號(hào)到達(dá)了終點(diǎn)有梆!
參賽者1號(hào)到達(dá)了終點(diǎn)!
參賽者0號(hào)到達(dá)了終點(diǎn)意系!
參賽者2號(hào)到達(dá)了終點(diǎn)泥耀!
參賽者3號(hào)到達(dá)了終點(diǎn)!
第三階段:所有參賽者都到達(dá)終點(diǎn)蛔添,比賽結(jié)束痰催!
@ Example2?分層示例
下面的例子:每一個(gè)Phaser周期類(lèi)注冊(cè)的線(xiàn)程數(shù)目不能超過(guò)TASKS_PER_PHASER(例子中是4個(gè)),否則就要增加一層子phaser層迎瞧。
public class PhaserTest6 {
//
private static final int = 4;
public static void main(String args[]) throws Exception {
//
final int phaseToTerminate = 3;
//創(chuàng)建一個(gè)Phaser父類(lèi)對(duì)象
final Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) { //屏障方法
System.out.println("====== " + phase + " ======");
return phase == phaseToTerminate || registeredParties == 0;
}
};
//創(chuàng)建10個(gè)任務(wù)
final Task tasks[] = new Task[10];
build(tasks, 0, tasks.length, phaser);
for (int i = 0; i < tasks.length; i++) {
System.out.println("starting thread, id: " + i);
final Thread thread = new Thread(tasks[i]);
thread.start();
}
}
//遞歸分層夸溶,
public static void build(Task[] tasks, int lo, int hi, Phaser ph) {
//如果任務(wù)的數(shù)量超過(guò)每一層的phaser的閾值TASKS_PER_PHASER,則要繼續(xù)分層
if (hi - lo > TASKS_PER_PHASER) {
for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
int j = Math.min(i + TASKS_PER_PHASER, hi);
//當(dāng)前的phaser(ph)作為父周期凶硅,來(lái)創(chuàng)建一個(gè)子phaser
build(tasks, i, j, new Phaser(ph));
}
} else {
//線(xiàn)程的數(shù)量在閾值內(nèi)缝裁,無(wú)需分成,可以直接注冊(cè)線(xiàn)程到當(dāng)前的Phaser
for (int i = lo; i < hi; ++i)
tasks[i] = new Task(i, ph);
}
}
public static class Task implements Runnable {
//
private final int id;
private final Phaser phaser;
public Task(int id, Phaser phaser) {
this.id = id;
this.phaser = phaser;
this.phaser.register();
}
@Override
public void run() {
while (!phaser.isTerminated()) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// NOP
}
System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);
phaser.arriveAndAwaitAdvance();
}
}
}
}
需要注意的是足绅,TASKS_PER_PHASER的值取決于具體的Task實(shí)現(xiàn)捷绑。對(duì)于Task執(zhí)行時(shí)間很短的場(chǎng)景(也就是競(jìng)爭(zhēng)相對(duì)激烈),可以考慮使用較小的TASKS_PER_PHASER值氢妈,例如4粹污。反之可以適當(dāng)增大
運(yùn)行結(jié)果:
in Task.run(), phase: 0, id: 2
in Task.run(), phase: 0, id: 1
in Task.run(), phase: 0, id: 3
in Task.run(), phase: 0, id: 0
in Task.run(), phase: 0, id: 8
in Task.run(), phase: 0, id: 5
in Task.run(), phase: 0, id: 9
in Task.run(), phase: 0, id: 7
in Task.run(), phase: 0, id: 4
in Task.run(), phase: 0, id: 6
====== 0 ======
in Task.run(), phase: 1, id: 9
in Task.run(), phase: 1, id: 6
in Task.run(), phase: 1, id: 1
in Task.run(), phase: 1, id: 7
in Task.run(), phase: 1, id: 8
in Task.run(), phase: 1, id: 5
in Task.run(), phase: 1, id: 0
in Task.run(), phase: 1, id: 4
in Task.run(), phase: 1, id: 3
in Task.run(), phase: 1, id: 2
====== 1 ======
in Task.run(), phase: 2, id: 6
in Task.run(), phase: 2, id: 0
in Task.run(), phase: 2, id: 2
in Task.run(), phase: 2, id: 3
in Task.run(), phase: 2, id: 7
in Task.run(), phase: 2, id: 5
in Task.run(), phase: 2, id: 8
in Task.run(), phase: 2, id: 9
in Task.run(), phase: 2, id: 1
in Task.run(), phase: 2, id: 4
====== 2 ======
in Task.run(), phase: 3, id: 3
in Task.run(), phase: 3, id: 4
in Task.run(), phase: 3, id: 9
in Task.run(), phase: 3, id: 5
in Task.run(), phase: 3, id: 8
in Task.run(), phase: 3, id: 1
in Task.run(), phase: 3, id: 7
in Task.run(), phase: 3, id: 0
in Task.run(), phase: 3, id: 2
in Task.run(), phase: 3, id: 6
====== 3 ======
文章源地址:https://www.cnblogs.com/jinggod/p/8494624.html