精通Java并發(fā) - 線程池

0. 前言

為什么需要學(xué)習(xí)并發(fā)編程?

Tomcat醉锅、Netty等框架源碼,需要并發(fā)編程基礎(chǔ)才能看懂;
并發(fā)也是Java程序員的必經(jīng)之路

本篇文章的學(xué)習(xí)內(nèi)容有:

  1. 20+并發(fā)工具:線程池曙搬,各種鎖,原子類鸽嫂,并發(fā)容器
  2. 兩種并發(fā)策略:ThreadLocal和final
  3. 兩大底層原理:CAS原理與AQS框架
  4. 控制并發(fā)流程:Semaphore
  5. 實(shí)戰(zhàn)高性能緩存

1. 總覽并發(fā)工具

并發(fā)工具類根據(jù)功能可分為三大類:

  1. 并發(fā)安全:
    1. 從底層原理分類:互斥同步(鎖)纵装、非互斥同步(atomic)、無同步方案(final)
    2. 從使用角度分類:限制共享變量据某,避免共享變量橡娄,成熟并發(fā)工具
  2. 管理線程:線程池
  3. 線程協(xié)作:三大并發(fā)工具類等

更加詳細(xì)的分類參考思維導(dǎo)圖的建立并發(fā)知識(shí)框架分支

2. 線程池-線程治理最大法寶

線程池是并發(fā)工具中用來管理線程的工具。

碼農(nóng)翻身的這篇小白入門線程池文章對(duì)線程池有一個(gè)大致的概念和介紹癣籽,推薦閱讀挽唉。

2.1 什么是線程池?

為什么要使用線程池筷狼?

使用 new Thread 的方式創(chuàng)建線程執(zhí)行任務(wù)瓶籽,存在兩個(gè)問題:

  1. 反復(fù)創(chuàng)建線程開銷大,線程創(chuàng)建需要開辟虛擬機(jī)棧埂材、本地方法棧塑顺、程序計(jì)數(shù)器等線程私有的內(nèi)存空間,線程銷毀時(shí)需要回收這些資源俏险,頻繁創(chuàng)建銷毀線程會(huì)浪費(fèi)大量系統(tǒng)資源
  2. 過多的線程會(huì)占用大量CPU和內(nèi)存严拒,大量線程回收會(huì)帶來GC壓力扬绪,甚至?xí)霈F(xiàn) OOM 異常,CPU頻繁的進(jìn)行上下文切換也會(huì)降低系統(tǒng)性能

線程池主要解決兩個(gè)問題:

  1. 提升響應(yīng)速度裤唠。不需要反復(fù)創(chuàng)建和回收線程挤牛,消除了創(chuàng)建線程的延遲和銷毀線程的時(shí)間
  2. 合理利用CPU和內(nèi)存資源,提供更好的性能巧骚。靈活調(diào)整線程數(shù)量赊颠,不會(huì)線程太多導(dǎo)致OOM,也不會(huì)線程太少浪費(fèi)CPU資源劈彪。如果不使用線程池竣蹦,每次需要執(zhí)行異步任務(wù)時(shí)直接 new 一個(gè)線程來運(yùn)行,而線程的創(chuàng)建和銷毀都是需要開銷的沧奴。線程池里面的線程是可復(fù)用的痘括,不需要每次執(zhí)行任務(wù)時(shí)都重新創(chuàng)建和銷毀線程,節(jié)省了計(jì)算機(jī)資源
  3. 統(tǒng)一管理線程滔吠,比如停止線程池中的3000個(gè)線程纲菌,比挨個(gè)停止線程方便很多;可以動(dòng)態(tài)新增線程疮绷,限制線程個(gè)數(shù)翰舌。每個(gè)線程池也都保留了一些基本的統(tǒng)計(jì)數(shù)據(jù),比如當(dāng)前線程池完成的任務(wù)數(shù)目等冬骚。

線程池的適用場(chǎng)景

  1. 服務(wù)器接收到大量請(qǐng)求時(shí)椅贱,適用線程池可以大大減少線程的創(chuàng)建和銷毀,提高服務(wù)器的性能只冻,實(shí)際中Tomcat也是這么做的
  2. 實(shí)際開發(fā)中庇麦,如果需要?jiǎng)?chuàng)建5個(gè)以上的線程,那么就可以使用線程池來管理喜德。

面試題1:線程不能重復(fù) start山橄,為什么線程池線程可以復(fù)用恨狈?

線程重復(fù) start薄货,會(huì)報(bào) IllegalThreadStateException 異常礁鲁,因?yàn)榫€程聲明周期就是從 start 到 terminated胶台,沒有辦法從 terminated 恢復(fù)到 start污尉,詳細(xì)答案參考面試題4與2.8 runWorker源碼解析章節(jié)

2.2 線程池詳解與 6 大屬性

JDK 中 ThreadPoolExecutor 類的構(gòu)造方法如下:

public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory,
                            RejectedExecutionHandler handler) {

線程池創(chuàng)建主要有6大參數(shù)

參數(shù)名 類型 含義
corePoolSize int 核心線程數(shù)數(shù)
maxPoolSize int 最大線程數(shù)數(shù)
keepAliveTime long 線程保持存活時(shí)間
workQueue BlockingQueue 任務(wù)存儲(chǔ)隊(duì)列
threadFactory ThreadFactory 線程工廠得哆,線程池需要新線程時(shí)使用ThreadFactory創(chuàng)建
handler RejectedExecutionHandler 線程池?zé)o法接受提交的任務(wù)時(shí)的拒絕策略
  • corePoolSize 常駐核心線程數(shù)沥曹,如果等于0髓帽,則任務(wù)執(zhí)行完畢奄薇,會(huì)銷毀線程池所有線程驳阎;如果大于0,任務(wù)執(zhí)行完畢核心線程不會(huì)銷毀。這個(gè)值的設(shè)置非常關(guān)鍵呵晚,過大會(huì)浪費(fèi)資源蜘腌,過小灰導(dǎo)致線程被頻繁創(chuàng)建和銷毀。

  • maximumPoolSize 最大線程數(shù)饵隙,表示線程池能夠容納同時(shí)執(zhí)行的最大線程數(shù)撮珠,必須大于等于1。如果待執(zhí)行的線程數(shù)大于此值金矛,則緩存在任務(wù)隊(duì)列 workQueue 中芯急。如果 maxPoolSize 等于 corePoolSize,即是固定大小線程池

  • workQueue 任務(wù)隊(duì)列驶俊,當(dāng)線程數(shù)大于等于 corePoolSize娶耍,則新任務(wù)保存到 BlockingQueue 阻塞隊(duì)列。阻塞隊(duì)列是線程安全的饼酿,使用兩個(gè) ReentrantLock 鎖來保證從出隊(duì)和入隊(duì)的原子性榕酒,是一個(gè)生產(chǎn)消費(fèi)模型隊(duì)列

2.2.1 線程池添加線程規(guī)則


任務(wù)提交到線程池,添加線程規(guī)則如下:

可記為:一cool二queue三max最后 reject故俐。

  1. 如果線程數(shù)小于 corePoolSize想鹰,即使其他線程處于空閑狀態(tài),也會(huì)創(chuàng)建一個(gè)新線程來執(zhí)行新任務(wù)药版。
  2. 如果線程數(shù)等于(或大于)corePoolSize 但小于 maxPoolSize辑舷,則將任務(wù)放入隊(duì)列
  3. 如果隊(duì)列已滿,且線程數(shù)小于 maxPoolSize槽片,則創(chuàng)建一個(gè)新線程來執(zhí)行任務(wù)
  4. 如果隊(duì)列已滿惩妇,且線程數(shù)等于 maxPoolSize,則使用拒絕策略拒絕該任務(wù)

比喻: 線程池中的線程就像燒烤店的餐桌筐乳,店里面的餐桌數(shù) corePoolSize,坐滿后讓顧客在排隊(duì)區(qū) workQueue 排隊(duì)等待乔妈;如果顧客少店內(nèi)坐不滿蝙云,店內(nèi)餐桌也不會(huì)收起來;

如果排隊(duì)區(qū)也滿了路召,那只能冒著被城管罰款的風(fēng)險(xiǎn)在店外面廣場(chǎng)加桌子勃刨,廣場(chǎng)面積有限,最大餐桌數(shù)為 maxPoolSize股淡;如果廣場(chǎng)的顧客吃完了身隐,那抓緊把桌子收起來;

如果排隊(duì)區(qū)和廣場(chǎng)都坐滿了唯灵,那來了新顧客只能拒絕服務(wù) rejected 了贾铝。

添加線程規(guī)則

源碼分析: 這里參考了《碼出高效p243》,execute方法的作用是提交任務(wù)command到線程池執(zhí)行,用戶提交任務(wù)到線程池的模型圖如下所示

