問題
(1)DelayQueue是阻塞隊列嗎脆栋?
(2)DelayQueue的實現(xiàn)方式倦卖?
(3)DelayQueue主要用于什么場景?
簡介
DelayQueue是java并發(fā)包下的延時阻塞隊列椿争,常用于實現(xiàn)定時任務(wù)怕膛。
繼承體系
從繼承體系可以看到,DelayQueue實現(xiàn)了BlockingQueue秦踪,所以它是一個阻塞隊列褐捻。
另外椅邓,DelayQueue還組合了一個叫做Delayed的接口板壮,DelayQueue中存儲的所有元素必須實現(xiàn)Delayed接口绰精。
那么卿樱,Delayed是什么呢繁调?
publicinterfaceDelayedextendsComparable{longgetDelay(TimeUnit unit);}
Delayed是一個繼承自Comparable的接口,并且定義了一個getDelay()方法烤送,用于表示還有多少時間到期帮坚,到期了應(yīng)返回小于等于0的數(shù)值。
源碼分析
主要屬性
// 用于控制并發(fā)的鎖privatefinaltransientReentrantLock lock =newReentrantLock();// 優(yōu)先級隊列privatefinalPriorityQueue q =newPriorityQueue();// 用于標(biāo)記當(dāng)前是否有線程在排隊(僅用于取元素時)privateThread leader =null;// 條件阅悍,用于表示現(xiàn)在是否有可取的元素privatefinalCondition available = lock.newCondition();
從屬性我們可以知道,延時隊列主要使用優(yōu)先級隊列來實現(xiàn)寻行,并輔以重入鎖和條件來控制并發(fā)安全拌蜘。
因為優(yōu)先級隊列是無界的,所以這里只需要一個條件就可以了举娩。
還記得優(yōu)先級隊列嗎?點(diǎn)擊鏈接直達(dá)【死磕 java集合之PriorityQueue源碼分析】
主要構(gòu)造方法
publicDelayQueue(){}publicDelayQueue(Collection<? extends E> c){this.addAll(c);}
構(gòu)造方法比較簡單骄噪,一個默認(rèn)構(gòu)造方法,一個初始化添加集合c中所有元素的構(gòu)造方法滔韵。
入隊
因為DelayQueue是阻塞隊列陪蜻,且優(yōu)先級隊列是無界的,所以入隊不會阻塞不會超時症昏,因此它的四個入隊方法是一樣的肝谭。
publicbooleanadd(E e){returnoffer(e);}publicvoidput(E e){? ? offer(e);}publicbooleanoffer(E e,longtimeout, TimeUnit unit){returnoffer(e);}publicbooleanoffer(E e){finalReentrantLock lock =this.lock;? ? lock.lock();try{? ? ? ? q.offer(e);if(q.peek() == e) {? ? ? ? ? ? leader =null;? ? ? ? ? ? available.signal();? ? ? ? }returntrue;? ? }finally{? ? ? ? lock.unlock();? ? }}
入隊方法比較簡單:
(1)加鎖;
(2)添加元素到優(yōu)先級隊列中医寿;
(3)如果添加的元素是堆頂元素,就把leader置為空沟突,并喚醒等待在條件available上的線程惠拭;
(4)解鎖棒呛;
出隊
因為DelayQueue是阻塞隊列,所以它的出隊有四個不同的方法趋观,有拋出異常的皱坛,有阻塞的,有不阻塞的抹沪,有超時的融欧。
我們這里主要分析兩個,poll()和take()方法欠肾。
publicEpoll(){finalReentrantLock lock =this.lock;? ? lock.lock();try{? ? ? ? E first = q.peek();if(first ==null|| first.getDelay(NANOSECONDS) >0)returnnull;elsereturnq.poll();? ? }finally{? ? ? ? lock.unlock();? ? }}
poll()方法比較簡單:
(1)加鎖;
(2)檢查第一個元素瑟慈,如果為空或者還沒到期葛碧,就返回null;
(3)如果第一個元素到期了就調(diào)用優(yōu)先級隊列的poll()彈出第一個元素乳绕;
(4)解鎖翩隧。
publicEtake()throwsInterruptedException{finalReentrantLock lock =this.lock;? ? lock.lockInterruptibly();try{for(;;) {// 堆頂元素E first = q.peek();// 如果堆頂元素為空,說明隊列中還沒有元素雷酪,直接阻塞等待if(first ==null)? ? ? ? ? ? ? ? available.await();else{// 堆頂元素的到期時間longdelay = first.getDelay(NANOSECONDS);// 如果小于0說明已到期,直接調(diào)用poll()方法彈出堆頂元素if(delay <=0)returnq.poll();// 如果delay大于0 吩跋,則下面要阻塞了// 將first置為空方便gc锌钮,因為有可能其它元素彈出了這個元素// 這里還持有著引用不會被清理first =null;// don't retain ref while waiting// 如果前面有其它線程在等待,直接進(jìn)入等待if(leader !=null)? ? ? ? ? ? ? ? ? ? available.await();else{// 如果leader為null旺韭,把當(dāng)前線程賦值給它Thread thisThread = Thread.currentThread();? ? ? ? ? ? ? ? ? ? leader = thisThread;try{// 等待delay時間后自動醒過來// 醒過來后把leader置空并重新進(jìn)入循環(huán)判斷堆頂元素是否到期// 這里即使醒過來后也不一定能獲取到元素// 因為有可能其它線程先一步獲取了鎖并彈出了堆頂元素// 條件鎖的喚醒分成兩步值漫,先從Condition的隊列里出隊// 再入隊到AQS的隊列中,當(dāng)其它線程調(diào)用LockSupport.unpark(t)的時候才會真正喚醒// 關(guān)于AQS我們后面會講的^^available.awaitNanos(delay);? ? ? ? ? ? ? ? ? ? }finally{// 如果leader還是當(dāng)前線程就把它置為空悔政,讓其它線程有機(jī)會獲取元素if(leader == thisThread)? ? ? ? ? ? ? ? ? ? ? ? ? ? leader =null;? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? }? ? }finally{// 成功出隊后槽地,如果leader為空且堆頂還有元素集畅,就喚醒下一個等待的線程if(leader ==null&& q.peek() !=null)// signal()只是把等待的線程放到AQS的隊列里面,并不是真正的喚醒a(bǔ)vailable.signal();// 解鎖赦颇,這才是真正的喚醒lock.unlock();? ? }}
take()方法稍微要復(fù)雜一些:
(1)加鎖媒怯;
(2)判斷堆頂元素是否為空,為空的話直接阻塞等待鳖敷;
(3)判斷堆頂元素是否到期定踱,到期了直接調(diào)用優(yōu)先級隊列的poll()彈出元素;
(4)沒到期至扰,再判斷前面是否有其它線程在等待敢课,有則直接等待;
(5)前面沒有其它線程在等待鞭盟,則把自己當(dāng)作第一個線程等待delay時間后喚醒筝野,再嘗試獲取元素;
(6)獲取到元素之后再喚醒下一個等待的線程歇竟;
(7)解鎖焕议;
使用方法
說了那么多唤锉,是不是還是不知道怎么用呢?那怎么能行,請看下面的案例:
publicclassDelayQueueTest{publicstaticvoidmain(String[] args){? ? ? ? DelayQueue queue =newDelayQueue<>();longnow = System.currentTimeMillis();// 啟動一個線程從隊列中取元素newThread(()->{while(true) {try{// 將依次打印1000籽慢,2000,5000届惋,7000脑豹,8000System.out.println(queue.take().deadline - now);? ? ? ? ? ? ? ? }catch(InterruptedException e) {? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? }).start();// 添加5個元素到隊列中queue.add(newMessage(now +5000));? ? ? ? queue.add(newMessage(now +8000));? ? ? ? queue.add(newMessage(now +2000));? ? ? ? queue.add(newMessage(now +1000));? ? ? ? queue.add(newMessage(now +7000));? ? }}classMessageimplementsDelayed{longdeadline;publicMessage(longdeadline){this.deadline = deadline;? ? }@OverridepubliclonggetDelay(TimeUnit unit){returndeadline - System.currentTimeMillis();? ? }@OverridepublicintcompareTo(Delayed o){return(int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));? ? }@OverridepublicStringtoString(){returnString.valueOf(deadline);? ? }}
是不是很簡單拌牲,越早到期的元素越先出隊拍埠。
總結(jié)
(1)DelayQueue是阻塞隊列嬉探;
(2)DelayQueue內(nèi)部存儲結(jié)構(gòu)使用優(yōu)先級隊列甲馋;
(3)DelayQueue使用重入鎖和條件來控制并發(fā)安全;
(4)DelayQueue常用于定時任務(wù)痊远;
寫在最后:
碼字不易看到最后了碧聪,那就點(diǎn)個關(guān)注唄,只收藏不點(diǎn)關(guān)注的都是在耍流氓滞造!關(guān)注并私信我“架構(gòu)”,免費(fèi)送一些Java架構(gòu)資料买窟,先到先得!