Java基礎(chǔ)-線程-線程池

Java工程師知識樹 / Java基礎(chǔ)


為什么使用線程池

池化技術(shù)相比大家已經(jīng)屢見不鮮了种蘸,線程池、數(shù)據(jù)庫連接池、Http 連接池等等都是對這個思想的應(yīng)用次舌。池化技術(shù)的思想主要是為了減少每次獲取資源的消耗,提高對資源的利用率。

線程池提供了一種限制和管理資源(包括執(zhí)行一個任務(wù))。每個線程池還維護(hù)一些基本統(tǒng)計(jì)信息睹逃,例如已完成任務(wù)的數(shù)量。

使用線程池的好處

  • 降低資源消耗祷肯。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗沉填。
  • 提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時佑笋,任務(wù)可以不需要的等到線程創(chuàng)建就能立即執(zhí)行翼闹。
  • 提高線程的可管理性。線程是稀缺資源蒋纬,如果無限制的創(chuàng)建猎荠,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性蜀备,使用線程池可以進(jìn)行統(tǒng)一的分配关摇,調(diào)優(yōu)和監(jiān)控。

四種線程池

線程池創(chuàng)建最終是通過ThreadPoolExecutor創(chuàng)建的碾阁。根據(jù)構(gòu)造方法傳參數(shù)的不同又可以分為不同的線程池输虱。

ThreadPoolExecutor 類圖結(jié)構(gòu):

ThreadPoolExecutor構(gòu)造方法為:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

參數(shù)說明

  • corePoolSize:線程池中核心線程數(shù)的數(shù)量

  • maximumPoolSize:在線程池中允許存在的最大線程數(shù)

  • keepAliveTime:當(dāng)存在的線程數(shù)大于corePoolSize,那么會找到空閑線程去銷毀脂凶,此參數(shù)是設(shè)置空閑多久的線程才被銷毀宪睹。

  • unit:時間單位 TimeUnit工具類

  • workQueue:工作隊(duì)列,線程池中的當(dāng)前線程數(shù)大于核心線程的話艰猬,那么接下來的任務(wù)會放入到隊(duì)列中

    • ArrayBlockingQueue:基于數(shù)組有界阻塞隊(duì)列
    • LinkedBlockingQueue:基于鏈表阻塞隊(duì)列
    • SynchronousQueue:不存儲元素的阻塞隊(duì)列(讀寫須等待一并進(jìn)行)
    • PriorityBlockingQueue:支持優(yōu)先級的無界隊(duì)列
  • threadFactory:在創(chuàng)建線程的時候横堡,通過工廠模式來生產(chǎn)線程埋市。這個參數(shù)就是設(shè)置我們自定義的線程創(chuàng)建工廠冠桃。

  • handler:拒絕策略,如果超過了最大線程數(shù)道宅,那么就會執(zhí)行我們設(shè)置的拒絕策略

    • AbortPolicy:直接拋出異常食听。
    • CallerRunsPolicy:只用調(diào)用者所在線程來運(yùn)行任務(wù)胸蛛。
    • DiscardOldestPolicy:丟棄隊(duì)列里最近的一個任務(wù),并執(zhí)行當(dāng)前任務(wù)樱报。
    • DiscardPolicy:不處理葬项,丟棄掉。

線程池處理邏輯:

  1. corePoolSize個任務(wù)時迹蛤,來一個任務(wù)就創(chuàng)建一個線程
  2. 如果當(dāng)前線程池的線程數(shù)大于了corePoolSize那么接下來再來的任務(wù)就會放入到我們上面設(shè)置的workQueue隊(duì)列中
  3. 如果此時workQueue也滿了民珍,那么再來任務(wù)時,就會新建臨時線程盗飒,那么此時如果我們設(shè)置了keepAliveTime或者設(shè)置了allowCoreThreadTimeOut嚷量,那么系統(tǒng)就會進(jìn)行線程的活性檢查,一旦超時便銷毀線程
  4. 如果此時線程池中的當(dāng)前線程大于了maximumPoolSize最大線程數(shù)逆趣,那么就會執(zhí)行我們剛才設(shè)置的handler拒絕策略

線程池分類:

newFixedThreadPool---固定大小的線程池

java.util.concurrent.Executors
    
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