提交任務(wù)到線程池
/*
 * ThreadPoolExecutor 源碼
 * ctl.get()獲取表示線程池狀態(tài)和線程個(gè)數(shù)的int值垢揩,
 * workerCountOf獲取線程池中的線程數(shù)玖绿,isRunning判斷線程池是否為Running,是Running才接受新任務(wù)
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
        * Proceed in 3 steps:
        *
        * 1. If fewer than corePoolSize threads are running, try to
        * start a new thread with the given command as its first
        * task.  The call to addWorker atomically checks runState and
        * workerCount, and so prevents false alarms that would add
        * threads when it shouldn't, by returning false.
        *
        * 2. If a task can be successfully queued, then we still need
        * to double-check whether we should have added a thread
        * (because existing ones died since last checking) or that
        * the pool shut down since entry into this method. So we
        * recheck state and if necessary roll back the enqueuing if
        * stopped, or start a new thread if there are none.
        *
        * 3. If we cannot queue task, then we try to add a new
        * thread.  If it fails, we know we are shut down or saturated
        * and so reject the task.
        */
    // 返回包含線程池狀態(tài)和線程數(shù)的 int 值叁巨,這個(gè)值的詳解見線程池狀態(tài)章節(jié)
    int c = ctl.get();
    // 1. 線程數(shù)小于常駐核心線程數(shù)斑匪,使用addWorker創(chuàng)建新線程,傳入任務(wù)command
    if (workerCountOf(c) < corePoolSize) {
        // 創(chuàng)建線程成功則return返回
        if (addWorker(command, true))
            return;
        // 如果創(chuàng)建失敗锋勺,防止外部已經(jīng)在線程池中加入新任務(wù)蚀瘸,重新獲取c
        c = ctl.get();
    }
    
    // 2. 如果線程數(shù)大于等于coresize,且線程池處于Running狀態(tài)庶橱,則將任務(wù)添加到隊(duì)列
    // 如果線程池調(diào)用了shutdown方法贮勃,則isRunning返回false,不會(huì)將任務(wù)添加到隊(duì)列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        
        // 2.2 添加成功后返回true悬包,再次檢查是否需要添加一個(gè)線程(因?yàn)槿腙?duì)過程中可能有線程被銷毀core=0 keeptime=0)衙猪,
        // 因?yàn)榉荝unning狀態(tài)都不接受新任務(wù),如果isRunning返回false則從隊(duì)列移除任務(wù)command布近,并執(zhí)行拒絕策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 2.3 如果當(dāng)前線程池為空垫释,則新創(chuàng)建一個(gè)線程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. 如果線程數(shù)大于等coresize,且任務(wù)隊(duì)列已滿撑瞧,則創(chuàng)建新線程棵譬,
    // 參數(shù)false表示線程數(shù)要小于maxsize,創(chuàng)建成功返回true预伺,失敗則執(zhí)行拒絕策略
    else if (!addWorker(command, false))
        // 如果addWorker返回false订咸,即創(chuàng)建線程失敗,則使用拒絕策略
        reject(command);
}

代碼1判斷如果線程池中線程個(gè)數(shù)小于 corePoolSize酬诀,則會(huì)向workers 里新增一個(gè)核心core線程執(zhí)行該任務(wù)脏嚷。

如果線程池中線程個(gè)數(shù)大于等于 corePoolSize 則執(zhí)行代碼2,如果當(dāng)前線程池是RUNNING狀態(tài)則添加任務(wù)到任務(wù)隊(duì)列瞒御,非RUNNING狀態(tài)是拒絕接受新任務(wù)的父叙。

如果向任務(wù)隊(duì)列添加成功,則代碼2.2對(duì)線程池狀態(tài)進(jìn)行二次校驗(yàn)肴裙,這是因?yàn)樘砑尤蝿?wù)到隊(duì)列后趾唱,執(zhí)行代碼2.2前有可能線程池的狀態(tài)已經(jīng)有變化了。這里二次校驗(yàn)如果線程池狀態(tài)不是RUNNING蜻懦,則把該任務(wù)從任務(wù)隊(duì)列移除甜癞,移除后政治性拒絕策略;如果二次校驗(yàn)通過宛乃,則執(zhí)行代碼2.3判斷線程池中線程個(gè)數(shù)悠咱,如果線程個(gè)數(shù)為0則新建一個(gè)線程蒸辆。

如果代碼2添加任務(wù)失敗,則說明任務(wù)隊(duì)列已滿乔煞,那么執(zhí)行代碼3吁朦,嘗試新創(chuàng)建線程來執(zhí)行該任務(wù),如果線程池線程個(gè)數(shù) ≥ maxxinumPoolSize 則執(zhí)行拒絕策略渡贾。

執(zhí)行拒絕策略的兩種情況:1. 線程池狀態(tài)為非RUNNING逗宜;2. 任務(wù)隊(duì)列已滿且線程個(gè)數(shù) ≥ maxxinumPoolSize

2.2.2 常見 3 種工作隊(duì)列


  1. 空隊(duì)列:SynchronousQueue,隊(duì)列長(zhǎng)度為0空骚,僅起到一個(gè)交接作用纺讲。任務(wù)保存到隊(duì)列后直接交給線程池,很容易創(chuàng)建新的線程囤屹,所以線程池需要設(shè)置大一點(diǎn)的 maxPoolSize熬甚。比如Executors#newCachedThreadPool創(chuàng)建線程池就使用的該隊(duì)列,并且設(shè)置 maxPoolSize 為 Integer.MAX_VALUE肋坚。
  2. 無界隊(duì)列:LinkedBlockingQueue乡括,鏈表結(jié)構(gòu),隊(duì)列長(zhǎng)度默認(rèn)為 Integer.MAX_VALUE智厌。由于隊(duì)列不會(huì)滿诲泌,所以線程數(shù)不會(huì)大于 corePoolSize,所以 maxPoolSize 相當(dāng)于沒有意義铣鹏》笊ǎ可以應(yīng)對(duì)流量突增,新任務(wù)都會(huì)被放到隊(duì)列中诚卸,缺點(diǎn)是如果處理速度小于任務(wù)提交速度葵第,則會(huì)造成 OOM 異常。比如Executors#newFixedThreadPool創(chuàng)建線程池使用的就是該隊(duì)列合溺,并且 maxPoolSize 等于 corePoolSize卒密。
  3. 有界隊(duì)列:ArrayBlockingQueue,需要手動(dòng)指定長(zhǎng)度的有界隊(duì)列棠赛,使用該隊(duì)列 maxPoolSize 有意義哮奇,當(dāng)隊(duì)列滿了之后,會(huì)創(chuàng)建新線程來執(zhí)行任務(wù)恭朗。

2.2.3 線程池 OOM


前面提到無界隊(duì)列 LinkedBlockingQueue,如果處理速度小于任務(wù)提交速度則會(huì)出現(xiàn) OOM依疼,JDK 中Executors#newFixedThreadPool創(chuàng)建的線程池就使用的 LinkedBlockingQueue痰腮,使用不當(dāng)就會(huì)出現(xiàn)OOM,示例代碼中任務(wù)的提交速度遠(yuǎn)高于執(zhí)行速度律罢,并且使用JVM 參數(shù)調(diào)小堆內(nèi)存-Xmx8m -Xms8m膀值,方便復(fù)現(xiàn)OOM棍丐。

面試題2:線程池線程數(shù)能大于maxPoolSize嗎?

// 待解答沧踏,看源碼貌似可以歌逢,但是與定義不符啊

面試題3:線程池沒有任務(wù)時(shí)會(huì)怎么樣?

會(huì)阻塞住翘狱,線程池沒有任務(wù)執(zhí)行時(shí)秘案,會(huì)從任務(wù)隊(duì)列 workQueue 取出任務(wù),如果 workQueue 為空潦匈,則會(huì)調(diào)用 wait() 方法進(jìn)入阻塞狀態(tài)阱高,直到有新任務(wù)進(jìn)來喚醒。實(shí)際中大多使用LinkedBlockingQueue 阻塞隊(duì)列茬缩,這里附上了簡(jiǎn)單的 ArrayBlockingQueue源碼

// ArrayBlockQueue#take源碼
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            // 如果阻塞隊(duì)列位空赤惊,則進(jìn)入阻塞狀態(tài),等待offer方法喚醒
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

