該系列文章為翻譯自國(guó)外博客http://www.baeldung.com
1.概覽
java.util.concurrent 包提供了創(chuàng)建多線程應(yīng)用所需的工具荔烧, 在本文中长豁,我們會(huì)對(duì)整個(gè)java.util.concurrent包做一個(gè)概覽夯到。
2.核心組件
? ? .?Executor
? ? .ExecutorService
? ? .ScheduledExecutorService?
? ? .Future?
? ? ?.CountDownLatch?
? ? ?.CyclicBarrier?
? ? ?.Semaphore?
? ? ?.ThreadFactory
? ? ?.BlockingQueue
? ? ?.DelayQueue
? ? ?.Locks
? ? ?.Phaser
針對(duì)上述的每一個(gè)組件弧腥,我們都用專題文章张抄。
2.1?Executor
? Executor代表的是一個(gè)能執(zhí)行給定任務(wù)的接口(interface)孤页。一個(gè)任務(wù)應(yīng)該在一個(gè)新線程上運(yùn)行還是在一個(gè)并發(fā)線程上運(yùn)行,取決于特定的實(shí)現(xiàn)(調(diào)用從何處開始) 奕纫。因此提陶,使用這個(gè)接口我們可以把任務(wù)執(zhí)行流從實(shí)際的任務(wù)執(zhí)行鏈中相分離。
不過有一點(diǎn)需要注意匹层,Executor并不嚴(yán)格地要求任務(wù)執(zhí)行一定要是異步的隙笆。在下面這個(gè)簡(jiǎn)單得例子中,一個(gè)executor可以立即在當(dāng)前調(diào)用的線程中執(zhí)行已提交的任務(wù)task.
我們需要建立一個(gè)invoker去創(chuàng)建executor實(shí)例:
publicclassInvoker implementsExecutor {
????@Override
? ? ?publicvoidexecute(Runnable r) {
????????r.run();
????}
}
現(xiàn)在升筏,我們就可以使用這個(gè)invoker來執(zhí)行任務(wù)啦撑柔。
publicvoidexecute() {
????Executor executor = newInvoker();
????executor.execute( () -> {
????????// 要執(zhí)行的任務(wù)
????});
}
注意:如果這個(gè)executor不能接受這個(gè)要執(zhí)行的任務(wù),它會(huì)拋出RejectedExecutionException異常您访。
2.2?ExecutorService
ExecutorService是一個(gè)針對(duì)異步處理的完整解決方案铅忿,它負(fù)責(zé)管理一個(gè)內(nèi)存隊(duì)列并且基于線程的可用性調(diào)度已提交的任務(wù)。
?在使用ExecutorService 之前灵汪,我們需要?jiǎng)?chuàng)建一個(gè)Runnable類.
publicclassTask implementsRunnable {
????@Override
????publicvoidrun() {
????????// task details
????}
}
現(xiàn)在我們就能創(chuàng)建一個(gè)ExecutorService實(shí)例并分配這個(gè)任務(wù)辆沦。在創(chuàng)建的時(shí)候,我們需要指定線程池的大小识虚。
ExecutorService executor = Executors.newFixedThreadPool(10);
如果你想創(chuàng)建一個(gè)單線程的ExecutorService實(shí)例,我們可以使用new SingleThreadExecutor(ThreadFactory threadFactory) 創(chuàng)建這個(gè)實(shí)例妒茬。
一旦這個(gè)executor被創(chuàng)建担锤,我們就可以使用它來提交任務(wù)了。
publicvoidexecute() {
????executor.submit(newTask());
}
當(dāng)然乍钻,我們也能在提交任務(wù)的時(shí)候肛循,創(chuàng)建這個(gè)Runnable實(shí)例、
executor.submit(() -> {
????newTask();
});
它同時(shí)也提供了兩個(gè)十分便捷地結(jié)束運(yùn)行的方法银择,第一個(gè)是shutdown()多糠,它會(huì)一直等待到所有被提交的任務(wù)結(jié)束運(yùn)行。另一個(gè)方法是 shutdownNow() ,這個(gè)方法會(huì)立即終結(jié)所有的pending或executing 的任務(wù)浩考。
還有一個(gè)方法 awaitTermination(long timeout,TimeUnit unit) ,在一個(gè)關(guān)閉時(shí)間觸發(fā)或 執(zhí)行超時(shí)發(fā)生 又或者 執(zhí)行線程自身被中斷之后夹孔, 這個(gè)方法 會(huì)強(qiáng)制阻塞直到所有的所有的任務(wù)完成運(yùn)行。
try{
????executor.awaitTermination( 20l, TimeUnit.NANOSECONDS );
} catch(InterruptedException e) {
????e.printStackTrace();
}
2.3?ScheduledExecutorService
ScheduledExecutorService 是一個(gè)類似于 ExecutorService的接口,但是它可以周期性地執(zhí)行任務(wù)搭伤。Executor和ExecutorService的方法會(huì)立刻被調(diào)度沒有任何的人為延遲只怎。0或任何負(fù)數(shù)都表明這個(gè)請(qǐng)求需要立即執(zhí)行。
我們可以使用Runnable和Callable接口來定義task:
publicvoidexecute() {
????ScheduledExecutorService executorService
??????= Executors.newSingleThreadScheduledExecutor();
????Future future = executorService.schedule(() -> {
????????// ...
????????return"Hello world";
????}, 1, TimeUnit.SECONDS);
????ScheduledFuture scheduledFuture = executorService.schedule(() -> {
????????// ...
????}, 1, TimeUnit.SECONDS);
?executorService.shutdown();
}
ScheduledExecutorService也可以在一個(gè)給定的延遲時(shí)間后再調(diào)度這個(gè)任務(wù):
executorService.scheduleAtFixedRate(() -> {
????// ...
}, 1, 10, TimeUnit.SECONDS);
executorService.scheduleWithFixedDelay(() -> {
????// ...
}, 1, 10, TimeUnit.SECONDS);
這里怜俐,scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit ) 方法會(huì)創(chuàng)建并執(zhí)行一個(gè)周期性的行為身堡,
? 在給定的初始延遲后,它會(huì)被首次執(zhí)行拍鲤,緊接著在 給定的周期內(nèi)陸續(xù)被調(diào)用贴谎,直到這個(gè)服務(wù)實(shí)例關(guān)閉(shutdown).
? scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit ) 方法會(huì)創(chuàng)建并執(zhí)行一個(gè)周期性的行為。
? 在給定的初始延遲后被首次調(diào)用季稳,并在再給定的正在執(zhí)行的和下一個(gè)被調(diào)用的之間的延遲時(shí)間內(nèi)重復(fù)擅这。
2.4 Future
Future是用于表示一個(gè)異步操作的結(jié)果。它同時(shí)還伴隨有檢查異步操作是否完成的方法以及獲取計(jì)算結(jié)果的方法等等绞幌。另外蕾哟,cancel(boolean mayInterruptIfRunning) 這個(gè)API會(huì)取消操作并且釋放正在執(zhí)行的線程。如果mayInterruptIfRunning的值為true的話莲蜘,正在執(zhí)行任務(wù)的線程將會(huì)被立即終結(jié)谭确。否則的話,就允許進(jìn)行中的任務(wù)完成票渠。
我們可以使用下面的代碼片去創(chuàng)建一個(gè)future 實(shí)例:
publicvoidinvoke() {
????ExecutorService executorService = Executors.newFixedThreadPool(10);
????Future future = executorService.submit(() -> {
? ? ? // ...
? ? ? Thread.sleep(10000l);
????????return"Hello world";
????});
}
我們可以使用下面的代碼片去檢查這個(gè)future結(jié)果已經(jīng)準(zhǔn)備好并且在運(yùn)算完成時(shí)逐哈,獲取數(shù)據(jù):
if(future.isDone() && !future.isCancelled()) {
????try{
????????str = future.get();
????} catch(InterruptedException | ExecutionException e) {
????????e.printStackTrace();
????}
}
我們也能為一個(gè)指定的操作指定超時(shí)時(shí)間,如果這個(gè)任務(wù)花費(fèi)的時(shí)間超過了指定值问顷,就會(huì)拋出
? 一個(gè)TimeoutException:
try{
????future.get(10, TimeUnit.SECONDS);
} catch(InterruptedException | ExecutionException | TimeoutException e) {
????e.printStackTrace();
}
2.5?CountDownLatch
CountDownLatch 是一個(gè)功能類昂秃,它能阻塞一組線程直到一些操作完成。CountDownLatch在初始化時(shí)杜窄,會(huì)伴隨著一個(gè)counter(Integer type);
當(dāng)依賴的線程結(jié)束運(yùn)行時(shí)肠骆,該計(jì)數(shù)器會(huì)遞減。但是塞耕,一旦計(jì)數(shù)器的值到達(dá)0蚀腿,其他線程就會(huì)被釋放。關(guān)于CountDownLatch的詳情請(qǐng)點(diǎn)擊這里扫外。
2.6??CyclicBarrier
CyclicBarrier 的作用幾乎和CountDownLatch 一樣莉钙,除了 我們可以復(fù)用它。? 不像CountDownLatch,在調(diào)用最終的任務(wù)之前筛谚,它允許多線程使用 await()方法(就是大家所熟知的 barrier 條件) 等待彼此磁玉。我們需要?jiǎng)?chuàng)建一個(gè)Runnable task實(shí)例 來初始化這個(gè)barrier 條件:
publicclassTask implementsRunnable {
????privateCyclicBarrier barrier;
? ? publicTask(CyclicBarrier barrier) {
????????this.barrier = barrier;
????}
????@Override
????publicvoidrun() {
????????try{
????????????LOG.info(Thread.currentThread().getName() +
??????????????" is waiting");
????????????barrier.await();
????????????LOG.info(Thread.currentThread().getName() +
??????????????" is released");
????????} catch(InterruptedException | BrokenBarrierException e) {
????????????e.printStackTrace();
????????}
? }
}
現(xiàn)在我們可以調(diào)用一些線程去競(jìng)爭(zhēng)這個(gè)barrier 條件:
publicvoidstart() {
????CyclicBarrier cyclicBarrier = newCyclicBarrier(3, () -> {
????????// ...
????????LOG.info("All previous tasks are completed");
????});
? ? ?Thread t1 = newThread(newTask(cyclicBarrier), "T1");
????Thread t2 = newThread(newTask(cyclicBarrier), "T2");
????Thread t3 = newThread(newTask(cyclicBarrier), "T3");
????if(!cyclicBarrier.isBroken()) {
????????t1.start();
????????t2.start();
????????t3.start();
????}
}
這里,isBroken()方法會(huì)檢查是否有線程在執(zhí)行期間被中斷驾讲,我們應(yīng)該在執(zhí)行實(shí)際的處理之前先執(zhí)行該檢查蚊伞。
2.7?Semaphore
Semaphore是用于阻塞對(duì)某些物理或邏輯資源的線程級(jí)別的訪問席赂。一個(gè)semaphore包含了一套許可證;當(dāng)一個(gè)線程試圖進(jìn)入臨界區(qū)時(shí)厚柳,它需要檢查
? 該semaphore,以便于知道有沒有可用的許可證氧枣。
? 如果許可證不可用的話(通過 tryAcquire()),線程不允許跳到臨界區(qū);然而别垮,如果許可證可用便监,訪問被準(zhǔn)許的話,該許可證計(jì)數(shù)器(the permit counter)就會(huì)減少.
? 一旦正在執(zhí)行的線程釋放臨界區(qū)碳想,許可證計(jì)數(shù)器的值又會(huì)被再一次增加(通過release()方法來完成)烧董。
? 通過使用? tryAcquire(long timeout,TimeUnit unit)方法,我們?cè)讷@取訪問時(shí)指定超時(shí)時(shí)間胧奔。
? 下面的代碼片段可用于實(shí)現(xiàn)一個(gè)semaphore:
staticSemaphore semaphore = newSemaphore(10);
publicvoidexecute() throwsInterruptedException {
????LOG.info("Available permit : "+ semaphore.availablePermits());
????LOG.info("Number of threads waiting to acquire: "+
??????semaphore.getQueueLength());
????if(semaphore.tryAcquire()) {
????????semaphore.acquire();
????????// ...
????????semaphore.release();
????}
}
使用semaphore逊移,我們可以實(shí)現(xiàn)一個(gè)像數(shù)據(jù)結(jié)構(gòu)一樣的互斥量。更多詳情龙填,請(qǐng)看胳泉。
2.8?ThreadFactory 線程工廠
正如名字所提示的意思,ThreadFactory 表現(xiàn)的就像線程池一樣岩遗,它會(huì)在需要時(shí)創(chuàng)建新線程扇商。在實(shí)現(xiàn)有效的線程創(chuàng)建鏈時(shí),ThreadFactory消除了許多樣板化的代碼宿礁。
?我們可以定義一個(gè)ThreadFactory:
publicclassBaeldungThreadFactory implementsThreadFactory {
????privateintthreadId;
????privateString name;
????publicBaeldungThreadFactory(String name) {
????????threadId = 1;
????????this.name = name;
????}
????@Override
????publicThread newThread(Runnable r) {
????????Thread t = newThread(r, name + "-Thread_"+ threadId);
????????LOG.info("created new thread with id : "+ threadId +
????????????" and name : "+ t.getName());
????????threadId++;
????????returnt;
????}
}
我們可以使用這個(gè)newThread(Runnable r)方法在運(yùn)行時(shí)去創(chuàng)建一個(gè)新線程:
BaeldungThreadFactory factory = newBaeldungThreadFactory(
????"BaeldungThreadFactory");
for(inti = 0; i < 10; i++) {
????Thread t = factory.newThread(newTask());
????t.start();
}
2.9 BlockingQueue 阻塞隊(duì)列
在異步編程中案铺,使用最普遍的模式就是 生產(chǎn)者-消費(fèi)者模式。 java.util.concurrent 包中提供的有一個(gè)數(shù)據(jù)類型即眾所周知的BlockingQueue -它在這些異步場(chǎng)景中非常有用梆靖。
2.10 DelayQueue延遲隊(duì)列
DelayQueue是一個(gè)無窮大的阻塞隊(duì)列控汉,它里面的元素只有在它的過期時(shí)間完成時(shí),才會(huì)被拉過來返吻。因此赂鲤,最頂層的元素(頭)將會(huì)擁有最大的延遲藤为,同時(shí)它也是最后一個(gè)被投票的饥臂。
2.11 Locks 鎖
?鎖Lock是用于阻塞其他線程訪問某一段特定代碼的小工具望浩,用于和當(dāng)前正在持有鎖線程分隔開。 Lock和Synchronized block同步代碼塊最主要的區(qū)別是恨课,同步代碼塊主要是被包含在方法中,而Lcok的API lock()以及unlock()方法卻可以在分開的方法中使用岳服。
2.12? Phaser
?比起CyclicBarrier和CountDownLatch來說剂公,Phaser是一個(gè)更靈活的解決方案 -它可以作為一個(gè)可復(fù)用的barrier。動(dòng)態(tài)線程數(shù)在繼續(xù)執(zhí)行之前需要等待這個(gè)barrier.我們可以協(xié)同執(zhí)行的多個(gè)phase吊宋,為每一個(gè)program phase復(fù)用一個(gè) Phaser實(shí)例纲辽。
3. 結(jié)論 Conclusion
在這篇概覽文章中,我們聚焦了java.util.concurrent包中的幾個(gè)不同特性,和往常一樣拖吼,全部的源代碼都在在github上鳞上,地址是:https://github.com/eugenp/tutorials/tree/master/core-java-concurrency