使用newFixedThreadPool方法創(chuàng)建出來的線程池為固定大小的線程池蝶溶,可以通過第一個靜態(tài)方法指定線程池的大小,該線程池corePoolSizemaximumPoolSize相等宣渗,阻塞隊(duì)列使用的是LinkedBlockingQueue抖所,理論上大小為整數(shù)最大值。

使用newFixedThreadPool方法創(chuàng)建出來的線程池中的線程數(shù)量始終不變痕囱,當(dāng)有新任務(wù)提交時田轧,線程池中有空閑線程則會立即執(zhí)行,如果沒有咐蝇,則會暫存到阻塞隊(duì)列涯鲁。

對于固定大小的線程池,不存在線程數(shù)量的變有序,同時使用無界的LinkedBlockingQueue來存放執(zhí)行的任務(wù)抹腿。

存在的問題:

當(dāng)任務(wù)提交十分頻繁的時候,LinkedBlockingQueue迅速增大旭寿,存在著耗盡系統(tǒng)資源的問題警绩。而且在線程池空閑時,即線程池中沒有可運(yùn)行任務(wù)時盅称,它也不會釋放工作線程肩祥,還會占用一定的系統(tǒng)資源,需要shutdown缩膝。

ExecutorService fixPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
    fixPool.execute(() -> System.out.println(Thread.currentThread().getName() + " : " + atomicInteger.getAndIncrement()));
}
fixPool.shutdown();
// 打印結(jié)果 5個線程
pool-1-thread-1 : 0
pool-1-thread-5 : 4
pool-1-thread-4 : 3
pool-1-thread-3 : 2
pool-1-thread-2 : 1
pool-1-thread-3 : 8
pool-1-thread-4 : 7
pool-1-thread-5 : 6
pool-1-thread-1 : 5
pool-1-thread-2 : 9

newSingleThreadExecutor---單個線程的線程池

java.util.concurrent.Executors
    
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}

使用newSingleThreadExecutor方法創(chuàng)建的線程池為單個線程線程池混狠,只有一個線程的線程池,阻塞隊(duì)列使用的是LinkedBlockingQueue,若有多余的任務(wù)提交到線程池中疾层,則會被暫存到阻塞隊(duì)列将饺,待空閑時再去執(zhí)行。按照先入先出的順序執(zhí)行任務(wù)。

ExecutorService singlePool = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
    singlePool.execute(() -> System.out.println(Thread.currentThread().getName() + " : " + atomicInteger.getAndIncrement()));
}
singlePool.shutdown();
// 打印結(jié)果  只有一個線程
pool-1-thread-1 : 0
pool-1-thread-1 : 1
pool-1-thread-1 : 2
pool-1-thread-1 : 3
pool-1-thread-1 : 4
pool-1-thread-1 : 5
pool-1-thread-1 : 6
pool-1-thread-1 : 7
pool-1-thread-1 : 8
pool-1-thread-1 : 9

newCachedThreadPool---緩存線程池

java.util.concurrent.Executors
    
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS, // 緩存的線程默認(rèn)存活時間
                                  new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,// 緩存的線程默認(rèn)存活時間
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}

使用newCachedThreadPool方法創(chuàng)建的線程池為緩存線程池予弧。

緩存線程池刮吧,緩存的線程默認(rèn)存活60秒。線程的核心池corePoolSize大小為0掖蛤,核心池最大為Integer.MAX_VALUE,阻塞隊(duì)列使用的是SynchronousQueue杀捻。

SynchronousQueue是一個直接提交的阻塞隊(duì)列, SynchronousQueue總會迫使線程池增加新的線程去執(zhí)行新的任務(wù)蚓庭。在沒有任務(wù)執(zhí)行時致讥,當(dāng)線程的空閑時間超過keepAliveTime(60秒),則工作線程將會終止被回收器赞,當(dāng)提交新任務(wù)時拄踪,如果沒有空閑線程,則創(chuàng)建新線程執(zhí)行任務(wù)拳魁,會導(dǎo)致一定的系統(tǒng)開銷惶桐。

存在的問題:

如果同時又大量任務(wù)被提交,而且任務(wù)執(zhí)行的時間不是特別快潘懊,那么線程池便會新增出等量的線程池處理任務(wù)姚糊,這很可能會很快耗盡系統(tǒng)的資源。

