ThreadPoolExecutor線(xiàn)程池源碼和典型問(wèn)題

思考

  • 你是否有此疑問(wèn):普通線(xiàn)程使用后即銷(xiāo)毀器紧,而對(duì)于線(xiàn)程池中核心線(xiàn)程將一直存在耀销,非核心線(xiàn)程會(huì)銷(xiāo)毀,它是如何做到的铲汪?看了這篇文章熊尉,相信你能夠了解其中緣由!

Executor框架

  • 主要有三個(gè)部分組成
    1. 任務(wù): 包括被執(zhí)行任務(wù)需要實(shí)現(xiàn)的接口:Runnable接口或Callable接口
    2. 任務(wù)的執(zhí)行:任務(wù)執(zhí)行機(jī)制的核心接口Executor,以及繼承自Executor的ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)
    3. 異步計(jì)算的結(jié)果: 包括接口Future和實(shí)現(xiàn)Future接口的FutureTask類(lèi)
  • 簡(jiǎn)單的連接上述
    1. Executor是一個(gè)接口,是Executor框架的基礎(chǔ),將任務(wù)提交與任務(wù)執(zhí)行分離開(kāi)來(lái)
    2. ThreadPoolExecutor是線(xiàn)程池的核心實(shí)現(xiàn)類(lèi),用來(lái)執(zhí)行被提交的任務(wù)
    3. ScheduledThreadPoolExecutor是一個(gè)實(shí)現(xiàn)類(lèi),在給定的延遲后運(yùn)行命令,或者定期執(zhí)行命令(比Timer更靈活,功能更加強(qiáng)大)
    4. Future接口和實(shí)現(xiàn)它的FutureTask類(lèi),代表異步計(jì)算的結(jié)果
    5. Runnable接口和Callable接口實(shí)現(xiàn)類(lèi)都可以被任務(wù)給執(zhí)行

線(xiàn)程隊(duì)列

  • 主要包括: ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue
ArrayBlockingQueue : 基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列(FIFO) 源碼附錄在文章最下
  • 一個(gè)有界緩存隊(duì)列,可以指定緩存隊(duì)列大小,當(dāng)其存儲(chǔ)滿(mǎn)后,加入隊(duì)列失敗,會(huì)開(kāi)啟新線(xiàn)程執(zhí)行,但是當(dāng)線(xiàn)程達(dá)到最大線(xiàn)程數(shù)時(shí),在加入就會(huì)拋出異常RejectExecutorExcation
PriorityBlockingQueue : 基于數(shù)組結(jié)構(gòu)的優(yōu)先級(jí)的無(wú)限阻塞隊(duì)列
  • 擁有優(yōu)先級(jí)的同ArrayBlockIngQueue在offer實(shí)現(xiàn)上可2倍的自動(dòng)擴(kuò)容
  • 不存在超容量情況掌腰,所以不會(huì)像ArrayBlockingQueue一樣拋出異常
  • 通過(guò)構(gòu)造函數(shù)傳入對(duì)象來(lái)判斷優(yōu)先級(jí),所以傳入對(duì)象必須實(shí)現(xiàn)comparable接口
  • 源碼分析在文章最下,在算法中運(yùn)用較多,比如查找鏈表中前K大值等
LinkedBlockingQueue : 基于鏈表結(jié)構(gòu)的阻塞隊(duì)列(FIFO)
  • 一個(gè)無(wú)界緩存等待隊(duì)列,當(dāng)線(xiàn)程數(shù)量達(dá)到核心線(xiàn)程數(shù)時(shí),剩余任務(wù)都會(huì)添加進(jìn)來(lái)等待,即最大線(xiàn)程數(shù)無(wú)效
SynchronousQueue : 不存儲(chǔ)元素的阻塞隊(duì)列(FIFO先進(jìn)先出公平,或LIFO非公平)
  • 沒(méi)有容量,是無(wú)緩存等待隊(duì)列,一個(gè)不存儲(chǔ)任務(wù)的隊(duì)列,會(huì)直接將任務(wù)交給消費(fèi)者,并且必須等隊(duì)列中添加元素被消費(fèi)后才能繼續(xù)添加新元素
  • 擁有公平(FIFO)和非公平策略,非公平會(huì)導(dǎo)致一些數(shù)據(jù)永遠(yuǎn)不會(huì)被消費(fèi)
  • 使用該隊(duì)列一般要求非線(xiàn)程數(shù)設(shè)置為無(wú)界,避免線(xiàn)程拒絕執(zhí)行操作

Executor的運(yùn)用

  1. Executors.callable(Runnable task):將Runnable對(duì)象封裝為一個(gè)Callable對(duì)象
  2. ExecutorService.execute(Runnable task):執(zhí)行一個(gè)沒(méi)有返回值得任務(wù)
  3. ExecutorService.submit(Runnable或Callable task ):返回一個(gè)實(shí)現(xiàn)Future接口的對(duì)象FutureTask,
    1. 主線(xiàn)程可以執(zhí)行FutureTask.get()方法等待任務(wù)執(zhí)行完成獲取接口
    2. 也可以調(diào)用FutureTask.cancel()來(lái)取消此任務(wù)的執(zhí)行
ThreadPoolExecutor源碼分析
  1. 首先創(chuàng)建線(xiàn)程池最終調(diào)取方法是:
        public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    //調(diào)取這個(gè)方法
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 
    
    • corePoolSize : 核心線(xiàn)程數(shù),創(chuàng)建線(xiàn)程池后默認(rèn)線(xiàn)程為0,當(dāng)有任務(wù)來(lái)時(shí)才會(huì)創(chuàng)建對(duì)應(yīng)的一個(gè)線(xiàn)程,直到達(dá)到corePoolSize大小,就會(huì)將在到達(dá)的任務(wù)放到緩存隊(duì)列中,除非調(diào)用了預(yù)創(chuàng)建線(xiàn)程才會(huì)在沒(méi)有任務(wù)到達(dá)之前就創(chuàng)建對(duì)應(yīng)的線(xiàn)程數(shù);
    • maximumPoolSize: 最大線(xiàn)程數(shù),線(xiàn)程池最多能創(chuàng)建的線(xiàn)程數(shù),當(dāng)隊(duì)列滿(mǎn)了以后再進(jìn)來(lái)的任務(wù)會(huì)在創(chuàng)建一個(gè)新的線(xiàn)程
    • keepAliveTime : 線(xiàn)程沒(méi)有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會(huì)終止.默認(rèn)只有線(xiàn)程數(shù)大于corePoolSize時(shí)才起作用,針對(duì)的是最大線(xiàn)程數(shù)時(shí)創(chuàng)建的線(xiàn)程.有方法讓線(xiàn)程數(shù)在核心線(xiàn)程之內(nèi)也起作用,直到線(xiàn)程數(shù)為0
    • unit : 保持時(shí)間的參數(shù)單位
    • workQueue: 一個(gè)阻塞隊(duì)列,用來(lái)存儲(chǔ)等待執(zhí)行的任務(wù),當(dāng)任務(wù)滿(mǎn)了就會(huì)調(diào)用maximumPoolSize創(chuàng)建新線(xiàn)程
    • threadFactory : 用于設(shè)置創(chuàng)建線(xiàn)程的工廠(chǎng),可以通過(guò)線(xiàn)程工廠(chǎng)給每個(gè)創(chuàng)建出來(lái)的線(xiàn)程命名或者設(shè)置優(yōu)先級(jí)等操作:通過(guò)implements ThreadFactory可以自定義線(xiàn)程工廠(chǎng)
    • handler : 便是當(dāng)拒絕處理任務(wù)時(shí)的策略(當(dāng)阻塞隊(duì)列滿(mǎn)了,且達(dá)到最大線(xiàn)程數(shù)后再來(lái)任務(wù)會(huì)調(diào)取),通過(guò)implements RejectedExecutionHandler可自定義自己的拒絕策略
      1. AbortPolicy : 默認(rèn)方式,直接拋出異常
      2. CallerRunsPolicy : 只用調(diào)用者所在線(xiàn)程來(lái)運(yùn)行任務(wù),異步任務(wù)變成了同步執(zhí)行了,比如主線(xiàn)程調(diào)用的execute方法則拒絕策略會(huì)將線(xiàn)程池拒絕的任務(wù)交給主線(xiàn)程執(zhí)行了
      3. DiscardOldestPolicy: 丟棄隊(duì)列中對(duì)頭的那個(gè)任務(wù)(最早添加進(jìn)來(lái)的)并執(zhí)行當(dāng)前任務(wù)
      4. DiscardPolicy: 不處理,丟棄掉
ThreadPoolExecutor : FixedThreadPool, SingleThreadExecutor, CachedThreadPool
FixedTHreadPool(默認(rèn)阻塞隊(duì)列無(wú)界==> OOM)
  1. 創(chuàng)建一個(gè)固定線(xiàn)程數(shù)量nThreads的線(xiàn)程,核心線(xiàn)程跟最大線(xiàn)程數(shù)量一致都為nThreads
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
  2. 使用無(wú)界隊(duì)列LinkedBlockingQueue: 使得最大線(xiàn)程數(shù),跟超時(shí)時(shí)間都是無(wú)意義的,根據(jù)線(xiàn)程池運(yùn)行步驟,隊(duì)列不可能被填滿(mǎn),所以未執(zhí)行shutdown()或shutdownNow()方法的運(yùn)行中的FixedThreadPool不會(huì)拒絕任務(wù)
  3. 使用與為了滿(mǎn)足資源管理的需求而限制當(dāng)前線(xiàn)程數(shù)量,用于負(fù)載比較重的服務(wù)器
  • 使用場(chǎng)景: 適用于處理CPU密集型任務(wù),確保CPU在長(zhǎng)期被工作線(xiàn)程使用下,盡可能少的分配線(xiàn)程,一半是N(CPU) + 1
