1.介紹
這篇文章將介紹一下java中的線程池-一開始我們先看一下java標準庫中的實現(xiàn),之后再看一下Google的Guava庫中的實現(xiàn)。
2.線程池
在java中凶赁,線程會被映射到系統(tǒng)級別的線程也就是操作系統(tǒng)的資源咧栗。如果你不可控地創(chuàng)建線程,那么你就有可能很快地耗盡這些資源虱肄。線程間的上下文切換也是由操作系統(tǒng)來完成的-為了模擬并發(fā)致板。一個過于簡單化的觀點是:你生產的線程越多,每一個線程在處理實際工作時咏窿,花費的時間越少斟或。
多線程模式可以幫助我們在多線程程序中有效地節(jié)省資源并且在一定的限制下確保并發(fā)性。 在你使用線程池時集嵌,你會以并行任務的形式編寫你的并發(fā)代碼并且把他們提交給一個線程池實例運行萝挤。這個實例控制著多個可復用的線程御毅。
?多線程模式允許你控制應用程序所創(chuàng)建的線程數(shù)量,它們的生命周期平斩,以及調度這些任務的執(zhí)行并且把進來的線程保存到一個隊列中亚享。
3.java中線程池
?3.1 Executors, Executor 以及ExecutorService
Executor 類為你包含了多個用于創(chuàng)建預配置的線程池實例的方法,使用它你無需進行任何自定義的調整绘面。Executor和ExecutorService接口是用于和不同線程池實現(xiàn)一起工作的欺税。通常,你應該把你的代碼和線程池的實現(xiàn)相隔離揭璃,并且在你的應用程序中使用這些接口晚凿。
Executor接口有一個單獨的execute()方法,這個方法可以用于提交那些需要運行的Runnable實例瘦馍。
? ? 這里有一個簡單的例子歼秽,講述了如何使用Executor API來獲取一個由單個線程池支持的Executor實例? 以及一個用于次序地執(zhí)行任務的無界隊列。這里情组,我們執(zhí)行了一個單任務燥筷,這個單任務就是簡單地在屏幕上打印"Hello World",該任務會作為一個lambda表達式被提交給ExecutorService院崇。
Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> System.out.println("Hello World"));
ExecutorService 接口包含了大量用于控制任務處理以及管理服務結束的方法肆氓。使用這個接口,你可以提交要執(zhí)行的任務并且通過使用返回的Future實例 底瓣,你可以控制這些任務的執(zhí)行谢揪。在下面的例子中,我們創(chuàng)建了一個ExecutorService捐凭,提交了一個任務拨扶,之后使用其返回的Future對象的get()方法會導致等待直到提交的任務完成并且有返回值返回:
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future future = executorService.submit(() -> "Hello World");
// some operations
String result = future.get();
當然,在現(xiàn)實生活中的場景下茁肠,通常你并不想立即調用future.get()方法患民,而是推遲到在你真正需要這個返回值的時候再調用它。
另外垦梆,submit方法也有兩種重載形式匹颤,要么接收一個Runnable接口或者接收一個Callable接口,這倆個接口都是很實用的接口奶赔,它們都能作為lambda表達式被傳遞過去惋嚎。
Runnable接口的單一方法不會拋出異常也不會返回值杠氢。而Callable接口可能就更加方便了站刑,因為它允許拋出異常并且返回值。最后-為了能讓編譯器 推斷出這是一個Callable 類型鼻百,只需簡單地從lambda表達式中返回一個值即可绞旅。
更多使用ExecutorService接口和future的例子摆尝,請查看http://www.baeldung.com/java-executor-service-tutorial。
3.2 ThreadPoolExecutor
ThreadPoolExecutor是一個可擴展的線程池實現(xiàn)因悲,它提供了許多參數(shù)和用于調整的鉤子(with lots of parameters and hooks for fine-tuning)
我們接下來主要討論的配置參數(shù)是: corePoolSize堕汞, maximumPoolSize 以及keepAliveTime。
線程池會由一定固定數(shù)量的核心線程組成晃琳,這些線程會一直保存在線程池中讯检。其他額外的線程會在需要時被生產出來,并且在不需要時被終結掉卫旱。該corePoolSize參數(shù)就是核心線程的數(shù)量人灼,這些線程會在實例化并且一直保存在線程池中。如果所有的核心線程都繁忙顾翼,同時又有更多的任務被提交上來投放,那么,該線程池就會擴大到maximumPoolSize數(shù)量适贸。keepAliveTime參數(shù)(本姆迹活時間)是一個時間間隔,它用于控制那些額外創(chuàng)建的線程在空閑狀態(tài)下能夠存活時間間隔拜姿。
這些參數(shù)覆蓋了廣泛的用例烙样,但是 大多數(shù)典型的配置都被預定義在Executor的靜態(tài)方法中。
例如砾隅,newFixedThreadPool方法會創(chuàng)建一個ThreadPoolExecutor,該線程池的corePoolSize和maximumPoolSize參數(shù)值是相等的误阻,并且keepAliveTime為0,這也就意味著:該線程池中的線程數(shù)量總是固定不變的:
ThreadPoolExecutor executor =? (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
executor.submit(() -> {
????Thread.sleep(1000);
????returnnull;
});
executor.submit(() -> {
????Thread.sleep(1000);
????returnnull;
});
executor.submit(() -> {
????Thread.sleep(1000);
????returnnull;
});
assertEquals(2, executor.getPoolSize());
assertEquals(1, executor.getQueue().size());
在上面的例子中晴埂,我們用2個固定數(shù)量的線程實例化了一個ThreadPoolExecutor究反,這意味著,如果同時運行的任務數(shù)小于等于2的話儒洛,它們能立馬被運行精耐。否則的話,它們中的一些任務會被放入到一個隊列中等待運行(put into a queue to wait for their turn)琅锻。
我們創(chuàng)建了三個Callable任務卦停,并且讓它們睡眠1000毫秒從而模擬一個繁重的任務。頭倆個任務會被立即執(zhí)行恼蓬,但是第三個將不得不在隊列中等待惊完,我們可以在提交這些任務之后,調用getPoolSize()和getQueueSize()方法來驗證处硬。
另一個預配置的ThreadPoolExecutor可以使用Executor.newCachedThreadPool()方法來創(chuàng)建小槐,該方法不會接收線程數(shù)量,corePoolSize被設置為0,并且maximumPoolSize被設置為Integer.MAX_VALUE,keepAliveTime被設置為60s凿跳。
這些參數(shù)值意味著:該CachedThreadPool線程池可以無限地增長以容納提交任意數(shù)量的任務件豌。但是當那些線程不再需要時,如果它們在60s內不活躍的話控嗜,就會被處理掉茧彤。
一個典型的用例(use case)就是 當在你的應用中存在許多短期存活的任務時,CachedThreadPool線程池將會是最佳的選擇疆栏。
ThreadPoolExecutor executor =
??(ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.submit(() -> {
????Thread.sleep(1000);
????returnnull;
});
executor.submit(() -> {
????Thread.sleep(1000);
????returnnull;
});
executor.submit(() -> {
????Thread.sleep(1000);
????returnnull;
});
assertEquals(3, executor.getPoolSize());
assertEquals(0, executor.getQueue().size());
上面例子中的隊列大小總會是0曾掂,因為它在內部使用的是一個SynchronousQueue實例,在Synchronous實例中壁顶,成對的insert和remove操作總是同時發(fā)生遭殉,因此,這個隊列中不會包含任何東西博助。
Executors.newSingleThreadExecutor() 會創(chuàng)建一個典型的只包含單個線程的ThreadPoolExecutor险污,這個單線程的executor對于創(chuàng)建事件線程來說是非常理想的。
它的corePoolSize以及maximumPoolSize參數(shù)大小都等于1富岳,并且keepAliveTime為0蛔糯。上面案例中的任務會被順序地執(zhí)行,因此窖式,在這個任務結束之后蚁飒,這個標識的值將會是2.
AtomicInteger counter = newAtomicInteger();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
????counter.set(1);
});
executor.submit(() -> {
????counter.compareAndSet(1, 2);
});
另外,這個ThreadPoolExecutor用了一個互斥的包裝所修飾萝喘,因此淮逻,在它創(chuàng)建之后,就不能重新配置了阁簸。注意:這也就是為什么我們不能把它轉成一個ThreadPoolExecutor的原因爬早。
3.3 ScheduledThreadPooolExecutor
ScheduledThreadPooolExecutor 繼承了ThreadPoolExecutor類,同時實現(xiàn)了ScheduledExecutorService 接口的好幾個方法:
? .schedule方法允許在一個指定的延遲之后启妹,再執(zhí)行一個任務筛严。
? .scheduleAtFixedRate方法則允許在一個指定的初始延遲之后再執(zhí)行任務,并且之后再一定的周期內循環(huán)執(zhí)行該任務饶米。所以這個執(zhí)行頻率是固定的fixed桨啃。
? .scheduleWithFixedDelay 方法類似于scheduleAtFixedRate,因為檬输, 它會重復執(zhí)行給定的任務照瘾,但是給定的延遲指的是:上個任務執(zhí)行結束和下個任務開始執(zhí)行之間的間隔;執(zhí)行頻率因執(zhí)行一個給定任務所花費的時間的不同而不同丧慈。
該Executors.newScheduledThreadPool() 方法主要是用于創(chuàng)建一個調度線程池執(zhí)行器ScheduledThreadPoolExecutor析命,該ScheduledThreadPoolExecutor帶有給定corePoolSize大小,無限的maximumPoolSize以及keepAliveTime的值為0.這里有一個如何在500毫秒之后調度一個任務執(zhí)行的案例:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.schedule(() -> {
????System.out.println("Hello World");
}, 500, TimeUnit.MILLISECONDS);
下面的代碼展示了如何在500毫秒的延遲之后,執(zhí)行一個任務碳却,并且之后每隔100毫秒重復執(zhí)行一次。在調度完該任務之后笑旺,我們會一直等待直到它用CountDownLatch鎖倒計時三次昼浦。然后,使用Future.cancel()方法取消它:
CountDownLatch lock = newCountDownLatch(3);
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
ScheduledFuture future = executor.scheduleAtFixedRate(() -> {
????System.out.println("Hello World");
????lock.countDown();
}, 500, 100, TimeUnit.MILLISECONDS);
lock.await(1000, TimeUnit.MILLISECONDS);
future.cancel(true);
3.4 ForkJoinPool
ForkJoinPool是java7中fork/join框架的核心部分筒主,它解決 在遞歸算法中創(chuàng)建多任務的常見問題关噪。使用一個簡單的ThreadPoolExecutor,你將會把線程迅速地耗完乌妙,因為每一個任務或子任務都需要一個它自己的線程來運行使兔。
在fork/join框架中,任一個任務都能創(chuàng)建一定數(shù)量的子任務藤韵,并且使用join方法等待他們完成虐沥。 fork/join框架的好處是:它不會為每一個任務或子任務創(chuàng)建一個新的線程而是去實現(xiàn)Work Stealing 算法。該框架在《帶你進入java中的Fork/join框架》文章中進行了全面的介紹泽艘。
我們來看一個簡單的例子欲险,使用ForkJoinPool遍歷一個樹節(jié)點并且計算所有葉子值得總和。這里是一個樹的簡單實現(xiàn)匹涮,它由一個節(jié)點天试,一個int值,以及一組子節(jié)點構成然低。
static class TreeNode { int value; Set children;
? ? TreeNode(int value, TreeNode... children) {
? ? ? ? this.value = value;
? ? ? ? this.children = Sets.newHashSet(children);
? ? }
}
現(xiàn)在如果我們想并發(fā)地計算樹上的所有值喜每,我們需要實現(xiàn)RecursiveTask接口,每一個任務接收它自己的節(jié)點并且把它的值和它的所有孩子值的總和相加雳攘。
要計算孩子值得總和带兜,該任務實現(xiàn)需要做如下事情:
? .游歷子節(jié)點集合
? .在游歷過程中做映射,為每一個元素新建一個CountingTask
? .fork一個CountingTask來執(zhí)行每一個子任務吨灭。
? .在每一個forked任務上調用join方法來收集處理結果鞋真。
? .使用Collectors.summingInt 集合來總計結果。
public static class CountingTask extends RecursiveTask {
? ? private final TreeNode node;
? ? public CountingTask(TreeNode node) {
? ? ? ? this.node = node;
? ? }
? ? @Override
? ? protected Integer compute() {
? ? ? ? return node.value + node.children.stream()
? ? ? ? ? .map(childNode -> new CountingTask(childNode).fork())
? ? ? ? ? .collect(Collectors.summingInt(ForkJoinTask::join));
? ? }
}
在一個實際的樹上運行該計算的代碼非常簡單:
reeNode tree = new TreeNode(5,
? new TreeNode(3), new TreeNode(2,
? ? new TreeNode(2), new TreeNode(8)));
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
int sum = forkJoinPool.invoke(new CountingTask(tree));
4.Guava中的線程池實現(xiàn)
Guava是一個流行的Google庫沃于。它有許多實用的并發(fā)類涩咖,包括 ExecutorService的幾個handy實現(xiàn)。這些實現(xiàn)都不能直接實例化或繼承繁莹,因此檩互,創(chuàng)建它們實例的唯一入口點就是:MoreExecutor類。
4.1 添加Guava的maven依賴
???? 把下面的maven依賴添加到你的pom文件中咨演,這樣你的項目中就包含了Guava庫啦闸昨。
4.2 DirectExecutor和DirectExecutorService
有時候,你想在當前線程或者一個線程池中執(zhí)行任務,取決于一些條件饵较。你可能更傾向于使用單一的Executor接口拍嵌,然后僅僅切換一下實現(xiàn)。雖然? 實現(xiàn)一個Executor或ExecutorService來讓當前線程執(zhí)行任務并不難循诉,但它仍然要我們寫一些模板化的代碼横辆。
幸運地是,Guava為我們提供了預定義的實例茄猫。
下面有一個例子狈蚤,這個例子表明了在同一個線程中執(zhí)行任務。雖然該任務睡眠了500毫秒划纽,它會阻塞當前線程 但當執(zhí)行完成之后脆侮,結果便是立即可用的。
Executor executor = MoreExecutors.directExecutor();
AtomicBoolean executed = newAtomicBoolean();
executor.execute(() -> {
????try{
????????Thread.sleep(500);
????} catch(InterruptedException e) {
????????e.printStackTrace();
????}
????executed.set(true);
});
assertTrue(executed.get());
directExecutor()方法返回的實際是一個靜態(tài)單例對象勇劣,所以靖避,使用這個方法在對象創(chuàng)建時不會造成任何花銷。
相比較于 MoreExecutors.newDirectExecutorService()比默,你可能更喜歡這個方法筋蓖,因為newDirectExecutorService,會在每一次調用時退敦,都創(chuàng)建一個完整的對象粘咖。
4.3 退出Executor Service
另一個常見的問題是,當線程池還在執(zhí)行它的任務時侈百,你把虛擬機關閉了瓮下。即使有一個恰當?shù)娜∠麢C制,也無法保證任務可以完美地表現(xiàn)钝域,并且在executor service關閉時讽坏,能夠停止它們的工作。這有可能導致在任務的執(zhí)行過程中例证,JVM不定期地掛起路呜。
為了解決這個問題,Guava 引入了一系列退出executor service的機制织咧。它們都是基于守護線程來實現(xiàn)的胀葱,這些守護線程是伴隨著JVM一起終止的。這些service使用Runtime.getRuntime().addShutdownHook()方法增加了一個關閉鉤子笙蒙。這樣抵屿,在放棄掛起任務之前,就可以在一定時間內防止VM虛擬機終止(terminate)捅位。
在下面的例子中轧葛,我們提交了一個包含無限循環(huán)的任務搂抒,但是我們使用了一個現(xiàn)存的executor service,這個executor service會在VM結束時尿扯,等待100毫秒求晶。如果沒有這個exitingExecutorService 的話,這個任務可能會導致虛擬機無限期地掛起衷笋。
ThreadPoolExecutor executor =(ThreadPoolExecutor) Executors.newFixedThreadPool(5);
ExecutorService executorService =MoreExecutors.getExitingExecutorService(executor,? 100, TimeUnit.MILLISECONDS);
executorService.submit(() -> {
????while(true) {
????}
});
4.4 Listening Decorators
Listening Decorators允許你包裝ExecutorService并且在任務提交時芳杏,接收一個ListenableFuture實例而不是一個簡單得Future實例。該ListenableFuture接口繼承了Future接口右莱,并且有一個額外的方法addListener.這個方法允許添加一個監(jiān)聽器,這個監(jiān)聽器會在future完成之后被調用档插。
你很少想直接使用ListenableFuture.addListener() 慢蜓,但是對于 大多數(shù)Future 功能類來說,它確實必須的郭膛。例如晨抡,借助于Futures.allAsList()方法你可以在單個ListenableFuture中組合多個ListenableFuture實例:
ExecutorService executorService = Executors.newCachedThreadPool();
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService);
ListenableFuture future1 =? listeningExecutorService.submit(() -> "Hello");
ListenableFuture future2 =? listeningExecutorService.submit(() -> "World");
String greeting = Futures.allAsList(future1, future2).get()
??.stream()
??.collect(Collectors.joining(" "));
assertEquals("Hello World", greeting);
5.總結
在本篇文章中,我們已經討論了線程池模式和它在java標準庫以及Google的Guava庫中的實現(xiàn)则剃。本篇文章的源代碼都在github上:代碼地址