ExecutorService cachedPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
    cachedPool.execute(() -> System.out.println(Thread.currentThread().getName() + " : " + atomicInteger.getAndIncrement()));
}
//打印結(jié)果
pool-1-thread-1 : 0
pool-1-thread-4 : 3
pool-1-thread-3 : 2
pool-1-thread-2 : 1
pool-1-thread-6 : 5
pool-1-thread-5 : 4
pool-1-thread-7 : 6
pool-1-thread-8 : 7
pool-1-thread-8 : 8     
pool-1-thread-2 : 9

newScheduledThreadPool---定時線程池

java.util.concurrent.Executors
    
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

使用newScheduledThreadPool方法創(chuàng)建的線程池為定時線程池授舟。

定時線程池救恨,可用于周期性地去執(zhí)行任務(wù),通常用于周期性的同步數(shù)據(jù)释树。

ScheduledExecutorService ses = Executors.newScheduledThreadPool(5);
//5個線程每兩秒執(zhí)行一次 隨機(jī)哪個線程執(zhí)行
ses.scheduleAtFixedRate(() -> System.out.println(Thread.currentThread().getName() + " : " + "執(zhí)行了"), 0, 2, TimeUnit.SECONDS);
打印結(jié)果:  5個線程每兩秒執(zhí)行一次 隨機(jī)哪個線程執(zhí)行
pool-1-thread-1 : 執(zhí)行了
pool-1-thread-1 : 執(zhí)行了
pool-1-thread-2 : 執(zhí)行了
pool-1-thread-1 : 執(zhí)行了
pool-1-thread-3 : 執(zhí)行了
pool-1-thread-3 : 執(zhí)行了
pool-1-thread-3 : 執(zhí)行了
... ...

線程池的使用

線程池不允許使用Executors去創(chuàng)建肠槽,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學(xué)更加明確線程池的運(yùn)行規(guī)則奢啥,規(guī)避資源耗盡的風(fēng)險秸仙。

說明:Executors各個方法的弊端:

  • 1:newFixedThreadPool和newSingleThreadExecutor:
    ??主要問題是堆積的請求處理隊(duì)列可能會耗費(fèi)非常大的內(nèi)存,甚至OOM桩盲。
  • 2:newCachedThreadPool和newScheduledThreadPool:
    ??主要問題是線程數(shù)最大數(shù)是Integer.MAX_VALUE寂纪,可能會創(chuàng)建數(shù)量非常多的線程,甚至OOM赌结。

使用實(shí)例:

package com.thread.study.pool;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadTest {

    public static void main(String[] args) throws InterruptedException {
        int corePoolSize = 3; // 線程池中核心線程數(shù)的數(shù)量
        int maximumPoolSize = 6;// 在線程池中允許存在的最大線程數(shù)
        long keepAliveTime = 10;// 存活時間
        TimeUnit unit = TimeUnit.SECONDS; // 時間單位 秒
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(3); // 工作隊(duì)列 基于鏈表阻塞隊(duì)列
        ThreadFactory threadFactory = new NameTreadFactory();// 自定義線程工廠
        RejectedExecutionHandler handler = new MyIgnorePolicy(); // 自定義拒絕策略
        /*
            public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                              long keepAliveTime, TimeUnit unit,
                              BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
         */
        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
                workQueue, threadFactory, handler);
        executor.prestartAllCoreThreads(); // 預(yù)啟動所有核心線程
        for (int i = 1; i <= 10; i++) {
            executor.execute(new MyTask(String.valueOf(i)));
        }
        System.out.println("當(dāng)前線程池中存活的線程數(shù)為: "+executor.getActiveCount());
        Thread.sleep(5000);
        System.out.println("當(dāng)前線程池中存活的線程數(shù)為: "+executor.getActiveCount());
        executor.shutdown();
    }

    // 自定義線程工廠
    static class NameTreadFactory implements ThreadFactory {

        private final AtomicInteger mThreadNum = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
            System.out.println(t.getName() + " has been created");
            return t;
        }
    }

    //拒絕策略
    public static class MyIgnorePolicy implements RejectedExecutionHandler {

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            doLog(r, e);
        }

        private void doLog(Runnable r, ThreadPoolExecutor e) {
            // 可做日志記錄等
            System.err.println(r.toString() + " rejected!" + " 當(dāng)前線程池中存活的線程數(shù)為: " + e.getActiveCount());
        }
    }

    // 線程
    static class MyTask implements Runnable {
        private String name;

        public MyTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() +" : "+ this.toString() + " is running!");
                Thread.sleep(1000); //讓任務(wù)執(zhí)行慢點(diǎn)
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public String getName() {
            return name;
        }

        @Override
        public String toString() {
            return "MyTask [name=" + name + "]";
        }
    }
}

