思考
- 你是否有此疑問(wèn):普通線(xiàn)程使用后即銷(xiāo)毀器紧,而對(duì)于線(xiàn)程池中核心線(xiàn)程將一直存在耀销,非核心線(xiàn)程會(huì)銷(xiāo)毀,它是如何做到的铲汪?看了這篇文章熊尉,相信你能夠了解其中緣由!
Executor框架
- 主要有三個(gè)部分組成
- 任務(wù): 包括被執(zhí)行任務(wù)需要實(shí)現(xiàn)的接口:Runnable接口或Callable接口
- 任務(wù)的執(zhí)行:任務(wù)執(zhí)行機(jī)制的核心接口Executor,以及繼承自Executor的ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)
- 異步計(jì)算的結(jié)果: 包括接口Future和實(shí)現(xiàn)Future接口的FutureTask類(lèi)
- 簡(jiǎn)單的連接上述
- Executor是一個(gè)接口,是Executor框架的基礎(chǔ),將任務(wù)提交與任務(wù)執(zhí)行分離開(kāi)來(lái)
- ThreadPoolExecutor是線(xiàn)程池的核心實(shí)現(xiàn)類(lèi),用來(lái)執(zhí)行被提交的任務(wù)
- ScheduledThreadPoolExecutor是一個(gè)實(shí)現(xiàn)類(lèi),在給定的延遲后運(yùn)行命令,或者定期執(zhí)行命令(比Timer更靈活,功能更加強(qiáng)大)
- Future接口和實(shí)現(xiàn)它的FutureTask類(lèi),代表異步計(jì)算的結(jié)果
- Runnable接口和Callable接口實(shí)現(xiàn)類(lèi)都可以被任務(wù)給執(zhí)行
線(xiàn)程隊(duì)列
- 主要包括: ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue
ArrayBlockingQueue : 基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列(FIFO) 源碼附錄在文章最下
- 一個(gè)有界緩存隊(duì)列,可以指定緩存隊(duì)列大小,當(dāng)其存儲(chǔ)滿(mǎn)后,加入隊(duì)列失敗,會(huì)開(kāi)啟新線(xiàn)程執(zhí)行,但是當(dāng)線(xiàn)程達(dá)到最大線(xiàn)程數(shù)時(shí),在加入就會(huì)拋出異常RejectExecutorExcation
PriorityBlockingQueue : 基于數(shù)組結(jié)構(gòu)的優(yōu)先級(jí)的無(wú)限阻塞隊(duì)列
- 擁有優(yōu)先級(jí)的同ArrayBlockIngQueue在offer實(shí)現(xiàn)上可2倍的自動(dòng)擴(kuò)容
- 不存在超容量情況掌腰,所以不會(huì)像ArrayBlockingQueue一樣拋出異常
- 通過(guò)構(gòu)造函數(shù)傳入對(duì)象來(lái)判斷優(yōu)先級(jí),所以傳入對(duì)象必須實(shí)現(xiàn)comparable接口
- 源碼分析在文章最下,在算法中運(yùn)用較多,比如查找鏈表中前K大值等
LinkedBlockingQueue : 基于鏈表結(jié)構(gòu)的阻塞隊(duì)列(FIFO)
- 一個(gè)無(wú)界緩存等待隊(duì)列,當(dāng)線(xiàn)程數(shù)量達(dá)到核心線(xiàn)程數(shù)時(shí),剩余任務(wù)都會(huì)添加進(jìn)來(lái)等待,即最大線(xiàn)程數(shù)無(wú)效
SynchronousQueue : 不存儲(chǔ)元素的阻塞隊(duì)列(FIFO先進(jìn)先出公平,或LIFO非公平)
- 沒(méi)有容量,是無(wú)緩存等待隊(duì)列,一個(gè)不存儲(chǔ)任務(wù)的隊(duì)列,會(huì)直接將任務(wù)交給消費(fèi)者,并且必須等隊(duì)列中添加元素被消費(fèi)后才能繼續(xù)添加新元素
- 擁有公平(FIFO)和非公平策略,非公平會(huì)導(dǎo)致一些數(shù)據(jù)永遠(yuǎn)不會(huì)被消費(fèi)
- 使用該隊(duì)列一般要求非線(xiàn)程數(shù)設(shè)置為無(wú)界,避免線(xiàn)程拒絕執(zhí)行操作
Executor的運(yùn)用
- Executors.callable(Runnable task):將Runnable對(duì)象封裝為一個(gè)Callable對(duì)象
- ExecutorService.execute(Runnable task):執(zhí)行一個(gè)沒(méi)有返回值得任務(wù)
- ExecutorService.submit(Runnable或Callable task ):返回一個(gè)實(shí)現(xiàn)Future接口的對(duì)象FutureTask,
- 主線(xiàn)程可以執(zhí)行FutureTask.get()方法等待任務(wù)執(zhí)行完成獲取接口
- 也可以調(diào)用FutureTask.cancel()來(lái)取消此任務(wù)的執(zhí)行
ThreadPoolExecutor源碼分析
- 首先創(chuàng)建線(xiàn)程池最終調(diào)取方法是:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } //調(diào)取這個(gè)方法 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
- corePoolSize : 核心線(xiàn)程數(shù),創(chuàng)建線(xiàn)程池后默認(rèn)線(xiàn)程為0,當(dāng)有任務(wù)來(lái)時(shí)才會(huì)創(chuàng)建對(duì)應(yīng)的一個(gè)線(xiàn)程,直到達(dá)到corePoolSize大小,就會(huì)將在到達(dá)的任務(wù)放到緩存隊(duì)列中,除非調(diào)用了預(yù)創(chuàng)建線(xiàn)程才會(huì)在沒(méi)有任務(wù)到達(dá)之前就創(chuàng)建對(duì)應(yīng)的線(xiàn)程數(shù);
- maximumPoolSize: 最大線(xiàn)程數(shù),線(xiàn)程池最多能創(chuàng)建的線(xiàn)程數(shù),當(dāng)隊(duì)列滿(mǎn)了以后再進(jìn)來(lái)的任務(wù)會(huì)在創(chuàng)建一個(gè)新的線(xiàn)程
- keepAliveTime : 線(xiàn)程沒(méi)有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會(huì)終止.默認(rèn)只有線(xiàn)程數(shù)大于corePoolSize時(shí)才起作用,針對(duì)的是最大線(xiàn)程數(shù)時(shí)創(chuàng)建的線(xiàn)程.有方法讓線(xiàn)程數(shù)在核心線(xiàn)程之內(nèi)也起作用,直到線(xiàn)程數(shù)為0
- unit : 保持時(shí)間的參數(shù)單位
- workQueue: 一個(gè)阻塞隊(duì)列,用來(lái)存儲(chǔ)等待執(zhí)行的任務(wù),當(dāng)任務(wù)滿(mǎn)了就會(huì)調(diào)用maximumPoolSize創(chuàng)建新線(xiàn)程
- threadFactory : 用于設(shè)置創(chuàng)建線(xiàn)程的工廠(chǎng),可以通過(guò)線(xiàn)程工廠(chǎng)給每個(gè)創(chuàng)建出來(lái)的線(xiàn)程命名或者設(shè)置優(yōu)先級(jí)等操作:通過(guò)implements ThreadFactory可以自定義線(xiàn)程工廠(chǎng)
- handler : 便是當(dāng)拒絕處理任務(wù)時(shí)的策略(當(dāng)阻塞隊(duì)列滿(mǎn)了,且達(dá)到最大線(xiàn)程數(shù)后再來(lái)任務(wù)會(huì)調(diào)取),通過(guò)implements RejectedExecutionHandler可自定義自己的拒絕策略
- AbortPolicy : 默認(rèn)方式,直接拋出異常
- CallerRunsPolicy : 只用調(diào)用者所在線(xiàn)程來(lái)運(yùn)行任務(wù),異步任務(wù)變成了同步執(zhí)行了,比如主線(xiàn)程調(diào)用的execute方法則拒絕策略會(huì)將線(xiàn)程池拒絕的任務(wù)交給主線(xiàn)程執(zhí)行了
- DiscardOldestPolicy: 丟棄隊(duì)列中對(duì)頭的那個(gè)任務(wù)(最早添加進(jìn)來(lái)的)并執(zhí)行當(dāng)前任務(wù)
- DiscardPolicy: 不處理,丟棄掉
ThreadPoolExecutor : FixedThreadPool, SingleThreadExecutor, CachedThreadPool
FixedTHreadPool(默認(rèn)阻塞隊(duì)列無(wú)界==> OOM)
- 創(chuàng)建一個(gè)固定線(xiàn)程數(shù)量nThreads的線(xiàn)程,核心線(xiàn)程跟最大線(xiàn)程數(shù)量一致都為nThreads
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
- 使用無(wú)界隊(duì)列LinkedBlockingQueue: 使得最大線(xiàn)程數(shù),跟超時(shí)時(shí)間都是無(wú)意義的,根據(jù)線(xiàn)程池運(yùn)行步驟,隊(duì)列不可能被填滿(mǎn),所以未執(zhí)行shutdown()或shutdownNow()方法的運(yùn)行中的FixedThreadPool不會(huì)拒絕任務(wù)
- 使用與為了滿(mǎn)足資源管理的需求而限制當(dāng)前線(xiàn)程數(shù)量,用于負(fù)載比較重的服務(wù)器
- 使用場(chǎng)景: 適用于處理CPU密集型任務(wù),確保CPU在長(zhǎng)期被工作線(xiàn)程使用下,盡可能少的分配線(xiàn)程,一半是N(CPU) + 1
SingleThreadExecutor(默認(rèn)阻塞隊(duì)列無(wú)界==> OOM)
- 創(chuàng)建使用單個(gè)線(xiàn)程的SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
- 同上FixedThreadPool使用無(wú)限隊(duì)列保存任務(wù),因此最大及核心線(xiàn)程數(shù)都是1
- 使用與需要保證順序的執(zhí)行各個(gè)人物,并且在任意時(shí)間點(diǎn),不會(huì)有多個(gè)線(xiàn)程時(shí)活動(dòng)的
- 使用場(chǎng)景: 適用于串行執(zhí)行任務(wù)的場(chǎng)景,每個(gè)任務(wù)必須按順序執(zhí)行,不需要并發(fā)執(zhí)行
CachedThreadPool(默認(rèn)核心線(xiàn)程0,最大線(xiàn)程i.MAX,synchronousQueue==>OOM)
- 創(chuàng)建一個(gè)大小無(wú)界的線(xiàn)程池,適用于執(zhí)行很多的短期異步人物的小程序,或者負(fù)載比較輕量的服務(wù)器
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
- 分析: corePoolSize 設(shè)置為0 ,核心線(xiàn)程數(shù)為空,而最大線(xiàn)程數(shù)設(shè)置為Inter.MAX_VALUE,即非核心線(xiàn)程為無(wú)限的,同時(shí)空閑線(xiàn)程等待新任務(wù)最長(zhǎng)為60s后被終止
- 同時(shí): 使用沒(méi)有容量的SynchronousQueue作為線(xiàn)程池的工作隊(duì)列,這意味著如果主線(xiàn)程提交任務(wù)速度高于線(xiàn)程池中線(xiàn)程處理任務(wù)的速度,CachedThreadPool將會(huì)不斷創(chuàng)建新線(xiàn)程,最終耗盡CPU和內(nèi)存資源
-
執(zhí)行步驟:
- 執(zhí)行execute方法,首先執(zhí)行SynchronousQueue的offer方法提交任務(wù),并查詢(xún)線(xiàn)程池中是否有空閑線(xiàn)程來(lái)執(zhí)行其中的poll方法來(lái)移除任務(wù),如果有,則配對(duì)成功,將任務(wù)交給這個(gè)空閑隊(duì)列
- 否則,配對(duì)失敗,將會(huì)創(chuàng)建一個(gè)新線(xiàn)程去處理任務(wù)
- 當(dāng)線(xiàn)程池中線(xiàn)程空閑時(shí),會(huì)執(zhí)行synchronousQueue的poll方法等到其提交新任務(wù),如果超過(guò)60s依然沒(méi)有提交,則這個(gè)線(xiàn)程就會(huì)終止
- 由于非核心線(xiàn)程無(wú)界,所以一旦提交任務(wù)速度 > 線(xiàn)程池處理速度就會(huì)不斷的創(chuàng)建新線(xiàn)程
- 因此:使用與每次提交任務(wù)都會(huì)有線(xiàn)程立刻進(jìn)行處理的,大量,耗時(shí)少的任務(wù),長(zhǎng)時(shí)間保持空閑的CachedThreadPool將不會(huì)使用任何資源
- 其實(shí)就是主線(xiàn)程調(diào)用 offer方法跟沒(méi)有容量的阻塞隊(duì)列的poll方法是否適配,如果適配就使用該空閑線(xiàn)程,如果不適配就從新創(chuàng)建線(xiàn)程執(zhí)行任務(wù)
- 使用場(chǎng)景: 用于并發(fā)執(zhí)行大量短期的小任務(wù)
ThreadPoolExecutor
-
重點(diǎn):重中之重: 線(xiàn)程池工作的過(guò)程:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); //如果當(dāng)前線(xiàn)程數(shù)小于核心線(xiàn)程數(shù),則創(chuàng)建線(xiàn)程并執(zhí)行當(dāng)前任務(wù) if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //如果正在運(yùn)行的線(xiàn)程數(shù)量大于或等于 corePoolSize狰住,那么將這個(gè)任務(wù)放入隊(duì)列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //如果線(xiàn)程池內(nèi)不處于運(yùn)行或者任務(wù)無(wú)法放入隊(duì)列,并且當(dāng)前線(xiàn)程數(shù)小于最大允許的線(xiàn)程數(shù)量齿梁,則創(chuàng)建一個(gè)線(xiàn)程執(zhí)行任務(wù),非核心線(xiàn)程 else if (!addWorker(command, false)) reject(command); //拋出RejectedExecutionException異常,對(duì)于有界隊(duì)列而言才會(huì)有這個(gè) }
- 如果正在運(yùn)行的線(xiàn)程數(shù)量小于corePoolSize:核心線(xiàn)程數(shù),則馬上創(chuàng)建線(xiàn)程運(yùn)行這個(gè)任務(wù)
- 如果正在運(yùn)行的線(xiàn)程數(shù)量大于或等于核心線(xiàn)程數(shù),馬上將這個(gè)任務(wù)放入隊(duì)列,注意每種線(xiàn)程隊(duì)列實(shí)現(xiàn)的offer方法不同哦!
- 如果隊(duì)列滿(mǎn)了,而且正在運(yùn)行的線(xiàn)程數(shù)量小于最大線(xiàn)程數(shù)量,則會(huì)創(chuàng)建非核心線(xiàn)程立刻運(yùn)行這個(gè)任務(wù)
- 如果隊(duì)列滿(mǎn)了,正在運(yùn)行的線(xiàn)程數(shù)量大于或等于允許的最大線(xiàn)程數(shù)量,那么線(xiàn)程池就會(huì)拋出RejectExecutorException
-
切記:
- 新建完線(xiàn)程則當(dāng)這個(gè)線(xiàn)程完成任務(wù)時(shí),他會(huì)從隊(duì)列中取下一個(gè)任務(wù)來(lái)執(zhí)行;
- 當(dāng)一個(gè)線(xiàn)程無(wú)事可做,即為隊(duì)列為null,超過(guò)一定時(shí)間keepAliveTime自定義超時(shí)回收時(shí)間,當(dāng)設(shè)置為0配合的是無(wú)界隊(duì)列使用,線(xiàn)程池會(huì)判斷,如果當(dāng)前運(yùn)行的線(xiàn)程數(shù)大于核心線(xiàn)程數(shù),那么這個(gè)線(xiàn)程就會(huì)被停掉回收,所以所有任務(wù)完成后,線(xiàn)程池會(huì)收縮到corePoolSize的大小
- 通過(guò)以上分析可以看到他們?cè)创a中都調(diào)用了ThreadPoolExecutor來(lái)創(chuàng)建一個(gè)線(xiàn)程池
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } //實(shí)際調(diào)用對(duì)象 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
- 分析源碼中的參數(shù)
- corePoolSize: 核心線(xiàn)程池大小
- maximumPoolSize: 最大線(xiàn)程池的大小
- BlockingQueue: 用來(lái)暫時(shí)保存任務(wù)的工作隊(duì)列
- RejectedExecutionHandler:當(dāng)ThreadPoolExecutor已經(jīng)關(guān)閉或ThreadPoolExecutor已經(jīng)飽和 時(shí)(達(dá)到了最大線(xiàn)程池大小且工作隊(duì)列已滿(mǎn))转晰,execute()方法將要調(diào)用的Handler
ScheduledThreadPoolExecutor
- 繼承自ThreadPoolExecutor:主要用來(lái)在給定的延遲之后運(yùn)行任務(wù)或者定期執(zhí)行任務(wù)(功能與Timer類(lèi)似)
- 線(xiàn)程池的特點(diǎn):
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); } private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
- 最大線(xiàn)程數(shù)為Integer.MAX_VALUE
- 阻塞隊(duì)列為DelayedWorkQueue
ScheduledThreadPoolExecutor添加任務(wù)的另外兩個(gè)方法:
- scheduleAtFixedRate: 按某種速率周期執(zhí)行
- scheduleWithFixedDelay: 在某個(gè)延遲后執(zhí)行
- 兩種方法的內(nèi)部實(shí)現(xiàn)都是創(chuàng)建了一個(gè)ScheduledFutureTask對(duì)象封裝了任務(wù)的延遲執(zhí)行時(shí)間及執(zhí)行周期,并調(diào)用decorateTask()方法轉(zhuǎn)成RunnableScheduledFuture對(duì)象士飒,然后添加到延遲隊(duì)列中。
- DelayQueue: 封裝了一個(gè)優(yōu)先級(jí)隊(duì)列,會(huì)對(duì)隊(duì)列中的ScheduledFutureTask進(jìn)行排序,兩個(gè)任務(wù)執(zhí)行time不同時(shí),time小的先執(zhí)行,否則比較添加到隊(duì)列中的ScheduledFutureTask的順序號(hào)sequenceNumber,先提交的先執(zhí)行
- 工作機(jī)制為:
- 調(diào)用上面兩個(gè)方法添加一個(gè)任務(wù)
- 線(xiàn)程池中的線(xiàn)程從DelayQueue中取任務(wù)
- 然后執(zhí)行任務(wù)
- 執(zhí)行步驟為:
- 線(xiàn)程從DelayQueue中獲取time大于等于當(dāng)前時(shí)間的ScheduledFutureTask , DelayQueue.take()
- 執(zhí)行完后修改這個(gè)task的time為下次被執(zhí)行時(shí)間
- 然后在把這個(gè)task放回隊(duì)列中DelayQueue.add()
- 使用場(chǎng)景: 用于需要多個(gè)后臺(tái)線(xiàn)程執(zhí)行周期任務(wù),同時(shí)需要限制線(xiàn)程數(shù)量的場(chǎng)景
作業(yè)
- 自定義創(chuàng)建一個(gè)能夠根據(jù)加入順序得到最后一個(gè)數(shù)據(jù)的線(xiàn)程池,前面的沒(méi)有完成就拋棄
- 自定義設(shè)置一個(gè)先達(dá)到最大隊(duì)列工作,后緩存隊(duì)列的線(xiàn)程池
- 1的解題思路
- 由于是順序加入,如果前面有等待任務(wù),則新加入的任務(wù)將會(huì)替換掉之前的任務(wù),我們只要最后一個(gè)任務(wù),則使用線(xiàn)程數(shù)為1,且等待隊(duì)列也為1,同時(shí)拒絕策略為DiscardOldestPolicy()即可
- 當(dāng)?shù)谝淮翁砑尤蝿?wù)執(zhí)行,第二次添加加入隊(duì)列,如果此時(shí)任務(wù)3來(lái)到將會(huì)替換2,直到下一次任務(wù)到來(lái),則最終執(zhí)行的是最后一次添加的任務(wù)
- 2的解題思路:
- 默認(rèn)當(dāng)工作隊(duì)列滿(mǎn)了無(wú)法入隊(duì)才會(huì)擴(kuò)容線(xiàn)程池,我們可以重寫(xiě)隊(duì)列的offer方法,造成隊(duì)列已滿(mǎn)假象
- 在擴(kuò)容達(dá)到最大線(xiàn)程以后會(huì)觸發(fā)拒絕策略,此時(shí)我們可以將任務(wù)真正的插入到緩存隊(duì)列中
- 解答:使用LinkedTransferQueue.tryTransfer() :如果存在一個(gè)消費(fèi)者已經(jīng)等待接收它蔗崎,則立即傳送指定的元素酵幕,否則返回false,并且不進(jìn)入隊(duì)列缓苛。
/** * 設(shè)置一個(gè)由鏈表結(jié)構(gòu)組成的無(wú)界阻塞TransferQueue隊(duì)列芳撒。相對(duì)于其他阻塞隊(duì)列邓深,LinkedTransferQueue多了tryTransfer和transfer方法。 * LinkedTransferQueue采用一種預(yù)占模式笔刹。 * 意思就是消費(fèi)者線(xiàn)程取元素時(shí)芥备,如果隊(duì)列不為空,則直接取走數(shù)據(jù)舌菜,若隊(duì)列為空萌壳,那就生成一個(gè)節(jié)點(diǎn)(節(jié)點(diǎn)元素為null)入隊(duì), * 然后消費(fèi)者線(xiàn)程被等待在這個(gè)節(jié)點(diǎn)上日月,后面生產(chǎn)者線(xiàn)程入隊(duì)時(shí)發(fā)現(xiàn)有一個(gè)元素為null的節(jié)點(diǎn)袱瓮,生產(chǎn)者線(xiàn)程就不入隊(duì)了,直接就將元素填充到該節(jié)點(diǎn)爱咬, * 并喚醒該節(jié)點(diǎn)等待的線(xiàn)程尺借,被喚醒的消費(fèi)者線(xiàn)程取走元素,從調(diào)用的方法返回精拟。我們稱(chēng)這種節(jié)點(diǎn)操作為“匹配”方式燎斩。 */ public class MyLinkedTransferQueue extends LinkedTransferQueue<Runnable> { @Override public boolean offer(Runnable e) { return tryTransfer(e); // 如果存在一個(gè)消費(fèi)者已經(jīng)等待接收它,則立即傳送指定的元素蜂绎,否則返回false栅表,并且不進(jìn)入隊(duì)列。 } } /** * 使用靜態(tài)內(nèi)部類(lèi)創(chuàng)建一個(gè)單例的線(xiàn)程池 */ public class ExecutorFactory { private static class Holder{ private static ThreadPoolExecutor executors = new ThreadPoolExecutor(1, 4 , 60 , TimeUnit.SECONDS , new MyLinkedTransferQueue() , new ThreadNameFactory(), new CustomizeRejectHandler()); } public static ThreadPoolExecutor getInstance(){ return Holder.executors ; } } /** * 自定義拒絕策略,用于當(dāng)前隊(duì)列任務(wù)滿(mǎn)以后的操作 */ public class CustomizeRejectHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //重寫(xiě)一個(gè)DisCardOldestPolicy // if (!executor.isShutdown()){ //當(dāng)線(xiàn)程池沒(méi)有終止時(shí),彈出隊(duì)列中最早添加一個(gè)任務(wù)并執(zhí)行當(dāng)前任務(wù) // executor.getQueue().poll(); // executor.execute(r); // } try { executor.getQueue().put(r); //當(dāng)拒絕是添加到阻塞隊(duì)列中 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
- 單機(jī)上一個(gè)線(xiàn)程正在處理任務(wù),如果突然斷電了怎么辦(正在處理和阻塞隊(duì)列里的請(qǐng)求怎么處理)
答: 可以對(duì)正在處理和阻塞隊(duì)列的任務(wù)做事務(wù)管理或?qū)ψ枞?duì)列中的任務(wù)持久化處理,并且當(dāng)斷電或系統(tǒng)崩潰,操作無(wú)法進(jìn)行時(shí),可以通過(guò)回溯日志的方式來(lái)撤銷(xiāo)正在處理的已經(jīng)執(zhí)行成功的操作,然后重新執(zhí)行整個(gè)阻塞隊(duì)列 - 為什么不建議在代碼中直接使用Executors創(chuàng)建線(xiàn)程池,而是推薦通過(guò)ThreadPoolExecutor方式創(chuàng)建?
- 答:不適用是可以明確的讓我們知道線(xiàn)程池的運(yùn)行規(guī)則,避免使用工具類(lèi)的包裝而不夠直觀內(nèi)部機(jī)制導(dǎo)致潛在難以發(fā)現(xiàn)的問(wèn)題,比如,使用newSingleThreadPool和FixedThreadPool創(chuàng)建線(xiàn)程池由于默認(rèn)最大線(xiàn)程為Max而導(dǎo)致如果處理時(shí)間過(guò)長(zhǎng),任務(wù)過(guò)多而導(dǎo)致的OOM就很難發(fā)現(xiàn)問(wèn)題
多線(xiàn)程中的問(wèn)題
CountDownLatch與CyclicBarrier區(qū)別
- 這兩個(gè)類(lèi)都可以實(shí)現(xiàn)一組線(xiàn)程在到達(dá)某個(gè)條件之前進(jìn)行等待,內(nèi)部都有一個(gè)計(jì)數(shù)器,當(dāng)計(jì)數(shù)器的值不斷減為0時(shí)所有阻塞的線(xiàn)程都將會(huì)被喚醒
- CountDownlatch計(jì)數(shù)器由使用者控制,線(xiàn)程調(diào)用await()只是將自己阻塞而不會(huì)減少計(jì)數(shù)器的值,只有當(dāng)調(diào)用countDown()方法才會(huì)減一,直到為0時(shí),喚醒a(bǔ)wait(),并且只能攔截一輪
- CyclicBarrier : cyclic 循環(huán) , Barrier:柵欄 ,顧名思義表示可以實(shí)現(xiàn)循環(huán)攔截,其中的計(jì)數(shù)器由自己控制,在CyclicBarrier中線(xiàn)程調(diào)用await()不僅會(huì)將自己阻塞還會(huì)降計(jì)數(shù)器減1.直到為0時(shí)喚醒所有的阻塞隊(duì)列,而后會(huì)重置新一輪的攔截
線(xiàn)程池的選擇使用
- 高并發(fā),任務(wù)執(zhí)行時(shí)間短的業(yè)務(wù)怎樣使用線(xiàn)程池?并發(fā)不高,任務(wù)執(zhí)行長(zhǎng)的業(yè)務(wù)怎么使用線(xiàn)程池?并發(fā)高,業(yè)務(wù)執(zhí)行時(shí)間長(zhǎng)的業(yè)務(wù)怎么使用線(xiàn)程池?
- 高并發(fā),任務(wù)執(zhí)行時(shí)間短,線(xiàn)程池線(xiàn)程數(shù)量可以設(shè)置成CPU+1,目的是減少線(xiàn)程上下文切換
- 并發(fā)不高,任務(wù)執(zhí)行時(shí)間長(zhǎng)的業(yè)務(wù)區(qū)別看待:
- 假設(shè)業(yè)務(wù)長(zhǎng)時(shí)間幾種在IO操作,就是IO密集型任務(wù),由于IO操作并不占用CPU資源,這個(gè)時(shí)候就盡可能讓CPU運(yùn)轉(zhuǎn),可以加大線(xiàn)程池中的線(xiàn)程數(shù)量,讓CPU不至于停下來(lái)等待IO操作,從而處理更多的業(yè)務(wù)
- 假設(shè)任務(wù)長(zhǎng)時(shí)間集中在計(jì)算操作上,就是計(jì)算密集型任務(wù),同一一致,線(xiàn)程池的線(xiàn)程數(shù)量設(shè)置少一些,減少線(xiàn)程上下文的切換
- 并發(fā)高,業(yè)務(wù)執(zhí)行時(shí)間長(zhǎng),解決這種類(lèi)型任務(wù)的關(guān)鍵不在于線(xiàn)程池而在于整體架構(gòu)的設(shè)計(jì)荡碾,看看這些業(yè)務(wù)里面某些數(shù)據(jù)是否能做緩存是第一步谨读,增加服務(wù)器是第二步,至于線(xiàn)程池的設(shè)置坛吁,設(shè)置參考其他有關(guān)線(xiàn)程池的文章劳殖。最后,業(yè)務(wù)執(zhí)行時(shí)間長(zhǎng)的問(wèn)題拨脉,也可能需要分析一下哆姻,看看能不能使用中間件對(duì)任務(wù)進(jìn)行拆分和解耦。
jstack追蹤異常代碼
- Windows 下使用 jps,jstack ,及procexp.exe工具
-
JPS常用命令整理
- JPS是1.5提供的一個(gè)顯示當(dāng)前所有Java進(jìn)程pid命令
- jps : 列出pid和Java主類(lèi)名
- jps -l : 列出pid和Java主類(lèi)全名
- jps -lm : 列出pid,主類(lèi)全程和應(yīng)用程序參數(shù)
- jps -v : 列出pid和JVM參數(shù)
-
Jstack常用命令整理
- jstack是JVM自帶的一種堆棧追蹤工具
- jstack pid(通過(guò)jps獲得的pid) 打印線(xiàn)程堆棧
線(xiàn)程中斷機(jī)制
- java的線(xiàn)程中斷機(jī)制是一種協(xié)作機(jī)制,線(xiàn)程會(huì)不時(shí)的檢測(cè)中斷標(biāo)識(shí)位,以判斷是否應(yīng)該被中斷(值是否為true), 主要有三個(gè)方法
- interrupt() : 每個(gè)線(xiàn)程都有個(gè)boolean類(lèi)型的中斷標(biāo)志,當(dāng)使用該方法時(shí)會(huì)被標(biāo)記為true
- isInterrupted() : 判斷線(xiàn)程是否被中斷
- interrupted() : 清除中斷標(biāo)志,并返回原狀態(tài)
- 當(dāng)使用中斷時(shí),被中斷的線(xiàn)程并不會(huì)立刻停止做事,而是在合適的時(shí)機(jī)終止
- 機(jī)制一: 如果該線(xiàn)程處在可中斷狀態(tài)下sleep,join,wait等,則該線(xiàn)程會(huì)被立刻喚醒,同時(shí)收到一個(gè)interruptedException,,如果是IO則資源會(huì)被關(guān)閉
- 機(jī)制二:如果該線(xiàn)程處于不可中斷狀態(tài)下,即沒(méi)有調(diào)用上方機(jī)制一的api,處于運(yùn)行時(shí)的進(jìn)程,則只是設(shè)置一下中斷標(biāo)志位,其他事情都不會(huì)發(fā)生,如果此后線(xiàn)程調(diào)用阻塞api,則會(huì)馬上跳出,并拋出InterruptedExecption異常,接下來(lái)事情就跟一一致了,如果不調(diào)用阻塞api,則線(xiàn)程會(huì)一致運(yùn)行下去.則在運(yùn)行的代碼中可以輪詢(xún)中斷標(biāo)志位,看它是否被請(qǐng)求停止正在做的事情,可以通過(guò)isInterrupted()來(lái)讀取,同時(shí)也可以通過(guò)一個(gè)interrupted()來(lái)讀取和清楚標(biāo)記位
public class InterruptedTest extends Thread{
public static void main(String[] args) {
InterruptedTest interruptedTest = new InterruptedTest();
interruptedTest.start();
}
@Override
public void run() {
super.run();
MyThread myThread = new MyThread();
myThread.start();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//開(kāi)始中斷
myThread.interrupt();
}
class MyThread extends Thread{
@Override
public void run() {
super.run();
//判斷是否標(biāo)記為中斷:注意被異常捕獲以后中斷標(biāo)記位會(huì)被重置為false,需要再次拋出方可中斷
while (!Thread.currentThread().isInterrupted()){
System.out.println("我是正在運(yùn)行...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) { //sleep是會(huì)有中斷標(biāo)記的因此會(huì)拋出interruptedException: 中斷標(biāo)志位會(huì)被清除,如果想讓上方while退出,必須再次手動(dòng)設(shè)置標(biāo)志位
e.printStackTrace();
System.out.println("我收到了異常中斷標(biāo)記" + Thread.currentThread().isInterrupted()); // false:已經(jīng)被異常重置了
interrupt(); //再次拋出
System.out.println("我收到了再次拋出中斷標(biāo)記" + Thread.currentThread().isInterrupted()); //true,可以正常中斷
}
}
}
}
}
線(xiàn)程池線(xiàn)程復(fù)用原理
- 我們自己創(chuàng)建的線(xiàn)程都知道,他只能start()執(zhí)行一次,一旦執(zhí)行完畢或被中斷即走terminated終止?fàn)顟B(tài)結(jié)束線(xiàn)程了,那為何身處線(xiàn)程池中的線(xiàn)程卻可以復(fù)用一致執(zhí)行呢?你難道沒(méi)有這樣的疑問(wèn)嗎?
-
通過(guò)上面的分析,我們知道線(xiàn)程池的運(yùn)行流程圖!
- 一旦一個(gè)任務(wù)提交我們就會(huì)判斷其后流程:
- 直接申請(qǐng)線(xiàn)程執(zhí)行任務(wù)
- 加入到緩存隊(duì)列中等待線(xiàn)程執(zhí)行
- 執(zhí)行拒絕策略
- 帶著上方的疑問(wèn),我們根據(jù)源碼詳細(xì)分析線(xiàn)程池運(yùn)行機(jī)制: 如何維護(hù)自身狀態(tài),如何管理任務(wù),如何管理線(xiàn)程!
ThreadPoolExecutor
-
首先看他的繼承關(guān)系圖
- 頂層接口Executor提供了一種思想:將任務(wù)提交和任務(wù)執(zhí)行進(jìn)行解耦玫膀。用戶(hù)無(wú)需關(guān)注如何創(chuàng)建線(xiàn)程矛缨,如何調(diào)度線(xiàn)程來(lái)執(zhí)行任務(wù),用戶(hù)只需提供Runnable對(duì)象帖旨,將任務(wù)的運(yùn)行邏輯提交到執(zhí)行器(Executor)中箕昭,由Executor框架完成線(xiàn)程的調(diào)配和任務(wù)的執(zhí)行部分。
- ExecutorService接口增加了一些能力:(1)擴(kuò)充執(zhí)行任務(wù)的能力解阅,補(bǔ)充可以為一個(gè)或一批異步任務(wù)生成Future的方法落竹;(2)提供了管控線(xiàn)程池的方法,比如停止線(xiàn)程池的運(yùn)行
- AbstractExecutorService則是上層的抽象類(lèi)货抄,將執(zhí)行任務(wù)的流程串聯(lián)了起來(lái)述召,保證下層的實(shí)現(xiàn)只需關(guān)注一個(gè)執(zhí)行任務(wù)的方法即可
- ThreadPoolExecutor實(shí)現(xiàn)最復(fù)雜的運(yùn)行部分朱转,ThreadPoolExecutor將會(huì)一方面維護(hù)自身的生命周期,另一方面同時(shí)管理線(xiàn)程和任務(wù)积暖,使兩者良好的結(jié)合從而執(zhí)行并行任務(wù)
線(xiàn)程池的生命周期管理
- 線(xiàn)程池運(yùn)行的狀態(tài),是伴隨著線(xiàn)程池的運(yùn)行在內(nèi)部維護(hù)的,由一個(gè)變量維護(hù)兩個(gè)狀態(tài)值(很常見(jiàn)): 運(yùn)行狀態(tài)(runState)和線(xiàn)程數(shù)量(workCount)
//使用高3位保存線(xiàn)程池的運(yùn)行狀態(tài)runState(總共5種3位足夠了),低29位保存workerCount有效線(xiàn)程數(shù)量:使用同一位的原子類(lèi)不用對(duì)兩個(gè)變量操作時(shí)需要加鎖操作了,直接使用一個(gè)原子類(lèi)即可
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//五種線(xiàn)程池狀態(tài)
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; } //計(jì)算當(dāng)前運(yùn)行狀態(tài)
private static int workerCountOf(int c) { return c & CAPACITY; } //計(jì)算當(dāng)前線(xiàn)程數(shù)量
private static int ctlOf(int rs, int wc) { return rs | wc; } //通過(guò)狀態(tài)和線(xiàn)程數(shù)生成一個(gè)ctl合成數(shù)量是一個(gè)原子類(lèi)
- 注意:線(xiàn)程池運(yùn)行狀態(tài)同線(xiàn)程運(yùn)行狀態(tài)不同的哦
- Running: 能接受新提交的任務(wù),并且能夠處理阻塞隊(duì)列中的任務(wù)
- Shutdown : 關(guān)閉狀態(tài),不能接受新提交的任務(wù),但卻可以繼續(xù)處理阻塞隊(duì)列中的任務(wù)
- stop : 不能接受新任務(wù),也不處理緩存隊(duì)列中的任務(wù),會(huì)中斷正在處理任務(wù)的線(xiàn)程
- Tidying: 所有任務(wù)都終止了,workerCount有效線(xiàn)程數(shù)為0
-
Terminated: 在terminated()方法執(zhí)行完后進(jìn)入該狀態(tài),終止?fàn)顟B(tài)
任務(wù)調(diào)度
- 當(dāng)用戶(hù)提交一個(gè)任務(wù)會(huì)通過(guò)Executor.execute()方法執(zhí)行,他的步驟上方已經(jīng)總結(jié)過(guò)了,這里在重復(fù)說(shuō)一下:
- 首先檢查線(xiàn)程池運(yùn)行狀態(tài),如果不是Running,直接拒絕,線(xiàn)程池要保證在Running狀態(tài)下執(zhí)行任務(wù)
- 如果workerCount(下面使用wc) < corePoolSize,則創(chuàng)建新線(xiàn)程執(zhí)行提交任務(wù)
- 如果wc >= corePoolSize, 核心線(xiàn)程池滿(mǎn)了,阻塞隊(duì)列未滿(mǎn),則添加到隊(duì)列中
- 如果wc >= corePoolSize && wc < maximumPoolSize ,且緩存隊(duì)列滿(mǎn)了,則直接創(chuàng)建新線(xiàn)程執(zhí)行提交任務(wù)
- 如果wc > maximumPoolSize,并且線(xiàn)程隊(duì)列滿(mǎn)了,則根據(jù)拒絕策略具體執(zhí)行該任務(wù),默認(rèn)是拋異常
private final BlockingQueue<Runnable> workQueue; //線(xiàn)程阻塞隊(duì)列
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //計(jì)算當(dāng)前運(yùn)行線(xiàn)程數(shù)小于核心線(xiàn)程數(shù),對(duì)應(yīng)上方2的情況
if (addWorker(command, true)) //創(chuàng)建新的線(xiàn)程執(zhí)行任務(wù)
return;
c = ctl.get();
}
//執(zhí)行這里說(shuō)明過(guò)了2那種情況,就看3的情況,阻塞隊(duì)列可以正常添加任務(wù)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get(); //添加成功了
if (! isRunning(recheck) && remove(command)) //如果添加以后不再是運(yùn)行狀態(tài),則移除剛才加入隊(duì)列的任務(wù)
reject(command); //執(zhí)行拒絕策略
else if (workerCountOf(recheck) == 0) //線(xiàn)程池運(yùn)行狀態(tài)且wc運(yùn)行線(xiàn)程數(shù)為0時(shí)
addWorker(null, false); //創(chuàng)建新線(xiàn)程執(zhí)行阻塞隊(duì)列中的任務(wù)
}
else if (!addWorker(command, false)) //如果隊(duì)列滿(mǎn)了,可以正常走addWorker創(chuàng)建非核心線(xiàn)程即上方4, 如果false則是5
reject(command); //走拒絕策略
}
- 增加新線(xiàn)程執(zhí)行任務(wù)addWorker()
private final HashSet<Worker> workers = new HashSet<>(); //線(xiàn)程池中的所有工作線(xiàn)程
private boolean addWorker(Runnable firstTask, boolean core) {
//根據(jù)當(dāng)前狀態(tài),判斷是否添加成功,上方執(zhí)行方法中的addWorker兩個(gè)參數(shù)firstTask = null ,core = true /false 具體分析
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); //獲取運(yùn)行狀態(tài)
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && //狀態(tài) > shutDown 表示此時(shí)已經(jīng)不再接受任務(wù)
//shutdown狀態(tài)不接受新任務(wù),但可以執(zhí)行已經(jīng)加入隊(duì)列中的任務(wù),所以當(dāng)進(jìn)入shutdown狀態(tài),且傳進(jìn)來(lái)的任務(wù)為null時(shí),并且任務(wù)隊(duì)列不為null時(shí),是允許添加新線(xiàn)程的,把這個(gè)條件取反就是不允許
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) { //使用CAS操作避免加鎖
int wc = workerCountOf(c); //獲取工作線(xiàn)程
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false; //大于線(xiàn)程最大容量2的29次方量(所以newCacheExecutor并不能得到Integer.MAX_Value的),或者大于最大允許線(xiàn)程量則不能添加啦
if (compareAndIncrementWorkerCount(c)) //可添加就CAS操作線(xiàn)程數(shù)+1,成功說(shuō)明可添加
break retry; //break跳出retry對(duì)應(yīng)的循環(huán),執(zhí)行循環(huán)后面的添加worker邏輯
c = ctl.get(); // Re-read ctl 重新讀取狀態(tài)
if (runStateOf(c) != rs)
continue retry; //狀態(tài)改變了,跳到外層循環(huán)繼續(xù)重新執(zhí)行循環(huán)
// else CAS failed due to workerCount change; retry inner loop
//在內(nèi)存層循環(huán)中不停的嘗試CAS操作增加線(xiàn)程數(shù)
}
}
//找了上方break retry可以正常使用CAS新增線(xiàn)程數(shù)
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); //通過(guò)Worker包裝runnable任務(wù),稍后我們分析
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //加鎖
try {
int rs = runStateOf(ctl.get());
//如果線(xiàn)程池狀態(tài)rs < Shutdown即只能是Running
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { //或者shutDown狀態(tài)但是沒(méi)有新任務(wù)
if (t.isAlive()) // 線(xiàn)程已經(jīng)啟動(dòng)藤为,并且當(dāng)前沒(méi)有任何異常的話(huà),則是true夺刑,否則為false
throw new IllegalThreadStateException(); //我還沒(méi)有啟動(dòng)呢
workers.add(w); //正常添加到線(xiàn)程池中workers工作線(xiàn)程
int s = workers.size();
if (s > largestPoolSize) //largestPoolSize:記錄著線(xiàn)程池中出現(xiàn)過(guò)最大線(xiàn)程數(shù)量
largestPoolSize = s;
workerAdded = true; //可以正常工作的標(biāo)記
}
} finally {
mainLock.unlock();
}
if (workerAdded) { //如果正常工作,則開(kāi)啟線(xiàn)程任務(wù)
t.start();
workerStarted = true; //開(kāi)始工作標(biāo)記
}
}
} finally {
if (! workerStarted) //該任務(wù)沒(méi)有開(kāi)始,則添加到失敗
addWorkerFailed(w);
}
return workerStarted;
}
- 通過(guò)上面分析,新增線(xiàn)程去完成任務(wù)主要通過(guò)worker類(lèi)去完成的
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable //實(shí)現(xiàn)了Runnable接口,因此t.start()執(zhí)行的就是worker的run方法啊
{
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); //創(chuàng)建thread(this:Worker) ,則t.start()調(diào)用worker的run,同時(shí)原來(lái)的Runnable被封裝為Worker的屬性firstTask
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
//getThreadFactory即為T(mén)hreadPoolExecutor創(chuàng)建thread工廠(chǎng)(實(shí)現(xiàn)ThreadFactory)可修改Thread名稱(chēng),優(yōu)先級(jí)等操作實(shí)現(xiàn)的
public ThreadFactory getThreadFactory() {
return threadFactory;
}
- 所以上方addWorker添加工作線(xiàn)程的t.start()方法調(diào)用的就是runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; //這個(gè)就是我們執(zhí)行線(xiàn)程池executor.execute()方法時(shí)候的runnable
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果task不為null,并且從workQueue中獲取任務(wù)不為null,則會(huì)一直執(zhí)行下去
while (task != null || (task = getTask()) != null) { //task是需要執(zhí)行的任務(wù),不一定是剛剛添加的那個(gè)了,這樣其實(shí)worker線(xiàn)程并沒(méi)有完成工作,自然也就不會(huì)銷(xiāo)毀了
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) || //檢查線(xiàn)程狀態(tài),若線(xiàn)程池處于中斷狀態(tài),調(diào)用interrupt將線(xiàn)程中斷
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt(); //中斷線(xiàn)程
try {
beforeExecute(wt, task); //可以在任務(wù)真正執(zhí)行之前做點(diǎn)啥,空實(shí)現(xiàn)
Throwable thrown = null;
try {
task.run(); //執(zhí)行execute()方法中的run方法,在t.start()線(xiàn)程內(nèi),這只是一個(gè)方法執(zhí)行哈!
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); //線(xiàn)程之后可以做啥,空實(shí)現(xiàn)
}
} finally {
task = null;
w.completedTasks++; //該線(xiàn)程執(zhí)行完成任務(wù)+1
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- 重點(diǎn)邏輯是while循環(huán),當(dāng)我們第一次創(chuàng)建worker并執(zhí)行任務(wù)后,并沒(méi)有結(jié)束線(xiàn)程,而是通過(guò)while循環(huán)調(diào)用getTask()方法從阻塞隊(duì)列中去task繼續(xù)調(diào)用task.run()執(zhí)行任務(wù),注意這里run()只是一個(gè)普通的方法調(diào)用,并不是start()哦!運(yùn)行線(xiàn)程就是Worker線(xiàn)程中
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 對(duì)應(yīng)ShutDown雖然不添加任務(wù),但是可以執(zhí)行阻塞隊(duì)列中的,Stop以后就不能子在執(zhí)行任務(wù)了
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null; //返回null,停止執(zhí)行任務(wù)
}
int wc = workerCountOf(c);
// allowCoreThreadTimeOut 表示是否允許核心線(xiàn)程超時(shí)銷(xiāo)毀,默認(rèn)false不銷(xiāo)毀.若設(shè)置成true,核心線(xiàn)程也會(huì)銷(xiāo)毀的
//只有正在工作的線(xiàn)程數(shù)大于核心線(xiàn)程數(shù)才會(huì)為true,佛足額返回false
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果timed為true(wx > 核心線(xiàn)程),通過(guò)poll取任務(wù),如果為false,通過(guò)take取任務(wù)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //這兩個(gè)參數(shù)就是創(chuàng)建線(xiàn)程池中保存時(shí)間量
workQueue.take();
if (r != null) //如果有任務(wù)就退出死循環(huán),返回任務(wù)交給上方的worker線(xiàn)程運(yùn)行
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
- 通過(guò)以上分析:不改變allowCoreThreadTimeOut默認(rèn)前提下,若wc > 核心線(xiàn)程數(shù),則通過(guò)poll從隊(duì)列中取任務(wù),如果wc <= 核心線(xiàn)程數(shù),則通過(guò)take取任務(wù)
- 則poll()方法同take()區(qū)別是啥呢?
- 他們都是阻塞隊(duì)列中的方法:常用的 ArrayBlockingQueue,LinkedBlockingQueue,PriorityQueue,SynchronizedQueue,中的方法
//ArrayBlockingQueue
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) { //是否隊(duì)列中的元素個(gè)數(shù)為0,說(shuō)明空隊(duì)列
if (nanos <= 0L) //等待時(shí)間到了,隊(duì)列中還未有數(shù)據(jù)加入,則返回null,
return null;
/**
* 調(diào)用該方法的前提是缅疟,當(dāng)前線(xiàn)程已經(jīng)成功獲得與該條件對(duì)象綁定的重入鎖,否* * 則調(diào)用該方法時(shí)會(huì)拋出IllegalMonitorStateException性誉。
* nanosTimeout指定該方法等待信號(hào)的的最大時(shí)間(單位為納秒)窿吩。若指定時(shí)間* * 內(nèi)收到signal()或signalALL()則返回nanosTimeout減去已經(jīng)等待的時(shí)間;
*若指定時(shí)間內(nèi)有其它線(xiàn)程中斷該線(xiàn)程错览,則拋出InterruptedException并清除當(dāng)前線(xiàn)程的打斷狀態(tài)纫雁;
* 若指定時(shí)間內(nèi)未收到通知,則返回0或負(fù)數(shù)倾哺。
*/
nanos = notEmpty.awaitNanos(nanos); //每次signal喚醒重新等待
}
return dequeue(); //如果有元素取出
} finally {
lock.unlock();
}
}
//如果poll超時(shí)返回null,則回調(diào)到
f ((wc > maximumPoolSize || (timed && timedOut)) //true
&& (wc > 1 || workQueue.isEmpty())) { //隊(duì)列也是空的,走進(jìn)去
if (compareAndDecrementWorkerCount(c)) //CAS可以減少c的個(gè)數(shù)
return null; //返回了null,該線(xiàn)程不能再上方的while循環(huán)中繼續(xù)獲取就結(jié)束線(xiàn)程啦,非核心線(xiàn)程就over啦,嘿嘿!
continue;
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) //不能使用if,避免虛假喚醒
notEmpty.await(); //一旦count隊(duì)列為空,會(huì)一致await阻塞在這里的,直到workQueue.offer()添加元素時(shí)喚醒
return dequeue(); //取出隊(duì)頭元素
} finally {
lock.unlock();
}
}
- 綜上: 當(dāng)核心線(xiàn)程在while循環(huán)中運(yùn)行調(diào)用getTask獲取task任務(wù)時(shí),如果此時(shí)隊(duì)列中沒(méi)有數(shù)據(jù)則走緩存阻塞隊(duì)列的take方法,會(huì)被notEmpty.await()阻塞,那這個(gè)阻塞又是何時(shí)被喚醒的呢?
- 當(dāng)然是下一個(gè)任務(wù)達(dá)到的時(shí)候也就是調(diào)用execute的時(shí)候添加一個(gè)新的任務(wù)Task
//這個(gè)就是調(diào)用當(dāng)前核心線(xiàn)程已經(jīng)滿(mǎn)了,則添加到阻塞隊(duì)列中,
//剛剛上方的核心線(xiàn)程在等待任務(wù),添加以后肯定就調(diào)用notEmpty.signal()喚醒等待線(xiàn)程取任務(wù)執(zhí)行啦
if (isRunning(c) && workQueue.offer(command))
- 我們來(lái)驗(yàn)證一下我們的想法:workQueue就是選擇的隊(duì)列,這里看ArrayBlockingQueue,當(dāng)然對(duì)于其他隊(duì)列也是相同的
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock; //獲取鎖,跟上方加鎖時(shí)同一把鎖
lock.lock();
try {
if (count == items.length)
return false; //如果當(dāng)前隊(duì)列已滿(mǎn),不能再加入了false
else {
enqueue(e); //正常添加到隊(duì)列中
return true;
}
} finally {
lock.unlock();
}
}
//enqueue添加到數(shù)組循環(huán)隊(duì)列中后調(diào)用notEmpty.signal()喚醒一個(gè)await線(xiàn)程取任務(wù)開(kāi)始工作啦!
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
- 通過(guò)生成-消費(fèi)者模式,將execute加入隊(duì)列的任務(wù)通知等待的核心線(xiàn)程取阻塞隊(duì)列中的任務(wù)開(kāi)始執(zhí)行!
補(bǔ)充一下阻塞隊(duì)列的源碼分析
ArrayDeque: 底層使用循環(huán)數(shù)組實(shí)現(xiàn)雙向隊(duì)列
- 局部變量
transient Object[] elements; //當(dāng)前存儲(chǔ)數(shù)組 transient int head; //其始頭結(jié)點(diǎn) transient int tail; //尾結(jié)點(diǎn) private static final int MIN_INITIAL_CAPACITY = 8; //默認(rèn)數(shù)組大小為 8
- 注意由于是循環(huán)數(shù)組,所以頭結(jié)點(diǎn)不一定比尾結(jié)點(diǎn)小哦,也可能比尾結(jié)點(diǎn)大有size - 1 -> 0 的位置
- 由于需要判斷數(shù)組中數(shù)據(jù)是否為空,還是已經(jīng)填滿(mǎn),所以會(huì)有一個(gè)節(jié)點(diǎn)浪費(fèi)掉,這是因?yàn)?
- 如果head = tail 無(wú)法判斷他們是空的還是滿(mǎn)的數(shù)組
- 如果空出一位tail表示當(dāng)前需要填充數(shù)據(jù)的位置,則當(dāng)head =tail時(shí)表示空,head = (tail + 1) % size 相等,表示當(dāng)前數(shù)組滿(mǎn)了,需要擴(kuò)容
- 擴(kuò)容方法也參考HashMap的擴(kuò)容以 2的N次方,則上面的 %可以表示為 head = (tail + 1) & (size - 1)性能更好
- 因此: ArrayDeque默認(rèn)數(shù)組大小為 8 ,然后以 2倍擴(kuò)容,不滿(mǎn)會(huì)向上補(bǔ),比如 創(chuàng)建一個(gè)14的數(shù)組實(shí)際創(chuàng)建的是 大小 16的循環(huán)數(shù)組
add方法
- 源碼為:不能存儲(chǔ)null
public void addLast(E e) { if (e == null) throw new NullPointerException(); elements[tail] = e; //注意這里是先添加在判斷是否擴(kuò)容,通過(guò)上面局部變量的分析可知肯定有一個(gè)空余的位置,在這里正好填滿(mǎn)數(shù)組以后再判斷是否需要擴(kuò)容,判斷方法也同上2的情形一致 if ( (tail = (tail + 1) & (elements.length - 1)) == head) doubleCapacity(); //兩倍的擴(kuò)容數(shù)組 } //我們查看擴(kuò)容代碼 private void doubleCapacity() { assert head == tail; //斷言tail增加以后是否跟head一致,數(shù)組完全填滿(mǎn) int p = head; int n = elements.length; int r = n - p; // 分兩部分來(lái)復(fù)制,n分成 從0->p - 1 復(fù)制p個(gè)數(shù)據(jù) 和從p->n 復(fù)制(n - p)個(gè)數(shù)據(jù) int newCapacity = n << 1; //擴(kuò)容兩倍 if (newCapacity < 0) throw new IllegalStateException("Sorry, deque too big"); Object[] a = new Object[newCapacity]; //將原來(lái)數(shù)組的起始位置至數(shù)組末尾全部復(fù)制到新數(shù)組0 -> n - p - 1處,此時(shí)不用管尾結(jié)點(diǎn)的位置,全部復(fù)制,多余的復(fù)制會(huì)存儲(chǔ)成默認(rèn)值的,不用在意(這里不會(huì)出現(xiàn)的,因?yàn)樯戏降臄嘌?數(shù)組數(shù)據(jù)已經(jīng)填滿(mǎn)了) //分開(kāi)copy是為了避免頭尾節(jié)點(diǎn)過(guò)度的問(wèn)題,不管你們?cè)谀膬?我都是從頭復(fù)制到最尾部,在從數(shù)組0位置copy到頭部節(jié)點(diǎn)處即可全部復(fù)制 System.arraycopy(elements, p, a, 0, r); //在賦值從原來(lái)數(shù)組的0->p -1的位置上的數(shù)據(jù)到 新數(shù)組的 n-p -> (n - 1)處,即把原來(lái)舊數(shù)組全部復(fù)制到新數(shù)組從0其實(shí)到n -1結(jié)尾處 System.arraycopy(elements, 0, a, r, p); elements = a; //拷貝后重新賦值給數(shù)組 head = 0; //重新設(shè)置起始頭結(jié)點(diǎn)為 0 ,尾結(jié)點(diǎn)為n處的null tail = n; }
remove 刪除
- remove方法最終調(diào)取pollFirst() 刪除頭部數(shù)據(jù) 或 pollLast() 刪除尾部數(shù)據(jù)
public E pollFirst() { int h = head; @SuppressWarnings("unchecked") E result = (E) elements[h]; if (result == null) return null; // r如果當(dāng)前頭部不為則賦值為null,并將頭head增加 1,且判斷是否越界超過(guò)整個(gè)數(shù)組大小了 elements[h] = null; // Must null out slot head = (h + 1) & (elements.length - 1); return result; } public E pollLast() { //尾結(jié)點(diǎn)刪除就是 -1 是否小于了0,如果tail = 0 ,則t = -1 & 15 (-1的補(bǔ)碼為 32位的1 & 15 = 15設(shè)置成最后一位數(shù)組的值刪除并且尾結(jié)點(diǎn)為null的指向最后一位) int t = (tail - 1) & (elements.length - 1); @SuppressWarnings("unchecked") E result = (E) elements[t]; if (result == null) return null; elements[t] = null; //刪除循環(huán)數(shù)組中最后一位 tail = t; return result; }
修改和查找
- 就是普通的數(shù)組查找,無(wú)非是垮了個(gè)(size -1 ) -> 0而已
PriorityQueue: 使用堆得優(yōu)先級(jí)隊(duì)列
- PriorityQueue一個(gè)基于優(yōu)先級(jí)的無(wú)界隊(duì)列轧邪,優(yōu)先級(jí)隊(duì)列的元素按照其自然順序進(jìn)行排序或者根據(jù)自定義提供的Comparator進(jìn)行排序
- 比如對(duì)于VIP和普通用戶(hù)的請(qǐng)求進(jìn)行優(yōu)先級(jí)排序處理請(qǐng)求等;
- 不允許使用null及不可比較的對(duì)象(沒(méi)有實(shí)現(xiàn)Comparable接口的對(duì)象)
- 非線(xiàn)程安全的,但是可以使用PriorityBlockingQueue用于多線(xiàn)程環(huán)境
- PriorityQueue隊(duì)列頭是排序規(guī)則中最小的那個(gè)元素,如果多個(gè)元素都是最小值則隨機(jī)挑選一個(gè)
- 常用方法及時(shí)間復(fù)雜度(后文分析)
- peek() 返回隊(duì)首元素 O(1)
- element(); //返回隊(duì)頭元素(不刪除) O(1)
- poll() 返回隊(duì)首元素羞海,隊(duì)首元素出隊(duì)列 O(log(N))
- add() 添加元素 O(log(N))
- offer(E e)將指定的元素插入此優(yōu)先級(jí)隊(duì)列忌愚。不能添加null元素。 O(log(N))
- isEmpty() 判斷是否為空
構(gòu)造函數(shù)
- 查看構(gòu)造函數(shù)
//默認(rèn)數(shù)組大小 11 private static final int DEFAULT_INITIAL_CAPACITY = 11; //存儲(chǔ)數(shù)組 transient Object[] queue; // non-private to simplify nested class access //數(shù)組的長(zhǎng)度 private int size = 0; //隊(duì)列比較器却邓,為null使用默認(rèn)比較器 private final Comparator<? super E> comparator; //Fail-Fast標(biāo)記,用于多線(xiàn)程 transient int modCount = 0; // non-private to simplify nested class access
實(shí)現(xiàn)原理
- 通過(guò)二叉小頂堆實(shí)現(xiàn),可以用一顆完全二叉樹(shù)表示(任意一個(gè)非葉子節(jié)點(diǎn)的權(quán)值,都不大于去左右子節(jié)點(diǎn)的權(quán)值),也就意味著可以通過(guò)數(shù)組作為其底層實(shí)現(xiàn)(數(shù)組實(shí)現(xiàn)簡(jiǎn)單,同時(shí)不會(huì)占用無(wú)用內(nèi)存,數(shù)組中間不會(huì)有空余位置)
[圖片上傳失敗...(image-8ca912-1592548755621)] - 觀察上方我們可以得到父子節(jié)點(diǎn)之間的關(guān)系
- 左子點(diǎn)下標(biāo): 當(dāng)前父節(jié)點(diǎn)下標(biāo) * 2 + 1 ;
- 右子點(diǎn)下標(biāo): 當(dāng)前父節(jié)點(diǎn)下標(biāo) * 2 + 2 ;
- 父節(jié)點(diǎn)下標(biāo): (當(dāng)前子節(jié)點(diǎn) - 1) / 2 ; //不論是左子節(jié)點(diǎn)還是右子節(jié)點(diǎn)
- 通過(guò)上方公式,我們可以很方便計(jì)算出某個(gè)節(jié)點(diǎn)的父節(jié)點(diǎn)以及子節(jié)點(diǎn)的下標(biāo),這就是為何使用數(shù)組來(lái)存儲(chǔ)堆的原因
添加 add/offer
- 兩者區(qū)別主要是插入失敗處理不同,add插入失敗拋異常,而offer插入失敗返回false,對(duì)于PriorityQueue兩者沒(méi)啥差別的
[圖片上傳失敗...(image-6e9980-1592548755621)] - 當(dāng)新加入元素破壞了最小頂堆,就需要跟它父節(jié)點(diǎn)/祖父節(jié)點(diǎn)..設(shè)置根節(jié)點(diǎn)做調(diào)整,只是交換子節(jié)點(diǎn)跟父節(jié)點(diǎn)即可
public boolean offer(E e) { if (e == null) throw new NullPointerException(); modCount++; int i = size; if (i >= queue.length) //如果當(dāng)前數(shù)組已經(jīng)滿(mǎn)了,就擴(kuò)容其大小 grow(i + 1); //擴(kuò)容函數(shù)如果數(shù)組長(zhǎng)度小于64就兩倍擴(kuò)容,否則增長(zhǎng)1.5倍 size = i + 1; if (i == 0) //如果當(dāng)前數(shù)組為空,則在第一位添加數(shù)據(jù) queue[0] = e; else siftUp(i, e); //開(kāi)始在葉子結(jié)點(diǎn)添加并調(diào)節(jié)數(shù)據(jù),插入位置是數(shù)組的最后一位,也就是最下方葉子結(jié)點(diǎn)的最右邊的那個(gè)位置 return true; } //grow擴(kuò)容函數(shù) private void grow(int minCapacity) { int oldCapacity = queue.length; // Double size if small; else grow by 50% int newCapacity = oldCapacity + ((oldCapacity < 64) ? (oldCapacity + 2) : (oldCapacity >> 1)); // overflow-conscious code if (newCapacity - MAX_ARRAY_SIZE > 0) newCapacity = hugeCapacity(minCapacity); queue = Arrays.copyOf(queue, newCapacity); //復(fù)制到新的數(shù)組中 } //seftUp()函數(shù) private void siftUp(int k, E x) { if (comparator != null) //自定義的比較器 siftUpUsingComparator(k, x); else //默認(rèn)比較器 siftUpComparable(k, x); } //我們主要看自定義比較器的方法 private void siftUpUsingComparator(int k, E x) { while (k > 0) { //k最開(kāi)始是數(shù)組的最后一位,我們開(kāi)始跟他的父節(jié)點(diǎn)進(jìn)行比較 int parent = (k - 1) >>> 1; //根據(jù)上方公式第三條得到其父節(jié)點(diǎn)下標(biāo) k>0的位置肯定有父節(jié)點(diǎn),不用擔(dān)心下面的數(shù)組越界問(wèn)題 Object e = queue[parent]; //獲取父節(jié)點(diǎn)的數(shù)據(jù) if (comparator.compare(x, (E) e) >= 0) //根據(jù)自定義比較器確定兩者關(guān)系,如果x比父節(jié)點(diǎn)e大,則表明插入位置正確,退出循環(huán) break; queue[k] = e; //否則將父節(jié)點(diǎn)的值賦值給當(dāng)前子節(jié)點(diǎn)的位置,父節(jié)點(diǎn)值并沒(méi)有改變,只是位置信息賦值給了當(dāng)前k,comparator比較的時(shí)候還是拿到插入值x同父節(jié)點(diǎn)值比較,跟當(dāng)前節(jié)點(diǎn)值無(wú)關(guān)的 k = parent; } queue[k] = x; //上方公式獲取到k應(yīng)該插入的位置,對(duì)其賦值即可,k只賦值了這一次哦 }
- 加入元素可能破壞小頂堆性質(zhì),需要進(jìn)行調(diào)整,過(guò)程為: 從K指定的位置開(kāi)始,將x追層與當(dāng)前點(diǎn)的parent進(jìn)行比較并交換,知道滿(mǎn)足x>= queue[parent]:比較可以是元素的自然順序,也可以是自定義比較器
獲得隊(duì)首元素
- element()和peek() : 獲取但不刪除隊(duì)首元素,也就是隊(duì)列中權(quán)值最小的那個(gè)元素,區(qū)別是前者失敗拋出異常,后者返回null,由于采用最小堆,且隊(duì)首元素在下標(biāo)為0處,因此直接返回queue[0]即可
public E peek() { return (size == 0) ? null : (E) queue[0]; }
remove和poll
- 獲取并刪除隊(duì)首元素,區(qū)別是remove失敗拋出異常,poll返回null,由于隊(duì)首刪除會(huì)改變最小堆結(jié)構(gòu),因此需要維護(hù)小頂堆的性質(zhì),需要調(diào)節(jié)堆
[圖片上傳失敗...(image-d67d9e-1592548755621)]public E poll() { if (size == 0) return null; int s = --size; //查找數(shù)組最后一位數(shù)據(jù) x ,對(duì)其調(diào)用siftDown方法 modCount++; E result = (E) queue[0]; E x = (E) queue[s]; queue[s] = null; //刪除以后數(shù)組長(zhǎng)度 -1 ,最后一位設(shè)置成null if (s != 0) //s不是隊(duì)首元素則開(kāi)始調(diào)整堆 siftDown(0, x); return result; } //調(diào)整堆的函數(shù),無(wú)非就是跟其孩子節(jié)點(diǎn)進(jìn)行比較 private void siftDown(int k, E x) { //注意:此時(shí) k 為 0 ,x為當(dāng)前數(shù)組末尾那個(gè)值也就是上圖中的數(shù)字 9 if (comparator != null) //是否存在自定義的比較器 siftDownUsingComparator(k, x); else siftDownComparable(k, x); } //自定義比較器 private void siftDownUsingComparator(int k, E x) { int half = size >>> 1; //這里注意理解: size是減后的數(shù)組大小,由于堆中父節(jié)點(diǎn) < 子節(jié)點(diǎn)特性,因此,對(duì)于最下層,也就是圖片中跟9同級(jí)的節(jié)點(diǎn)是沒(méi)有必要在考慮的,同樣9如果有祖父節(jié)點(diǎn)則右側(cè)叔節(jié)點(diǎn)也是沒(méi)有必要考慮的,對(duì)于上方的b圖 size = 10 ,右移1位為 5就是數(shù)據(jù)15的位置,則15的父節(jié)點(diǎn)必然小于15所以已經(jīng)找到交換過(guò)了,當(dāng)15的父節(jié)點(diǎn)為根節(jié)點(diǎn)時(shí),下方對(duì)左右有比較的,已經(jīng)覆蓋整個(gè)特點(diǎn)了,因此設(shè)置成 size / 2 更加快速合理 while (k < half) { //循環(huán)比較,找到數(shù)組中最小數(shù)字存放到堆頂元素 int child = (k << 1) + 1; //第一次找到k的左子樹(shù) Object c = queue[child]; int right = child + 1; //找到k的右子樹(shù) if (right < size && comparator.compare((E) c, (E) queue[right]) > 0) //首先比較左右子樹(shù)找到他們的最小值,以便下面跟父節(jié)點(diǎn)比較是否交換 c = queue[child = right]; //如果右子樹(shù)更小,設(shè)置給臨時(shí)變量c if (comparator.compare(x, (E) c) <= 0) //比較當(dāng)前最后一位數(shù)據(jù)x,跟他的子樹(shù)中最小的數(shù)據(jù),如果比他小,則找到了這個(gè)存放x的位置,如果比他大,則交換值 break; queue[k] = c; //將比x小的子樹(shù)存放到父節(jié)點(diǎn)中 k = child; //設(shè)置k的下標(biāo)為當(dāng)前孩子節(jié)點(diǎn)位置,注意child已經(jīng)在比較左右子樹(shù)時(shí)根據(jù)他們大小設(shè)置到最小子樹(shù)上了,因此在往下查找 } queue[k] = x; //找到了當(dāng)前合適的父節(jié)點(diǎn)位置K,x的值比他的左右子樹(shù)都大,則這個(gè)位置就是存放x的位置 }
remove(Object o) :刪除指定元素
- 遍歷數(shù)組找到第一個(gè)滿(mǎn)足條件的元素下標(biāo)(如果有多個(gè),只刪除一個(gè)的),刪除會(huì)改變隊(duì)列結(jié)構(gòu),需要進(jìn)行調(diào)整,分為兩種情況:
- 刪除的數(shù)組最后一位元素,則直接刪除即可
- 刪除的不是最后一個(gè)元素,則從刪除點(diǎn)開(kāi)始以最后一個(gè)元素為參考調(diào)用一次siftDown(),可能導(dǎo)致不滿(mǎn)足父節(jié)點(diǎn)小于該刪除節(jié)點(diǎn),還需要上慮調(diào)用添加是的siftUp()將添加走一遍
-
很多人對(duì)第二個(gè)不理解,簡(jiǎn)單分析一下源碼,刪除操作實(shí)際調(diào)用removeAt(int i)
[圖片上傳失敗...(image-53d895-1592548755621)]
private E removeAt(int i) { //i為數(shù)組中從0開(kāi)始查找第一個(gè)滿(mǎn)足.equeal(o)的元素下標(biāo) // assert i >= 0 && i < size; modCount++; int s = --size; //找到數(shù)組末尾數(shù)據(jù),如果是第一種情況直接刪除即可 if (s == i) // removed last element queue[i] = null; else { //如果是第二種情況 E moved = (E) queue[s]; //首先保存移除數(shù)組末尾元素,然后將數(shù)組最后一位置空,即刪除任何數(shù)據(jù),其實(shí)就是刪除最未那個(gè),這點(diǎn)同紅黑樹(shù)刪除一致 queue[s] = null; siftDown(i, moved); //再次調(diào)用siftDown,此時(shí)i是需要?jiǎng)h除的元素下標(biāo),moved是數(shù)組最后一位的數(shù)據(jù),其實(shí)上方的poll彈出堆頂元素,只是一個(gè)特殊的移除下標(biāo)為0的數(shù)據(jù)而已 //對(duì)于上方的圖片只是一種特殊情況4正好是9的祖父節(jié)點(diǎn),如果需要移除的是10也沒(méi)有關(guān)系,我們依然刪除下標(biāo)10的數(shù)據(jù),將數(shù)據(jù)9跟10的兩個(gè)子節(jié)點(diǎn)15 跟11比較,發(fā)現(xiàn)都小于則將9放到下標(biāo)2的位置再將10返回即可,只要滿(mǎn)足子節(jié)點(diǎn)大于父節(jié)點(diǎn)即可 if (queue[i] == moved) { //上方調(diào)整過(guò)的數(shù)據(jù),如果發(fā)現(xiàn)當(dāng)前刪除節(jié)點(diǎn)數(shù)據(jù)直接填充了比如刪除的是11,這個(gè)時(shí)候填充9到下標(biāo)6的位置,你發(fā)現(xiàn) queue[6] = movie = 9下方的都是大于9的,但是父節(jié)點(diǎn)并不能保證小于9哦,這個(gè)時(shí)候就需要向添加時(shí)候一樣,向上過(guò)濾直到滿(mǎn)足所有下方數(shù)據(jù)都小于父節(jié)點(diǎn)即2跟6交換后2位置9,6位置10 siftUp(i, moved); if (queue[i] != moved) return moved; } } return null; }
注意:無(wú)論是隊(duì)列還是棧都是可以用鏈表或者數(shù)組實(shí)現(xiàn)的,基本上所有數(shù)據(jù)都是這兩種形式
- 很多人會(huì)說(shuō),我學(xué)習(xí)ArrayDeque跟PriorityQueue數(shù)據(jù)結(jié)構(gòu)有啥用呢?你大概是忘記了一個(gè)重要的東西線(xiàn)程池
- 線(xiàn)程池中緩存隊(duì)列有主要有四種:
- ArrayBlockingQueue(int i): 它是ArrayDeque的線(xiàn)程安全版本,基本使用一致,無(wú)非就是加了鎖保證線(xiàn)程安全性,還有一個(gè)它是有界的i是數(shù)組的大小,不可更改,ArrayDeque是可變的
- LinkedBlockingQueue(int i): i可要可不要,如果不要就是不固定大小的鏈表,如果有則為指定大小的鏈表,這個(gè)簡(jiǎn)單就不分析了,跟鏈表一致的
- PriorityBlockingQueue(int i): i可有可無(wú),意義同上,是PriorityQueue的線(xiàn)程安全類(lèi),依據(jù)設(shè)置優(yōu)先級(jí)彈出隊(duì)列中的任務(wù)給線(xiàn)程池使用
- SynchronizedQueue() : 沒(méi)有空間的隊(duì)列,不緩存任何數(shù)據(jù)滴,來(lái)多少任務(wù)創(chuàng)建多少線(xiàn)程