SingleThreadExecutor(默認(rèn)阻塞隊(duì)列無(wú)界==> OOM)
  • 創(chuàng)建使用單個(gè)線(xiàn)程的SingleThreadExecutor
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
  • 同上FixedThreadPool使用無(wú)限隊(duì)列保存任務(wù),因此最大及核心線(xiàn)程數(shù)都是1
  • 使用與需要保證順序的執(zhí)行各個(gè)人物,并且在任意時(shí)間點(diǎn),不會(huì)有多個(gè)線(xiàn)程時(shí)活動(dòng)的
  • 使用場(chǎng)景: 適用于串行執(zhí)行任務(wù)的場(chǎng)景,每個(gè)任務(wù)必須按順序執(zhí)行,不需要并發(fā)執(zhí)行
CachedThreadPool(默認(rèn)核心線(xiàn)程0,最大線(xiàn)程i.MAX,synchronousQueue==>OOM)
  1. 創(chuàng)建一個(gè)大小無(wú)界的線(xiàn)程池,適用于執(zhí)行很多的短期異步人物的小程序,或者負(fù)載比較輕量的服務(wù)器
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
  2. 分析: corePoolSize 設(shè)置為0 ,核心線(xiàn)程數(shù)為空,而最大線(xiàn)程數(shù)設(shè)置為Inter.MAX_VALUE,即非核心線(xiàn)程為無(wú)限的,同時(shí)空閑線(xiàn)程等待新任務(wù)最長(zhǎng)為60s后被終止
  3. 同時(shí): 使用沒(méi)有容量的SynchronousQueue作為線(xiàn)程池的工作隊(duì)列,這意味著如果主線(xiàn)程提交任務(wù)速度高于線(xiàn)程池中線(xiàn)程處理任務(wù)的速度,CachedThreadPool將會(huì)不斷創(chuàng)建新線(xiàn)程,最終耗盡CPU和內(nèi)存資源
  4. 執(zhí)行步驟:
    1. 執(zhí)行execute方法,首先執(zhí)行SynchronousQueue的offer方法提交任務(wù),并查詢(xún)線(xiàn)程池中是否有空閑線(xiàn)程來(lái)執(zhí)行其中的poll方法來(lái)移除任務(wù),如果有,則配對(duì)成功,將任務(wù)交給這個(gè)空閑隊(duì)列
    2. 否則,配對(duì)失敗,將會(huì)創(chuàng)建一個(gè)新線(xiàn)程去處理任務(wù)
    3. 當(dāng)線(xiàn)程池中線(xiàn)程空閑時(shí),會(huì)執(zhí)行synchronousQueue的poll方法等到其提交新任務(wù),如果超過(guò)60s依然沒(méi)有提交,則這個(gè)線(xiàn)程就會(huì)終止
    4. 由于非核心線(xiàn)程無(wú)界,所以一旦提交任務(wù)速度 > 線(xiàn)程池處理速度就會(huì)不斷的創(chuàng)建新線(xiàn)程
    5. 因此:使用與每次提交任務(wù)都會(huì)有線(xiàn)程立刻進(jìn)行處理的,大量,耗時(shí)少的任務(wù),長(zhǎng)時(shí)間保持空閑的CachedThreadPool將不會(huì)使用任何資源
    6. 其實(shí)就是主線(xiàn)程調(diào)用 offer方法跟沒(méi)有容量的阻塞隊(duì)列的poll方法是否適配,如果適配就使用該空閑線(xiàn)程,如果不適配就從新創(chuàng)建線(xiàn)程執(zhí)行任務(wù)
  • 使用場(chǎng)景: 用于并發(fā)執(zhí)行大量短期的小任務(wù)
ThreadPoolExecutor
  • 重點(diǎn):重中之重: 線(xiàn)程池工作的過(guò)程:
        public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        //如果當(dāng)前線(xiàn)程數(shù)小于核心線(xiàn)程數(shù),則創(chuàng)建線(xiàn)程并執(zhí)行當(dāng)前任務(wù)
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果正在運(yùn)行的線(xiàn)程數(shù)量大于或等于 corePoolSize狰住,那么將這個(gè)任務(wù)放入隊(duì)列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果線(xiàn)程池內(nèi)不處于運(yùn)行或者任務(wù)無(wú)法放入隊(duì)列,并且當(dāng)前線(xiàn)程數(shù)小于最大允許的線(xiàn)程數(shù)量齿梁,則創(chuàng)建一個(gè)線(xiàn)程執(zhí)行任務(wù),非核心線(xiàn)程
        else if (!addWorker(command, false))
            reject(command);  //拋出RejectedExecutionException異常,對(duì)于有界隊(duì)列而言才會(huì)有這個(gè)
    }
    
    1. 如果正在運(yùn)行的線(xiàn)程數(shù)量小于corePoolSize:核心線(xiàn)程數(shù),則馬上創(chuàng)建線(xiàn)程運(yùn)行這個(gè)任務(wù)
    2. 如果正在運(yùn)行的線(xiàn)程數(shù)量大于或等于核心線(xiàn)程數(shù),馬上將這個(gè)任務(wù)放入隊(duì)列,注意每種線(xiàn)程隊(duì)列實(shí)現(xiàn)的offer方法不同哦!
    3. 如果隊(duì)列滿(mǎn)了,而且正在運(yùn)行的線(xiàn)程數(shù)量小于最大線(xiàn)程數(shù)量,則會(huì)創(chuàng)建非核心線(xiàn)程立刻運(yùn)行這個(gè)任務(wù)
    4. 如果隊(duì)列滿(mǎn)了,正在運(yùn)行的線(xiàn)程數(shù)量大于或等于允許的最大線(xiàn)程數(shù)量,那么線(xiàn)程池就會(huì)拋出RejectExecutorException
  • 切記:
    1. 新建完線(xiàn)程則當(dāng)這個(gè)線(xiàn)程完成任務(wù)時(shí),他會(huì)從隊(duì)列中取下一個(gè)任務(wù)來(lái)執(zhí)行;
    2. 當(dāng)一個(gè)線(xiàn)程無(wú)事可做,即為隊(duì)列為null,超過(guò)一定時(shí)間keepAliveTime自定義超時(shí)回收時(shí)間,當(dāng)設(shè)置為0配合的是無(wú)界隊(duì)列使用,線(xiàn)程池會(huì)判斷,如果當(dāng)前運(yùn)行的線(xiàn)程數(shù)大于核心線(xiàn)程數(shù),那么這個(gè)線(xiàn)程就會(huì)被停掉回收,所以所有任務(wù)完成后,線(xiàn)程池會(huì)收縮到corePoolSize的大小
  • 通過(guò)以上分析可以看到他們?cè)创a中都調(diào)用了ThreadPoolExecutor來(lái)創(chuàng)建一個(gè)線(xiàn)程池
     public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    //實(shí)際調(diào)用對(duì)象
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
    
  • 分析源碼中的參數(shù)
    1. corePoolSize: 核心線(xiàn)程池大小
    2. maximumPoolSize: 最大線(xiàn)程池的大小
    3. BlockingQueue: 用來(lái)暫時(shí)保存任務(wù)的工作隊(duì)列
    4. RejectedExecutionHandler:當(dāng)ThreadPoolExecutor已經(jīng)關(guān)閉或ThreadPoolExecutor已經(jīng)飽和 時(shí)(達(dá)到了最大線(xiàn)程池大小且工作隊(duì)列已滿(mǎn))转晰,execute()方法將要調(diào)用的Handler

ScheduledThreadPoolExecutor

  • 繼承自ThreadPoolExecutor:主要用來(lái)在給定的延遲之后運(yùn)行任務(wù)或者定期執(zhí)行任務(wù)(功能與Timer類(lèi)似)
  • 線(xiàn)程池的特點(diǎn):
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }
    
    private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
    
    • 最大線(xiàn)程數(shù)為Integer.MAX_VALUE
    • 阻塞隊(duì)列為DelayedWorkQueue
      ScheduledThreadPoolExecutor添加任務(wù)的另外兩個(gè)方法:
    1. scheduleAtFixedRate: 按某種速率周期執(zhí)行
    2. scheduleWithFixedDelay: 在某個(gè)延遲后執(zhí)行
    • 兩種方法的內(nèi)部實(shí)現(xiàn)都是創(chuàng)建了一個(gè)ScheduledFutureTask對(duì)象封裝了任務(wù)的延遲執(zhí)行時(shí)間及執(zhí)行周期,并調(diào)用decorateTask()方法轉(zhuǎn)成RunnableScheduledFuture對(duì)象士飒,然后添加到延遲隊(duì)列中。
  1. DelayQueue: 封裝了一個(gè)優(yōu)先級(jí)隊(duì)列,會(huì)對(duì)隊(duì)列中的ScheduledFutureTask進(jìn)行排序,兩個(gè)任務(wù)執(zhí)行time不同時(shí),time小的先執(zhí)行,否則比較添加到隊(duì)列中的ScheduledFutureTask的順序號(hào)sequenceNumber,先提交的先執(zhí)行
  2. 工作機(jī)制為:
    1. 調(diào)用上面兩個(gè)方法添加一個(gè)任務(wù)
    2. 線(xiàn)程池中的線(xiàn)程從DelayQueue中取任務(wù)
    3. 然后執(zhí)行任務(wù)
  3. 執(zhí)行步驟為:
    1. 線(xiàn)程從DelayQueue中獲取time大于等于當(dāng)前時(shí)間的ScheduledFutureTask , DelayQueue.take()
    2. 執(zhí)行完后修改這個(gè)task的time為下次被執(zhí)行時(shí)間
    3. 然后在把這個(gè)task放回隊(duì)列中DelayQueue.add()
  4. 使用場(chǎng)景: 用于需要多個(gè)后臺(tái)線(xiàn)程執(zhí)行周期任務(wù),同時(shí)需要限制線(xiàn)程數(shù)量的場(chǎng)景

