Java 多線程(三)優(yōu)化任務執(zhí)行

本篇文章通過服務器通信頁面渲染兩個功能的實現(xiàn)來加深多線程中FutureExecutor的理解哪痰。

服務器通信

串行執(zhí)行任務

任務執(zhí)行最簡單的策略就是在單線程中串行執(zhí)行各項任務遂赠,并不會涉及多線程。

以創(chuàng)建通訊服務為例晌杰,我們可以這樣實現(xiàn)(很low)

    @Test
    public void singleThread() throws IOException {
        ServerSocket serverSocket= new ServerSocket(8088);
        while (true){
            Socket conn = serverSocket.accept();
            handleRequest(conn);
        }
    }

代碼很簡單跷睦,理論上沒什么毛病,但是實際使用中只能處理一個請求肋演。但是當處理任務很耗時并且在多次請求時會阻塞無法及時響應抑诸。
由此可見串行處理機制通常都無法提供高吞吐率或快速響應性。

顯式的為任務創(chuàng)建線程

串行執(zhí)行任務這么 low爹殊,我們來通過多線程來處理請求吧:當接收到請求后創(chuàng)建新的線程去執(zhí)行任務蜕乡。new Thread()應該就能實現(xiàn)。

初級版本:

    @Test
    public void perThreadTask() throws IOException {
        ServerSocket serverSocket = new ServerSocket(8088);
        while (true) {
            Socket conn = serverSocket.accept();
            Runnable r = new Runnable() {
                @Override
                public void run() {
                    handleRequest(conn);
                }
            };
            new Thread(r).start();
        }
    }

微弱的優(yōu)點

  • 對于每個請求边灭,都創(chuàng)建了一個線程來處理异希,達到多線程并行效果
  • 任務處理從主線程分離出來,使得主循環(huán)能更快的處理下一個請求

為每個任務分配一個線程存在一些缺陷绒瘦,尤其當需要創(chuàng)建大量的線程時

  • 線程生命周期的開銷非常高称簿。根據(jù)平臺的不同,實際的開銷也不同惰帽。但是線程的創(chuàng)建過程都會需要時間憨降,并且需要 JVM 和操作系統(tǒng)提供一些輔助操作。
  • 資源消耗该酗∈谝活躍的線程會消耗系統(tǒng)資源,尤其是內存呜魄。如果可運行的線程數(shù)量多余可用處理器的數(shù)量悔叽,那么有些線程將閑置。大量閑置的線程會占用許多內存爵嗅,給垃圾回收器帶來壓力娇澎。如果你已經擁有足夠多的線程使所有 CPU 保持忙碌狀態(tài),那么多余的線程反而會降低性能睹晒。
  • 穩(wěn)定性趟庄。隨著平臺的不同括细,可創(chuàng)建線程數(shù)量的限制是不同的,并受多個因素制約戚啥,包括 JVM 的啟動參數(shù)奋单、Thread 構造函數(shù)中請求的棧大小,以及底層操作系統(tǒng)對線程的限制等猫十。如果破壞了這些限制览濒,很可能拋出 OOM 異常。

<h5>
上面兩種方式都存在一些問題:單線程串行的問題在于其糟糕的響應性和吞吐量炫彩;而為每個任務分配線程的問題在于資源消耗和管理的復雜性匾七。</h5>
<h5>
在 Java 類庫中絮短,任務執(zhí)行的主要抽象不是 Thread江兢,而是 Executor
</h5>

public interface Executor {
    void execute(Runnable command);
}

Executor 框架

Executor 基于生產者-消費者模式,提交任務的操作相當于生產者丁频,執(zhí)行任務的線程相當于消費者杉允。

image

通訊優(yōu)化

對于以前的通訊服務我們可以用 Executor 進一步優(yōu)化一下

    @Test
    public void limitExecutorTask() throws IOException {
        final int nThreads  = 100;
        ExecutorService exec = Executors.newFixedThreadPool(nThreads);
        ServerSocket serverSocket = new ServerSocket(8088);
        while (true) {
            Socket conn = serverSocket.accept();
            Runnable r = new Runnable() {
                @Override
                public void run() {
                    handleRequest(conn);
                }
            };
            exec.execute(r);
        }
    }

線程池