查看 2.8.3 getTask源碼分析章節(jié)凰锡,當(dāng)沒有任務(wù)時(shí)未舟,核心線程會(huì)從任務(wù)隊(duì)列 take 任務(wù)進(jìn)入阻塞狀態(tài),非核心線程會(huì)從任務(wù)隊(duì)列 poll 任務(wù)掂为,若超時(shí)后沒有任務(wù)則線程被回收(執(zhí)行結(jié)束)裕膀。

  • keepAliveTime 線程空閑時(shí)間,單位為 TimeUnit菩掏。當(dāng)線程空閑時(shí)間達(dá)到 keepAliveTime 時(shí)為空閑線程魂角,空閑線程會(huì)被回收銷毀,直到只剩下 corePoolSize 個(gè)線程為止智绸,避免浪費(fèi)內(nèi)存和句柄資源野揪?。默認(rèn)情況下瞧栗,線程數(shù)大于 corePoolSize時(shí)斯稳,keepAliveTime 才會(huì)起作用。比如Executors#newCachedThreadPool就設(shè)置 keepAliveTime 為60秒迹恐,當(dāng)超過線程空閑時(shí)間超過60s時(shí)挣惰,則會(huì)回收線程直到只剩下 corePoolSize 個(gè)線程為止。
  • threadFactory 線程工廠殴边。用來創(chuàng)建線程憎茂,默認(rèn)使用 Executors.defaultThreadFactory(),創(chuàng)建出來的線程都屬于同一個(gè)線程組锤岸,相同優(yōu)先級(jí)NORM_PRIORITY竖幔,都不是守護(hù)線程,線程名稱形如 pool-poolNumber-trhead-id是偷,自定義線程工廠可以設(shè)置更加友好易讀的線程名稱拳氢。
// Executors 內(nèi)部類 DefaultThreadFactory源碼募逞,參數(shù)r是用戶傳入的任務(wù)
public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r,
                            namePrefix + threadNumber.getAndIncrement(),
                            0);
    // 設(shè)置為非守護(hù)線程
    if (t.isDaemon())
        t.setDaemon(false);
    // 設(shè)置相同優(yōu)先級(jí)
    if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);
    return t;
}

上述的Executors 默認(rèn)的線程工廠過于簡(jiǎn)單,對(duì)用戶不夠友好馋评,線程名稱必須具有特定意義放接,如包含調(diào)用來源,業(yè)務(wù)含義等留特。清晰的線程名稱方便后期調(diào)試和分析纠脾,有助于快速定位死鎖,StackOverflow等問題磕秤。以下為簡(jiǎn)單的自定義線程工廠:
去Github查看代碼示例

2.2.4 常見 4 種拒絕策略


  • handler 線程池該屬性用于執(zhí)行拒絕策略乳乌,默認(rèn)使用 RejectedExecutionHandler。當(dāng)任務(wù)隊(duì)列 workQueue 已滿且線程數(shù)已經(jīng)達(dá)到了 maxPoolSize市咆,則通過該策略拒絕處理請(qǐng)求汉操,這是一種簡(jiǎn)單的限流保護(hù)。友好的拒絕策略應(yīng)該做到以下三點(diǎn):

    1. 保存任務(wù)到數(shù)據(jù)庫(kù)進(jìn)行削峰填谷蒙兰,在空閑時(shí)再取出來執(zhí)行
    2. 轉(zhuǎn)向某個(gè)提示頁面
    3. 打印日志

ThreadPoolExecutor中提供了 4 個(gè)公開的靜態(tài)內(nèi)部類:

  • AbortPolicy(默認(rèn)):丟棄任務(wù)并拋出RejectedExecutionException異常
  • DiscardPolicy:丟棄任務(wù)磷瘤,但是不拋出異常,不推薦
  • DiscardOldestPolicy:丟棄隊(duì)列中等待最久的任務(wù)搜变,然后把當(dāng)前任務(wù)加入隊(duì)列中
  • CallerRunsPolicy:調(diào)用任務(wù)的run()方法繞過線程池直接執(zhí)行采缚。

一般使用默認(rèn)的拒絕策略即可,如果需要自定義拒絕策略挠他,可以參考代碼示例

2.3 常見5種線程池

線程池相關(guān)類圖

Executor 接口定義線程池執(zhí)行任務(wù)的方法 execute()扳抽,

Executors 提供創(chuàng)建線程池的各種方式,類似于Collections

ExecutorService 接口定義了管理線程任務(wù)的方法 submit()殖侵、shutdown() 和 invokeAll()

AbstractExecutorService 對(duì) submit()和 invokeAll() 進(jìn)行實(shí)現(xiàn)贸呢。

Parameter FixedThreadPool SingleTExecutor CachedTPool ScheduledTPool
corePoolSize n 1 0 n
maxNumPoolSize n 1 Integer.MAX_VALUE Integer.MAX_VALUE
keepAliveTime 0 s 0 s 60 s 60 s
workQueue LinkedBlockingQueue LinkedBlockingQueue SynchronousQueue DelayedWorkQueue

以下方法都有多種重載方法,可以設(shè)置線程數(shù)和自定義線程工廠拢军,前三個(gè)返回ThreadPoolExecutor 對(duì)象楞陷,第四個(gè)返回 ScheduledThreadPoolExecutor 對(duì)象。

  • Executors#newSingleThreadExecutor() 創(chuàng)建一個(gè)單線程的線程池茉唉,相當(dāng)于單線程串行執(zhí)行所有任務(wù)固蛾。
  • Executors#newFixedThreadPool(n) 創(chuàng)建指定固定線程數(shù)的線程池,core 等于 maxSize度陆,不存在空閑線程艾凯,所以keepAliveTime為 0
  • Executors#newCachedThreadPool() 高度可伸縮線程池,coreSize為0懂傀,任務(wù)隊(duì)列長(zhǎng)度為0趾诗,任務(wù)直接交給線程池創(chuàng)建線程執(zhí)行,線程空閑時(shí)間超過60s會(huì)被回收鸿竖。
  • Executors#newScheduledThreadPool() 適支持定時(shí)及周期性任務(wù)執(zhí)行沧竟,比 Timer 更安全,功能更強(qiáng)大缚忧。是ScheduledThreadPoolExecutor的對(duì)象悟泵。

通過了解以上4種自動(dòng)創(chuàng)建線程池的方法,每種方法都有不恰當(dāng)之處闪水,maxNumPoolSize 設(shè)置過大糕非,workQueue 設(shè)置為無界隊(duì)列,在流量突增時(shí)都會(huì)引發(fā) OOM球榆,所以《阿里巴巴Java編程規(guī)范》中不允許使用 Executors朽肥,推薦使用 ThreadPoolExecutor 的方式創(chuàng)建線程池,這樣的處理方式能更加明確線程池的運(yùn)行規(guī)則持钉,規(guī)避資源耗盡的風(fēng)險(xiǎn)衡招。

  • Executors#newWorkStealingPool() JDK8引入,創(chuàng)建持有足夠線程的線程池每强。從下面源碼中可以看出始腾,該線程池把 CPU 核心數(shù)量設(shè)置為默認(rèn)的并行度。通過線程池類圖可知空执,其他線程池本質(zhì)都是ThreadPoolExecutor浪箭,而Executors#newWorkStealingPool()創(chuàng)建的線程池是ForkJoinPool對(duì)象
// Executors 創(chuàng)建線程池源碼
public static ExecutorService newWorkStealingPool() {
    // 線程數(shù)為 CPU 核心數(shù)
    return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
            ForkJoinPool.defaultForkJoinWorkerThreadFactory,
            null, true);
}

這個(gè)線程池和前面的線程池有所不同,在能產(chǎn)生子任務(wù)的場(chǎng)景才適合使用該線程池辨绊。線程可以竊取(Stealing)其他線程的任務(wù)奶栖,提高并行度。

需要注意的是最好不要加鎖门坷,因?yàn)槿蝿?wù)并行執(zhí)行宣鄙,不加鎖才能發(fā)揮出并行的效果。不保證執(zhí)行順序拜鹤,使用場(chǎng)景有限框冀。例如遞歸情況,可以使用該線程池敏簿。

2.4 手動(dòng)創(chuàng)建線程池

上文中說道明也,手動(dòng)創(chuàng)建線程池優(yōu)于JDK自動(dòng)創(chuàng)建線程池,但是線程池中的線程數(shù)應(yīng)該設(shè)置為多少呢惯裕?

參考文章:創(chuàng)建多少線程才是合適的温数?

  • CPU 密集型:比如加密、計(jì)算 hash 等任務(wù)蜻势,任務(wù)屬于 CPU 密集型撑刺,最佳線程數(shù)為 CPU 核心數(shù)

  • 耗時(shí) IO 型:比如讀寫數(shù)據(jù)庫(kù)握玛、文件和網(wǎng)絡(luò)通信等够傍,CPU一般是不工作的甫菠,外設(shè)的速度遠(yuǎn)遠(yuǎn)慢于 CPU,最佳線程數(shù)可以設(shè)置為 cpu 核心數(shù)的很多倍冕屯。

  • 線程數(shù) = cpu 核心數(shù) * (1+平均等待時(shí)間/平均工作時(shí)間)

手動(dòng)創(chuàng)建線程池寂诱,應(yīng)該注意以下 4 點(diǎn):

  1. 避免誤解隊(duì)列,防止 OOM
  2. 自定義線程工廠安聘,設(shè)置合適的線程名稱痰洒,方便后期調(diào)試
  3. 一般使用默認(rèn)的拒絕策略,也可以自定義拒絕策略
  4. 設(shè)置合適的 corePoolSize 和 maxPoolSize