作業(yè)

  1. 自定義創(chuàng)建一個(gè)能夠根據(jù)加入順序得到最后一個(gè)數(shù)據(jù)的線(xiàn)程池,前面的沒(méi)有完成就拋棄
  2. 自定義設(shè)置一個(gè)先達(dá)到最大隊(duì)列工作,后緩存隊(duì)列的線(xiàn)程池
  • 1的解題思路
    1. 由于是順序加入,如果前面有等待任務(wù),則新加入的任務(wù)將會(huì)替換掉之前的任務(wù),我們只要最后一個(gè)任務(wù),則使用線(xiàn)程數(shù)為1,且等待隊(duì)列也為1,同時(shí)拒絕策略為DiscardOldestPolicy()即可
    2. 當(dāng)?shù)谝淮翁砑尤蝿?wù)執(zhí)行,第二次添加加入隊(duì)列,如果此時(shí)任務(wù)3來(lái)到將會(huì)替換2,直到下一次任務(wù)到來(lái),則最終執(zhí)行的是最后一次添加的任務(wù)
  • 2的解題思路:
    1. 默認(rèn)當(dāng)工作隊(duì)列滿(mǎn)了無(wú)法入隊(duì)才會(huì)擴(kuò)容線(xiàn)程池,我們可以重寫(xiě)隊(duì)列的offer方法,造成隊(duì)列已滿(mǎn)假象
    2. 在擴(kuò)容達(dá)到最大線(xiàn)程以后會(huì)觸發(fā)拒絕策略,此時(shí)我們可以將任務(wù)真正的插入到緩存隊(duì)列中
    • 解答:使用LinkedTransferQueue.tryTransfer() :如果存在一個(gè)消費(fèi)者已經(jīng)等待接收它蔗崎,則立即傳送指定的元素酵幕,否則返回false,并且不進(jìn)入隊(duì)列缓苛。
    /**
     * 設(shè)置一個(gè)由鏈表結(jié)構(gòu)組成的無(wú)界阻塞TransferQueue隊(duì)列芳撒。相對(duì)于其他阻塞隊(duì)列邓深,LinkedTransferQueue多了tryTransfer和transfer方法。
     * LinkedTransferQueue采用一種預(yù)占模式笔刹。
     * 意思就是消費(fèi)者線(xiàn)程取元素時(shí)芥备,如果隊(duì)列不為空,則直接取走數(shù)據(jù)舌菜,若隊(duì)列為空萌壳,那就生成一個(gè)節(jié)點(diǎn)(節(jié)點(diǎn)元素為null)入隊(duì),
     * 然后消費(fèi)者線(xiàn)程被等待在這個(gè)節(jié)點(diǎn)上日月,后面生產(chǎn)者線(xiàn)程入隊(duì)時(shí)發(fā)現(xiàn)有一個(gè)元素為null的節(jié)點(diǎn)袱瓮,生產(chǎn)者線(xiàn)程就不入隊(duì)了,直接就將元素填充到該節(jié)點(diǎn)爱咬,
     * 并喚醒該節(jié)點(diǎn)等待的線(xiàn)程尺借,被喚醒的消費(fèi)者線(xiàn)程取走元素,從調(diào)用的方法返回精拟。我們稱(chēng)這種節(jié)點(diǎn)操作為“匹配”方式燎斩。
     */
    public class MyLinkedTransferQueue extends LinkedTransferQueue<Runnable> {
    
        @Override
        public boolean offer(Runnable e) {
            return tryTransfer(e); // 如果存在一個(gè)消費(fèi)者已經(jīng)等待接收它,則立即傳送指定的元素蜂绎,否則返回false栅表,并且不進(jìn)入隊(duì)列。
        }
    }
    
    /**
     * 使用靜態(tài)內(nèi)部類(lèi)創(chuàng)建一個(gè)單例的線(xiàn)程池
     */
    public class ExecutorFactory {
    
        private static class Holder{
            private static ThreadPoolExecutor executors = new ThreadPoolExecutor(1, 4 , 60 , TimeUnit.SECONDS ,
                    new MyLinkedTransferQueue() ,
                    new ThreadNameFactory(),
                    new CustomizeRejectHandler());
        }
    
    
        public static ThreadPoolExecutor getInstance(){
            return Holder.executors ;
        }
    }
    
    
    
    /**
     * 自定義拒絕策略,用于當(dāng)前隊(duì)列任務(wù)滿(mǎn)以后的操作
     */
    public class CustomizeRejectHandler implements RejectedExecutionHandler {
    
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //重寫(xiě)一個(gè)DisCardOldestPolicy
    //        if (!executor.isShutdown()){ //當(dāng)線(xiàn)程池沒(méi)有終止時(shí),彈出隊(duì)列中最早添加一個(gè)任務(wù)并執(zhí)行當(dāng)前任務(wù)
    //            executor.getQueue().poll();
    //            executor.execute(r);
    //        }
    
            try {
                executor.getQueue().put(r); //當(dāng)拒絕是添加到阻塞隊(duì)列中
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    
  1. 單機(jī)上一個(gè)線(xiàn)程正在處理任務(wù),如果突然斷電了怎么辦(正在處理和阻塞隊(duì)列里的請(qǐng)求怎么處理)
    答: 可以對(duì)正在處理和阻塞隊(duì)列的任務(wù)做事務(wù)管理或?qū)ψ枞?duì)列中的任務(wù)持久化處理,并且當(dāng)斷電或系統(tǒng)崩潰,操作無(wú)法進(jìn)行時(shí),可以通過(guò)回溯日志的方式來(lái)撤銷(xiāo)正在處理的已經(jīng)執(zhí)行成功的操作,然后重新執(zhí)行整個(gè)阻塞隊(duì)列
  2. 為什么不建議在代碼中直接使用Executors創(chuàng)建線(xiàn)程池,而是推薦通過(guò)ThreadPoolExecutor方式創(chuàng)建?
  3. 答:不適用是可以明確的讓我們知道線(xiàn)程池的運(yùn)行規(guī)則,避免使用工具類(lèi)的包裝而不夠直觀內(nèi)部機(jī)制導(dǎo)致潛在難以發(fā)現(xiàn)的問(wèn)題,比如,使用newSingleThreadPool和FixedThreadPool創(chuàng)建線(xiàn)程池由于默認(rèn)最大線(xiàn)程為Max而導(dǎo)致如果處理時(shí)間過(guò)長(zhǎng),任務(wù)過(guò)多而導(dǎo)致的OOM就很難發(fā)現(xiàn)問(wèn)題

多線(xiàn)程中的問(wèn)題

CountDownLatch與CyclicBarrier區(qū)別
  • 這兩個(gè)類(lèi)都可以實(shí)現(xiàn)一組線(xiàn)程在到達(dá)某個(gè)條件之前進(jìn)行等待,內(nèi)部都有一個(gè)計(jì)數(shù)器,當(dāng)計(jì)數(shù)器的值不斷減為0時(shí)所有阻塞的線(xiàn)程都將會(huì)被喚醒
  • CountDownlatch計(jì)數(shù)器由使用者控制,線(xiàn)程調(diào)用await()只是將自己阻塞而不會(huì)減少計(jì)數(shù)器的值,只有當(dāng)調(diào)用countDown()方法才會(huì)減一,直到為0時(shí),喚醒a(bǔ)wait(),并且只能攔截一輪
  • CyclicBarrier : cyclic 循環(huán) , Barrier:柵欄 ,顧名思義表示可以實(shí)現(xiàn)循環(huán)攔截,其中的計(jì)數(shù)器由自己控制,在CyclicBarrier中線(xiàn)程調(diào)用await()不僅會(huì)將自己阻塞還會(huì)降計(jì)數(shù)器減1.直到為0時(shí)喚醒所有的阻塞隊(duì)列,而后會(huì)重置新一輪的攔截
線(xiàn)程池的選擇使用
  • 高并發(fā),任務(wù)執(zhí)行時(shí)間短的業(yè)務(wù)怎樣使用線(xiàn)程池?并發(fā)不高,任務(wù)執(zhí)行長(zhǎng)的業(yè)務(wù)怎么使用線(xiàn)程池?并發(fā)高,業(yè)務(wù)執(zhí)行時(shí)間長(zhǎng)的業(yè)務(wù)怎么使用線(xiàn)程池?
    1. 高并發(fā),任務(wù)執(zhí)行時(shí)間短,線(xiàn)程池線(xiàn)程數(shù)量可以設(shè)置成CPU+1,目的是減少線(xiàn)程上下文切換
    2. 并發(fā)不高,任務(wù)執(zhí)行時(shí)間長(zhǎng)的業(yè)務(wù)區(qū)別看待:
      1. 假設(shè)業(yè)務(wù)長(zhǎng)時(shí)間幾種在IO操作,就是IO密集型任務(wù),由于IO操作并不占用CPU資源,這個(gè)時(shí)候就盡可能讓CPU運(yùn)轉(zhuǎn),可以加大線(xiàn)程池中的線(xiàn)程數(shù)量,讓CPU不至于停下來(lái)等待IO操作,從而處理更多的業(yè)務(wù)
      2. 假設(shè)任務(wù)長(zhǎng)時(shí)間集中在計(jì)算操作上,就是計(jì)算密集型任務(wù),同一一致,線(xiàn)程池的線(xiàn)程數(shù)量設(shè)置少一些,減少線(xiàn)程上下文的切換
    3. 并發(fā)高,業(yè)務(wù)執(zhí)行時(shí)間長(zhǎng),解決這種類(lèi)型任務(wù)的關(guān)鍵不在于線(xiàn)程池而在于整體架構(gòu)的設(shè)計(jì)荡碾,看看這些業(yè)務(wù)里面某些數(shù)據(jù)是否能做緩存是第一步谨读,增加服務(wù)器是第二步,至于線(xiàn)程池的設(shè)置坛吁,設(shè)置參考其他有關(guān)線(xiàn)程池的文章劳殖。最后,業(yè)務(wù)執(zhí)行時(shí)間長(zhǎng)的問(wèn)題拨脉,也可能需要分析一下哆姻,看看能不能使用中間件對(duì)任務(wù)進(jìn)行拆分和解耦。
