java.util.concurrent概覽

該系列文章為翻譯自國(guó)外博客http://www.baeldung.com

1.概覽

java.util.concurrent 包提供了創(chuàng)建多線程應(yīng)用所需的工具荔烧, 在本文中长豁,我們會(huì)對(duì)整個(gè)java.util.concurrent包做一個(gè)概覽夯到。

2.核心組件

? ? .?Executor

? ? .ExecutorService

? ? .ScheduledExecutorService?

? ? .Future?

? ? ?.CountDownLatch?

? ? ?.CyclicBarrier?

? ? ?.Semaphore?

? ? ?.ThreadFactory

? ? ?.BlockingQueue

? ? ?.DelayQueue

? ? ?.Locks

? ? ?.Phaser

針對(duì)上述的每一個(gè)組件弧腥,我們都用專題文章张抄。

2.1?Executor

? Executor代表的是一個(gè)能執(zhí)行給定任務(wù)的接口(interface)孤页。一個(gè)任務(wù)應(yīng)該在一個(gè)新線程上運(yùn)行還是在一個(gè)并發(fā)線程上運(yùn)行,取決于特定的實(shí)現(xiàn)(調(diào)用從何處開始) 奕纫。因此提陶,使用這個(gè)接口我們可以把任務(wù)執(zhí)行流從實(shí)際的任務(wù)執(zhí)行鏈中相分離。

不過有一點(diǎn)需要注意匹层,Executor并不嚴(yán)格地要求任務(wù)執(zhí)行一定要是異步的隙笆。在下面這個(gè)簡(jiǎn)單得例子中,一個(gè)executor可以立即在當(dāng)前調(diào)用的線程中執(zhí)行已提交的任務(wù)task.

我們需要建立一個(gè)invoker去創(chuàng)建executor實(shí)例:

publicclassInvoker implementsExecutor {

????@Override

? ? ?publicvoidexecute(Runnable r) {

????????r.run();

????}

}

現(xiàn)在升筏,我們就可以使用這個(gè)invoker來執(zhí)行任務(wù)啦撑柔。

publicvoidexecute() {

????Executor executor = newInvoker();

????executor.execute( () -> {

????????// 要執(zhí)行的任務(wù)

????});

}

注意:如果這個(gè)executor不能接受這個(gè)要執(zhí)行的任務(wù),它會(huì)拋出RejectedExecutionException異常您访。


2.2?ExecutorService

ExecutorService是一個(gè)針對(duì)異步處理的完整解決方案铅忿,它負(fù)責(zé)管理一個(gè)內(nèi)存隊(duì)列并且基于線程的可用性調(diào)度已提交的任務(wù)。

?在使用ExecutorService 之前灵汪,我們需要?jiǎng)?chuàng)建一個(gè)Runnable類.

publicclassTask implementsRunnable {

????@Override

????publicvoidrun() {

????????// task details

????}

}

現(xiàn)在我們就能創(chuàng)建一個(gè)ExecutorService實(shí)例并分配這個(gè)任務(wù)辆沦。在創(chuàng)建的時(shí)候,我們需要指定線程池的大小识虚。

ExecutorService executor = Executors.newFixedThreadPool(10);

如果你想創(chuàng)建一個(gè)單線程的ExecutorService實(shí)例,我們可以使用new SingleThreadExecutor(ThreadFactory threadFactory) 創(chuàng)建這個(gè)實(shí)例妒茬。

一旦這個(gè)executor被創(chuàng)建担锤,我們就可以使用它來提交任務(wù)了。

publicvoidexecute() {

????executor.submit(newTask());

}

當(dāng)然乍钻,我們也能在提交任務(wù)的時(shí)候肛循,創(chuàng)建這個(gè)Runnable實(shí)例、

executor.submit(() -> {

????newTask();

});

它同時(shí)也提供了兩個(gè)十分便捷地結(jié)束運(yùn)行的方法银择,第一個(gè)是shutdown()多糠,它會(huì)一直等待到所有被提交的任務(wù)結(jié)束運(yùn)行。另一個(gè)方法是 shutdownNow() ,這個(gè)方法會(huì)立即終結(jié)所有的pending或executing 的任務(wù)浩考。

還有一個(gè)方法 awaitTermination(long timeout,TimeUnit unit) ,在一個(gè)關(guān)閉時(shí)間觸發(fā)或 執(zhí)行超時(shí)發(fā)生 又或者 執(zhí)行線程自身被中斷之后夹孔, 這個(gè)方法 會(huì)強(qiáng)制阻塞直到所有的所有的任務(wù)完成運(yùn)行。

