本篇文章通過服務器通信
和頁面渲染
兩個功能的實現(xiàn)來加深多線程中Future
和Executor
的理解哪痰。
服務器通信
串行執(zhí)行任務
任務執(zhí)行最簡單的策略就是在單線程中串行執(zhí)行各項任務遂赠,并不會涉及多線程。
以創(chuàng)建通訊服務為例晌杰,我們可以這樣實現(xiàn)(很low)
@Test
public void singleThread() throws IOException {
ServerSocket serverSocket= new ServerSocket(8088);
while (true){
Socket conn = serverSocket.accept();
handleRequest(conn);
}
}
代碼很簡單跷睦,理論上沒什么毛病,但是實際使用中只能處理一個請求肋演。但是當處理任務很耗時并且在多次請求時會阻塞無法及時響應抑诸。
由此可見串行處理機制通常都無法提供高吞吐率或快速響應性。
顯式的為任務創(chuàng)建線程
串行執(zhí)行任務這么 low爹殊,我們來通過多線程來處理請求吧:當接收到請求后創(chuàng)建新的線程去執(zhí)行任務蜕乡。new Thread()應該就能實現(xiàn)。
初級版本:
@Test
public void perThreadTask() throws IOException {
ServerSocket serverSocket = new ServerSocket(8088);
while (true) {
Socket conn = serverSocket.accept();
Runnable r = new Runnable() {
@Override
public void run() {
handleRequest(conn);
}
};
new Thread(r).start();
}
}
微弱的優(yōu)點
- 對于每個請求边灭,都創(chuàng)建了一個線程來處理异希,達到多線程并行效果
- 任務處理從主線程分離出來,使得主循環(huán)能更快的處理下一個請求
為每個任務分配一個線程存在一些缺陷绒瘦,尤其當需要創(chuàng)建大量的線程時
- 線程生命周期的開銷非常高称簿。根據(jù)平臺的不同,實際的開銷也不同惰帽。但是線程的創(chuàng)建過程都會需要時間憨降,并且需要 JVM 和操作系統(tǒng)提供一些輔助操作。
- 資源消耗该酗∈谝活躍的線程會消耗系統(tǒng)資源,尤其是內存呜魄。如果可運行的線程數(shù)量多余可用處理器的數(shù)量悔叽,那么有些線程將閑置。大量閑置的線程會占用許多內存爵嗅,給垃圾回收器帶來壓力娇澎。如果你已經擁有足夠多的線程使所有 CPU 保持忙碌狀態(tài),那么多余的線程反而會降低性能睹晒。
- 穩(wěn)定性趟庄。隨著平臺的不同括细,可創(chuàng)建線程數(shù)量的限制是不同的,并受多個因素制約戚啥,包括 JVM 的啟動參數(shù)奋单、Thread 構造函數(shù)中請求的棧大小,以及底層操作系統(tǒng)對線程的限制等猫十。如果破壞了這些限制览濒,很可能拋出 OOM 異常。
<h5>
上面兩種方式都存在一些問題:單線程串行的問題在于其糟糕的響應性和吞吐量炫彩;而為每個任務分配線程的問題在于資源消耗和管理的復雜性匾七。</h5>
<h5>
在 Java 類庫中絮短,任務執(zhí)行的主要抽象不是 Thread江兢,而是 Executor
</h5>
public interface Executor {
void execute(Runnable command);
}
Executor 框架
Executor 基于生產者-消費者模式,提交任務的操作相當于生產者丁频,執(zhí)行任務的線程相當于消費者杉允。
通訊優(yōu)化
對于以前的通訊服務我們可以用 Executor
進一步優(yōu)化一下
@Test
public void limitExecutorTask() throws IOException {
final int nThreads = 100;
ExecutorService exec = Executors.newFixedThreadPool(nThreads);
ServerSocket serverSocket = new ServerSocket(8088);
while (true) {
Socket conn = serverSocket.accept();
Runnable r = new Runnable() {
@Override
public void run() {
handleRequest(conn);
}
};
exec.execute(r);
}
}
線程池
線程池從字面來看時指管理一組同構工作線程的資源池。它與工作隊列密切相關席里,它在工作隊列中保存了所有等待執(zhí)行的任務叔磷。
線程池通過重用現(xiàn)有的線程而不是創(chuàng)建新線程,可以在處理多個請求時分攤在線程創(chuàng)建和銷毀過程中產生的巨大開銷奖磁。另一個額外的好處是改基,當請求到達時,工作線程已經存在咖为,因此不會由于等待創(chuàng)建線程而延遲任務的執(zhí)行秕狰,挺高響應性。
JAVA 類庫中提供了一個靈活的線程池以及一些有用的默認配置躁染∶В可以通過 Executors 中的靜態(tài)工廠方法來創(chuàng)建。
newFixedThreadPool
將創(chuàng)建一個固定長度的線程池吞彤,每當提交一個任務時就創(chuàng)建一個線程我衬,直到達到線程的最大數(shù)量。newCacheedThreadPool
將創(chuàng)建一個可緩存的線程池饰恕,如果線程池的當前規(guī)模超過了處理需求時挠羔,那么將回收空閑的線程,而當需求增加時,則可以添加新的線程埋嵌,線程池的規(guī)模則不存在限制破加。newSingleThreadPool
是一個單線程的 Executor,它創(chuàng)建單個工作者線程來執(zhí)行任務莉恼,如果這個線程異常結束拌喉,會創(chuàng)建另一個線程來替代速那。newSingleThreadPool
能確保依照任務在隊列中的順序來串行執(zhí)行。newScheduledThreadPool
創(chuàng)建一個固定長度的線程池尿背,而且以延遲或定時的方式來執(zhí)行任務端仰,類似于 Timer。
Executor 生命周期
為了解決執(zhí)行服務的聲明周期問題田藐,Executor
擴展了 ExecutorService
接口荔烧,添加了一些用于管理生命周期的方法shutdown()
,shutdownNow()
,isShutdown()
,isTerminated()
,awaitTermination()
。
ExecutorService
的生命周期有3中狀態(tài):運行汽久、關閉和已終止鹤竭。初始創(chuàng)建時處于運行狀態(tài)。
-
shutdown()
方法將執(zhí)行平緩的關閉過程:不再接受新的任務景醇,同時等待已經提交的任務執(zhí)行完成臀稚,包括那些還未開始執(zhí)行的任務。 -
shutdownNow()
方法將執(zhí)行粗暴的關閉過程:它將嘗試取消所有運行中任務三痰,并且不再啟動隊列中尚未開始執(zhí)行的任務吧寺。
等待所有任務完成后,ExecutorService
將轉入終止狀態(tài)散劫≈苫可以調用awaitTermination
來等待到達終止狀態(tài)逗扒,或者通過isTerminated
來輪詢是否已終止唱逢。
服務器通訊初步牛批版本
class LifecycleWebServer {
private ExecutorService exec;
public void start() throws IOException {
ServerSocket socket = new ServerSocket(80);
while (!exec.isShutdown()) {
try {
Socket conn = socket.accept();
exec.execute(new Runnable() {
@Override
public void run() {
handleRequest(conn);
}
});
}catch (RejectedExecutionException e){
if (!exec.isShutdown()){
System.out.println("task submission reject::"+e);
}
}
}
}
public void stop(){
exec.shutdown();
}
void handleRequest(Socket conn) {
Request req = readRequest(conn);
if(isShutdownRequest(req)){
stop();
}else {
dispatchRequest(req);
}
}
private void dispatchRequest(Request req) {
//......分發(fā)請求
}
private boolean isShutdownRequest(Request req) {
//......判斷是否是 shutdown 請求
}
private Request readRequest(Socket conn) {
//......解析請求
}
}
通過 ExecutorService
增加對任務生命周期的管理像鸡。
延遲任務與生命周期
Timer
是作者使用較多的任務類杭跪,主要用來管理延遲任務以及周期任務喳逛。因為 Timer
本身還是存在一些缺陷:
-
Timer
在執(zhí)行所有定時任務時只會創(chuàng)建一個線程犹撒。如果某個任務的執(zhí)行時間過長跋选,那么將破壞其他TimerTask
的定時精確性斋射。public void timerTest() { Timer timer = new Timer(); System.out.println("Timer Test Start " +new Date()); timer.schedule(new TimerTask() { @Override public void run() { System.out.println("001 working current " +new Date()); try { Thread.sleep(4*1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("001 working current " +new Date()); } },1000); timer.schedule(new TimerTask() { @Override public void run() { try { Thread.sleep(1000); System.out.println("002 working current " +new Date()); Thread.sleep(1000); System.out.println("002 working current " +new Date()); Thread.sleep(1000); System.out.println("002 working current " +new Date()); Thread.sleep(1000); System.out.println("002 working current " +new Date()); } catch (InterruptedException e) { e.printStackTrace(); } } },2000); }
打印 log:
Timer Test Start Tue Dec 10 11:52:44 CST 2019
001 working current Tue Dec 10 11:52:45 CST 2019
001 working current Tue Dec 10 11:52:49 CST 2019
002 working current Tue Dec 10 11:52:50 CST 2019
002 working current Tue Dec 10 11:52:51 CST 2019
002 working current Tue Dec 10 11:52:52 CST 2019
002 working current Tue Dec 10 11:52:53 CST 2019
從時間戳上可以看出兩個 TimerTask 是串行執(zhí)行的症概。時間調度出現(xiàn)了問題
- 另一個是線程泄露問題:當 TimerTask 拋出一個未檢查的異常蕾额,那么 Timer 將表現(xiàn)出糟糕的行為。Timer 線程并不捕獲異常彼城,因此當 TimerTask 拋出未檢查的異常時將終止定時線程诅蝶,并且不會恢復線程的執(zhí)行。
請盡量減少或者停止 Timer 的使用募壕,ScheduledThreadPoolExecutor
能夠正確處理這些表現(xiàn)出錯誤行為的任務调炬。
public void testScheduled(){
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10);
System.out.println("scheduled test " + new Date());
ScheduledFuture<?> work1 = executor.schedule(new Callable<String>() {
@Override
public String call() throws Exception {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("001 Worker " + new Date());
return "work1 finish";
}
}, 1, TimeUnit.SECONDS);
ScheduledFuture<?> work2 = executor.schedule(new Callable<String>() {
@Override
public String call() throws Exception {
try {
Thread.sleep(1000);
System.out.println("002 Worker " + new Date());
Thread.sleep(1000);
System.out.println("002 Worker " + new Date());
Thread.sleep(1000);
System.out.println("002 Worker " + new Date());
Thread.sleep(1000);
System.out.println("002 Worker " + new Date());
} catch (InterruptedException e) {
e.printStackTrace();
}
return "work2 Finish";
}
}, 2, TimeUnit.SECONDS);
}
輸出 log:
scheduled test Tue Dec 10 15:54:10 CST 2019
002 Worker Tue Dec 10 15:54:13 CST 2019
002 Worker Tue Dec 10 15:54:14 CST 2019
001 Worker Tue Dec 10 15:54:15 CST 2019
002 Worker Tue Dec 10 15:54:15 CST 2019
002 Worker Tue Dec 10 15:54:16 CST 2019
從 log 來看,時間調度上符合我們的預期舱馅,棒棒噠缰泡。
頁面渲染
來自面試官的提問:瀏覽器是怎樣加載網(wǎng)頁的?
方法一:使用簡單串行
最簡單的方法是對HTML文檔進行串行處理。當遇到文本標簽時棘钞,將其繪制到圖像緩存中缠借。當遇到圖像引用時,先通過網(wǎng)絡獲取宜猜,然后再將其繪制到圖像緩存中泼返。這種方式算是一種思路,但是可能會令使用者感到方案姨拥,他們必須等待很長時間绅喉,直到顯示所有的文本。
@Test
public void singleThreadRender() {
CharSequence source = "";
renderText(source);
List<ImageData> imageDatas = new ArrayList<>();
for (ImageInfo imageInfo : scanForImageInfo(source)) {
imageDatas.add(imageInfo.downloadImage());
}
for (ImageData imageData : imageDatas) {
renderImage(imageData);
}
}
了解 Callable
和 Future
Executor
框架使用 Runnable
作為其基本的任務表示形式叫乌。Runnable
是一種有很大局限的抽象柴罐,雖然能夠異步執(zhí)行任務,但是它不能返回一個值或者拋出受檢查的異常憨奸。
許多任務實際上都是存在延遲的計算(像執(zhí)行數(shù)據(jù)庫查詢革屠、從網(wǎng)絡上獲取資源、或者計算某個復雜的功能)膀藐。對于這些任務屠阻,Callable
是一種更好的抽象:它認為主入口點應該返回一個值,并可能拋出一個異常额各。
Runnable
和Callable
描述的都是抽象的計算任務。這些任務通常都應該有一個明確的起始點吧恃,并且最終會結束虾啦。Executor
執(zhí)行任務有4個生命周期階段:創(chuàng)建、提交痕寓、開始和完成傲醉。由于有些任務可能需要很長的時間,因此通常希望能夠及時取消呻率。再 Executor
框架中硬毕,已提交但尚未開始的任務可以取消,但是對于那些已經開始的任務礼仗,只有當它們能響應中斷時吐咳,才能取消。
Future
表示一個任務的生命周期元践,并提供了相應的方法來判斷任務是否已經完成或取消韭脊,以及獲取任務的結果和取消任務等。在 Future
規(guī)范中包含的隱含意義是单旁,任務的聲明周期只能前進沪羔,不能后腿,就像ExcutorService
的生命周期一樣象浑。當某個任務完成后蔫饰,它就永遠停留在完成
狀態(tài)上琅豆。
Future 包含如下方法:
interface Future{
boolean cancel()
boolean get()
boolean isCancelled()
boolean isDone()
}
get()
方法的行為取決于任務的狀態(tài)(尚未開始、正在運行篓吁、已完成)趋距。如果任務已完成,方法會立即返回或者拋出一個異常越除;如果任務沒有完成节腐,方法 將阻塞直到任務完成。
可以通過多種方法創(chuàng)建一個Future
來描述任務摘盆。ExecutorService
中的所有的 submit 方法都將返回一個Future
翼雀,從而將一個Runnable
或者Callable
提交給 Executor
,并得到一個 Future
用來獲取任務的執(zhí)行結果或者取消任務孩擂。
方法二:使用Future
實現(xiàn)渲染
為了使頁面渲染具有更高的并發(fā)性狼渊,我們分解成兩個任務:一個是渲染所有的文本(
CPU 密集型
);另一個是下載所有的圖像(I/O 密集型
)类垦。
Callable
和 Future
有助于協(xié)同任務之間的交互狈邑。
@Test
public void futureRender() {
CharSequence source = "";
ExecutorService executor = Executors.newFixedThreadPool(10);
List<ImageInfo> imageInfos = scanForImageInfo(source);
Callable<List<ImageData>> task = new Callable<List<ImageData>>() {
@Override
public List<ImageData> call() throws Exception {
List<ImageData> result = new ArrayList<>();
for (ImageInfo imageInfo : imageInfos) {
result.add(imageInfo.downloadImage());
}
return result;
}
};
Future<List<ImageData>> future = executor.submit(task);
renderText(source);
try {
List<ImageData> imageDatas = future.get();
for (ImageData imageData : imageDatas) {
renderImage(imageData);
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
future.cancel(true);
} catch (ExecutionException e) {
e.printStackTrace();
}
}
futureRender
使得渲染文本與下載圖像數(shù)據(jù)的任務并發(fā)執(zhí)行,當所有圖像下載完成后蚤认,會顯示到頁面上米苹。對比串行版本已經提高了效率和用戶體驗。但我們還可以做得更好砰琢,我們不必等到所有的圖像都下載完成蘸嘶,而是希望沒下載完一副圖像就顯示出來。
了解CompletionService
CompletionService
的實現(xiàn)類是ExecutorCompletionService
陪汽,它將Executor
和BlockingQueue
的功能融合在一起训唱。
如果想及時獲取任計算的結果,按照前面的思路我們可以先保留任務提交Executor
后返回的 Future
挚冤,然后不斷的調用get()
方法來獲取况增。這種方式雖然可行,但是不夠優(yōu)雅训挡。幸運的是有CompletionService
澳骤。
請仔細閱讀take()
方法說明:
/**
* Retrieves and removes the Future representing the next
* completed task, waiting if none are yet present.
*
* @return the Future representing the next completed task
* @throws InterruptedException if interrupted while waiting
*/
Future<V> take() throws InterruptedException;
take()
會取出并從隊列移除已完成的任務。so舍哄,我們可以這樣實現(xiàn):
使用CompletionService
實現(xiàn)頁面渲染
@Test
public void completionServiceRender(ExecutorService executor, CharSequence source) {
List<ImageInfo> info = scanForImageInfo(source);
CompletionService<ImageData> completionService = new ExecutorCompletionService<>(executor);
for (ImageInfo imageInfo : info) {
completionService.submit(new Callable<ImageData>() {
@Override
public ImageData call() throws Exception {
return imageInfo.downloadImage();
}
});
}
renderText(source);
try {
int taskSize = info.size();
for (int i = 0; i < taskSize; i++) {
Future<ImageData> f = completionService.take();
ImageData data = f.get();
renderImage(data);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
為任務設置時限
新需求:對于耗時任務宴凉,等待特定時間后仍未完成,則取消任務表悬。
需求合情合理弥锄。這種情況下,我們可以使用Future
的get()
方法,官方描述如下:
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
- 兩個參數(shù):等待的時間籽暇、時間單位温治。
- 請注意拋出的異常,我們可以通過捕獲
TimeoutException
來處理超時情況戒悠。