jstack追蹤異常代碼
  • Windows 下使用 jps,jstack ,及procexp.exe工具
  1. JPS常用命令整理
    • JPS是1.5提供的一個(gè)顯示當(dāng)前所有Java進(jìn)程pid命令
    1. jps : 列出pid和Java主類(lèi)名
    2. jps -l : 列出pid和Java主類(lèi)全名
    3. jps -lm : 列出pid,主類(lèi)全程和應(yīng)用程序參數(shù)
    4. jps -v : 列出pid和JVM參數(shù)
  2. Jstack常用命令整理
    • jstack是JVM自帶的一種堆棧追蹤工具
    • jstack pid(通過(guò)jps獲得的pid) 打印線(xiàn)程堆棧
線(xiàn)程中斷機(jī)制
  • java的線(xiàn)程中斷機(jī)制是一種協(xié)作機(jī)制,線(xiàn)程會(huì)不時(shí)的檢測(cè)中斷標(biāo)識(shí)位,以判斷是否應(yīng)該被中斷(值是否為true), 主要有三個(gè)方法
    1. interrupt() : 每個(gè)線(xiàn)程都有個(gè)boolean類(lèi)型的中斷標(biāo)志,當(dāng)使用該方法時(shí)會(huì)被標(biāo)記為true
    2. isInterrupted() : 判斷線(xiàn)程是否被中斷
    3. interrupted() : 清除中斷標(biāo)志,并返回原狀態(tài)
  • 當(dāng)使用中斷時(shí),被中斷的線(xiàn)程并不會(huì)立刻停止做事,而是在合適的時(shí)機(jī)終止
    • 機(jī)制一: 如果該線(xiàn)程處在可中斷狀態(tài)下sleep,join,wait等,則該線(xiàn)程會(huì)被立刻喚醒,同時(shí)收到一個(gè)interruptedException,,如果是IO則資源會(huì)被關(guān)閉
    • 機(jī)制二:如果該線(xiàn)程處于不可中斷狀態(tài)下,即沒(méi)有調(diào)用上方機(jī)制一的api,處于運(yùn)行時(shí)的進(jìn)程,則只是設(shè)置一下中斷標(biāo)志位,其他事情都不會(huì)發(fā)生,如果此后線(xiàn)程調(diào)用阻塞api,則會(huì)馬上跳出,并拋出InterruptedExecption異常,接下來(lái)事情就跟一一致了,如果不調(diào)用阻塞api,則線(xiàn)程會(huì)一致運(yùn)行下去.則在運(yùn)行的代碼中可以輪詢(xún)中斷標(biāo)志位,看它是否被請(qǐng)求停止正在做的事情,可以通過(guò)isInterrupted()來(lái)讀取,同時(shí)也可以通過(guò)一個(gè)interrupted()來(lái)讀取和清楚標(biāo)記位
public class InterruptedTest extends Thread{

    public static void main(String[] args) {
        InterruptedTest interruptedTest = new InterruptedTest();
        interruptedTest.start();
    }


    @Override
    public void run() {
        super.run();
        MyThread myThread = new MyThread();

        myThread.start();

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //開(kāi)始中斷
        myThread.interrupt();


    }


    class MyThread extends Thread{

        @Override
        public void run() {
            super.run();
            //判斷是否標(biāo)記為中斷:注意被異常捕獲以后中斷標(biāo)記位會(huì)被重置為false,需要再次拋出方可中斷
            while (!Thread.currentThread().isInterrupted()){

                System.out.println("我是正在運(yùn)行...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) { //sleep是會(huì)有中斷標(biāo)記的因此會(huì)拋出interruptedException: 中斷標(biāo)志位會(huì)被清除,如果想讓上方while退出,必須再次手動(dòng)設(shè)置標(biāo)志位
                    e.printStackTrace();
                    System.out.println("我收到了異常中斷標(biāo)記" + Thread.currentThread().isInterrupted()); // false:已經(jīng)被異常重置了
                    interrupt(); //再次拋出
                    System.out.println("我收到了再次拋出中斷標(biāo)記" + Thread.currentThread().isInterrupted()); //true,可以正常中斷
                }
            }
        }
    }
}

線(xiàn)程池線(xiàn)程復(fù)用原理
  • 我們自己創(chuàng)建的線(xiàn)程都知道,他只能start()執(zhí)行一次,一旦執(zhí)行完畢或被中斷即走terminated終止?fàn)顟B(tài)結(jié)束線(xiàn)程了,那為何身處線(xiàn)程池中的線(xiàn)程卻可以復(fù)用一致執(zhí)行呢?你難道沒(méi)有這樣的疑問(wèn)嗎?
  • 通過(guò)上面的分析,我們知道線(xiàn)程池的運(yùn)行流程圖!


    image
  • 一旦一個(gè)任務(wù)提交我們就會(huì)判斷其后流程:
    1. 直接申請(qǐng)線(xiàn)程執(zhí)行任務(wù)
    2. 加入到緩存隊(duì)列中等待線(xiàn)程執(zhí)行
    3. 執(zhí)行拒絕策略
  • 帶著上方的疑問(wèn),我們根據(jù)源碼詳細(xì)分析線(xiàn)程池運(yùn)行機(jī)制: 如何維護(hù)自身狀態(tài),如何管理任務(wù),如何管理線(xiàn)程!
ThreadPoolExecutor
  • 首先看他的繼承關(guān)系圖


    image
  1. 頂層接口Executor提供了一種思想:將任務(wù)提交和任務(wù)執(zhí)行進(jìn)行解耦玫膀。用戶(hù)無(wú)需關(guān)注如何創(chuàng)建線(xiàn)程矛缨,如何調(diào)度線(xiàn)程來(lái)執(zhí)行任務(wù),用戶(hù)只需提供Runnable對(duì)象帖旨,將任務(wù)的運(yùn)行邏輯提交到執(zhí)行器(Executor)中箕昭,由Executor框架完成線(xiàn)程的調(diào)配和任務(wù)的執(zhí)行部分。
  2. ExecutorService接口增加了一些能力:(1)擴(kuò)充執(zhí)行任務(wù)的能力解阅,補(bǔ)充可以為一個(gè)或一批異步任務(wù)生成Future的方法落竹;(2)提供了管控線(xiàn)程池的方法,比如停止線(xiàn)程池的運(yùn)行
  3. AbstractExecutorService則是上層的抽象類(lèi)货抄,將執(zhí)行任務(wù)的流程串聯(lián)了起來(lái)述召,保證下層的實(shí)現(xiàn)只需關(guān)注一個(gè)執(zhí)行任務(wù)的方法即可
  4. ThreadPoolExecutor實(shí)現(xiàn)最復(fù)雜的運(yùn)行部分朱转,ThreadPoolExecutor將會(huì)一方面維護(hù)自身的生命周期,另一方面同時(shí)管理線(xiàn)程和任務(wù)积暖,使兩者良好的結(jié)合從而執(zhí)行并行任務(wù)
線(xiàn)程池的生命周期管理
  • 線(xiàn)程池運(yùn)行的狀態(tài),是伴隨著線(xiàn)程池的運(yùn)行在內(nèi)部維護(hù)的,由一個(gè)變量維護(hù)兩個(gè)狀態(tài)值(很常見(jiàn)): 運(yùn)行狀態(tài)(runState)和線(xiàn)程數(shù)量(workCount)
//使用高3位保存線(xiàn)程池的運(yùn)行狀態(tài)runState(總共5種3位足夠了),低29位保存workerCount有效線(xiàn)程數(shù)量:使用同一位的原子類(lèi)不用對(duì)兩個(gè)變量操作時(shí)需要加鎖操作了,直接使用一個(gè)原子類(lèi)即可
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //五種線(xiàn)程池狀態(tài)
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; } //計(jì)算當(dāng)前運(yùn)行狀態(tài)
    private static int workerCountOf(int c)  { return c & CAPACITY; } //計(jì)算當(dāng)前線(xiàn)程數(shù)量
    private static int ctlOf(int rs, int wc) { return rs | wc; } //通過(guò)狀態(tài)和線(xiàn)程數(shù)生成一個(gè)ctl合成數(shù)量是一個(gè)原子類(lèi)
  • 注意:線(xiàn)程池運(yùn)行狀態(tài)同線(xiàn)程運(yùn)行狀態(tài)不同的哦
    • Running: 能接受新提交的任務(wù),并且能夠處理阻塞隊(duì)列中的任務(wù)
    • Shutdown : 關(guān)閉狀態(tài),不能接受新提交的任務(wù),但卻可以繼續(xù)處理阻塞隊(duì)列中的任務(wù)
    • stop : 不能接受新任務(wù),也不處理緩存隊(duì)列中的任務(wù),會(huì)中斷正在處理任務(wù)的線(xiàn)程
    • Tidying: 所有任務(wù)都終止了,workerCount有效線(xiàn)程數(shù)為0
    • Terminated: 在terminated()方法執(zhí)行完后進(jìn)入該狀態(tài),終止?fàn)顟B(tài)


      image