代碼示例如下所示浴韭,去Github查看詳細(xì)代碼示例

public class UserThreadPool {
    /**
     * 執(zhí)行main方法會(huì)打印如下日志:
     *  UserThreadFactory's 第1機(jī)房-Worker-1       // 線程工程創(chuàng)建線程丘喻,打印線程名稱
     *  UserThreadFactory's 第1機(jī)房-Worker-2
     *  running_0                                 // task內(nèi)容,打印任務(wù)執(zhí)行次數(shù)
     *  running_1
     *  running_2
     *  running_3
     *  you task is rejected. count=158. java.util.concurrent.ThreadPoolExecutor@677327b6[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 4]
     *      // 打印線程池狀態(tài)念颈,線程池線程數(shù)量為2泉粉,達(dá)到了我們定義的maxPoolSize=2,任務(wù)隊(duì)列數(shù)量為2榴芳,達(dá)到了我們定義的workQueue容量搀继,完成的任務(wù)數(shù)completed tasks有4個(gè)。
     *      // 到最終日志翠语,任務(wù)的執(zhí)行次數(shù) running_和任務(wù)拒絕次數(shù)rejectCount相加等于總?cè)蝿?wù)數(shù)200
     *      // 有時(shí) queued tasks 不一定等于2叽躯,因?yàn)閳?zhí)行拒絕策略時(shí)隊(duì)列元素為2,打印時(shí)隊(duì)列元素可能已經(jīng)被取走執(zhí)行了肌括,復(fù)現(xiàn)時(shí)可以刪除 threadpool.demo.Task 類的 sleep方法点骑。
     *
     */
    public static void main(String[] args) {
        // 1. 任務(wù)隊(duì)列,避免誤無解隊(duì)列谍夭,防止OOM異常
        BlockingQueue workQueue = new LinkedBlockingQueue(2);

        // 2. 線程工廠黑滴,定義合適的線程名稱
        UserThreadFactory f1 = new UserThreadFactory("第1機(jī)房");

        // 3. 拒絕策略
        UserRejectHandler handler = new UserRejectHandler();
        
        // 4.1 創(chuàng)建線程池,使用自定義線程工廠和拒絕策略
        ThreadPoolExecutor threadPool1 = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, workQueue, f1, handler);
        // 4.2 使用默認(rèn)的線程工廠和拒絕策略紧索,使用大多數(shù)情況袁辈,這里的拒絕策略時(shí)AbortPolicy,即丟棄任務(wù)并拋出異常
        ThreadPoolExecutor threadPool2 = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, workQueue);

        // 使用線程池珠漂,
        Runnable task= new Task();
        for (int i = 0; i < 200; i++) {
            // 使用自定義默認(rèn)策略晚缩,拒絕任務(wù)并打印線程池狀態(tài)
            // 任務(wù)的執(zhí)行次數(shù) running_和任務(wù)拒絕次數(shù)rejectCount之和應(yīng)為總?cè)蝿?wù)數(shù)200
            threadPool1.execute(task);

            //threadPool2.execute(task);    // 使用默認(rèn)的拒絕策略,拒絕任務(wù)媳危,并拋出異常
        }
    }
}

// todo 此處應(yīng)該補(bǔ)充 Spring 中創(chuàng)建池的方法荞彼。

https://mp.weixin.qq.com/s/z3gjfk4l-s8aKD4cvY8CHA
https://mp.weixin.qq.com/s/05Ud0t7ECIYWMePhuOf6tg

// springboot中啟動(dòng)異步任務(wù)@Async ,定時(shí)任務(wù)@Scheduled
https://www.bilibili.com/video/av38657363?p=95

2.5 停止線程池