try{

????executor.awaitTermination( 20l, TimeUnit.NANOSECONDS );

} catch(InterruptedException e) {

????e.printStackTrace();

}

2.3?ScheduledExecutorService

ScheduledExecutorService 是一個(gè)類似于 ExecutorService的接口,但是它可以周期性地執(zhí)行任務(wù)搭伤。Executor和ExecutorService的方法會(huì)立刻被調(diào)度沒有任何的人為延遲只怎。0或任何負(fù)數(shù)都表明這個(gè)請(qǐng)求需要立即執(zhí)行。

我們可以使用Runnable和Callable接口來定義task:

publicvoidexecute() {

????ScheduledExecutorService executorService

??????= Executors.newSingleThreadScheduledExecutor();

????Future future = executorService.schedule(() -> {

????????// ...

????????return"Hello world";

????}, 1, TimeUnit.SECONDS);

????ScheduledFuture scheduledFuture = executorService.schedule(() -> {

????????// ...

????}, 1, TimeUnit.SECONDS);

?executorService.shutdown();

}

ScheduledExecutorService也可以在一個(gè)給定的延遲時(shí)間后再調(diào)度這個(gè)任務(wù):

executorService.scheduleAtFixedRate(() -> {

????// ...

}, 1, 10, TimeUnit.SECONDS);


executorService.scheduleWithFixedDelay(() -> {

????// ...

}, 1, 10, TimeUnit.SECONDS);

這里怜俐,scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit ) 方法會(huì)創(chuàng)建并執(zhí)行一個(gè)周期性的行為身堡,

? 在給定的初始延遲后,它會(huì)被首次執(zhí)行拍鲤,緊接著在 給定的周期內(nèi)陸續(xù)被調(diào)用贴谎,直到這個(gè)服務(wù)實(shí)例關(guān)閉(shutdown).

? scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit ) 方法會(huì)創(chuàng)建并執(zhí)行一個(gè)周期性的行為。

? 在給定的初始延遲后被首次調(diào)用季稳,并在再給定的正在執(zhí)行的和下一個(gè)被調(diào)用的之間的延遲時(shí)間內(nèi)重復(fù)擅这。

2.4 Future

Future是用于表示一個(gè)異步操作的結(jié)果。它同時(shí)還伴隨有檢查異步操作是否完成的方法以及獲取計(jì)算結(jié)果的方法等等绞幌。另外蕾哟,cancel(boolean mayInterruptIfRunning) 這個(gè)API會(huì)取消操作并且釋放正在執(zhí)行的線程。如果mayInterruptIfRunning的值為true的話莲蜘,正在執(zhí)行任務(wù)的線程將會(huì)被立即終結(jié)谭确。否則的話,就允許進(jìn)行中的任務(wù)完成票渠。

我們可以使用下面的代碼片去創(chuàng)建一個(gè)future 實(shí)例:

publicvoidinvoke() {

????ExecutorService executorService = Executors.newFixedThreadPool(10);

????Future future = executorService.submit(() -> {

? ? ? // ...

? ? ? Thread.sleep(10000l);

????????return"Hello world";

????});

}

我們可以使用下面的代碼片去檢查這個(gè)future結(jié)果已經(jīng)準(zhǔn)備好并且在運(yùn)算完成時(shí)逐哈,獲取數(shù)據(jù):

if(future.isDone() && !future.isCancelled()) {

????try{

????????str = future.get();

????} catch(InterruptedException | ExecutionException e) {

????????e.printStackTrace();

????}

}

我們也能為一個(gè)指定的操作指定超時(shí)時(shí)間,如果這個(gè)任務(wù)花費(fèi)的時(shí)間超過了指定值问顷,就會(huì)拋出

? 一個(gè)TimeoutException:

try{

????future.get(10, TimeUnit.SECONDS);

} catch(InterruptedException | ExecutionException | TimeoutException e) {

????e.printStackTrace();

}

2.5?CountDownLatch

CountDownLatch 是一個(gè)功能類昂秃,它能阻塞一組線程直到一些操作完成。CountDownLatch在初始化時(shí)杜窄,會(huì)伴隨著一個(gè)counter(Integer type);

當(dāng)依賴的線程結(jié)束運(yùn)行時(shí)肠骆,該計(jì)數(shù)器會(huì)遞減。但是塞耕,一旦計(jì)數(shù)器的值到達(dá)0蚀腿,其他線程就會(huì)被釋放。關(guān)于CountDownLatch的詳情請(qǐng)點(diǎn)擊這里扫外。


2.6??CyclicBarrier