任務(wù)調(diào)度
  • 當(dāng)用戶(hù)提交一個(gè)任務(wù)會(huì)通過(guò)Executor.execute()方法執(zhí)行,他的步驟上方已經(jīng)總結(jié)過(guò)了,這里在重復(fù)說(shuō)一下:
    1. 首先檢查線(xiàn)程池運(yùn)行狀態(tài),如果不是Running,直接拒絕,線(xiàn)程池要保證在Running狀態(tài)下執(zhí)行任務(wù)
    2. 如果workerCount(下面使用wc) < corePoolSize,則創(chuàng)建新線(xiàn)程執(zhí)行提交任務(wù)
    3. 如果wc >= corePoolSize, 核心線(xiàn)程池滿(mǎn)了,阻塞隊(duì)列未滿(mǎn),則添加到隊(duì)列中
    4. 如果wc >= corePoolSize && wc < maximumPoolSize ,且緩存隊(duì)列滿(mǎn)了,則直接創(chuàng)建新線(xiàn)程執(zhí)行提交任務(wù)
    5. 如果wc > maximumPoolSize,并且線(xiàn)程隊(duì)列滿(mǎn)了,則根據(jù)拒絕策略具體執(zhí)行該任務(wù),默認(rèn)是拋異常
  private final BlockingQueue<Runnable> workQueue; //線(xiàn)程阻塞隊(duì)列
  public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { //計(jì)算當(dāng)前運(yùn)行線(xiàn)程數(shù)小于核心線(xiàn)程數(shù),對(duì)應(yīng)上方2的情況
            if (addWorker(command, true)) //創(chuàng)建新的線(xiàn)程執(zhí)行任務(wù)
                return;
            c = ctl.get();
        }
        //執(zhí)行這里說(shuō)明過(guò)了2那種情況,就看3的情況,阻塞隊(duì)列可以正常添加任務(wù)
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get(); //添加成功了
            if (! isRunning(recheck) && remove(command)) //如果添加以后不再是運(yùn)行狀態(tài),則移除剛才加入隊(duì)列的任務(wù)
                reject(command);  //執(zhí)行拒絕策略
            else if (workerCountOf(recheck) == 0) //線(xiàn)程池運(yùn)行狀態(tài)且wc運(yùn)行線(xiàn)程數(shù)為0時(shí)
                addWorker(null, false); //創(chuàng)建新線(xiàn)程執(zhí)行阻塞隊(duì)列中的任務(wù)
        }
        else if (!addWorker(command, false)) //如果隊(duì)列滿(mǎn)了,可以正常走addWorker創(chuàng)建非核心線(xiàn)程即上方4, 如果false則是5
            reject(command); //走拒絕策略
    }
  • 增加新線(xiàn)程執(zhí)行任務(wù)addWorker()
 private final HashSet<Worker> workers = new HashSet<>(); //線(xiàn)程池中的所有工作線(xiàn)程
private boolean addWorker(Runnable firstTask, boolean core) {
//根據(jù)當(dāng)前狀態(tài),判斷是否添加成功,上方執(zhí)行方法中的addWorker兩個(gè)參數(shù)firstTask = null ,core = true /false 具體分析
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c); //獲取運(yùn)行狀態(tài)

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && //狀態(tài) > shutDown 表示此時(shí)已經(jīng)不再接受任務(wù)
            //shutdown狀態(tài)不接受新任務(wù),但可以執(zhí)行已經(jīng)加入隊(duì)列中的任務(wù),所以當(dāng)進(jìn)入shutdown狀態(tài),且傳進(jìn)來(lái)的任務(wù)為null時(shí),并且任務(wù)隊(duì)列不為null時(shí),是允許添加新線(xiàn)程的,把這個(gè)條件取反就是不允許
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty())) 
                return false;

            for (;;) { //使用CAS操作避免加鎖
                int wc = workerCountOf(c); //獲取工作線(xiàn)程
                if (wc >= CAPACITY || 
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false; //大于線(xiàn)程最大容量2的29次方量(所以newCacheExecutor并不能得到Integer.MAX_Value的),或者大于最大允許線(xiàn)程量則不能添加啦
                if (compareAndIncrementWorkerCount(c)) //可添加就CAS操作線(xiàn)程數(shù)+1,成功說(shuō)明可添加
                    break retry; //break跳出retry對(duì)應(yīng)的循環(huán),執(zhí)行循環(huán)后面的添加worker邏輯
                c = ctl.get();  // Re-read ctl 重新讀取狀態(tài)
                if (runStateOf(c) != rs) 
                    continue retry; //狀態(tài)改變了,跳到外層循環(huán)繼續(xù)重新執(zhí)行循環(huán)
                // else CAS failed due to workerCount change; retry inner loop
                //在內(nèi)存層循環(huán)中不停的嘗試CAS操作增加線(xiàn)程數(shù)
            }
        }
        //找了上方break retry可以正常使用CAS新增線(xiàn)程數(shù)
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask); //通過(guò)Worker包裝runnable任務(wù),稍后我們分析
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock(); //加鎖
                try {
                
                    int rs = runStateOf(ctl.get());
                    //如果線(xiàn)程池狀態(tài)rs < Shutdown即只能是Running
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) { //或者shutDown狀態(tài)但是沒(méi)有新任務(wù)
                        if (t.isAlive()) // 線(xiàn)程已經(jīng)啟動(dòng)藤为,并且當(dāng)前沒(méi)有任何異常的話(huà),則是true夺刑,否則為false
                            throw new IllegalThreadStateException(); //我還沒(méi)有啟動(dòng)呢
                        workers.add(w); //正常添加到線(xiàn)程池中workers工作線(xiàn)程
                        int s = workers.size();
                        if (s > largestPoolSize) //largestPoolSize:記錄著線(xiàn)程池中出現(xiàn)過(guò)最大線(xiàn)程數(shù)量
                            largestPoolSize = s;
                        workerAdded = true; //可以正常工作的標(biāo)記
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) { //如果正常工作,則開(kāi)啟線(xiàn)程任務(wù)
                    t.start();
                    workerStarted = true; //開(kāi)始工作標(biāo)記
                }
            }
        } finally {
            if (! workerStarted) //該任務(wù)沒(méi)有開(kāi)始,則添加到失敗
                addWorkerFailed(w); 
        }
        return workerStarted;
    }
  • 通過(guò)上面分析,新增線(xiàn)程去完成任務(wù)主要通過(guò)worker類(lèi)去完成的
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable //實(shí)現(xiàn)了Runnable接口,因此t.start()執(zhí)行的就是worker的run方法啊
         {
     
        final Thread thread;
       
        Runnable firstTask;
        
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);  //創(chuàng)建thread(this:Worker) ,則t.start()調(diào)用worker的run,同時(shí)原來(lái)的Runnable被封裝為Worker的屬性firstTask
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
        
    //getThreadFactory即為T(mén)hreadPoolExecutor創(chuàng)建thread工廠(chǎng)(實(shí)現(xiàn)ThreadFactory)可修改Thread名稱(chēng),優(yōu)先級(jí)等操作實(shí)現(xiàn)的
    public ThreadFactory getThreadFactory() {
        return threadFactory;
    }
  • 所以上方addWorker添加工作線(xiàn)程的t.start()方法調(diào)用的就是runWorker
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask; //這個(gè)就是我們執(zhí)行線(xiàn)程池executor.execute()方法時(shí)候的runnable
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        //如果task不為null,并且從workQueue中獲取任務(wù)不為null,則會(huì)一直執(zhí)行下去
            while (task != null || (task = getTask()) != null) { //task是需要執(zhí)行的任務(wù),不一定是剛剛添加的那個(gè)了,這樣其實(shí)worker線(xiàn)程并沒(méi)有完成工作,自然也就不會(huì)銷(xiāo)毀了
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) || //檢查線(xiàn)程狀態(tài),若線(xiàn)程池處于中斷狀態(tài),調(diào)用interrupt將線(xiàn)程中斷
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt(); //中斷線(xiàn)程
                try {
                    beforeExecute(wt, task); //可以在任務(wù)真正執(zhí)行之前做點(diǎn)啥,空實(shí)現(xiàn)
                    Throwable thrown = null;
                    try {
                        task.run(); //執(zhí)行execute()方法中的run方法,在t.start()線(xiàn)程內(nèi),這只是一個(gè)方法執(zhí)行哈!
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown); //線(xiàn)程之后可以做啥,空實(shí)現(xiàn)
                    }
                } finally {
                    task = null;
                    w.completedTasks++; //該線(xiàn)程執(zhí)行完成任務(wù)+1
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
  • 重點(diǎn)邏輯是while循環(huán),當(dāng)我們第一次創(chuàng)建worker并執(zhí)行任務(wù)后,并沒(méi)有結(jié)束線(xiàn)程,而是通過(guò)while循環(huán)調(diào)用getTask()方法從阻塞隊(duì)列中去task繼續(xù)調(diào)用task.run()執(zhí)行任務(wù),注意這里run()只是一個(gè)普通的方法調(diào)用,并不是start()哦!運(yùn)行線(xiàn)程就是Worker線(xiàn)程中
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 對(duì)應(yīng)ShutDown雖然不添加任務(wù),但是可以執(zhí)行阻塞隊(duì)列中的,Stop以后就不能子在執(zhí)行任務(wù)了
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null; //返回null,停止執(zhí)行任務(wù)
            }

            int wc = workerCountOf(c);

            // allowCoreThreadTimeOut 表示是否允許核心線(xiàn)程超時(shí)銷(xiāo)毀,默認(rèn)false不銷(xiāo)毀.若設(shè)置成true,核心線(xiàn)程也會(huì)銷(xiāo)毀的
            //只有正在工作的線(xiàn)程數(shù)大于核心線(xiàn)程數(shù)才會(huì)為true,佛足額返回false
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //
        
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
            //如果timed為true(wx > 核心線(xiàn)程),通過(guò)poll取任務(wù),如果為false,通過(guò)take取任務(wù)
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //這兩個(gè)參數(shù)就是創(chuàng)建線(xiàn)程池中保存時(shí)間量
                    workQueue.take();
                if (r != null) //如果有任務(wù)就退出死循環(huán),返回任務(wù)交給上方的worker線(xiàn)程運(yùn)行
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
  • 通過(guò)以上分析:不改變allowCoreThreadTimeOut默認(rèn)前提下,若wc > 核心線(xiàn)程數(shù),則通過(guò)poll從隊(duì)列中取任務(wù),如果wc <= 核心線(xiàn)程數(shù),則通過(guò)take取任務(wù)
  • 則poll()方法同take()區(qū)別是啥呢?
    • 他們都是阻塞隊(duì)列中的方法:常用的 ArrayBlockingQueue,LinkedBlockingQueue,PriorityQueue,SynchronizedQueue,中的方法
