引言
JDK中除了上文提到的各種并發(fā)容器蹲蒲,還提供了豐富的阻塞隊(duì)列谦屑。阻塞隊(duì)列統(tǒng)一實(shí)現(xiàn)了BlockingQueue?接口流妻,BlockingQueue?接口在java.util包Queue?接口的基礎(chǔ)上提供了put(e)以及take()兩個(gè)阻塞方法痘拆。他的主要使用場(chǎng)景就是多線程下的生產(chǎn)者消費(fèi)者模式,生產(chǎn)者線程通過(guò)put(e)方法將生產(chǎn)元素喊废,消費(fèi)者線程通過(guò)take()消費(fèi)元素祝高。除了阻塞功能,BlockingQueue?接口還定義了定時(shí)的offer以及poll污筷,以及一次性移除方法drainTo工闺。
//插入元素,隊(duì)列滿(mǎn)后會(huì)拋出異常booleanadd(E e);//移除元素瓣蛀,隊(duì)列為空時(shí)會(huì)拋出異常Eremove();//插入元素陆蟆,成功反會(huì)truebooleanoffer(E e);//移除元素Epoll();//插入元素,隊(duì)列滿(mǎn)后會(huì)阻塞voidput(E e)throwsInterruptedException;//移除元素惋增,隊(duì)列空后會(huì)阻塞Etake()throwsInterruptedException;//限時(shí)插入booleanoffer(E e,longtimeout, TimeUnit unit)//限時(shí)移除Epoll(longtimeout, TimeUnit unit);//獲取所有元素到Collection中intdrainTo(Collection c);
JDK1.8中的阻塞隊(duì)列實(shí)現(xiàn)共有7個(gè)叠殷,分別是ArrayBlockingQueue、LinkedBlockingQueue器腋、PriorityBlockingQueue溪猿、DelayQueue钩杰、SynchronousQueue纫塌、LinkedTransferQueue以及LinkedBlockingDeque,下面就來(lái)一一對(duì)他們進(jìn)行一個(gè)簡(jiǎn)單的分析讲弄。
ArrayBlockingQueue
ArrayBlockingQueue是一個(gè)底層用數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列措左,有界是指他的容量大小是固定的,不能擴(kuò)充容量避除,在初始化時(shí)就必須確定隊(duì)列大小怎披。它通過(guò)可重入的獨(dú)占鎖ReentrantLock來(lái)控制并發(fā)胸嘁,Condition來(lái)實(shí)現(xiàn)阻塞。
//通過(guò)數(shù)組來(lái)存儲(chǔ)隊(duì)列中的元素final Object[] items;//初始化一個(gè)固定的數(shù)組大小凉逛,默認(rèn)使用非公平鎖來(lái)控制并發(fā)publicArrayBlockingQueue(intcapacity){this(capacity,false);}//初始化固定的items數(shù)組大小性宏,初始化notEmpty以及notFull兩個(gè)Condition來(lái)控制生產(chǎn)消費(fèi)publicArrayBlockingQueue(intcapacity, boolean fair){if(capacity <=0)thrownewIllegalArgumentException();this.items =newObject[capacity];lock=newReentrantLock(fair);//通過(guò)ReentrantLock來(lái)控制并發(fā)notEmpty =lock.newCondition();? ? notFull =lock.newCondition();}
可以看到ArrayBlockingQueue初始化了一個(gè)ReentrantLock以及兩個(gè)Condition,用來(lái)控制并發(fā)下隊(duì)列的生產(chǎn)消費(fèi)状飞。這里重點(diǎn)看下阻塞的put以及take方法:
//插入元素到隊(duì)列中publicvoidput(E e) throws InterruptedException{? ? checkNotNull(e);? ? final ReentrantLocklock=this.lock;lock.lockInterruptibly();//獲取獨(dú)占鎖try{while(count == items.length)//如果隊(duì)列已滿(mǎn)則通過(guò)await阻塞put方法notFull.await();? ? ? ? enqueue(e);//插入元素}finally{lock.unlock();? ? }}privatevoidenqueue(E x){// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items =this.items;? ? items[putIndex] = x;if(++putIndex == items.length)//插入元素后將putIndex+1毫胜,當(dāng)隊(duì)列使用完后重置為0putIndex =0;? ? count++;? ? notEmpty.signal();//隊(duì)列添加元素后喚醒因notEmpty等待的消費(fèi)線程}//移除隊(duì)列中的元素publicEtake() throws InterruptedException{? ? final ReentrantLocklock=this.lock;lock.lockInterruptibly();//獲取獨(dú)占鎖try{while(count ==0)//如果隊(duì)列已空則通過(guò)await阻塞take方法notEmpty.await();returndequeue();//移除元素}finally{lock.unlock();? ? }}privateEdequeue(){// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items =this.items;? ? @SuppressWarnings("unchecked")? ? E x = (E) items[takeIndex];? ? items[takeIndex] =null;if(++takeIndex == items.length)//移除元素后將takeIndex+1,當(dāng)隊(duì)列使用完后重置為0takeIndex =0;? ? count--;if(itrs !=null)? ? ? ? itrs.elementDequeued();? ? notFull.signal();//隊(duì)列消費(fèi)元素后喚醒因notFull等待的消費(fèi)線程returnx;}
在隊(duì)列添加和移除元素的過(guò)程中使用putIndex诬辈、takeIndex以及count三個(gè)變量來(lái)控制生產(chǎn)消費(fèi)元素的過(guò)程酵使,putIndex負(fù)責(zé)記錄下一個(gè)可添加元素的下標(biāo),takeIndex負(fù)責(zé)記錄下一個(gè)可移除元素的下標(biāo)焙糟,count記錄了隊(duì)列中的元素總量口渔。隊(duì)列滿(mǎn)后通過(guò)notFull.await()來(lái)阻塞生產(chǎn)者線程,消費(fèi)元素后通過(guò)notFull.signal()來(lái)喚醒阻塞的生產(chǎn)者線程穿撮。隊(duì)列為空后通過(guò)notEmpty.await()來(lái)阻塞消費(fèi)者線程缺脉,生產(chǎn)元素后通過(guò)notEmpty.signal()喚醒阻塞的消費(fèi)者線程。
限時(shí)插入以及移除方法在ArrayBlockingQueue中通過(guò)awaitNanos來(lái)實(shí)現(xiàn)悦穿,在給定的時(shí)間過(guò)后如果線程未被喚醒則直接返回枪向。
publicbooleanoffer(E e,longtimeout, TimeUnit unit)? ? throws InterruptedException{? ? checkNotNull(e);longnanos = unit.toNanos(timeout);//獲取定時(shí)時(shí)長(zhǎng)final ReentrantLocklock=this.lock;lock.lockInterruptibly();try{while(count == items.length) {if(nanos <=0)//指定時(shí)長(zhǎng)過(guò)后,線程仍然未被喚醒則返回falsereturnfalse;? ? ? ? ? ? nanos = notFull.awaitNanos(nanos);//指定時(shí)長(zhǎng)內(nèi)阻塞線程}? ? ? ? enqueue(e);returntrue;? ? }finally{lock.unlock();? ? }}
還有一個(gè)比較重要的方法:drainTo咧党,drainTo方法可以一次性獲取隊(duì)列中所有的元素秘蛔,它減少了鎖定隊(duì)列的次數(shù),使用得當(dāng)在某些場(chǎng)景下對(duì)性能有不錯(cuò)的提升傍衡。
publicintdrainTo(Collection c,intmaxElements){? ? checkNotNull(c);if(c ==this)thrownewIllegalArgumentException();if(maxElements <=0)return0;? ? final Object[] items =this.items;? ? final ReentrantLocklock=this.lock;//僅獲取一次鎖lock.lock();try{intn = Math.min(maxElements, count);//獲取隊(duì)列中所有元素inttake = takeIndex;inti =0;try{while(i < n) {? ? ? ? ? ? ? ? @SuppressWarnings("unchecked")? ? ? ? ? ? ? ? E x = (E) items[take];? ? ? ? ? ? ? ? c.add(x);//循環(huán)插入元素items[take] =null;if(++take == items.length)? ? ? ? ? ? ? ? ? ? take =0;? ? ? ? ? ? ? ? i++;? ? ? ? ? ? }returnn;? ? ? ? }finally{// Restore invariants even if c.add() threwif(i >0) {? ? ? ? ? ? ? ? count -= i;? ? ? ? ? ? ? ? takeIndex = take;if(itrs !=null) {if(count ==0)? ? ? ? ? ? ? ? ? ? ? ? itrs.queueIsEmpty();elseif(i > take)? ? ? ? ? ? ? ? ? ? ? ? itrs.takeIndexWrapped();? ? ? ? ? ? ? ? }for(; i >0&&lock.hasWaiters(notFull); i--)? ? ? ? ? ? ? ? ? ? notFull.signal();//喚醒等待的生產(chǎn)者線程}? ? ? ? }? ? }finally{lock.unlock();? ? }}
LinkedBlockingQueue
LinkedBlockingQueue是一個(gè)底層用單向鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列深员,和ArrayBlockingQueue一樣,采用ReentrantLock來(lái)控制并發(fā)蛙埂,不同的是它使用了兩個(gè)獨(dú)占鎖來(lái)控制消費(fèi)和生產(chǎn)倦畅。put以及take方法源碼如下:
publicvoidput(E e)throwsInterruptedException {intc = -1;? ? Node node =newNode(e);finalReentrantLock putLock =this.putLock;//因?yàn)槭褂昧穗p鎖,需要使用AtomicInteger計(jì)算元素總量绣的,避免并發(fā)計(jì)算不準(zhǔn)確finalAtomicIntegercount=this.count;? ? putLock.lockInterruptibly();try{while(count.get() == capacity) {? ? ? ? ? ? notFull.await();//隊(duì)列已滿(mǎn)叠赐,阻塞生產(chǎn)線程}? ? ? ? enqueue(node);//插入元素到隊(duì)列尾部c =count.getAndIncrement();//count + 1if(c +1< capacity)//如果+1后隊(duì)列還未滿(mǎn),通過(guò)其他生產(chǎn)線程繼續(xù)生產(chǎn)notFull.signal();? ? }finally{? ? ? ? putLock.unlock();? ? }if(c ==0)//只有當(dāng)之前是空時(shí)屡江,消費(fèi)隊(duì)列才會(huì)阻塞芭概,否則是不需要通知的signalNotEmpty(); }privatevoidenqueue(Node node) {//將新元素添加到鏈表末尾,然后將last指向尾部元素last = last.next= node;}publicE take()throwsInterruptedException {? ? E x;intc = -1;finalAtomicIntegercount=this.count;finalReentrantLock takeLock =this.takeLock;? ? takeLock.lockInterruptibly();try{while(count.get() ==0) {? ? ? ? ? ? notEmpty.await();//隊(duì)列為空惩嘉,阻塞消費(fèi)線程}? ? ? ? x = dequeue();//消費(fèi)一個(gè)元素c =count.getAndDecrement();//count - 1if(c >1)// 通知其他等待的消費(fèi)線程繼續(xù)消費(fèi)notEmpty.signal();? ? }finally{? ? ? ? takeLock.unlock();? ? }if(c == capacity)//只有當(dāng)之前是滿(mǎn)的罢洲,生產(chǎn)隊(duì)列才會(huì)阻塞,否則是不需要通知的signalNotFull();returnx;}//消費(fèi)隊(duì)列頭部的下一個(gè)元素文黎,同時(shí)將新頭部置空privateE dequeue() {? ? Node h = head;? ? Node first = h.next;? ? h.next= h;// help GChead = first;? ? E x = first.item;? ? first.item =null;returnx;}
可以看到LinkedBlockingQueue通過(guò)takeLock和putLock兩個(gè)鎖來(lái)控制生產(chǎn)和消費(fèi)惹苗,互不干擾殿较,只要隊(duì)列未滿(mǎn),生產(chǎn)線程可以一直生產(chǎn)桩蓉,只要隊(duì)列不為空淋纲,消費(fèi)線程可以一直消費(fèi),不會(huì)相互因?yàn)楠?dú)占鎖而阻塞院究。
看過(guò)了LinkedBlockingQueue以及ArrayBlockingQueue的底層實(shí)現(xiàn)帚戳,會(huì)發(fā)現(xiàn)一個(gè)問(wèn)題,正常來(lái)說(shuō)消費(fèi)者和生產(chǎn)者可以并發(fā)執(zhí)行對(duì)隊(duì)列的吞吐量會(huì)有比較大的提升儡首,那么為什么ArrayBlockingQueue中不使用雙鎖來(lái)實(shí)現(xiàn)隊(duì)列的生產(chǎn)和消費(fèi)呢片任?我的理解是ArrayBlockingQueue也能使用雙鎖來(lái)實(shí)現(xiàn)功能,但由于它底層使用了數(shù)組這種簡(jiǎn)單結(jié)構(gòu)蔬胯,相當(dāng)于一個(gè)共享變量对供,如果通過(guò)兩個(gè)鎖,需要更加精確的鎖控制氛濒,這也是為什么JDK1.7中的ConcurrentHashMap使用了分段鎖來(lái)實(shí)現(xiàn)产场,將一個(gè)數(shù)組分為多個(gè)數(shù)組來(lái)提高并發(fā)量。LinkedBlockingQueue不存在這個(gè)問(wèn)題舞竿,鏈表這種數(shù)據(jù)結(jié)構(gòu)頭尾節(jié)點(diǎn)都相對(duì)獨(dú)立京景,存儲(chǔ)上也不連續(xù),雙鎖控制不存在復(fù)雜性骗奖。這是我的理解确徙,如果你有更好的結(jié)論,請(qǐng)留言探討执桌。
PriorityBlockingQueue
PriorityBlockingQueue是一個(gè)底層由數(shù)組實(shí)現(xiàn)的無(wú)界隊(duì)列鄙皇,并帶有排序功能,同樣采用ReentrantLock來(lái)控制并發(fā)仰挣。由于是無(wú)界的伴逸,所以插入元素時(shí)不會(huì)阻塞,沒(méi)有隊(duì)列滿(mǎn)的狀態(tài)膘壶,只有隊(duì)列為空的狀態(tài)错蝴。通過(guò)這兩點(diǎn)特征其實(shí)可以猜測(cè)它應(yīng)該是有一個(gè)獨(dú)占鎖(底層數(shù)組)和一個(gè)Condition(只通知消費(fèi))來(lái)實(shí)現(xiàn)的。put以及take方法源碼分析如下:
publicvoidput(E e){? ? offer(e);}publicbooleanoffer(E e){if(e ==null)thrownewNullPointerException();? ? final ReentrantLocklock=this.lock;lock.lock();intn, cap;? ? Object[] array;//無(wú)界隊(duì)列颓芭,隊(duì)列長(zhǎng)度不夠時(shí)會(huì)擴(kuò)容while((n = size) >= (cap = (array = queue).length))? ? ? ? tryGrow(array, cap);try{//通過(guò)comparator來(lái)實(shí)現(xiàn)優(yōu)先級(jí)排序Comparator cmp = comparator;if(cmp ==null)? ? ? ? ? ? siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);? ? ? ? size = n +1;? ? ? ? notEmpty.signal();//和ArrayBlockingQueue一樣顷锰,每次添加元素后通知消費(fèi)線程}finally{lock.unlock();? ? }returntrue;}publicEtake() throws InterruptedException{? ? final ReentrantLocklock=this.lock;lock.lockInterruptibly();? ? E result;try{while( (result = dequeue()) ==null)? ? ? ? ? ? notEmpty.await();//隊(duì)列為空,阻塞消費(fèi)線程}finally{lock.unlock();? ? }returnresult;}
DelayQueue
DelayQueue也是一個(gè)無(wú)界隊(duì)列畜伐,它是在PriorityQueue基礎(chǔ)上實(shí)現(xiàn)的馍惹,先按延遲優(yōu)先級(jí)排序躺率,延遲時(shí)間短的排在前面玛界。和PriorityBlockingQueue相似万矾,底層也是數(shù)組,采用一個(gè)ReentrantLock來(lái)控制并發(fā)慎框。由于是無(wú)界的良狈,所以插入元素時(shí)不會(huì)阻塞,沒(méi)有隊(duì)列滿(mǎn)的狀態(tài)笨枯。能想到的最簡(jiǎn)單的使用場(chǎng)景一般有兩個(gè):一個(gè)是緩存過(guò)期薪丁,一個(gè)是定時(shí)執(zhí)行的任務(wù)。但由于是無(wú)界的馅精,緩存過(guò)期上一般使用的并不多严嗜。簡(jiǎn)單來(lái)看下put以及take方法:
privatefinal transient ReentrantLocklock=newReentrantLock();privatefinal PriorityQueue q =newPriorityQueue();//優(yōu)先級(jí)隊(duì)列publicvoidput(E e){? ? offer(e);}publicbooleanoffer(E e){? ? final ReentrantLocklock=this.lock;lock.lock();try{? ? ? ? q.offer(e);//插入元素到優(yōu)先級(jí)隊(duì)列if(q.peek() == e) {//如果插入的元素在隊(duì)列頭部leader =null;? ? ? ? ? ? available.signal();//通知消費(fèi)線程}returntrue;? ? }finally{lock.unlock();? ? }}publicEtake() throws InterruptedException{? ? final ReentrantLocklock=this.lock;lock.lockInterruptibly();try{for(;;) {? ? ? ? ? ? E first = q.peek();//獲取頭部元素if(first ==null)? ? ? ? ? ? ? ? available.await();//空隊(duì)列阻塞else{longdelay = first.getDelay(NANOSECONDS);//檢查元素是否延遲到期if(delay <=0)returnq.poll();//到期則彈出元素first =null;// don't retain ref while waitingif(leader !=null)? ? ? ? ? ? ? ? ? ? available.await();else{? ? ? ? ? ? ? ? ? ? Thread thisThread = Thread.currentThread();? ? ? ? ? ? ? ? ? ? leader = thisThread;try{? ? ? ? ? ? ? ? ? ? ? ? available.awaitNanos(delay);//阻塞未到期的時(shí)間}finally{if(leader == thisThread)? ? ? ? ? ? ? ? ? ? ? ? ? ? leader =null;? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? }? ? }finally{if(leader ==null&& q.peek() !=null)? ? ? ? ? ? available.signal();lock.unlock();? ? }}
SynchronousQueue
SynchronousQueue相比較之前的4個(gè)隊(duì)列就比較特殊了,它是一個(gè)沒(méi)有容量的隊(duì)列洲敢,也就是說(shuō)它內(nèi)部時(shí)不會(huì)對(duì)數(shù)據(jù)進(jìn)行存儲(chǔ)漫玄,每進(jìn)行一次put之后必須要進(jìn)行一次take,否則相同線程繼續(xù)put會(huì)阻塞压彭。這種特性很適合做一些傳遞性的工作睦优,一個(gè)線程生產(chǎn),一個(gè)線程消費(fèi)壮不。內(nèi)部分為公平和非公平訪問(wèn)兩種模式汗盘,默認(rèn)使用非公平,未使用鎖询一,全部通過(guò)CAS操作來(lái)實(shí)現(xiàn)并發(fā)隐孽,吞吐量非常高。這里只對(duì)它的非公平實(shí)現(xiàn)下的take和put方法做下簡(jiǎn)單分析:
//非公平情況下調(diào)用內(nèi)部類(lèi)TransferStack的transfer方法putpublicvoidput(E e)throwsInterruptedException {if(e ==null)thrownewNullPointerException();if(transferer.transfer(e,false,0) ==null) {? ? ? ? Thread.interrupted();thrownewInterruptedException();? ? }}//非公平情況下調(diào)用內(nèi)部類(lèi)TransferStack的transfer方法takepublicE take()throwsInterruptedException {? ? E e = transferer.transfer(null,false,0);if(e !=null)returne;? ? Thread.interrupted();thrownewInterruptedException();}//具體的put以及take方法健蕊,只有E的區(qū)別缓醋,通過(guò)E來(lái)區(qū)別REQUEST還是DATA模式E transfer(E e,booleantimed,longnanos) {? ? SNode s =null;// constructed/reused as neededintmode = (e ==null) ? REQUEST : DATA;for(;;) {? ? ? ? SNode h = head;//棧無(wú)元素或者元素和插入的元素模式相匹配,也就是說(shuō)都是插入元素if(h ==null|| h.mode == mode) {//有時(shí)間限制并且超時(shí)if(timed && nanos <=0) {if(h !=null&& h.isCancelled())? ? ? ? ? ? ? ? ? ? casHead(h, h.next);// 重新設(shè)置頭節(jié)點(diǎn)elsereturnnull;? ? ? ? ? ? }//未超時(shí)cas操作嘗試設(shè)置頭節(jié)點(diǎn)elseif(casHead(h, s = snode(s, e, h, mode))) {//自旋一段時(shí)間后未消費(fèi)元素則掛起put線程SNode m = awaitFulfill(s, timed, nanos);if(m == s) {// wait was cancelledclean(s);returnnull;? ? ? ? ? ? ? ? }if((h = head) !=null&& h.next== s)? ? ? ? ? ? ? ? ? ? casHead(h, s.next);// help s's fulfillerreturn(E) ((mode == REQUEST) ? m.item : s.item);? ? ? ? ? ? }? ? ? ? }//棧不為空并且和頭節(jié)點(diǎn)模式不匹配绊诲,存在元素則消費(fèi)元素并重新設(shè)置head節(jié)點(diǎn)elseif(!isFulfilling(h.mode)) {// try to fulfillif(h.isCancelled())// already cancelledcasHead(h, h.next);// pop and retryelseif(casHead(h, s=snode(s, e, h, FULFILLING|mode))) {for(;;) {// loop until matched or waiters disappearSNode m = s.next;// m is s's matchif(m ==null) {// all waiters are gonecasHead(s,null);// pop fulfill nodes =null;// use new node next timebreak;// restart main loop}? ? ? ? ? ? ? ? ? ? SNode mn = m.next;if(m.tryMatch(s)) {? ? ? ? ? ? ? ? ? ? ? ? casHead(s, mn);// pop both s and mreturn(E) ((mode == REQUEST) ? m.item : s.item);? ? ? ? ? ? ? ? ? ? }else// lost matchs.casNext(m, mn);// help unlink}? ? ? ? ? ? }? ? ? ? }//節(jié)點(diǎn)正在匹配階段 else{// help a fulfillerSNode m = h.next;// m is h's matchif(m ==null)// waiter is gonecasHead(h,null);// pop fulfilling nodeelse{? ? ? ? ? ? ? ? SNode mn = m.next;if(m.tryMatch(h))// help matchcasHead(h, mn);// pop both h and melse// lost matchh.casNext(m, mn);// help unlink}? ? ? ? }? ? }}//先自旋后掛起的核心方法SNode awaitFulfill(SNode s,booleantimed,longnanos) {finallongdeadline = timed ? System.nanoTime() + nanos :0L;? ? Thread w = Thread.currentThread();//計(jì)算自旋的次數(shù)intspins = (shouldSpin(s) ?? ? ? ? ? ? ? ? ? ? (timed ? maxTimedSpins : maxUntimedSpins) :0);for(;;) {if(w.isInterrupted())? ? ? ? ? ? s.tryCancel();? ? ? ? SNode m = s.match;//匹配成功過(guò)返回節(jié)點(diǎn)if(m !=null)returnm;//超時(shí)控制if(timed) {? ? ? ? ? ? nanos = deadline - System.nanoTime();if(nanos <=0L) {? ? ? ? ? ? ? ? s.tryCancel();continue;? ? ? ? ? ? }? ? ? ? }//自旋檢查送粱,是否進(jìn)行下一次自旋if(spins >0)? ? ? ? ? ? spins = shouldSpin(s) ? (spins-1) :0;elseif(s.waiter ==null)? ? ? ? ? ? s.waiter = w;// establish waiter so can park next iterelseif(!timed)? ? ? ? ? ? LockSupport.park(this);//在這里掛起線程elseif(nanos > spinForTimeoutThreshold)? ? ? ? ? ? LockSupport.parkNanos(this, nanos);? ? }}
代碼非常復(fù)雜,這里說(shuō)下我所理解的核心邏輯掂之。代碼中可以看到put以及take方法都是通過(guò)調(diào)用transfer方法來(lái)實(shí)現(xiàn)的抗俄,然后通過(guò)參數(shù)mode來(lái)區(qū)別,在生產(chǎn)元素時(shí)如果是同一個(gè)線程多次put則會(huì)采取自旋的方式多次嘗試put元素世舰,可能自旋過(guò)程中元素會(huì)被消費(fèi)动雹,這樣可以及時(shí)put,降低線程掛起的性能損耗跟压,高吞吐量的核心也在這里胰蝠,消費(fèi)線程一樣,空棧時(shí)也會(huì)先自旋,自旋失敗然后通過(guò)線程的LockSupport.park方法掛起茸塞。
LinkedTransferQueue
LinkedTransferQueue是一個(gè)無(wú)界的阻塞隊(duì)列躲庄,底層由鏈表實(shí)現(xiàn)。雖然和LinkedBlockingQueue一樣也是鏈表實(shí)現(xiàn)的钾虐,但并發(fā)控制的實(shí)現(xiàn)上卻很不一樣噪窘,和SynchronousQueue類(lèi)似,采用了大量的CAS操作效扫,沒(méi)有使用鎖倔监,由于是無(wú)界的,所以不會(huì)put生產(chǎn)線程不會(huì)阻塞菌仁,只會(huì)在take時(shí)阻塞消費(fèi)線程浩习,消費(fèi)線程掛起時(shí)同樣使用LockSupport.park方法。
LinkedTransferQueue相比于以上的隊(duì)列還提供了一些額外的功能济丘,它實(shí)現(xiàn)了TransferQueue接口瘦锹,有兩個(gè)關(guān)鍵方法transfer(E e)和tryTransfer(E e)方法,transfer在沒(méi)有消費(fèi)時(shí)會(huì)阻塞闪盔,tryTransfer在沒(méi)有消費(fèi)時(shí)不會(huì)插入到隊(duì)列中弯院,也不會(huì)等待,直接返回false泪掀。
privatestaticfinalintNOW? =0;// for untimed poll, tryTransferprivatestaticfinalintASYNC =1;// for offer, put, addprivatestaticfinalintSYNC? =2;// for transfer, takeprivatestaticfinalintTIMED =3;// for timed poll, tryTransfer//通過(guò)SYNC狀態(tài)來(lái)實(shí)現(xiàn)生產(chǎn)阻塞publicvoidtransfer(E e)throwsInterruptedException{if(xfer(e,true, SYNC,0) !=null) {? ? ? ? Thread.interrupted();// failure possible only due to interruptthrownewInterruptedException();? ? }}//通過(guò)NOW狀態(tài)跳過(guò)添加元素以及阻塞publicbooleantryTransfer(E e){returnxfer(e,true, NOW,0) ==null;}//通過(guò)ASYNC狀態(tài)跳過(guò)阻塞publicvoidput(E e){? ? xfer(e,true, ASYNC,0);}//通過(guò)SYNC狀態(tài)來(lái)實(shí)現(xiàn)消費(fèi)阻塞publicEtake()throwsInterruptedException{? ? E e = xfer(null,false, SYNC,0);if(e !=null)returne;? ? Thread.interrupted();thrownewInterruptedException();}//生產(chǎn)消費(fèi)調(diào)用同一個(gè)方法听绳,通過(guò)e是否為空,haveData异赫,how等參數(shù)來(lái)區(qū)分具體邏輯privateExfer(E e,booleanhaveData,inthow,longnanos){if(haveData && (e ==null))thrownewNullPointerException();? ? Node s =null;// the node to append, if neededretry:for(;;) {// restart on append race//找出第一個(gè)可用節(jié)點(diǎn)for(Node h = head, p = h; p !=null;) {// find & match first nodebooleanisData = p.isData;? ? ? ? ? ? Object item = p.item;//隊(duì)列為空時(shí)直接跳過(guò)if(item != p && (item !=null) == isData) {// unmatched//節(jié)點(diǎn)類(lèi)型相同椅挣,跳過(guò)if(isData == haveData)// can't matchbreak;if(p.casItem(item, e)) {// matchfor(Node q = p; q != h;) {? ? ? ? ? ? ? ? ? ? ? ? Node n = q.next;// update by 2 unless singletonif(head == h && casHead(h, n ==null? q : n)) {? ? ? ? ? ? ? ? ? ? ? ? ? ? h.forgetNext();break;? ? ? ? ? ? ? ? ? ? ? ? }// advance and retryif((h = head)? ==null||? ? ? ? ? ? ? ? ? ? ? ? ? ? (q = h.next) ==null|| !q.isMatched())break;// unless slack < 2}? ? ? ? ? ? ? ? ? ? LockSupport.unpark(p.waiter);returnLinkedTransferQueue.cast(item);? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? ? ? Node n = p.next;? ? ? ? ? ? p = (p != n) ? n : (h = head);// Use head if p offlist}//插入節(jié)點(diǎn)或移除節(jié)點(diǎn)具體邏輯//tryTransfer方法會(huì)直接跳過(guò)并返回結(jié)果if(how != NOW) {// No matches availableif(s ==null)? ? ? ? ? ? ? ? s =newNode(e, haveData);? ? ? ? ? ? Node pred = tryAppend(s, haveData);//加入節(jié)點(diǎn)if(pred ==null)continueretry;// lost race vs opposite modeif(how != ASYNC)//自旋以及阻塞消費(fèi)線程邏輯,和SynchronousQueue類(lèi)似塔拳,先嘗試自選鼠证,失敗后掛起線程//transfer方法在沒(méi)有消費(fèi)線程時(shí)也會(huì)阻塞在這里returnawaitMatch(s, pred, e, (how == TIMED), nanos);? ? ? ? }returne;// not waiting}}
LinkedBlockingDeque
LinkedBlockingDeque是一個(gè)有界的雙端隊(duì)列,底層采用一個(gè)雙向的鏈表來(lái)實(shí)現(xiàn)靠抑,在LinkedBlockingQeque的Node?實(shí)現(xiàn)多了指向前一個(gè)節(jié)點(diǎn)的變量prev量九。并發(fā)控制上和ArrayBlockingQueue類(lèi)似,采用單個(gè)ReentrantLock來(lái)控制并發(fā)颂碧,這里是因?yàn)殡p端隊(duì)列頭尾都可以消費(fèi)和生產(chǎn)荠列,所以使用了一個(gè)共享鎖。它實(shí)現(xiàn)了BlockingDeque接口载城,繼承自BlockingQueue接口肌似,多了addFirst,addLast诉瓦,offerFirst川队,offerLast力细,peekFirst,peekLast等方法固额,用來(lái)頭尾生產(chǎn)和消費(fèi)眠蚂。LinkedBlockingDeque的實(shí)現(xiàn)代碼比較簡(jiǎn)單,基本就是綜合了LinkedBlockingQeque和ArrayBlockingQueue的代碼邏輯对雪,這里就不做分析了河狐。
總結(jié)
文章對(duì)JDK1.8中的7種阻塞隊(duì)列都做了簡(jiǎn)單分析米绕,幫助大家大致梳理的這7個(gè)隊(duì)列的基本原理瑟捣。總的來(lái)說(shuō)每種阻塞隊(duì)列都有它自己的應(yīng)用場(chǎng)景栅干,使用時(shí)可以先根據(jù)有界還是無(wú)界迈套,然后在根據(jù)各自的特性來(lái)進(jìn)行選擇。
有界阻塞隊(duì)列包括:ArrayBlockingQueue碱鳞、LinkedBlockingQueue以及LinkedBlockingDeque三種桑李,LinkedBlockingDeque應(yīng)用場(chǎng)景很少,一般用在“工作竊取”模式下窿给。ArrayBlockingQueue和LinkedBlockingQueue基本就是數(shù)組和鏈表的區(qū)別贵白。無(wú)界隊(duì)列包括PriorityBlockingQueue、DelayQueue和LinkedTransferQueue崩泡。PriorityBlockingQueue用在需要排序的隊(duì)列中禁荒。DelayQueue可以用來(lái)做一些定時(shí)任務(wù)或者緩存過(guò)期的場(chǎng)景。LinkedTransferQueue則相比較其他隊(duì)列多了transfer功能角撞。最后剩下一個(gè)不存儲(chǔ)元素的隊(duì)列SynchronousQueue呛伴,用來(lái)處理一些高效的傳遞性場(chǎng)景。