Java 8 并發(fā)教程:線程和執(zhí)行器
譯者:BlankKelly
歡迎閱讀我的Java8并發(fā)教程的第一部分凭迹。這份指南將會以簡單易懂的代碼示例來教給你如何在Java8中進(jìn)行并發(fā)編程。這是一系列教程中的第一部分。在接下來的15分鐘,你將會學(xué)會如何通過線程淑掌,任務(wù)(tasks)和 exector services來并行執(zhí)行代碼。
- 第一部分:線程和執(zhí)行器
- 第二部分:同步和鎖
- 第三部分:原子操作和 ConcurrentMap
并發(fā)在Java5中首次被引入并在后續(xù)的版本中不斷得到增強(qiáng)趋距。在這篇文章中介紹的大部分概念同樣適用于以前的Java版本月劈。不過我的代碼示例聚焦于Java8,大量使用lambda表達(dá)式和其他新特性敌买。如果你對lambda表達(dá)式不屬性简珠,我推薦你首先閱讀我的Java 8 教程。
Thread
和 Runnable
所有的現(xiàn)代操作系統(tǒng)都通過進(jìn)程和線程來支持并發(fā)虹钮。進(jìn)程是通常彼此獨(dú)立運(yùn)行的程序的實(shí)例聋庵,比如,如果你啟動了一個Java程序芙粱,操作系統(tǒng)產(chǎn)生一個新的進(jìn)程祭玉,與其他程序一起并行執(zhí)行。在這些進(jìn)程的內(nèi)部春畔,我們使用線程并發(fā)執(zhí)行代碼脱货,因此,我們可以最大限度的利用CPU可用的核心(core)律姨。
Java從JDK1.0開始執(zhí)行線程振峻。在開始一個新的線程之前,你必須指定由這個線程執(zhí)行的代碼择份,通常稱為task扣孟。這可以通過實(shí)現(xiàn)Runnable
——一個定義了一個無返回值無參數(shù)的run()
方法的函數(shù)接口,如下面的代碼所示:
Runnable task = () -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);
};
task.run();
Thread thread = new Thread(task);
thread.start();
System.out.println("Done!");
因為Runnable
是一個函數(shù)接口缓淹,所以我們利用lambda表達(dá)式將當(dāng)前的線程名打印到控制臺哈打。首先塔逃,在開始一個線程前我們在主線程中直接運(yùn)行runnable。
控制臺輸出的結(jié)果可能像下面這樣:
Hello main
Hello Thread-0
Done!
或者這樣:
Hello main
Done!
Hello Thread-0
由于我們不能預(yù)測這個runnable是在打印'done'前執(zhí)行還是在之后執(zhí)行料仗。順序是不確定的湾盗,因此在大的程序中編寫并發(fā)程序是一個復(fù)雜的任務(wù)。
我們可以將線程休眠確定的時間立轧。在這篇文章接下來的代碼示例中我們可以通過這種方法來模擬長時間運(yùn)行的任務(wù)格粪。
Runnable runnable = () -> {
try {
String name = Thread.currentThread().getName();
System.out.println("Foo " + name);
TimeUnit.SECONDS.sleep(1);
System.out.println("Bar " + name);
}
catch (InterruptedException e) {
e.printStackTrace();
}
};
Thread thread = new Thread(runnable);
thread.start();
當(dāng)你運(yùn)行上面的代碼時,你會注意到在第一條打印語句和第二條打印語句之間存在一分鐘的延遲氛改。TimeUnit
在處理單位時間時一個有用的枚舉類帐萎。你可以通過調(diào)用Thread.sleep(1000)
來達(dá)到同樣的目的。
使用Thread
類是很單調(diào)的且容易出錯胜卤。由于并發(fā)API在2004年Java5發(fā)布的時候才被引入疆导。這些API位于java.util.concurrent
包下,包含很多處理并發(fā)編程的有用的類葛躏。自從這些并發(fā)API引入以來澈段,在隨后的新的Java版本發(fā)布過程中得到不斷的增強(qiáng),甚至Java8提供了新的類和方法來處理并發(fā)舰攒。
接下來败富,讓我們走進(jìn)并發(fā)API中最重要的一部——executor services。
Executor
并發(fā)API引入了ExecutorService
作為一個在程序中直接使用Thread的高層次的替換方案摩窃。Executos支持運(yùn)行異步任務(wù)兽叮,通常管理一個線程池,這樣一來我們就不需要手動去創(chuàng)建新的線程猾愿。在不斷地處理任務(wù)的過程中鹦聪,線程池內(nèi)部線程將會得到復(fù)用,因此匪蟀,在我們可以使用一個executor service來運(yùn)行和我們想在我們整個程序中執(zhí)行的一樣多的并發(fā)任務(wù)椎麦。
下面是使用executors的第一個代碼示例:
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);
});
// => Hello pool-1-thread-1
Executors
類提供了便利的工廠方法來創(chuàng)建不同類型的 executor services。在這個示例中我們使用了一個單線程線程池的 executor材彪。
代碼運(yùn)行的結(jié)果類似于上一個示例观挎,但是當(dāng)運(yùn)行代碼時,你會注意到一個很大的差別:Java進(jìn)程從沒有停止段化!Executors必須顯式的停止-否則它們將持續(xù)監(jiān)聽新的任務(wù)嘁捷。
ExecutorService
提供了兩個方法來達(dá)到這個目的——shutdwon()
會等待正在執(zhí)行的任務(wù)執(zhí)行完而shutdownNow()
會終止所有正在執(zhí)行的任務(wù)并立即關(guān)閉execuotr。
這是我喜歡的通常關(guān)閉executors的方式:
try {
System.out.println("attempt to shutdown executor");
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
System.err.println("tasks interrupted");
}
finally {
if (!executor.isTerminated()) {
System.err.println("cancel non-finished tasks");
}
executor.shutdownNow();
System.out.println("shutdown finished");
}
executor通過等待指定的時間讓當(dāng)前執(zhí)行的任務(wù)終止來“溫柔的”關(guān)閉executor显熏。在等待最長5分鐘的時間后雄嚣,execuote最終會通過中斷所有的正在執(zhí)行的任務(wù)關(guān)閉。
Callable
和 Future
除了Runnable
,executor還支持另一種類型的任務(wù)——Callable
缓升。Callables也是類似于runnables的函數(shù)接口鼓鲁,不同之處在于,Callable返回一個值港谊。
下面的lambda表達(dá)式定義了一個callable:在休眠一分鐘后返回一個整數(shù)骇吭。
Callable<Integer> task = () -> {
try {
TimeUnit.SECONDS.sleep(1);
return 123;
}
catch (InterruptedException e) {
throw new IllegalStateException("task interrupted", e);
}
};
Callbale也可以像runnbales一樣提交給 executor services。但是callables的結(jié)果怎么辦歧寺?因為submit()
不會等待任務(wù)完成燥狰,executor service不能直接返回callable的結(jié)果。不過斜筐,executor 可以返回一個Future
類型的結(jié)果龙致,它可以用來在稍后某個時間取出實(shí)際的結(jié)果。
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);
System.out.println("future done? " + future.isDone());
Integer result = future.get();
System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);
在將callable提交給exector之后顷链,我們先通過調(diào)用isDone()
來檢查這個future是否已經(jīng)完成執(zhí)行目代。我十分確定這會發(fā)生什么,因為在返回那個整數(shù)之前callable會休眠一分鐘蕴潦、
在調(diào)用get()
方法時像啼,當(dāng)前線程會阻塞等待,直到callable在返回實(shí)際的結(jié)果123之前執(zhí)行完成√栋現(xiàn)在future執(zhí)行完畢,我們可以在控制臺看到如下的結(jié)果:
future done? false
future done? true
result: 123
Future與底層的executor service緊密的結(jié)合在一起真朗。記住此疹,如果你關(guān)閉executor,所有的未中止的future都會拋出異常遮婶。
executor.shutdownNow();
future.get();
你可能注意到我們這次創(chuàng)建executor的方式與上一個例子稍有不同蝗碎。我們使用newFixedThreadPool(1)
來創(chuàng)建一個單線程線程池的 execuot service。
這等同于使用newSingleThreadExecutor
不過使用第二種方式我們可以稍后通過簡單的傳入一個比1大的值來增加線程池的大小旗扑。
超時
任何future.get()
調(diào)用都會阻塞蹦骑,然后等待直到callable中止。在最糟糕的情況下臀防,一個callable持續(xù)運(yùn)行——因此使你的程序?qū)]有響應(yīng)眠菇。我們可以簡單的傳入一個時長來避免這種情況。
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(() -> {
try {
TimeUnit.SECONDS.sleep(2);
return 123;
}
catch (InterruptedException e) {
throw new IllegalStateException("task interrupted", e);
}
});
future.get(1, TimeUnit.SECONDS);
運(yùn)行上面的代碼將會產(chǎn)生一個TimeoutException
:
Exception in thread "main" java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
你可能已經(jīng)猜到俄為什么會排除這個異常袱衷。我們指定的最長等待時間為1分鐘捎废,而這個callable在返回結(jié)果之前實(shí)際需要兩分鐘。
invokeAll
Executors支持通過invokeAll()
一次批量提交多個callable致燥。這個方法結(jié)果一個callable的集合登疗,然后返回一個future的列表。
ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<String>> callables = Arrays.asList(
() -> "task1",
() -> "task2",
() -> "task3");
executor.invokeAll(callables)
.stream()
.map(future -> {
try {
return future.get();
}
catch (Exception e) {
throw new IllegalStateException(e);
}
})
.forEach(System.out::println);
在這個例子中,我們利用Java8中的函數(shù)流(stream)來處理invokeAll()
調(diào)用返回的所有future辐益。我們首先將每一個future映射到它的返回值断傲,然后將每個值打印到控制臺。如果你還不屬性stream智政,可以閱讀我的Java8 Stream 教程认罩。
invokeAny
批量提交callable的另一種方式就是invokeAny()
,它的工作方式與invokeAll()
稍有不同女仰。在等待future對象的過程中猜年,這個方法將會阻塞直到第一個callable中止然后返回這一個callable的結(jié)果。
為了測試這種行為疾忍,我們利用這個幫助方法來模擬不同執(zhí)行時間的callable乔外。這個方法返回一個callable,這個callable休眠指定 的時間直到返回給定的結(jié)果一罩。
Callable<String> callable(String result, long sleepSeconds) {
return () -> {
TimeUnit.SECONDS.sleep(sleepSeconds);
return result;
};
}
我們利用這個方法創(chuàng)建一組callable杨幼,這些callable擁有不同的執(zhí)行時間,從1分鐘到3分鐘聂渊。通過invokeAny()
將這些callable提交給一個executor差购,返回最快的callable的字符串結(jié)果-在這個例子中為任務(wù)2:
ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<String>> callables = Arrays.asList(
callable("task1", 2),
callable("task2", 1),
callable("task3", 3));
String result = executor.invokeAny(callables);
System.out.println(result);
// => task2
上面這個例子又使用了另一種方式來創(chuàng)建executor——調(diào)用newWorkStealingPool()
。這個工廠方法是Java8引入的汉嗽,返回一個ForkJoinPool
類型的 executor欲逃,它的工作方法與其他常見的execuotr稍有不同。與使用一個固定大小的線程池不同饼暑,ForkJoinPools
使用一個并行因子數(shù)來創(chuàng)建稳析,默認(rèn)值為主機(jī)CPU的可用核心數(shù)。
ForkJoinPools 在Java7時引入弓叛,將會在這個系列后面的教程中詳細(xì)講解彰居。讓我們深入了解一下 scheduled executors 來結(jié)束本次教程。
ScheduledExecutor
我們已經(jīng)學(xué)習(xí)了如何在一個 executor 中提交和運(yùn)行一次任務(wù)撰筷。為了持續(xù)的多次執(zhí)行常見的任務(wù)陈惰,我們可以利用調(diào)度線程池。
ScheduledExecutorService
支持任務(wù)調(diào)度毕籽,持續(xù)執(zhí)行或者延遲一段時間后執(zhí)行抬闯。
下面的實(shí)例,調(diào)度一個任務(wù)在延遲3分鐘后執(zhí)行:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);
TimeUnit.MILLISECONDS.sleep(1337);
long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);
調(diào)度一個任務(wù)將會產(chǎn)生一個專門的future類型——ScheduleFuture
影钉,它除了提供了Future的所有方法之外画髓,他還提供了getDelay()
方法來獲得剩余的延遲。在延遲消逝后平委,任務(wù)將會并發(fā)執(zhí)行奈虾。
為了調(diào)度任務(wù)持續(xù)的執(zhí)行,executors 提供了兩個方法scheduleAtFixedRate()
和scheduleWithFixedDelay()
。第一個方法用來以固定頻率來執(zhí)行一個任務(wù)肉微,比如匾鸥,下面這個示例中,每分鐘一次:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
int initialDelay = 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);
另外碉纳,這個方法還接收一個初始化延遲勿负,用來指定這個任務(wù)首次被執(zhí)行等待的時長。
請記桌筒堋:scheduleAtFixedRate()
并不考慮任務(wù)的實(shí)際用時奴愉。所以,如果你指定了一個period為1分鐘而任務(wù)需要執(zhí)行2分鐘铁孵,那么線程池為了性能會更快的執(zhí)行锭硼。
在這種情況下,你應(yīng)該考慮使用scheduleWithFixedDelay()
蜕劝。這個方法的工作方式與上我們上面描述的類似檀头。不同之處在于等待時間 period 的應(yīng)用是在一次任務(wù)的結(jié)束和下一個任務(wù)的開始之間。例如:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("Scheduling: " + System.nanoTime());
}
catch (InterruptedException e) {
System.err.println("task interrupted");
}
};
executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);
這個例子調(diào)度了一個任務(wù)岖沛,并在一次執(zhí)行的結(jié)束和下一次執(zhí)行的開始之間設(shè)置了一個1分鐘的固定延遲暑始。初始化延遲為0,任務(wù)執(zhí)行時間為0婴削。所以我們分別在0s,3s,6s,9s等間隔處結(jié)束一次執(zhí)行廊镜。如你所見,scheduleWithFixedDelay()
在你不能預(yù)測調(diào)度任務(wù)的執(zhí)行時長時是很有用的唉俗。
這是并發(fā)系列教程的第一部分期升。我推薦你親手實(shí)踐一下上面的代碼示例。你可以從 Github 上找到這篇文章中所有的代碼示例互躬,所以歡迎你fork這個倉庫,并收藏它颂郎。
我希望你會喜歡這篇文章吼渡。如果你有任何的問題都可以在下面評論或者通過 Twitter 向我反饋。
- 第一部分:線程和執(zhí)行器
- 第二部分:同步和鎖
- 第三部分:原子操作和 ConcurrentMap