ThreadPoolExecutor停止線程有 5 個(gè)相關(guān)方法:

  • shutdown() :通知線程池停止待笑,線程池會(huì)等已經(jīng)添加的任務(wù)執(zhí)行結(jié)束后停止
  • isShutdown() :查看線程池是否收到了 shutdown通知鸣皂,若收到,則不能添加新任務(wù)
  • isTerminated() :判斷線程池是否已經(jīng)停止,與shutdown不同寞缝,shutdown是判斷線程池是否開始停止癌压,即是否收到了通知
  • `awaitTermination(long timeout)) :等待一段時(shí)間,檢測(cè)線程池是否已經(jīng)停止荆陆,該方法是一個(gè)死循環(huán)措拇,若線程池停止了,返回true慎宾,若線程池未停止,則一直檢測(cè)直至超時(shí)浅悉,返回false
  • shutdownNow() :強(qiáng)制停止線程池趟据,使用 interrupt 方法通知正在執(zhí)行的線程,并返回任務(wù)隊(duì)列中的任務(wù)集合术健,用于記錄日志或保存到數(shù)據(jù)庫(kù)中汹碱。

去Github查看停止線程池 5 種相關(guān)方法的代碼示例。
這5個(gè)方法的源碼分析見《Java 并發(fā)編程之美 8.3.3》

2.6 鉤子方法與線程池監(jiān)控

2.6.1 鉤子Hook方法

鉤子方法:簡(jiǎn)單的理解就是一個(gè)流程荞估,在一個(gè)方法實(shí)現(xiàn)咳促,流程中存在需要自定義的部分,則抽象出一個(gè)鉤子方法勘伺,相同的部分則在流程中的實(shí)現(xiàn)即可跪腹。查看Java中的鉤子方法

在每個(gè)任務(wù)執(zhí)行前后都會(huì)調(diào)用鉤子方法,可以進(jìn)行日志記錄和統(tǒng)計(jì)等工作飞醉。

ThreadPoolExecutor 類中有三個(gè)個(gè)鉤子方法冲茸,默認(rèn)都是空實(shí)現(xiàn)。其中
beforeExecuteafterExecute缅帘,在每個(gè)任務(wù)執(zhí)行前后都會(huì)調(diào)用這兩個(gè)方法轴术,在1.8源碼分析章節(jié) runWorker 中可以看到這兩個(gè)方法在任務(wù)執(zhí)行前后被調(diào)用;terminated是在線程池被回收前調(diào)用钦无。使用鉤子方法就是繼承 ThreadPoolExecutor 類并重寫這幾個(gè)方法逗栽,然后就可以實(shí)現(xiàn)相關(guān)功能。

在1.8源碼分析章節(jié)可以看到線程池執(zhí)行任務(wù) runWorker 方法前后都會(huì)調(diào)用兩個(gè)鉤子方法失暂。

可以利用鉤子方法實(shí)現(xiàn)線程池的暫停與恢復(fù)彼宠,具體代碼示例見Github。

2.6.2 線程池監(jiān)控

線程池具有非常多的屬性弟塞,比如前面提到的核心線程數(shù)兵志,最大線程數(shù)等,線程池對(duì)于各種屬性提供了訪問接口:

  • getCorePoolSize() 獲取核心線程數(shù) corePoolSize
  • getMaximumPoolSize() 獲取最大線程數(shù) maximumPoolSize
  • getKeepAliveTime() 獲取空閑線程存活時(shí)間
  • getQueue() 獲取任務(wù)隊(duì)列
  • getRejectedExecutionHandler() 獲取拒絕策略對(duì)象
  • getThreadFactory() 獲取線程工廠對(duì)象

前6個(gè)方法是獲取 ThreadPoolExecutor 的構(gòu)造參數(shù)宣肚,下面的方法是獲取線程池的一些其他屬性想罕,這些屬性我們會(huì)在1.8源碼分析章節(jié)經(jīng)常見到。

  • getPoolSize() 獲取線程池中線程個(gè)數(shù)
  • getLargestPoolSize() 獲取線程池中線程數(shù)出現(xiàn)過的最大值,創(chuàng)建工作線程 addWorker 方法會(huì)更新該屬性
  • getActiveCount() 獲取工作線程數(shù)按价,即工作線程集 workers 的大小
  • getTaskCount() 獲取任務(wù)總數(shù)惭适,包括已完成、未完成和正在執(zhí)行的任務(wù)
  • getCompletedTaskCount() 獲取已完成的任務(wù)數(shù)楼镐,在 runWorker 中線程 worker 任務(wù)執(zhí)行完成會(huì)更新該屬性癞志,getCompletedTaskCount就是將線程工作集 workers 中所有線程完成任務(wù)數(shù)相加返回

2.7 線程池的 5 種狀態(tài)

線程池一共有 5 種狀態(tài),狀態(tài)轉(zhuǎn)換如下圖所示:


線程池狀態(tài)轉(zhuǎn)換圖
  • RUNNING:接受新任務(wù)框产,并且處理阻塞隊(duì)列里的任務(wù)
  • SHUTDOWN:拒絕新任務(wù)凄杯,但是處理阻塞隊(duì)列里的任務(wù)
  • STOP:拒絕新任務(wù),并且丟棄阻塞隊(duì)列里的任務(wù)
  • TIDYING:整理狀態(tài)秉宿,所有任務(wù)都執(zhí)行完后(包括任務(wù)隊(duì)列中的任務(wù))戒突,當(dāng)前線程池工作線程為0,將要調(diào)用 terminated 方法
  • TERMINATED:終止?fàn)顟B(tài)描睦,terminated 方法調(diào)用完后的狀態(tài)

線程池狀態(tài)轉(zhuǎn)換:

  • RUNNING -> SHUTDOWN:用戶顯式調(diào)用 shutdown() 方法膊存,或者線程池對(duì)象被回收時(shí)隱式調(diào)用了 finalize() 中的shutdown() 方法。
  • RUNNING / SHUTDOWN -> STOP:顯式調(diào)用 shutdownNow() 方法
  • SHUTDOWN -> TIDYING:當(dāng)線程池和任務(wù)隊(duì)列都為空時(shí)
  • STOP -> TIDYING:當(dāng)線程池為空時(shí)
  • TIDYING -> TERMINATER:當(dāng) terminated() hook方法執(zhí)行完成時(shí)

源碼分析:
在 ThreadPoolExecutor 的屬性定義中頻繁使用位移運(yùn)算來表示線程池狀態(tài)忱叭,位移運(yùn)算是改變當(dāng)前值的一種高效手段隔崎,查看 ThreadPoolExecutor 源碼可知,線程池一共有 5 種狀態(tài):

// ThreadPoolExecutor 屬性韵丑,用來表示線程池狀態(tài)

    // 線程池默認(rèn)為RUNNING狀態(tài)爵卒,線程個(gè)數(shù)為0,合并起來得到線程池的ctl值
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    // int值共有32位撵彻,右邊29位表示工作線程數(shù)技潘,左邊3位表示線程池狀態(tài)。3個(gè)二進(jìn)制位可以表示0-7
    // Integer.SIZE為32千康,COUNT_BITS
    private static final int COUNT_BITS = Integer.SIZE - 3;

    // 000-11111111111111111111111111111,類似于子網(wǎng)掩碼享幽,用于位運(yùn)算,使用見下方方法
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;


    // 用左邊3位二進(jìn)制拾弃,表示線程池狀態(tài)
    // RUNNING狀態(tài)值桩,表示可以接受新任務(wù),并且處理隊(duì)列里的任務(wù)
    // -1左移29位表示 111_00000000000000000000000000000
    private static final int RUNNING    = -1 << COUNT_BITS;
    
    // SHUTDOWN狀態(tài)豪椿,此狀態(tài)不再接受新任務(wù)奔坟,但可以繼續(xù)執(zhí)行隊(duì)列中的任務(wù)
    // 0左移29位表示 000_00000000000000000000000000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    
    // STOP狀態(tài),此狀態(tài)全面拒絕任務(wù)搭盾,并中斷正在處理的任務(wù)
    // 1左移29位表示 001_00000000000000000000000000000
    private static final int STOP       =  1 << COUNT_BITS;
    
    // TIDYING狀態(tài)咳秉,此狀態(tài)表示所有任務(wù)已經(jīng)被終止
    // 2左移29位表示 010_00000000000000000000000000000
    private static final int TIDYING    =  2 << COUNT_BITS;

    // TERMINATED狀態(tài),此狀態(tài)表示所有任務(wù)已經(jīng)被終止
    // 3左移29位表示 011_00000000000000000000000000000
    private static final int TERMINATED =  3 << COUNT_BITS;


    // 返回線程池狀態(tài)鸯隅,c是線程池狀態(tài)和線程數(shù)的int值澜建,與~CAPACITY做與運(yùn)算向挖,得到左3位二進(jìn)制線程池狀態(tài)
    private static int runStateOf(int c)     { return c & ~CAPACITY; }

    // 返回線程池線程數(shù),與CAPACITY做與運(yùn)算炕舵,得到右29位二進(jìn)制線程數(shù)
    private static int workerCountOf(int c)  { return c & CAPACITY; }

    // 把左邊三位與右邊29位或運(yùn)算何之,合并成一個(gè)值
    private static int ctlOf(int rs, int wc) { return rs | wc; }

線程池的狀態(tài)用ctl整型的左3位表示,五種線程池狀態(tài)的十進(jìn)制值大小依次為:

RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED

這樣設(shè)計(jì)的好處是很容易通過比較值大小來判斷線程池狀態(tài)咽筋,例如線程池中經(jīng)常需要 isRunning 判斷

    // 判斷線程池是否為Running狀態(tài)
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

根據(jù)以上可知溶推,線程池調(diào)用 shutdown() 方法后會(huì)從RUNNING狀態(tài)轉(zhuǎn)換為SHUTDOWN狀態(tài),下面 ThreadPoolExecutor 源碼中展示了線程池狀態(tài) RUNNING -> SHUTDOWN -> TIDYING -> TERMINATED 的轉(zhuǎn)換過程奸攻,參考《并發(fā)編程之美8.3.3》鳄哭。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 1. 設(shè)置線程池狀態(tài)為SHUTDOWN
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 嘗試終止線程池
    tryTerminate();
}
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // ......

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 2. 設(shè)置當(dāng)前線程池狀態(tài)為TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 調(diào)用鉤子方法
                    terminated();
                } finally {
                    // 3. terminated方法調(diào)用完畢后静稻,設(shè)置線程池狀態(tài)為TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

protected void terminated() { }

上面的源碼線程轉(zhuǎn)換過程缺少了STOP狀態(tài)惩阶,下面源碼展示了線程對(duì)象調(diào)用shutdownNow()方法后蜂大,線程狀態(tài) RUNNING -> STOP -> TIDYING -> TERMINATED 的轉(zhuǎn)換過程

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 1. 設(shè)置線程池狀態(tài)為STOP
        advanceRunState(STOP);
        interruptWorkers();
        // 將任務(wù)隊(duì)列元素移出疏橄,不執(zhí)行這些任務(wù)
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    // 2. 設(shè)置線程池狀態(tài)為TIDYING,調(diào)用terminated方法
    // 3. 設(shè)置線程池狀態(tài)為TERMINATED
    tryTerminate();
    return tasks;
}

// 略就?疑問:SHUTDOWN -> TIDYING 要等待線程池和任務(wù)隊(duì)列為空才行捎迫,源碼中并沒有看到這一點(diǎn)。

// 補(bǔ)充:mainLock的作用分析

2.8 實(shí)現(xiàn)原理與源碼分析

ThreadPoolExecutor 源碼有 4 個(gè)重要組成部分:

  1. 線程池的各個(gè)狀態(tài)表牢,源碼分析見線程池狀態(tài)章節(jié)窄绒,
  2. 提交任務(wù)執(zhí)行是 execute() 方法,源碼分析見添加線程規(guī)則章節(jié)
  3. 創(chuàng)建并啟動(dòng)新線程是 addWorker()方法
  4. 使用線程池中線程執(zhí)行任務(wù)是 runWorker()方法

ThreadPoolExecutor 重要方法調(diào)用圖如下所示崔兴,詳細(xì)是嵌套調(diào)用彰导,向右是串行執(zhí)行:


線程池方法時(shí)序圖

上圖中 Worker 是 Runnable 接口實(shí)現(xiàn)類,啟動(dòng)線程需要 new Thread(Runnable worker).start()敲茄,而創(chuàng)建線程工作在 Worker 構(gòu)造方法中使用線程工廠創(chuàng)建位谋,創(chuàng)建的線程保存在 Worker 類的 final Thread thread 屬性中,啟動(dòng)線程也是調(diào)用的 run() 方法就是 Runnable worker 的run() 方法堰燎,調(diào)用了 ThreadPoolExecutor#runWorker方法掏父。這里比較繞,詳細(xì)在 addWorker 與 runWorker 源碼分析章節(jié)介紹秆剪。

2.8.1 創(chuàng)建工作線程 addWorker 源碼分析

execute() 方法中創(chuàng)建線程調(diào)用的是 addWorker()方法赊淑,即新增工作線程 worker,代碼如下:

/**
 * 根據(jù)當(dāng)前線程池狀態(tài)仅讽,檢測(cè)是否可以添加新的工作線程陶缺,如果可以則創(chuàng)建新線程并執(zhí)行任務(wù)
 * 如果一切正常則返回true,返回false有倆種情況:
 * 1. 線程池非RUNNING狀態(tài)
 * 2. 線程工廠創(chuàng)建新線程失敗
 * @param firstTask 用戶需要執(zhí)行的任務(wù)Runnable洁灵,使用該任務(wù)來new 線程Thread饱岸,外部啟動(dòng)線程池時(shí)需要構(gòu)造的第一個(gè)線程,它是線程的母體
 * @param core 新增線程時(shí)的判斷指標(biāo):
 *    true 判斷指標(biāo)為corePoolSize,RUNNING狀態(tài)線程池的線程個(gè)數(shù)小于corePoolSize則可以創(chuàng)建新線程
 *    false 判斷指標(biāo)為maximumPoolSize伶贰,RUNNING狀態(tài)線程池的線程數(shù)小于maximumPoolSize則可以創(chuàng)建新線程
 */
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            // 獲取線程池的狀態(tài)與線程個(gè)數(shù)組合值c蛛砰,詳細(xì)見線程狀態(tài)章節(jié)
            int c = ctl.get();
            // 獲取線程池狀態(tài)
            int rs = runStateOf(c);

            // 1. 如果為RUNNING狀態(tài),則無法進(jìn)入if語句黍衙,直接進(jìn)行下一步for
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            // 2. 循環(huán)CAS增加線程個(gè)數(shù)
            for (;;) {
                // 獲取線程池線程個(gè)數(shù)
                int wc = workerCountOf(c);

                // 2.1 線程數(shù)最大為2^29泥畅,否則將影響左3位的線程池狀態(tài)值
                // 判斷線程個(gè)數(shù)是否大于等于核心數(shù)(最大數(shù))
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 2.2 CAS 增加線程數(shù),修改ctl值琅翻,成功后跳出循環(huán)到retry標(biāo)簽
                if (compareAndIncrementWorkerCount(c))
                    break retry;

                // 2.3 如果CAS增加線程數(shù)失敗位仁,則繼續(xù)執(zhí)行內(nèi)層for循環(huán),重新CAS
                // 線程池狀態(tài)線程個(gè)數(shù)值是可變化的方椎,需要獲取最新值
                c = ctl.get();  // Re-read ctl
                // 查看線程池狀態(tài)是否變化聂抢,如果變化則跳出for循環(huán)到retry標(biāo)簽
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        // 3. 到這里說明CAS成功了,這里開始創(chuàng)建新的工作線程
        boolean workerStarted = false;      // 標(biāo)記線程是否啟動(dòng)成功
        boolean workerAdded = false;        // 標(biāo)記線程是否新增到workers成功
        // Worker是工作線程棠众,實(shí)現(xiàn)了Runnable接口琳疏,是ThreadPoolExecutor的內(nèi)部類
        Worker w = null;
        try {
            // 3.1 創(chuàng)建新線程,這行代碼已經(jīng)利用Worker構(gòu)造方法中的線程池工廠創(chuàng)建了新線程
            // 并封裝成工作線程Worker對(duì)象闸拿,線程創(chuàng)建時(shí)參數(shù)Runnable為firstTask
            w = new Worker(firstTask);
            
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 3.2 添加獨(dú)占鎖空盼,為了實(shí)現(xiàn)workers同步,因?yàn)榭赡芏鄠€(gè)線程同時(shí)調(diào)用了同一個(gè)線程池的execute方法
                mainLock.lock();
                try {
                    // 3.3 獲取線程池狀態(tài)新荤,避免在獲取鎖前調(diào)用了shutdown方法
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        
                        // 如果線程已啟動(dòng)調(diào)用過start方法揽趾,則拋出異常,等價(jià)于普通線程啟動(dòng)狀態(tài)校驗(yàn)
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();

                        // 3.4 添加新創(chuàng)建的工作線程 w 到工作線程集workers
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }

                // 3.5 添加成功后啟動(dòng)工作線程苛骨,調(diào)用線程start方法篱瞎,這個(gè)工作線程是線程池工廠創(chuàng)建的
                if (workerAdded) {
                    // 啟動(dòng)線程,調(diào)用線程t的run方法痒芝,
                    // 線程t是new Thread(Runnable worker)得到的俐筋,所以會(huì)調(diào)用worker.run()方法
                    t.start();
                    // 線程啟動(dòng)成功
                    workerStarted = true;
                }
            }
        } finally {
            // 3.6 如果線程啟動(dòng)失敗,則把新增的線程從workers中remove掉严衬,然后線程個(gè)數(shù)ctl減1
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }   

代碼較長(zhǎng)校哎,主要分為兩個(gè)部分:1.雙重循環(huán)通過 CAS 操作增加線程數(shù);2.創(chuàng)建新工作線程瞳步,并線程線程安全的將新線程添加到 workers 中闷哆,并且啟動(dòng)新線程。

首先來分析第一段代碼单起,通過CAS增加線程數(shù)抱怔,代碼1中,下面三種情況會(huì)新增線程失敗并返回 false:

  1. 當(dāng)前線程池狀態(tài)為STOP嘀倒,TIDYING和TERMINATED
  2. 線程池狀態(tài)為SHUTDOWN并且已經(jīng)有了firstTask
  3. 當(dāng)前線程池狀態(tài)為SHUTDOWN并且任務(wù)隊(duì)列為空

內(nèi)層循環(huán)的作用是使用CAS操作設(shè)置線程個(gè)數(shù)屈留,代碼2.1判斷如果線程個(gè)數(shù)超限制則返回 false局冰,否則執(zhí)行代碼2.2CAS操作設(shè)置線程個(gè)數(shù),CAS成功則退出雙新歡灌危,CAS失敗則執(zhí)行代碼2.3看線程狀態(tài)是否變化康二,如果變了,則再次進(jìn)入外層循環(huán)重新獲取線程池狀態(tài)勇蝙,否則依舊在內(nèi)存循環(huán)繼續(xù)CAS嘗試沫勿。

compareAndIncrementWorkerCount() 方法是CAS添加線程個(gè)數(shù),修改表示線程池狀態(tài)和線程個(gè)數(shù)的 ctl 值味混,執(zhí)行失敗概率非常低产雹,類似自旋鎖原理。這里的處理邏輯是線程個(gè)數(shù)先加 1翁锡,如果后面創(chuàng)建線程失敗再減 1蔓挖,這是輕量處理并發(fā)創(chuàng)建線程的方式。如果先創(chuàng)建線程馆衔,成功再加 1瘟判,當(dāng)發(fā)現(xiàn)超出線程數(shù)限制后再銷毀線程,這種處理方式明顯比前者的代價(jià)要大角溃。

第二部分的代碼3說明使用CAS成功的增加了線程個(gè)數(shù)拷获,但是現(xiàn)在新線程還未創(chuàng)建,這里使用全局的獨(dú)占鎖把系新增的線程 worker 添加的工作線程集 workers 中开镣。

代碼3.1使用線程工廠創(chuàng)建了一個(gè)新的工作線程刀诬,代碼3.2獲取了獨(dú)占鎖咽扇,代碼3.3重新檢查線程池狀態(tài)邪财,這是為了避免在獲取鎖前其他線程調(diào)用了shutdown方法關(guān)閉了線程池。如果線程池被關(guān)閉质欲,則直接釋放鎖树埠;否則執(zhí)行代碼3.4添加新工作線程 w 到工作線程集 workers 中,然后釋放鎖嘶伟。代碼3.5判斷如果新增工作線程成功怎憋,則啟動(dòng)新線程。

代碼3.6判斷線程是否啟動(dòng)成功九昧,如果啟動(dòng)會(huì)失敗則從工作線程集 wokers 中移除新線程 w绊袋,并將線程數(shù)減 1。

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            // 從工作線程集中移除新線程
            workers.remove(w);
        // 將線程個(gè)數(shù)減1铸鹰,修改ctl值癌别,對(duì)應(yīng)compareAndIncrementWorkerCount方法
        decrementWorkerCount();
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

用戶提交任務(wù)到線程池后,由工作線程 Worker 來執(zhí)行蹋笼,Worker 的構(gòu)造函數(shù)如下:

// 工作線程Worker實(shí)現(xiàn)了Runnable接口展姐,并把本對(duì)象作為參數(shù)傳入newThread()創(chuàng)建新線程躁垛,
// new Thread(Runnable r)中參數(shù)r就是用戶任務(wù)firstTask
private final class Worker extends AbstractQueuedSynchronizer
    implements Runnable {

    Worker(Runnable firstTask) {
        setState(-1); // 在調(diào)用runWorker前禁止中斷
        this.firstTask = firstTask;
        // 使用線程工廠創(chuàng)建一個(gè)新線程, 并把本對(duì)象作為參數(shù)傳入newThread()創(chuàng)建新線程
        // 線程工廠中new Thread(Runnable worker)參數(shù)就是當(dāng)前worker對(duì)象
        this.thread = getThreadFactory().newThread(this);   
    }

    // 在addWorker中啟動(dòng)線程是Worker.thread.start(),
    // 最終還是調(diào)用worker對(duì)象的run方法圾笨,即調(diào)用runWorker()方法
    @Override
    public void run() {
        // 任務(wù)執(zhí)行邏輯與線程復(fù)用
        runWorker(this);
    }

由上面源碼可知教馆,addWorker 中使用線程工廠創(chuàng)建并啟動(dòng)新線程,最終執(zhí)行任務(wù)調(diào)用的是 Worker#runWorker 方法擂达。

2.8.2 實(shí)現(xiàn)線程復(fù)用 runWorker 源碼分析

上一小節(jié)我們知道 addWorker 創(chuàng)建了工作線程后土铺,調(diào)用 Worker.thread.start() 啟動(dòng)線程,執(zhí)行用戶傳入的任務(wù)則在 Worker#run 方法中谍婉,具體的執(zhí)行任務(wù)邏輯和線程復(fù)用執(zhí)行任務(wù)的實(shí)現(xiàn)在 runWorker 方法中舒憾,源碼如下所示:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 用戶提交的任務(wù)
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // getTask從任務(wù)隊(duì)列取出任務(wù),執(zhí)行任務(wù)穗熬,直至任務(wù)隊(duì)列為空
        // 當(dāng)前代碼都在工作線程的run方法中镀迂,循環(huán)調(diào)用任務(wù)的run方法相當(dāng)于在復(fù)用工作線程
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 1. 調(diào)用前置鉤子方法,在每個(gè)任務(wù)執(zhí)行前調(diào)用
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 2. 調(diào)用用戶提交任務(wù)的task的run()方法唤蔗,worker串行執(zhí)行探遵,相當(dāng)于worker線程一直在運(yùn)行用戶的任務(wù)
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 3. 調(diào)用后置鉤子方法,在每個(gè)任務(wù)執(zhí)行后調(diào)用  
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // 4. 統(tǒng)計(jì)該worker線程完成任務(wù)的個(gè)數(shù)
                w.completedTasks++;
                w.unlock();
            }
        } // while

        // 如果在執(zhí)行任務(wù)期間未拋出異常妓柜,則可以執(zhí)行這一行代碼
        completedAbruptly = false;
    } finally {
        // 5. 執(zhí)行清理工作箱季,每個(gè)worker執(zhí)行多個(gè)任務(wù),但只進(jìn)行一次清理工作
        // 統(tǒng)計(jì)所有worker線程完成的任務(wù)總數(shù)棍掐,對(duì)當(dāng)前worker進(jìn)行銷毀
        processWorkerExit(w, completedAbruptly);
    }
}

實(shí)現(xiàn)工作線程復(fù)用就是在while循環(huán)中不斷從隊(duì)列中獲取任務(wù)Runnable task藏雏,然后調(diào)用task.run() 來處理用戶任務(wù),即實(shí)現(xiàn)了工作線程復(fù)用作煌。

2.8.3 取出任務(wù)與回收超時(shí)線程 getTask

runWorker 在執(zhí)行任務(wù)時(shí)掘殴,需要不斷從任務(wù)隊(duì)列取出任務(wù),取出任務(wù)的邏輯則在 getTask() 方法中實(shí)現(xiàn)粟誓,getTask() 取出任務(wù)有 3 種情況:

  1. 任務(wù)隊(duì)列不為空奏寨,取出任務(wù)并返回
  2. 任務(wù)隊(duì)列為空,則需要判斷是否需要回收超時(shí)線程鹰服,若需要回收超時(shí)線程病瞳,則該工作線程調(diào)用workQueue.poll(keepAliveTime),等待任務(wù)隊(duì)列 keepAliveTime悲酷,超時(shí)后返回null任務(wù)套菜。
  3. 任務(wù)隊(duì)列為空,若不需要回收超時(shí)線程设易,則該工作線程調(diào)用workQueue.take()進(jìn)入阻塞狀態(tài)逗柴,直至任務(wù)隊(duì)列有新任務(wù)出現(xiàn)。這也告訴了我們當(dāng)線程池沒有任務(wù)時(shí)亡嫌,線程池中 corePoolSize 數(shù)量的線程處于阻塞狀態(tài)嚎于,有了新任務(wù)后喚醒線程到RUNNABLE狀態(tài)掘而,繼續(xù)復(fù)用該線程。

彩蛋:阻塞隊(duì)列出隊(duì)操作 take 與 poll 的區(qū)別如何記憶于购?

poll 中有 O袍睡,就像一個(gè)表在計(jì)時(shí),超時(shí)后就返回null了肋僧,而 take 會(huì)一直等待

getTask() 的源碼分析如下:

private Runnable getTask() {
    // 判斷是否對(duì)超時(shí)空閑線程進(jìn)行回收斑胜,當(dāng)任務(wù)隊(duì)列為空時(shí)設(shè)置timeOut為true
    boolean timedOut = false; // Did the last poll() time out?

    // 從隊(duì)列中取出任務(wù),若隊(duì)列為空嫌吠,則CAS修改ctl變量的工作線程個(gè)數(shù)
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        // 工作線程數(shù)
        int wc = workerCountOf(c);

        // 工作線程數(shù)大于核心數(shù)止潘,則需要超時(shí)判斷
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 工作線程數(shù)必須大于1或工作隊(duì)列為空,此時(shí)才考慮回收線程
        // 為什么這里不進(jìn)行線程超時(shí)判斷就減少工作線程辫诅?
        // 超時(shí)判斷keepAliveTime在下面從隊(duì)列取出元素時(shí)進(jìn)行
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // ctl 工作線程數(shù)-1凭戴,具體的線程銷毀工作會(huì)在processWorkerExit中執(zhí)行
            if (compareAndDecrementWorkerCount(c))
                // 2. 需要回收超時(shí)線程,超時(shí)未取到任務(wù)返回null
                return null;
            continue;
        }

        try {
            // 工作線程數(shù)若大于corePoolSize炕矮,需要回收超時(shí)線程么夫,三目表達(dá)式
            // 2. poll 是從任務(wù)隊(duì)列中取出隊(duì)首元素,若隊(duì)列為空肤视,則等待keepAliveTime档痪,超時(shí)后返回null
            // 3. take 是從任務(wù)隊(duì)列中取出隊(duì)首元素,若隊(duì)列為空邢滑,則一直阻塞腐螟,直至取出任務(wù)
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();

            if (r != null)
                // 1. 任務(wù)隊(duì)列不為空,返回任務(wù)
                return r;

            // 任務(wù)隊(duì)列為空困后,keepAliveTime時(shí)間內(nèi)也沒有取出元素乐纸,
            // 則表示需要線程超時(shí)判斷置為true,回收空閑線程
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
// 工作線程w執(zhí)行完隊(duì)列中所有任務(wù)后操灿,執(zhí)行清理統(tǒng)計(jì)工作
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 工作線程執(zhí)行任務(wù)未正確結(jié)束锯仪,如發(fā)生了異常等泵督,需要將工作線程數(shù)-1
    if (completedAbruptly) 
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 1.統(tǒng)計(jì)整個(gè)線程池完成的任務(wù)數(shù)
        completedTaskCount += w.completedTasks;
        // 2.移除工作線程w趾盐,線程數(shù)ctl值的修改在getTask任務(wù)中已經(jīng)完成
        // 這里移除的線程一定是非核心線程,核心線程沒有任務(wù)會(huì)阻塞小腊,不會(huì)進(jìn)行到這一步
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // 3. 設(shè)置線程池狀態(tài)為TIDYING救鲤,調(diào)用鉤子terminated方法
    // 4. 設(shè)置線程池狀態(tài)為 TERMINATED
    tryTerminate();

    // 創(chuàng)建線程至核心數(shù)corePoolSize,存在corePoolSize=0秩冈,出現(xiàn)新任務(wù)的情況
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            // 該工作線程成功結(jié)束
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 最小線程數(shù)為0且任務(wù)隊(duì)列不為空本缠,說明不合適,需要將最小線程數(shù)設(shè)置為1
            // 隊(duì)列為空則說明最小線程為1也可以入问,則不修改min
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;

            // 工作線程數(shù)大于min丹锹,則返回即可
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 到這一步稀颁,說明工作線程數(shù)小于min,需要新增工作線程
        addWorker(null, false);
    }
}

面試題4: 我們知道線程狀態(tài)從RUNNABLE到TERMINATED是不可逆的楣黍,比如一個(gè)線程只能調(diào)用一次start() 方法匾灶,線程生命周期到了TERMINATED就結(jié)束了,無法重新到達(dá)RUNNABLE狀態(tài)租漂。

那么線程池能夠復(fù)用線程是怎么實(shí)現(xiàn)的阶女?

通過 ThreadPoolExecutor#runWorker 源碼可知,一個(gè)工作線程啟動(dòng)后哩治,run 方法中循環(huán)調(diào)用用戶任務(wù) Runnable#run() 方法執(zhí)行任務(wù)秃踩,相當(dāng)于工作線程始終處于 RUNNABLE 狀態(tài),直至所有用戶任務(wù)執(zhí)行完畢业筏,線程才會(huì)退出循環(huán)憔杨,線程到達(dá)TERMINATED狀態(tài)。

面試題5: 前面提到線程池中任務(wù)執(zhí)行完畢后蒜胖,會(huì)回收空閑線程芍秆,保留 corePoolSize 個(gè)線程。如上一問題所說翠勉,任務(wù)執(zhí)行完畢后線程會(huì)到達(dá)TERMINATED狀態(tài)妖啥,那如何保留工作線程呢?

通過 ThreadPoolExecutor#getTask 源碼可知对碌,在工作線程執(zhí)行任務(wù)時(shí)荆虱,會(huì)從任務(wù)隊(duì)列取出任務(wù),當(dāng)任務(wù)隊(duì)列為空時(shí)朽们,會(huì)調(diào)用workQueue.take()怀读,使線程進(jìn)入阻塞狀態(tài),當(dāng)有任務(wù)時(shí)骑脱,會(huì)線性線程到 Runnable 狀態(tài)繼續(xù)復(fù)用線程菜枷。

1.9 線程池設(shè)計(jì)模式

雖然在 Java 語言中創(chuàng)建線程看上去就像創(chuàng)建一個(gè)對(duì)象一樣簡(jiǎn)單,只需要 new Thread() 就可以了叁丧,但實(shí)際上創(chuàng)建線程遠(yuǎn)不是創(chuàng)建一個(gè)對(duì)象那么簡(jiǎn)單啤誊。創(chuàng)建對(duì)象,僅僅是在 JVM 的堆里分配一塊內(nèi)存而已拥娄;而創(chuàng)建一個(gè)線程蚊锹,卻需要調(diào)用操作系統(tǒng)內(nèi)核的 API,然后操作系統(tǒng)要為線程分配一系列的資源稚瘾,這個(gè)成本就很高了牡昆,所以線程是一個(gè)重量級(jí)的對(duì)象,應(yīng)該避免頻繁創(chuàng)建和銷毀摊欠。

線程池和一般意義上的池化資源是不同的丢烘。一般意義上的池化資源如數(shù)據(jù)庫(kù)連接池柱宦,都是下面這樣,當(dāng)你需要資源的時(shí)候就調(diào)用 acquire() 方法來申請(qǐng)資源播瞳,用完之后就調(diào)用 release() 釋放資源捷沸。

class XXXPool{
    // 獲取池化資源
    XXX acquire() {......}
    // 釋放池化資源
    void release(XXX x){......}
}  

目前業(yè)界線程池的設(shè)計(jì),普遍采用的都是生產(chǎn)者 - 消費(fèi)者模式狐史。線程池的使用方是生產(chǎn)者痒给,線程池本身是消費(fèi)者。在下面的示例代碼中骏全,我們創(chuàng)建了一個(gè)非常簡(jiǎn)單的線程池 MyThreadPool苍柏,你可以通過它來理解線程池的工作原理。


//簡(jiǎn)化的線程池姜贡,僅用來說明工作原理
class MyThreadPool {
    //利用阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模式
    BlockingQueue<Runnable> workQueue;
    //保存內(nèi)部工作線程
    List<WorkerThread> threads = new ArrayList<>();

    // 構(gòu)造方法
    MyThreadPool(int poolSize, BlockingQueue<Runnable> workQueue) {
        this.workQueue = workQueue;
        // 創(chuàng)建工作線程
        for (int idx = 0; idx < poolSize; idx++) {
            WorkerThread work = new WorkerThread();
            work.start();
            threads.add(work);
        }
    }

    // 提交任務(wù)
    void execute(Runnable command) {
        workQueue.put(command);
    }

    // 工作線程負(fù)責(zé)消費(fèi)任務(wù)试吁,并執(zhí)行任務(wù)
    class WorkerThread extends Thread {
        public void run() {
            //循環(huán)取任務(wù)并執(zhí)行
            while (true) {
                Runnable task = workQueue.take();
                task.run();
            }
        }
    }
}


    /** 下面是使用示例 **/
    // 創(chuàng)建有界阻塞隊(duì)列
    BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);
    // 創(chuàng)建線程池
    MyThreadPool pool = new MyThreadPool(10, workQueue);
    // 提交任務(wù)
    pool.execute(()->{System.out.println("hello");});

1.10 自己動(dòng)手實(shí)現(xiàn)線程池

線程池沒你想的那么簡(jiǎn)單 上 - crossoverJie

線程池沒你想的那么簡(jiǎn)單 下 - crossoverJie

推薦閱讀

  1. Java并發(fā)編程之美 - 翟陸續(xù) 內(nèi)容和慕課網(wǎng)玩轉(zhuǎn)Java并發(fā)類似,可以配合閱讀楼咳,有豐富的源碼分析熄捍,實(shí)踐部分有10個(gè)小案例

  2. Java并發(fā)編程實(shí)戰(zhàn) - 極客時(shí)間 內(nèi)容有深度,并發(fā)設(shè)計(jì)模式母怜,分析了 4 個(gè)并發(fā)應(yīng)用案例 Guava RateLimiter余耽,Netty,Disrupter 和 HiKariCP苹熏,還介紹了 4 種其他類型的并發(fā)模型 Actor碟贾,協(xié)程,CSP等

  3. 精通Java并發(fā)編程 - 哈維爾 非常多的案例轨域,幾乎每個(gè)知識(shí)點(diǎn)和章節(jié)都有案例袱耽,學(xué)習(xí)后能更熟悉Java并發(fā)的應(yīng)用

  4. 傳智播客8天并發(fā) 筆記有并發(fā)案例,CPU原理等筆記干发,非常深入朱巨,后面畫時(shí)間學(xué)習(xí)一下精

參考文檔

  1. 玩轉(zhuǎn)Java并發(fā)工具,精通JUC - 慕課網(wǎng)

  2. Java并發(fā)在線思維導(dǎo)圖 - 慕課網(wǎng)

  3. Java多線程編程實(shí)戰(zhàn) - 汪文君

  4. Java并發(fā)編程實(shí)戰(zhàn) - 極客時(shí)間

  5. Java并發(fā)編程之美 - 翟陸續(xù)

  6. 實(shí)戰(zhàn)Java高并發(fā)程序設(shè)計(jì) - 葛一鳴

  7. 四天學(xué)懂 JUC - 周陽

  8. 小白科普:線程和線程池 - 碼農(nóng)翻身

  9. 利用常見場(chǎng)景詳解java線程池 - CarpenterLee

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末枉长,一起剝皮案震驚了整個(gè)濱河市冀续,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌搀暑,老刑警劉巖沥阳,帶你破解...
    沈念sama閱讀 222,104評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件跨琳,死亡現(xiàn)場(chǎng)離奇詭異自点,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)脉让,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門桂敛,熙熙樓的掌柜王于貴愁眉苦臉地迎上來功炮,“玉大人,你說我怎么就攤上這事术唬⌒椒” “怎么了?”我有些...
    開封第一講書人閱讀 168,697評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵粗仓,是天一觀的道長(zhǎng)嫁怀。 經(jīng)常有香客問我,道長(zhǎng)借浊,這世上最難降的妖魔是什么塘淑? 我笑而不...
    開封第一講書人閱讀 59,836評(píng)論 1 298
  • 正文 為了忘掉前任,我火速辦了婚禮蚂斤,結(jié)果婚禮上存捺,老公的妹妹穿的比我還像新娘。我一直安慰自己曙蒸,他們只是感情好捌治,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,851評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著纽窟,像睡著了一般肖油。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上臂港,一...
    開封第一講書人閱讀 52,441評(píng)論 1 310
  • 那天构韵,我揣著相機(jī)與錄音,去河邊找鬼趋艘。 笑死疲恢,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的瓷胧。 我是一名探鬼主播显拳,決...
    沈念sama閱讀 40,992評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼搓萧!你這毒婦竟也來了杂数?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,899評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤瘸洛,失蹤者是張志新(化名)和其女友劉穎揍移,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體反肋,經(jīng)...
    沈念sama閱讀 46,457評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡那伐,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,529評(píng)論 3 341
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片罕邀。...
    茶點(diǎn)故事閱讀 40,664評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡畅形,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出诉探,到底是詐尸還是另有隱情日熬,我是刑警寧澤,帶...
    沈念sama閱讀 36,346評(píng)論 5 350
  • 正文 年R本政府宣布肾胯,位于F島的核電站竖席,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏敬肚。R本人自食惡果不足惜怕敬,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,025評(píng)論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望帘皿。 院中可真熱鬧东跪,春花似錦、人聲如沸鹰溜。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,511評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽曹动。三九已至斋日,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間墓陈,已是汗流浹背恶守。 一陣腳步聲響...
    開封第一講書人閱讀 33,611評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留贡必,地道東北人兔港。 一個(gè)月前我還...
    沈念sama閱讀 49,081評(píng)論 3 377
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像仔拟,于是被迫代替她去往敵國(guó)和親衫樊。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,675評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容