CyclicBarrier 的作用幾乎和CountDownLatch 一樣莉钙,除了 我們可以復(fù)用它。? 不像CountDownLatch,在調(diào)用最終的任務(wù)之前筛谚,它允許多線程使用 await()方法(就是大家所熟知的 barrier 條件) 等待彼此磁玉。我們需要?jiǎng)?chuàng)建一個(gè)Runnable task實(shí)例 來初始化這個(gè)barrier 條件:

publicclassTask implementsRunnable {

????privateCyclicBarrier barrier;

? ? publicTask(CyclicBarrier barrier) {

????????this.barrier = barrier;

????}

????@Override

????publicvoidrun() {

????????try{

????????????LOG.info(Thread.currentThread().getName() +

??????????????" is waiting");

????????????barrier.await();

????????????LOG.info(Thread.currentThread().getName() +

??????????????" is released");

????????} catch(InterruptedException | BrokenBarrierException e) {

????????????e.printStackTrace();

????????}

? }

}

現(xiàn)在我們可以調(diào)用一些線程去競(jìng)爭(zhēng)這個(gè)barrier 條件:

publicvoidstart() {

????CyclicBarrier cyclicBarrier = newCyclicBarrier(3, () -> {

????????// ...

????????LOG.info("All previous tasks are completed");

????});

? ? ?Thread t1 = newThread(newTask(cyclicBarrier), "T1");

????Thread t2 = newThread(newTask(cyclicBarrier), "T2");

????Thread t3 = newThread(newTask(cyclicBarrier), "T3");

????if(!cyclicBarrier.isBroken()) {

????????t1.start();

????????t2.start();

????????t3.start();

????}

}

這里,isBroken()方法會(huì)檢查是否有線程在執(zhí)行期間被中斷驾讲,我們應(yīng)該在執(zhí)行實(shí)際的處理之前先執(zhí)行該檢查蚊伞。

2.7?Semaphore

Semaphore是用于阻塞對(duì)某些物理或邏輯資源的線程級(jí)別的訪問席赂。一個(gè)semaphore包含了一套許可證;當(dāng)一個(gè)線程試圖進(jìn)入臨界區(qū)時(shí)厚柳,它需要檢查

? 該semaphore,以便于知道有沒有可用的許可證氧枣。

? 如果許可證不可用的話(通過 tryAcquire()),線程不允許跳到臨界區(qū);然而别垮,如果許可證可用便监,訪問被準(zhǔn)許的話,該許可證計(jì)數(shù)器(the permit counter)就會(huì)減少.

? 一旦正在執(zhí)行的線程釋放臨界區(qū)碳想,許可證計(jì)數(shù)器的值又會(huì)被再一次增加(通過release()方法來完成)烧董。

? 通過使用? tryAcquire(long timeout,TimeUnit unit)方法,我們?cè)讷@取訪問時(shí)指定超時(shí)時(shí)間胧奔。

? 下面的代碼片段可用于實(shí)現(xiàn)一個(gè)semaphore:

staticSemaphore semaphore = newSemaphore(10);

publicvoidexecute() throwsInterruptedException {

????LOG.info("Available permit : "+ semaphore.availablePermits());

????LOG.info("Number of threads waiting to acquire: "+

??????semaphore.getQueueLength());

????if(semaphore.tryAcquire()) {

????????semaphore.acquire();

????????// ...

????????semaphore.release();

????}

}

使用semaphore逊移,我們可以實(shí)現(xiàn)一個(gè)像數(shù)據(jù)結(jié)構(gòu)一樣的互斥量。更多詳情龙填,請(qǐng)看胳泉。


2.8?ThreadFactory 線程工廠

正如名字所提示的意思,ThreadFactory 表現(xiàn)的就像線程池一樣岩遗,它會(huì)在需要時(shí)創(chuàng)建新線程扇商。在實(shí)現(xiàn)有效的線程創(chuàng)建鏈時(shí),ThreadFactory消除了許多樣板化的代碼宿礁。

?我們可以定義一個(gè)ThreadFactory:

publicclassBaeldungThreadFactory implementsThreadFactory {

????privateintthreadId;

????privateString name;

????publicBaeldungThreadFactory(String name) {

????????threadId = 1;

????????this.name = name;

????}

????@Override

????publicThread newThread(Runnable r) {

????????Thread t = newThread(r, name + "-Thread_"+ threadId);

????????LOG.info("created new thread with id : "+ threadId +

????????????" and name : "+ t.getName());

????????threadId++;

????????returnt;

????}

}

我們可以使用這個(gè)newThread(Runnable r)方法在運(yùn)行時(shí)去創(chuàng)建一個(gè)新線程:

BaeldungThreadFactory factory = newBaeldungThreadFactory(

????"BaeldungThreadFactory");