打印結(jié)果:

my-thread-1 has been created
my-thread-2 has been created
my-thread-3 has been created
my-thread-4 has been created
my-thread-3 : MyTask [name=3] is running!
my-thread-1 : MyTask [name=1] is running!
my-thread-2 : MyTask [name=2] is running!
my-thread-5 has been created
my-thread-6 has been created
my-thread-4 : MyTask [name=4] is running!
my-thread-5 : MyTask [name=8] is running!
my-thread-6 : MyTask [name=9] is running!
當(dāng)前線程池中存活的線程數(shù)為: 6
MyTask [name=10] rejected! 當(dāng)前線程池中存活的線程數(shù)為: 6
my-thread-3 : MyTask [name=7] is running!
my-thread-2 : MyTask [name=5] is running!
my-thread-1 : MyTask [name=6] is running!
當(dāng)前線程池中存活的線程數(shù)為: 0

線程池使用場景

使用線程池異步操作常見場景

批量操作
批量操作不是實(shí)時顯示效果的操作捞蛋,比如批量導(dǎo)入配置,批量刪除或作廢柬姚,批量導(dǎo)出查詢結(jié)果等拟杉,只需要搭建一個任務(wù),在任務(wù)中處理即可量承,有錯誤對應(yīng)處理錯誤

日志
異步寫操作對功能影響不大的業(yè)務(wù)邏輯是常見的場景搬设,日志對與功能來說重要性并么有那么高

使用第三方接口
比如發(fā)郵箱啼染,發(fā)短信,發(fā)消息隊(duì)列信息焕梅,推送搜索引擎數(shù)據(jù),異步調(diào)用外圍接口等處理數(shù)據(jù)卦洽。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末贞言,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子阀蒂,更是在濱河造成了極大的恐慌该窗,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蚤霞,死亡現(xiàn)場離奇詭異酗失,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)昧绣,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進(jìn)店門规肴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人夜畴,你說我怎么就攤上這事拖刃。” “怎么了贪绘?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵兑牡,是天一觀的道長。 經(jīng)常有香客問我税灌,道長均函,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任菱涤,我火速辦了婚禮苞也,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘粘秆。我一直安慰自己墩朦,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布翻擒。 她就那樣靜靜地躺著氓涣,像睡著了一般。 火紅的嫁衣襯著肌膚如雪陋气。 梳的紋絲不亂的頭發(fā)上劳吠,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天,我揣著相機(jī)與錄音巩趁,去河邊找鬼痒玩。 笑死淳附,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的蠢古。 我是一名探鬼主播奴曙,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼草讶!你這毒婦竟也來了洽糟?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤堕战,失蹤者是張志新(化名)和其女友劉穎坤溃,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體嘱丢,經(jīng)...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡薪介,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了越驻。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片汁政。...
    茶點(diǎn)故事閱讀 38,137評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖缀旁,靈堂內(nèi)的尸體忽然破棺而出烂完,到底是詐尸還是另有隱情,我是刑警寧澤诵棵,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布抠蚣,位于F島的核電站,受9級特大地震影響履澳,放射性物質(zhì)發(fā)生泄漏嘶窄。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一距贷、第九天 我趴在偏房一處隱蔽的房頂上張望柄冲。 院中可真熱鬧,春花似錦忠蝗、人聲如沸现横。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽戒祠。三九已至,卻和暖如春速种,著一層夾襖步出監(jiān)牢的瞬間姜盈,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工配阵, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留馏颂,地道東北人示血。 一個月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓,卻偏偏與公主長得像救拉,于是被迫代替她去往敵國和親难审。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評論 2 345

推薦閱讀更多精彩內(nèi)容