線程池從字面來看時指管理一組同構工作線程的資源池。它與工作隊列密切相關席里,它在工作隊列中保存了所有等待執(zhí)行的任務叔磷。

線程池通過重用現(xiàn)有的線程而不是創(chuàng)建新線程,可以在處理多個請求時分攤在線程創(chuàng)建和銷毀過程中產生的巨大開銷奖磁。另一個額外的好處是改基,當請求到達時,工作線程已經存在咖为,因此不會由于等待創(chuàng)建線程而延遲任務的執(zhí)行秕狰,挺高響應性。

JAVA 類庫中提供了一個靈活的線程池以及一些有用的默認配置躁染∶В可以通過 Executors 中的靜態(tài)工廠方法來創(chuàng)建。

  • newFixedThreadPool將創(chuàng)建一個固定長度的線程池吞彤,每當提交一個任務時就創(chuàng)建一個線程我衬,直到達到線程的最大數(shù)量。

  • newCacheedThreadPool將創(chuàng)建一個可緩存的線程池饰恕,如果線程池的當前規(guī)模超過了處理需求時挠羔,那么將回收空閑的線程,而當需求增加時,則可以添加新的線程埋嵌,線程池的規(guī)模則不存在限制破加。

  • newSingleThreadPool是一個單線程的 Executor,它創(chuàng)建單個工作者線程來執(zhí)行任務莉恼,如果這個線程異常結束拌喉,會創(chuàng)建另一個線程來替代速那。newSingleThreadPool能確保依照任務在隊列中的順序來串行執(zhí)行。

  • newScheduledThreadPool 創(chuàng)建一個固定長度的線程池尿背,而且以延遲或定時的方式來執(zhí)行任務端仰,類似于 Timer。

Executor 生命周期

為了解決執(zhí)行服務的聲明周期問題田藐,Executor 擴展了 ExecutorService接口荔烧,添加了一些用于管理生命周期的方法shutdown(),shutdownNow(),isShutdown(),isTerminated(),awaitTermination()

ExecutorService的生命周期有3中狀態(tài):運行汽久、關閉和已終止鹤竭。初始創(chuàng)建時處于運行狀態(tài)。

  • shutdown() 方法將執(zhí)行平緩的關閉過程:不再接受新的任務景醇,同時等待已經提交的任務執(zhí)行完成臀稚,包括那些還未開始執(zhí)行的任務。
  • shutdownNow() 方法將執(zhí)行粗暴的關閉過程:它將嘗試取消所有運行中任務三痰,并且不再啟動隊列中尚未開始執(zhí)行的任務吧寺。
    等待所有任務完成后,ExecutorService 將轉入終止狀態(tài)散劫≈苫可以調用awaitTermination來等待到達終止狀態(tài)逗扒,或者通過isTerminated來輪詢是否已終止唱逢。

服務器通訊初步牛批版本

class LifecycleWebServer {
        private ExecutorService exec;

        public void start() throws IOException {
            ServerSocket socket = new ServerSocket(80);
            while (!exec.isShutdown()) {
                try {
                    Socket conn = socket.accept();
                    exec.execute(new Runnable() {
                        @Override
                        public void run() {
                            handleRequest(conn);
                        }

                    });
                }catch (RejectedExecutionException e){
                    if (!exec.isShutdown()){
                        System.out.println("task submission reject::"+e);
                    }
                }

            }
        }

        public void stop(){
            exec.shutdown();
        }

        void handleRequest(Socket conn) {
            Request req = readRequest(conn);
            if(isShutdownRequest(req)){
                stop();
            }else {
                dispatchRequest(req);
            }
        }

        private void dispatchRequest(Request req) {
            //......分發(fā)請求
        }

        private boolean isShutdownRequest(Request req) {
            //......判斷是否是 shutdown 請求
        }

        private Request readRequest(Socket conn) {
            //......解析請求
        }
    }

通過 ExecutorService 增加對任務生命周期的管理像鸡。

延遲任務與生命周期