for(inti = 0; i < 10; i++) {

????Thread t = factory.newThread(newTask());

????t.start();

}

2.9 BlockingQueue 阻塞隊(duì)列

在異步編程中案铺,使用最普遍的模式就是 生產(chǎn)者-消費(fèi)者模式。 java.util.concurrent 包中提供的有一個(gè)數(shù)據(jù)類型即眾所周知的BlockingQueue -它在這些異步場(chǎng)景中非常有用梆靖。


2.10 DelayQueue延遲隊(duì)列


DelayQueue是一個(gè)無窮大的阻塞隊(duì)列控汉,它里面的元素只有在它的過期時(shí)間完成時(shí),才會(huì)被拉過來返吻。因此赂鲤,最頂層的元素(頭)將會(huì)擁有最大的延遲藤为,同時(shí)它也是最后一個(gè)被投票的饥臂。

2.11 Locks 鎖

?鎖Lock是用于阻塞其他線程訪問某一段特定代碼的小工具望浩,用于和當(dāng)前正在持有鎖線程分隔開。 Lock和Synchronized block同步代碼塊最主要的區(qū)別是恨课,同步代碼塊主要是被包含在方法中,而Lcok的API lock()以及unlock()方法卻可以在分開的方法中使用岳服。

2.12? Phaser

?比起CyclicBarrier和CountDownLatch來說剂公,Phaser是一個(gè)更靈活的解決方案 -它可以作為一個(gè)可復(fù)用的barrier。動(dòng)態(tài)線程數(shù)在繼續(xù)執(zhí)行之前需要等待這個(gè)barrier.我們可以協(xié)同執(zhí)行的多個(gè)phase吊宋,為每一個(gè)program phase復(fù)用一個(gè) Phaser實(shí)例纲辽。

3. 結(jié)論 Conclusion

在這篇概覽文章中,我們聚焦了java.util.concurrent包中的幾個(gè)不同特性,和往常一樣拖吼,全部的源代碼都在在github上鳞上,地址是:https://github.com/eugenp/tutorials/tree/master/core-java-concurrency

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市吊档,隨后出現(xiàn)的幾起案子篙议,更是在濱河造成了極大的恐慌,老刑警劉巖怠硼,帶你破解...
    沈念sama閱讀 216,692評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件鬼贱,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡香璃,警方通過查閱死者的電腦和手機(jī)这难,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來葡秒,“玉大人姻乓,你說我怎么就攤上這事∶心粒” “怎么了蹋岩?”我有些...
    開封第一講書人閱讀 162,995評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)炸站。 經(jīng)常有香客問我星澳,道長(zhǎng),這世上最難降的妖魔是什么旱易? 我笑而不...
    開封第一講書人閱讀 58,223評(píng)論 1 292
  • 正文 為了忘掉前任禁偎,我火速辦了婚禮,結(jié)果婚禮上阀坏,老公的妹妹穿的比我還像新娘如暖。我一直安慰自己,他們只是感情好忌堂,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,245評(píng)論 6 388
  • 文/花漫 我一把揭開白布盒至。 她就那樣靜靜地躺著,像睡著了一般士修。 火紅的嫁衣襯著肌膚如雪枷遂。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,208評(píng)論 1 299
  • 那天棋嘲,我揣著相機(jī)與錄音酒唉,去河邊找鬼。 笑死沸移,一個(gè)胖子當(dāng)著我的面吹牛痪伦,可吹牛的內(nèi)容都是我干的侄榴。 我是一名探鬼主播,決...
    沈念sama閱讀 40,091評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼网沾,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼癞蚕!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起辉哥,我...
    開封第一講書人閱讀 38,929評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤桦山,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后证薇,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體度苔,經(jīng)...
    沈念sama閱讀 45,346評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,570評(píng)論 2 333
  • 正文 我和宋清朗相戀三年浑度,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了寇窑。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,739評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡箩张,死狀恐怖甩骏,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情先慷,我是刑警寧澤饮笛,帶...
    沈念sama閱讀 35,437評(píng)論 5 344
  • 正文 年R本政府宣布,位于F島的核電站论熙,受9級(jí)特大地震影響福青,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜脓诡,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,037評(píng)論 3 326
  • 文/蒙蒙 一无午、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧祝谚,春花似錦宪迟、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,677評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至席爽,卻和暖如春意荤,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背只锻。 一陣腳步聲響...
    開封第一講書人閱讀 32,833評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工玖像, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人炬藤。 一個(gè)月前我還...
    沈念sama閱讀 47,760評(píng)論 2 369
  • 正文 我出身青樓御铃,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親沈矿。 傳聞我的和親對(duì)象是個(gè)殘疾皇子上真,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,647評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容