//ArrayBlockingQueue
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) { //是否隊(duì)列中的元素個(gè)數(shù)為0,說(shuō)明空隊(duì)列
                if (nanos <= 0L) //等待時(shí)間到了,隊(duì)列中還未有數(shù)據(jù)加入,則返回null,
                    return null;
                /**
                * 調(diào)用該方法的前提是缅疟,當(dāng)前線(xiàn)程已經(jīng)成功獲得與該條件對(duì)象綁定的重入鎖,否* * 則調(diào)用該方法時(shí)會(huì)拋出IllegalMonitorStateException性誉。
                * nanosTimeout指定該方法等待信號(hào)的的最大時(shí)間(單位為納秒)窿吩。若指定時(shí)間* * 內(nèi)收到signal()或signalALL()則返回nanosTimeout減去已經(jīng)等待的時(shí)間;
                *若指定時(shí)間內(nèi)有其它線(xiàn)程中斷該線(xiàn)程错览,則拋出InterruptedException并清除當(dāng)前線(xiàn)程的打斷狀態(tài)纫雁;
                * 若指定時(shí)間內(nèi)未收到通知,則返回0或負(fù)數(shù)倾哺。 
                */
                nanos = notEmpty.awaitNanos(nanos);  //每次signal喚醒重新等待
            }
            return dequeue(); //如果有元素取出
        } finally {
            lock.unlock();
        }
    }
//如果poll超時(shí)返回null,則回調(diào)到
f ((wc > maximumPoolSize || (timed && timedOut)) //true
                && (wc > 1 || workQueue.isEmpty())) { //隊(duì)列也是空的,走進(jìn)去
                if (compareAndDecrementWorkerCount(c)) //CAS可以減少c的個(gè)數(shù)
                    return null; //返回了null,該線(xiàn)程不能再上方的while循環(huán)中繼續(xù)獲取就結(jié)束線(xiàn)程啦,非核心線(xiàn)程就over啦,嘿嘿!
                continue;
            }


public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)  //不能使用if,避免虛假喚醒
                notEmpty.await();  //一旦count隊(duì)列為空,會(huì)一致await阻塞在這里的,直到workQueue.offer()添加元素時(shí)喚醒
            return dequeue(); //取出隊(duì)頭元素
        } finally {
            lock.unlock();
        }
    }
  • 綜上: 當(dāng)核心線(xiàn)程在while循環(huán)中運(yùn)行調(diào)用getTask獲取task任務(wù)時(shí),如果此時(shí)隊(duì)列中沒(méi)有數(shù)據(jù)則走緩存阻塞隊(duì)列的take方法,會(huì)被notEmpty.await()阻塞,那這個(gè)阻塞又是何時(shí)被喚醒的呢?
  • 當(dāng)然是下一個(gè)任務(wù)達(dá)到的時(shí)候也就是調(diào)用execute的時(shí)候添加一個(gè)新的任務(wù)Task