Timer是作者使用較多的任務類杭跪,主要用來管理延遲任務以及周期任務喳逛。因為 Timer 本身還是存在一些缺陷:

  • Timer在執(zhí)行所有定時任務時只會創(chuàng)建一個線程犹撒。如果某個任務的執(zhí)行時間過長跋选,那么將破壞其他 TimerTask 的定時精確性斋射。
    public void timerTest() {
        Timer timer = new Timer();
        System.out.println("Timer Test Start " +new Date());
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("001 working current " +new Date());
                try {
                    Thread.sleep(4*1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("001 working current " +new Date());
            }
        },1000);
    
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println("002 working current " +new Date());
    
                    Thread.sleep(1000);
                    System.out.println("002 working current " +new Date());
    
                    Thread.sleep(1000);
                    System.out.println("002 working current " +new Date());
    
                    Thread.sleep(1000);
                    System.out.println("002 working current " +new Date());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },2000);
    }
    

打印 log:

    Timer Test Start Tue Dec 10 11:52:44 CST 2019
    001 working current Tue Dec 10 11:52:45 CST 2019
    001 working current Tue Dec 10 11:52:49 CST 2019
    002 working current Tue Dec 10 11:52:50 CST 2019
    002 working current Tue Dec 10 11:52:51 CST 2019
    002 working current Tue Dec 10 11:52:52 CST 2019
    002 working current Tue Dec 10 11:52:53 CST 2019

從時間戳上可以看出兩個 TimerTask 是串行執(zhí)行的症概。時間調度出現(xiàn)了問題

  • 另一個是線程泄露問題:當 TimerTask 拋出一個未檢查的異常蕾额,那么 Timer 將表現(xiàn)出糟糕的行為。Timer 線程并不捕獲異常彼城,因此當 TimerTask 拋出未檢查的異常時將終止定時線程诅蝶,并且不會恢復線程的執(zhí)行。

請盡量減少或者停止 Timer 的使用募壕,ScheduledThreadPoolExecutor能夠正確處理這些表現(xiàn)出錯誤行為的任務调炬。

    public void testScheduled(){
        ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10);
        System.out.println("scheduled test " + new Date());
        ScheduledFuture<?> work1 = executor.schedule(new Callable<String>() {
            @Override
            public String call() throws Exception {
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("001 Worker " + new Date());
                return "work1 finish";
            }
        }, 1, TimeUnit.SECONDS);
        ScheduledFuture<?> work2 = executor.schedule(new Callable<String>() {
            @Override
            public String call() throws Exception {
                try {
                    Thread.sleep(1000);
                    System.out.println("002 Worker " + new Date());
                    Thread.sleep(1000);
                    System.out.println("002 Worker " + new Date());
                    Thread.sleep(1000);
                    System.out.println("002 Worker " + new Date());
                    Thread.sleep(1000);
                    System.out.println("002 Worker " + new Date());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "work2 Finish";
            }
        }, 2, TimeUnit.SECONDS);
    }

輸出 log:

scheduled test Tue Dec 10 15:54:10 CST 2019
002 Worker Tue Dec 10 15:54:13 CST 2019
002 Worker Tue Dec 10 15:54:14 CST 2019
001 Worker Tue Dec 10 15:54:15 CST 2019
002 Worker Tue Dec 10 15:54:15 CST 2019
002 Worker Tue Dec 10 15:54:16 CST 2019

從 log 來看,時間調度上符合我們的預期舱馅,棒棒噠缰泡。

頁面渲染

來自面試官的提問:瀏覽器是怎樣加載網(wǎng)頁的?

方法一:使用簡單串行

最簡單的方法是對HTML文檔進行串行處理。當遇到文本標簽時棘钞,將其繪制到圖像緩存中缠借。當遇到圖像引用時,先通過網(wǎng)絡獲取宜猜,然后再將其繪制到圖像緩存中泼返。這種方式算是一種思路,但是可能會令使用者感到方案姨拥,他們必須等待很長時間绅喉,直到顯示所有的文本。

    @Test
    public void singleThreadRender() {
        CharSequence source = "";
        renderText(source);
        List<ImageData> imageDatas = new ArrayList<>();

        for (ImageInfo imageInfo : scanForImageInfo(source)) {
            imageDatas.add(imageInfo.downloadImage());
        }

        for (ImageData imageData : imageDatas) {
            renderImage(imageData);
        }
    }

了解 CallableFuture

Executor框架使用 Runnable作為其基本的任務表示形式叫乌。Runnable是一種有很大局限的抽象柴罐,雖然能夠異步執(zhí)行任務,但是它不能返回一個值或者拋出受檢查的異常憨奸。

許多任務實際上都是存在延遲的計算(像執(zhí)行數(shù)據(jù)庫查詢革屠、從網(wǎng)絡上獲取資源、或者計算某個復雜的功能)膀藐。對于這些任務屠阻,Callable是一種更好的抽象:它認為主入口點應該返回一個值,并可能拋出一個異常额各。

RunnableCallable描述的都是抽象的計算任務。這些任務通常都應該有一個明確的起始點吧恃,并且最終會結束虾啦。Executor執(zhí)行任務有4個生命周期階段:創(chuàng)建、提交痕寓、開始和完成傲醉。由于有些任務可能需要很長的時間,因此通常希望能夠及時取消呻率。再 Executor框架中硬毕,已提交但尚未開始的任務可以取消,但是對于那些已經開始的任務礼仗,只有當它們能響應中斷時吐咳,才能取消。

Future表示一個任務的生命周期元践,并提供了相應的方法來判斷任務是否已經完成或取消韭脊,以及獲取任務的結果和取消任務等。在 Future 規(guī)范中包含的隱含意義是单旁,任務的聲明周期只能前進沪羔,不能后腿,就像ExcutorService 的生命周期一樣象浑。當某個任務完成后蔫饰,它就永遠停留在完成狀態(tài)上琅豆。

Future 包含如下方法:

interface Future{
    boolean cancel()
    boolean get()
    boolean isCancelled()
    boolean isDone()
}

get()方法的行為取決于任務的狀態(tài)(尚未開始、正在運行篓吁、已完成)趋距。如果任務已完成,方法會立即返回或者拋出一個異常越除;如果任務沒有完成节腐,方法 將阻塞直到任務完成。

可以通過多種方法創(chuàng)建一個Future來描述任務摘盆。ExecutorService中的所有的 submit 方法都將返回一個Future翼雀,從而將一個Runnable或者Callable提交給 Executor,并得到一個 Future用來獲取任務的執(zhí)行結果或者取消任務孩擂。

方法二:使用Future實現(xiàn)渲染

為了使頁面渲染具有更高的并發(fā)性狼渊,我們分解成兩個任務:一個是渲染所有的文本(CPU 密集型);另一個是下載所有的圖像(I/O 密集型)类垦。