//這個(gè)就是調(diào)用當(dāng)前核心線(xiàn)程已經(jīng)滿(mǎn)了,則添加到阻塞隊(duì)列中,
//剛剛上方的核心線(xiàn)程在等待任務(wù),添加以后肯定就調(diào)用notEmpty.signal()喚醒等待線(xiàn)程取任務(wù)執(zhí)行啦
if (isRunning(c) && workQueue.offer(command)) 
  • 我們來(lái)驗(yàn)證一下我們的想法:workQueue就是選擇的隊(duì)列,這里看ArrayBlockingQueue,當(dāng)然對(duì)于其他隊(duì)列也是相同的
 public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock; //獲取鎖,跟上方加鎖時(shí)同一把鎖
        lock.lock();
        try {
            if (count == items.length)
                return false; //如果當(dāng)前隊(duì)列已滿(mǎn),不能再加入了false
            else {
                enqueue(e); //正常添加到隊(duì)列中
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
   
//enqueue添加到數(shù)組循環(huán)隊(duì)列中后調(diào)用notEmpty.signal()喚醒一個(gè)await線(xiàn)程取任務(wù)開(kāi)始工作啦!
private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
  • 通過(guò)生成-消費(fèi)者模式,將execute加入隊(duì)列的任務(wù)通知等待的核心線(xiàn)程取阻塞隊(duì)列中的任務(wù)開(kāi)始執(zhí)行!

補(bǔ)充一下阻塞隊(duì)列的源碼分析

ArrayDeque: 底層使用循環(huán)數(shù)組實(shí)現(xiàn)雙向隊(duì)列

  • 局部變量
    transient Object[] elements; //當(dāng)前存儲(chǔ)數(shù)組
    transient int head; //其始頭結(jié)點(diǎn)
    transient int tail; //尾結(jié)點(diǎn) 
    private static final int MIN_INITIAL_CAPACITY = 8; //默認(rèn)數(shù)組大小為 8
    
    • 注意由于是循環(huán)數(shù)組,所以頭結(jié)點(diǎn)不一定比尾結(jié)點(diǎn)小哦,也可能比尾結(jié)點(diǎn)大有size - 1 -> 0 的位置
    • 由于需要判斷數(shù)組中數(shù)據(jù)是否為空,還是已經(jīng)填滿(mǎn),所以會(huì)有一個(gè)節(jié)點(diǎn)浪費(fèi)掉,這是因?yàn)?
    1. 如果head = tail 無(wú)法判斷他們是空的還是滿(mǎn)的數(shù)組
    2. 如果空出一位tail表示當(dāng)前需要填充數(shù)據(jù)的位置,則當(dāng)head =tail時(shí)表示空,head = (tail + 1) % size 相等,表示當(dāng)前數(shù)組滿(mǎn)了,需要擴(kuò)容
    • 擴(kuò)容方法也參考HashMap的擴(kuò)容以 2的N次方,則上面的 %可以表示為 head = (tail + 1) & (size - 1)性能更好
  • 因此: ArrayDeque默認(rèn)數(shù)組大小為 8 ,然后以 2倍擴(kuò)容,不滿(mǎn)會(huì)向上補(bǔ),比如 創(chuàng)建一個(gè)14的數(shù)組實(shí)際創(chuàng)建的是 大小 16的循環(huán)數(shù)組
add方法
  • 源碼為:不能存儲(chǔ)null
    public void addLast(E e) {
        if (e == null)
            throw new NullPointerException();
        elements[tail] = e; //注意這里是先添加在判斷是否擴(kuò)容,通過(guò)上面局部變量的分析可知肯定有一個(gè)空余的位置,在這里正好填滿(mǎn)數(shù)組以后再判斷是否需要擴(kuò)容,判斷方法也同上2的情形一致
        if ( (tail = (tail + 1) & (elements.length - 1)) == head)
            doubleCapacity(); //兩倍的擴(kuò)容數(shù)組
    }
    
    //我們查看擴(kuò)容代碼
    private void doubleCapacity() {
        assert head == tail; //斷言tail增加以后是否跟head一致,數(shù)組完全填滿(mǎn)
        int p = head; 
        int n = elements.length;
        int r = n - p; // 分兩部分來(lái)復(fù)制,n分成 從0->p - 1 復(fù)制p個(gè)數(shù)據(jù) 和從p->n 復(fù)制(n - p)個(gè)數(shù)據(jù)
        int newCapacity = n << 1; //擴(kuò)容兩倍
        if (newCapacity < 0)
            throw new IllegalStateException("Sorry, deque too big");
        Object[] a = new Object[newCapacity];
        //將原來(lái)數(shù)組的起始位置至數(shù)組末尾全部復(fù)制到新數(shù)組0 -> n - p - 1處,此時(shí)不用管尾結(jié)點(diǎn)的位置,全部復(fù)制,多余的復(fù)制會(huì)存儲(chǔ)成默認(rèn)值的,不用在意(這里不會(huì)出現(xiàn)的,因?yàn)樯戏降臄嘌?數(shù)組數(shù)據(jù)已經(jīng)填滿(mǎn)了)
        //分開(kāi)copy是為了避免頭尾節(jié)點(diǎn)過(guò)度的問(wèn)題,不管你們?cè)谀膬?我都是從頭復(fù)制到最尾部,在從數(shù)組0位置copy到頭部節(jié)點(diǎn)處即可全部復(fù)制
        System.arraycopy(elements, p, a, 0, r); 
        //在賦值從原來(lái)數(shù)組的0->p -1的位置上的數(shù)據(jù)到 新數(shù)組的 n-p -> (n - 1)處,即把原來(lái)舊數(shù)組全部復(fù)制到新數(shù)組從0其實(shí)到n -1結(jié)尾處
        System.arraycopy(elements, 0, a, r, p);
        elements = a; //拷貝后重新賦值給數(shù)組
        head = 0; //重新設(shè)置起始頭結(jié)點(diǎn)為 0 ,尾結(jié)點(diǎn)為n處的null
        tail = n;
    }
    
remove 刪除
  • remove方法最終調(diào)取pollFirst() 刪除頭部數(shù)據(jù) 或 pollLast() 刪除尾部數(shù)據(jù)
    
     public E pollFirst() {
        int h = head;
        @SuppressWarnings("unchecked")
        E result = (E) elements[h];
        
        if (result == null)
            return null;
        // r如果當(dāng)前頭部不為則賦值為null,并將頭head增加 1,且判斷是否越界超過(guò)整個(gè)數(shù)組大小了
        elements[h] = null;     // Must null out slot
        head = (h + 1) & (elements.length - 1);
        return result;
    }
    
    public E pollLast() {
    //尾結(jié)點(diǎn)刪除就是 -1 是否小于了0,如果tail = 0 ,則t = -1 & 15 (-1的補(bǔ)碼為 32位的1 & 15 = 15設(shè)置成最后一位數(shù)組的值刪除并且尾結(jié)點(diǎn)為null的指向最后一位)
        int t = (tail - 1) & (elements.length - 1);
        @SuppressWarnings("unchecked")
        E result = (E) elements[t];
        if (result == null)
            return null;
        elements[t] = null; //刪除循環(huán)數(shù)組中最后一位
        tail = t;
        return result;
    }
    
修改和查找
  • 就是普通的數(shù)組查找,無(wú)非是垮了個(gè)(size -1 ) -> 0而已

PriorityQueue: 使用堆得優(yōu)先級(jí)隊(duì)列

  • PriorityQueue一個(gè)基于優(yōu)先級(jí)的無(wú)界隊(duì)列轧邪,優(yōu)先級(jí)隊(duì)列的元素按照其自然順序進(jìn)行排序或者根據(jù)自定義提供的Comparator進(jìn)行排序
    • 比如對(duì)于VIP和普通用戶(hù)的請(qǐng)求進(jìn)行優(yōu)先級(jí)排序處理請(qǐng)求等;
  • 不允許使用null及不可比較的對(duì)象(沒(méi)有實(shí)現(xiàn)Comparable接口的對(duì)象)
  • 非線(xiàn)程安全的,但是可以使用PriorityBlockingQueue用于多線(xiàn)程環(huán)境
  • PriorityQueue隊(duì)列頭是排序規(guī)則中最小的那個(gè)元素,如果多個(gè)元素都是最小值則隨機(jī)挑選一個(gè)
  • 常用方法及時(shí)間復(fù)雜度(后文分析)
    • peek() 返回隊(duì)首元素 O(1)
    • element(); //返回隊(duì)頭元素(不刪除) O(1)
    • poll() 返回隊(duì)首元素羞海,隊(duì)首元素出隊(duì)列 O(log(N))
    • add() 添加元素 O(log(N))
    • offer(E e)將指定的元素插入此優(yōu)先級(jí)隊(duì)列忌愚。不能添加null元素。 O(log(N))
    • isEmpty() 判斷是否為空
構(gòu)造函數(shù)
  • 查看構(gòu)造函數(shù)
    //默認(rèn)數(shù)組大小 11
     private static final int DEFAULT_INITIAL_CAPACITY = 11;
    
    //存儲(chǔ)數(shù)組
    transient Object[] queue; // non-private to simplify nested class access
    
    //數(shù)組的長(zhǎng)度
    private int size = 0;
    
    //隊(duì)列比較器却邓,為null使用默認(rèn)比較器
    private final Comparator<? super E> comparator;
    
    //Fail-Fast標(biāo)記,用于多線(xiàn)程      
    transient int modCount = 0; // non-private to simplify nested class access
    
    
實(shí)現(xiàn)原理
  • 通過(guò)二叉小頂堆實(shí)現(xiàn),可以用一顆完全二叉樹(shù)表示(任意一個(gè)非葉子節(jié)點(diǎn)的權(quán)值,都不大于去左右子節(jié)點(diǎn)的權(quán)值),也就意味著可以通過(guò)數(shù)組作為其底層實(shí)現(xiàn)(數(shù)組實(shí)現(xiàn)簡(jiǎn)單,同時(shí)不會(huì)占用無(wú)用內(nèi)存,數(shù)組中間不會(huì)有空余位置)
    [圖片上傳失敗...(image-8ca912-1592548755621)]
  • 觀察上方我們可以得到父子節(jié)點(diǎn)之間的關(guān)系
    1. 左子點(diǎn)下標(biāo): 當(dāng)前父節(jié)點(diǎn)下標(biāo) * 2 + 1 ;
    2. 右子點(diǎn)下標(biāo): 當(dāng)前父節(jié)點(diǎn)下標(biāo) * 2 + 2 ;
    3. 父節(jié)點(diǎn)下標(biāo): (當(dāng)前子節(jié)點(diǎn) - 1) / 2 ; //不論是左子節(jié)點(diǎn)還是右子節(jié)點(diǎn)
  • 通過(guò)上方公式,我們可以很方便計(jì)算出某個(gè)節(jié)點(diǎn)的父節(jié)點(diǎn)以及子節(jié)點(diǎn)的下標(biāo),這就是為何使用數(shù)組來(lái)存儲(chǔ)堆的原因
添加 add/offer
  • 兩者區(qū)別主要是插入失敗處理不同,add插入失敗拋異常,而offer插入失敗返回false,對(duì)于PriorityQueue兩者沒(méi)啥差別的
    [圖片上傳失敗...(image-6e9980-1592548755621)]
  • 當(dāng)新加入元素破壞了最小頂堆,就需要跟它父節(jié)點(diǎn)/祖父節(jié)點(diǎn)..設(shè)置根節(jié)點(diǎn)做調(diào)整,只是交換子節(jié)點(diǎn)跟父節(jié)點(diǎn)即可
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        modCount++;
        int i = size;
        if (i >= queue.length) //如果當(dāng)前數(shù)組已經(jīng)滿(mǎn)了,就擴(kuò)容其大小
            grow(i + 1); //擴(kuò)容函數(shù)如果數(shù)組長(zhǎng)度小于64就兩倍擴(kuò)容,否則增長(zhǎng)1.5倍
        size = i + 1;
        if (i == 0) //如果當(dāng)前數(shù)組為空,則在第一位添加數(shù)據(jù)
            queue[0] = e;
        else
            siftUp(i, e); //開(kāi)始在葉子結(jié)點(diǎn)添加并調(diào)節(jié)數(shù)據(jù),插入位置是數(shù)組的最后一位,也就是最下方葉子結(jié)點(diǎn)的最右邊的那個(gè)位置
        return true;
    }
    
    
    //grow擴(kuò)容函數(shù)
    private void grow(int minCapacity) {
        int oldCapacity = queue.length;
        // Double size if small; else grow by 50%
        int newCapacity = oldCapacity + ((oldCapacity < 64) ? 
                                         (oldCapacity + 2) :
                                         (oldCapacity >> 1));
        // overflow-conscious code
        if (newCapacity - MAX_ARRAY_SIZE > 0)
            newCapacity = hugeCapacity(minCapacity);
        queue = Arrays.copyOf(queue, newCapacity); //復(fù)制到新的數(shù)組中
    }
    
    
    //seftUp()函數(shù)
    private void siftUp(int k, E x) {
        if (comparator != null) //自定義的比較器
            siftUpUsingComparator(k, x);
        else //默認(rèn)比較器
            siftUpComparable(k, x);
    }
    
    //我們主要看自定義比較器的方法
    private void siftUpUsingComparator(int k, E x) {
        while (k > 0) { //k最開(kāi)始是數(shù)組的最后一位,我們開(kāi)始跟他的父節(jié)點(diǎn)進(jìn)行比較
            int parent = (k - 1) >>> 1; //根據(jù)上方公式第三條得到其父節(jié)點(diǎn)下標(biāo) k>0的位置肯定有父節(jié)點(diǎn),不用擔(dān)心下面的數(shù)組越界問(wèn)題
            Object e = queue[parent]; //獲取父節(jié)點(diǎn)的數(shù)據(jù)
            if (comparator.compare(x, (E) e) >= 0) //根據(jù)自定義比較器確定兩者關(guān)系,如果x比父節(jié)點(diǎn)e大,則表明插入位置正確,退出循環(huán)
                break;
            queue[k] = e; //否則將父節(jié)點(diǎn)的值賦值給當(dāng)前子節(jié)點(diǎn)的位置,父節(jié)點(diǎn)值并沒(méi)有改變,只是位置信息賦值給了當(dāng)前k,comparator比較的時(shí)候還是拿到插入值x同父節(jié)點(diǎn)值比較,跟當(dāng)前節(jié)點(diǎn)值無(wú)關(guān)的
            k = parent;
        }
        queue[k] = x; //上方公式獲取到k應(yīng)該插入的位置,對(duì)其賦值即可,k只賦值了這一次哦
    }
    
    
    • 加入元素可能破壞小頂堆性質(zhì),需要進(jìn)行調(diào)整,過(guò)程為: 從K指定的位置開(kāi)始,將x追層與當(dāng)前點(diǎn)的parent進(jìn)行比較并交換,知道滿(mǎn)足x>= queue[parent]:比較可以是元素的自然順序,也可以是自定義比較器
獲得隊(duì)首元素
  • element()和peek() : 獲取但不刪除隊(duì)首元素,也就是隊(duì)列中權(quán)值最小的那個(gè)元素,區(qū)別是前者失敗拋出異常,后者返回null,由于采用最小堆,且隊(duì)首元素在下標(biāo)為0處,因此直接返回queue[0]即可
    public E peek() {
        return (size == 0) ? null : (E) queue[0];
    }
    