CallableFuture有助于協(xié)同任務之間的交互狈邑。

    @Test
    public void futureRender() {
        CharSequence source = "";
        ExecutorService executor = Executors.newFixedThreadPool(10);

        List<ImageInfo> imageInfos = scanForImageInfo(source);
        Callable<List<ImageData>> task = new Callable<List<ImageData>>() {
            @Override
            public List<ImageData> call() throws Exception {
                List<ImageData> result = new ArrayList<>();
                for (ImageInfo imageInfo : imageInfos) {
                    result.add(imageInfo.downloadImage());
                }
                return result;
            }
        };

        Future<List<ImageData>> future = executor.submit(task);
        renderText(source);

        try {
            List<ImageData> imageDatas = future.get();
            for (ImageData imageData : imageDatas) {
                renderImage(imageData);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
            future.cancel(true);
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }

futureRender使得渲染文本與下載圖像數(shù)據(jù)的任務并發(fā)執(zhí)行,當所有圖像下載完成后蚤认,會顯示到頁面上米苹。對比串行版本已經提高了效率和用戶體驗。但我們還可以做得更好砰琢,我們不必等到所有的圖像都下載完成蘸嘶,而是希望沒下載完一副圖像就顯示出來。

了解CompletionService

CompletionService的實現(xiàn)類是ExecutorCompletionService陪汽,它將ExecutorBlockingQueue的功能融合在一起训唱。

如果想及時獲取任計算的結果,按照前面的思路我們可以先保留任務提交Executor后返回的 Future挚冤,然后不斷的調用get()方法來獲取况增。這種方式雖然可行,但是不夠優(yōu)雅训挡。幸運的是有CompletionService澳骤。

請仔細閱讀take()方法說明:

    /**
     * Retrieves and removes the Future representing the next
     * completed task, waiting if none are yet present.
     *
     * @return the Future representing the next completed task
     * @throws InterruptedException if interrupted while waiting
     */
    Future<V> take() throws InterruptedException;

take() 會取出并從隊列移除已完成的任務。so舍哄,我們可以這樣實現(xiàn):

使用CompletionService 實現(xiàn)頁面渲染

    @Test
    public void completionServiceRender(ExecutorService executor, CharSequence source) {
        List<ImageInfo> info = scanForImageInfo(source);
        CompletionService<ImageData> completionService = new ExecutorCompletionService<>(executor);
        for (ImageInfo imageInfo : info) {
            completionService.submit(new Callable<ImageData>() {
                @Override
                public ImageData call() throws Exception {
                    return imageInfo.downloadImage();
                }
            });
        }
        renderText(source);

        try {
            int taskSize = info.size();
            for (int i = 0; i < taskSize; i++) {
                Future<ImageData> f = completionService.take();
                ImageData data = f.get();
                renderImage(data);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }

為任務設置時限

新需求:對于耗時任務宴凉,等待特定時間后仍未完成,則取消任務表悬。

需求合情合理弥锄。這種情況下,我們可以使用Futureget()方法,官方描述如下:

/**
     * Waits if necessary for at most the given time for the computation
     * to complete, and then retrieves its result, if available.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     * @throws TimeoutException if the wait timed out
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
  • 兩個參數(shù):等待的時間籽暇、時間單位温治。
  • 請注意拋出的異常,我們可以通過捕獲TimeoutException來處理超時情況戒悠。
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末熬荆,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子绸狐,更是在濱河造成了極大的恐慌卤恳,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,000評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件寒矿,死亡現(xiàn)場離奇詭異突琳,居然都是意外死亡,警方通過查閱死者的電腦和手機符相,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,745評論 3 399
  • 文/潘曉璐 我一進店門拆融,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人啊终,你說我怎么就攤上這事镜豹。” “怎么了蓝牲?”我有些...
    開封第一講書人閱讀 168,561評論 0 360
  • 文/不壞的土叔 我叫張陵趟脂,是天一觀的道長。 經常有香客問我搞旭,道長散怖,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,782評論 1 298
  • 正文 為了忘掉前任肄渗,我火速辦了婚禮,結果婚禮上咬最,老公的妹妹穿的比我還像新娘翎嫡。我一直安慰自己,他們只是感情好永乌,可當我...
    茶點故事閱讀 68,798評論 6 397
  • 文/花漫 我一把揭開白布惑申。 她就那樣靜靜地躺著,像睡著了一般翅雏。 火紅的嫁衣襯著肌膚如雪圈驼。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,394評論 1 310
  • 那天望几,我揣著相機與錄音绩脆,去河邊找鬼。 笑死,一個胖子當著我的面吹牛靴迫,可吹牛的內容都是我干的惕味。 我是一名探鬼主播,決...
    沈念sama閱讀 40,952評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼玉锌,長吁一口氣:“原來是場噩夢啊……” “哼名挥!你這毒婦竟也來了?” 一聲冷哼從身側響起主守,我...
    開封第一講書人閱讀 39,852評論 0 276
  • 序言:老撾萬榮一對情侶失蹤禀倔,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后参淫,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體救湖,經...
    沈念sama閱讀 46,409評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,483評論 3 341
  • 正文 我和宋清朗相戀三年黄刚,在試婚紗的時候發(fā)現(xiàn)自己被綠了捎谨。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,615評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡憔维,死狀恐怖涛救,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情业扒,我是刑警寧澤检吆,帶...
    沈念sama閱讀 36,303評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站程储,受9級特大地震影響蹭沛,放射性物質發(fā)生泄漏。R本人自食惡果不足惜章鲤,卻給世界環(huán)境...
    茶點故事閱讀 41,979評論 3 334
  • 文/蒙蒙 一摊灭、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧败徊,春花似錦帚呼、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,470評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至沪哺,卻和暖如春沈自,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背辜妓。 一陣腳步聲響...
    開封第一講書人閱讀 33,571評論 1 272
  • 我被黑心中介騙來泰國打工枯途, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留忌怎,地道東北人。 一個月前我還...
    沈念sama閱讀 49,041評論 3 377
  • 正文 我出身青樓柔袁,卻偏偏與公主長得像呆躲,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子捶索,可洞房花燭夜當晚...
    茶點故事閱讀 45,630評論 2 359