remove和poll
  • 獲取并刪除隊(duì)首元素,區(qū)別是remove失敗拋出異常,poll返回null,由于隊(duì)首刪除會(huì)改變最小堆結(jié)構(gòu),因此需要維護(hù)小頂堆的性質(zhì),需要調(diào)節(jié)堆
    [圖片上傳失敗...(image-d67d9e-1592548755621)]
    public E poll() {
        if (size == 0)
            return null;
        int s = --size; //查找數(shù)組最后一位數(shù)據(jù) x ,對(duì)其調(diào)用siftDown方法
        modCount++;
        E result = (E) queue[0];
        E x = (E) queue[s];
        queue[s] = null; //刪除以后數(shù)組長(zhǎng)度 -1 ,最后一位設(shè)置成null
        if (s != 0)  //s不是隊(duì)首元素則開(kāi)始調(diào)整堆
            siftDown(0, x);
        return result;
    }
    
    //調(diào)整堆的函數(shù),無(wú)非就是跟其孩子節(jié)點(diǎn)進(jìn)行比較
    private void siftDown(int k, E x) { //注意:此時(shí) k 為 0 ,x為當(dāng)前數(shù)組末尾那個(gè)值也就是上圖中的數(shù)字 9 
        if (comparator != null) //是否存在自定義的比較器
            siftDownUsingComparator(k, x);
        else
            siftDownComparable(k, x);
    }
    
    //自定義比較器
    private void siftDownUsingComparator(int k, E x) {
        int half = size >>> 1; //這里注意理解: size是減后的數(shù)組大小,由于堆中父節(jié)點(diǎn) < 子節(jié)點(diǎn)特性,因此,對(duì)于最下層,也就是圖片中跟9同級(jí)的節(jié)點(diǎn)是沒(méi)有必要在考慮的,同樣9如果有祖父節(jié)點(diǎn)則右側(cè)叔節(jié)點(diǎn)也是沒(méi)有必要考慮的,對(duì)于上方的b圖 size = 10 ,右移1位為 5就是數(shù)據(jù)15的位置,則15的父節(jié)點(diǎn)必然小于15所以已經(jīng)找到交換過(guò)了,當(dāng)15的父節(jié)點(diǎn)為根節(jié)點(diǎn)時(shí),下方對(duì)左右有比較的,已經(jīng)覆蓋整個(gè)特點(diǎn)了,因此設(shè)置成 size / 2 更加快速合理
        while (k < half) {  //循環(huán)比較,找到數(shù)組中最小數(shù)字存放到堆頂元素
            int child = (k << 1) + 1; //第一次找到k的左子樹(shù)
            Object c = queue[child];
            int right = child + 1; //找到k的右子樹(shù)
            if (right < size &&
                comparator.compare((E) c, (E) queue[right]) > 0) //首先比較左右子樹(shù)找到他們的最小值,以便下面跟父節(jié)點(diǎn)比較是否交換
                c = queue[child = right]; //如果右子樹(shù)更小,設(shè)置給臨時(shí)變量c
            if (comparator.compare(x, (E) c) <= 0) //比較當(dāng)前最后一位數(shù)據(jù)x,跟他的子樹(shù)中最小的數(shù)據(jù),如果比他小,則找到了這個(gè)存放x的位置,如果比他大,則交換值
                break;
            queue[k] = c; //將比x小的子樹(shù)存放到父節(jié)點(diǎn)中
            k = child; //設(shè)置k的下標(biāo)為當(dāng)前孩子節(jié)點(diǎn)位置,注意child已經(jīng)在比較左右子樹(shù)時(shí)根據(jù)他們大小設(shè)置到最小子樹(shù)上了,因此在往下查找
        }
        queue[k] = x; //找到了當(dāng)前合適的父節(jié)點(diǎn)位置K,x的值比他的左右子樹(shù)都大,則這個(gè)位置就是存放x的位置
    }
    
    
    
remove(Object o) :刪除指定元素
  • 遍歷數(shù)組找到第一個(gè)滿(mǎn)足條件的元素下標(biāo)(如果有多個(gè),只刪除一個(gè)的),刪除會(huì)改變隊(duì)列結(jié)構(gòu),需要進(jìn)行調(diào)整,分為兩種情況:
    1. 刪除的數(shù)組最后一位元素,則直接刪除即可
    2. 刪除的不是最后一個(gè)元素,則從刪除點(diǎn)開(kāi)始以最后一個(gè)元素為參考調(diào)用一次siftDown(),可能導(dǎo)致不滿(mǎn)足父節(jié)點(diǎn)小于該刪除節(jié)點(diǎn),還需要上慮調(diào)用添加是的siftUp()將添加走一遍
  • 很多人對(duì)第二個(gè)不理解,簡(jiǎn)單分析一下源碼,刪除操作實(shí)際調(diào)用removeAt(int i)
    [圖片上傳失敗...(image-53d895-1592548755621)]


    image
      private E removeAt(int i) { //i為數(shù)組中從0開(kāi)始查找第一個(gè)滿(mǎn)足.equeal(o)的元素下標(biāo)
        // assert i >= 0 && i < size;
        modCount++;
        int s = --size; //找到數(shù)組末尾數(shù)據(jù),如果是第一種情況直接刪除即可
        if (s == i) // removed last element
            queue[i] = null;
        else {  //如果是第二種情況
            E moved = (E) queue[s]; //首先保存移除數(shù)組末尾元素,然后將數(shù)組最后一位置空,即刪除任何數(shù)據(jù),其實(shí)就是刪除最未那個(gè),這點(diǎn)同紅黑樹(shù)刪除一致
            queue[s] = null;
            siftDown(i, moved); //再次調(diào)用siftDown,此時(shí)i是需要?jiǎng)h除的元素下標(biāo),moved是數(shù)組最后一位的數(shù)據(jù),其實(shí)上方的poll彈出堆頂元素,只是一個(gè)特殊的移除下標(biāo)為0的數(shù)據(jù)而已
            //對(duì)于上方的圖片只是一種特殊情況4正好是9的祖父節(jié)點(diǎn),如果需要移除的是10也沒(méi)有關(guān)系,我們依然刪除下標(biāo)10的數(shù)據(jù),將數(shù)據(jù)9跟10的兩個(gè)子節(jié)點(diǎn)15 跟11比較,發(fā)現(xiàn)都小于則將9放到下標(biāo)2的位置再將10返回即可,只要滿(mǎn)足子節(jié)點(diǎn)大于父節(jié)點(diǎn)即可
            if (queue[i] == moved) {  //上方調(diào)整過(guò)的數(shù)據(jù),如果發(fā)現(xiàn)當(dāng)前刪除節(jié)點(diǎn)數(shù)據(jù)直接填充了比如刪除的是11,這個(gè)時(shí)候填充9到下標(biāo)6的位置,你發(fā)現(xiàn) queue[6] = movie = 9下方的都是大于9的,但是父節(jié)點(diǎn)并不能保證小于9哦,這個(gè)時(shí)候就需要向添加時(shí)候一樣,向上過(guò)濾直到滿(mǎn)足所有下方數(shù)據(jù)都小于父節(jié)點(diǎn)即2跟6交換后2位置9,6位置10
                siftUp(i, moved);
                if (queue[i] != moved)
                    return moved;
            }
        }
        return null;
    }
    
注意:無(wú)論是隊(duì)列還是棧都是可以用鏈表或者數(shù)組實(shí)現(xiàn)的,基本上所有數(shù)據(jù)都是這兩種形式
  • 很多人會(huì)說(shuō),我學(xué)習(xí)ArrayDeque跟PriorityQueue數(shù)據(jù)結(jié)構(gòu)有啥用呢?你大概是忘記了一個(gè)重要的東西線(xiàn)程池
  • 線(xiàn)程池中緩存隊(duì)列有主要有四種:
    1. ArrayBlockingQueue(int i): 它是ArrayDeque的線(xiàn)程安全版本,基本使用一致,無(wú)非就是加了鎖保證線(xiàn)程安全性,還有一個(gè)它是有界的i是數(shù)組的大小,不可更改,ArrayDeque是可變的
    2. LinkedBlockingQueue(int i): i可要可不要,如果不要就是不固定大小的鏈表,如果有則為指定大小的鏈表,這個(gè)簡(jiǎn)單就不分析了,跟鏈表一致的
    3. PriorityBlockingQueue(int i): i可有可無(wú),意義同上,是PriorityQueue的線(xiàn)程安全類(lèi),依據(jù)設(shè)置優(yōu)先級(jí)彈出隊(duì)列中的任務(wù)給線(xiàn)程池使用
    4. SynchronizedQueue() : 沒(méi)有空間的隊(duì)列,不緩存任何數(shù)據(jù)滴,來(lái)多少任務(wù)創(chuàng)建多少線(xiàn)程
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末硕糊,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子腊徙,更是在濱河造成了極大的恐慌简十,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,402評(píng)論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件撬腾,死亡現(xiàn)場(chǎng)離奇詭異螟蝙,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)民傻,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)胰默,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人漓踢,你說(shuō)我怎么就攤上這事牵署。” “怎么了喧半?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,483評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵碟刺,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我薯酝,道長(zhǎng)半沽,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,165評(píng)論 1 292
  • 正文 為了忘掉前任吴菠,我火速辦了婚禮者填,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘做葵。我一直安慰自己占哟,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,176評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布酿矢。 她就那樣靜靜地躺著榨乎,像睡著了一般。 火紅的嫁衣襯著肌膚如雪瘫筐。 梳的紋絲不亂的頭發(fā)上蜜暑,一...
    開(kāi)封第一講書(shū)人閱讀 51,146評(píng)論 1 297
  • 那天,我揣著相機(jī)與錄音策肝,去河邊找鬼肛捍。 笑死,一個(gè)胖子當(dāng)著我的面吹牛之众,可吹牛的內(nèi)容都是我干的拙毫。 我是一名探鬼主播,決...
    沈念sama閱讀 40,032評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼棺禾,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼缀蹄!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起膘婶,我...
    開(kāi)封第一講書(shū)人閱讀 38,896評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤缺前,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后竣付,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體诡延,經(jīng)...
    沈念sama閱讀 45,311評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,536評(píng)論 2 332
  • 正文 我和宋清朗相戀三年古胆,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了肆良。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,696評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡逸绎,死狀恐怖惹恃,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情棺牧,我是刑警寧澤巫糙,帶...
    沈念sama閱讀 35,413評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站颊乘,受9級(jí)特大地震影響参淹,放射性物質(zhì)發(fā)生泄漏醉锄。R本人自食惡果不足惜昔汉,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,008評(píng)論 3 325
  • 文/蒙蒙 一锨阿、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧鳄炉,春花似錦开呐、人聲如沸烟勋。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)卵惦。三九已至,卻和暖如春瓦戚,著一層夾襖步出監(jiān)牢的瞬間沮尿,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,815評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工伤极, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蛹找,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,698評(píng)論 2 368
  • 正文 我出身青樓哨坪,卻偏偏與公主長(zhǎng)得像庸疾,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子当编,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,592評(píng)論 2 353