最近得空痊远,想寫(xiě)篇文章好好說(shuō)說(shuō) java 線程池問(wèn)題垮抗,我相信很多人都一知半解的,包括我自己在仔仔細(xì)細(xì)看源碼之前碧聪,也有許多的不解冒版,甚至有些地方我一直都沒(méi)有理解到位。
說(shuō)到線程池實(shí)現(xiàn)逞姿,那么就不得不涉及到各種 BlockingQueue 的實(shí)現(xiàn)辞嗡,那么我想就 BlockingQueue 的問(wèn)題和大家分享分享我了解的一些知識(shí)。
本文沒(méi)有像之前分析 AQS 那樣一行一行源碼分析了滞造,不過(guò)還是把其中最重要和最難理解的代碼說(shuō)了一遍续室,所以不免篇幅略長(zhǎng)。本文涉及到比較多的 Doug Lea 對(duì) BlockingQueue 的設(shè)計(jì)思想谒养,希望有心的讀者真的可以有一些收獲挺狰,我覺(jué)得自己還是寫(xiě)了一些干貨的。
本文直接參考 Doug Lea 寫(xiě)的 java doc 和注釋买窟,這也是我們?cè)趯W(xué)習(xí) java 并發(fā)包時(shí)最好的材料了丰泊。希望大家能有所思、有所悟始绍,學(xué)習(xí) Doug Lea 的代碼風(fēng)格趁耗,并將其優(yōu)雅、嚴(yán)謹(jǐn)?shù)淖黠L(fēng)應(yīng)用到我們寫(xiě)的每一行代碼中疆虚。
”我自己是一名從事了十余年的后端的老程序員,辭職后目前在做講師,近期我花了一個(gè)月整理了一份最適合2018年學(xué)習(xí)的JAVA干貨(里面有高可用径簿、高并發(fā)罢屈、高性能及分布式、Jvm性能調(diào)優(yōu)篇亭、Spring源碼缠捌,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個(gè)知識(shí)點(diǎn)的架構(gòu)資料)從事后端的小伙伴們都可以來(lái)了解一下的译蒂,這里是程序員秘密聚集地曼月,各位還在架構(gòu)師的道路上掙扎的小伙伴們速來(lái)∪嶂纾“
加QQ群:611481448(名額有限哦Q魄邸)
BlockingQueue
開(kāi)篇先介紹下 BlockingQueue 這個(gè)接口的規(guī)則,后面再看其實(shí)現(xiàn)捕透。
首先聪姿,最基本的來(lái)說(shuō), BlockingQueue 是一個(gè)先進(jìn)先出的隊(duì)列(Queue)乙嘀,為什么說(shuō)是阻塞(Blocking)的呢末购?是因?yàn)?BlockingQueue 支持當(dāng)獲取隊(duì)列元素但是隊(duì)列為空時(shí),會(huì)阻塞等待隊(duì)列中有元素再返回虎谢;也支持添加元素時(shí)盟榴,如果隊(duì)列已滿,那么等到隊(duì)列可以放入新元素時(shí)再放入婴噩。
BlockingQueue 是一個(gè)接口擎场,繼承自 Queue,所以其實(shí)現(xiàn)類也可以作為 Queue 的實(shí)現(xiàn)來(lái)使用讳推,而 Queue 又繼承自 Collection 接口顶籽。
BlockingQueue 對(duì)插入操作、移除操作银觅、獲取元素操作提供了四種不同的方法用于不同的場(chǎng)景中使用:1礼饱、拋出異常;2究驴、返回特殊值(null 或 true/false镊绪,取決于具體的操作);3洒忧、阻塞等待此操作蝴韭,直到這個(gè)操作成功;4熙侍、阻塞等待此操作榄鉴,直到成功或者超時(shí)指定時(shí)間履磨。總結(jié)如下:
BlockingQueue 的各個(gè)實(shí)現(xiàn)都遵循了這些規(guī)則庆尘,當(dāng)然我們也不用死記這個(gè)表格剃诅,知道有這么回事,然后寫(xiě)代碼的時(shí)候根據(jù)自己的需要去看方法的注釋來(lái)選取合適的方法即可驶忌。
對(duì)于 BlockingQueue矛辕,我們的關(guān)注點(diǎn)應(yīng)該在 put(e) 和 take() 這兩個(gè)方法,因?yàn)檫@兩個(gè)方法是帶阻塞的付魔。
BlockingQueue 不接受 null 值的插入聊品,相應(yīng)的方法在碰到 null 的插入時(shí)會(huì)拋出 NullPointerException 異常。null 值在這里通常用于作為特殊值返回(表格中的第三列)几苍,代表 poll 失敗翻屈。所以,如果允許插入 null 值的話擦剑,那獲取的時(shí)候妖胀,就不能很好地用 null 來(lái)判斷到底是代表失敗,還是獲取的值就是 null 值惠勒。
一個(gè) BlockingQueue 可能是有界的赚抡,如果在插入的時(shí)候,發(fā)現(xiàn)隊(duì)列滿了纠屋,那么 put 操作將會(huì)阻塞涂臣。通常,在這里我們說(shuō)的無(wú)界隊(duì)列也不是說(shuō)真正的無(wú)界售担,而是它的容量是 Integer.MAX_VALUE(21億多)赁遗。
BlockingQueue 是設(shè)計(jì)用來(lái)實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者隊(duì)列的,當(dāng)然族铆,你也可以將它當(dāng)做普通的 Collection 來(lái)用岩四,前面說(shuō)了,它實(shí)現(xiàn)了 java.util.Collection 接口哥攘。例如剖煌,我們可以用 remove(x) 來(lái)刪除任意一個(gè)元素,但是逝淹,這類操作通常并不高效耕姊,所以盡量只在少數(shù)的場(chǎng)合使用,比如一條消息已經(jīng)入隊(duì)栅葡,但是需要做取消操作的時(shí)候茉兰。
BlockingQueue 的實(shí)現(xiàn)都是線程安全的,但是批量的集合操作如 addAll, containsAll, retainAll 和 removeAll 不一定是原子操作欣簇。如 addAll(c) 有可能在添加了一些元素后中途拋出異常规脸,此時(shí) BlockingQueue 中已經(jīng)添加了部分元素坯约,這個(gè)是允許的,取決于具體的實(shí)現(xiàn)燃辖。
BlockingQueue 不支持 close 或 shutdown 等關(guān)閉操作鬼店,因?yàn)殚_(kāi)發(fā)者可能希望不會(huì)有新的元素添加進(jìn)去,此特性取決于具體的實(shí)現(xiàn)黔龟,不做強(qiáng)制約束。
最后滥玷,BlockingQueue 在生產(chǎn)者-消費(fèi)者的場(chǎng)景中氏身,是支持多消費(fèi)者和多生產(chǎn)者的,說(shuō)的其實(shí)就是線程安全問(wèn)題惑畴。
相信上面說(shuō)的每一句都很清楚了蛋欣,BlockingQueue 是一個(gè)比較簡(jiǎn)單的線程安全容器,下面我會(huì)分析其具體的在 JDK 中的實(shí)現(xiàn)如贷,這里又到了 Doug Lea 表演時(shí)間了陷虎。
BlockingQueue 實(shí)現(xiàn)之 ArrayBlockingQueue
ArrayBlockingQueue 是 BlockingQueue 接口的有界隊(duì)列實(shí)現(xiàn)類,底層采用數(shù)組來(lái)實(shí)現(xiàn)杠袱。
其并發(fā)控制采用可重入鎖來(lái)控制尚猿,不管是插入操作還是讀取操作,都需要獲取到鎖才能進(jìn)行操作楣富。
ArrayBlockingQueue 共有以下幾個(gè)屬性:
// 用于存放元素的數(shù)組final Object[] items;// 下一次讀取操作的位置int takeIndex;// 下一次寫(xiě)入操作的位置int putIndex;// 隊(duì)列中的元素?cái)?shù)量int count; // 以下幾個(gè)就是控制并發(fā)用的同步器final ReentrantLock lock;private final Condition notEmpty;private final Condition notFull;
我們用個(gè)示意圖來(lái)描述其同步機(jī)制:
ArrayBlockingQueue 實(shí)現(xiàn)并發(fā)同步的原理就是凿掂,讀操作和寫(xiě)操作都需要獲取到 AQS 獨(dú)占鎖才能進(jìn)行操作。如果隊(duì)列為空纹蝴,這個(gè)時(shí)候讀操作的線程進(jìn)入到讀線程隊(duì)列排隊(duì)庄萎,等待寫(xiě)線程寫(xiě)入新的元素,然后喚醒讀線程隊(duì)列的第一個(gè)等待線程塘安。如果隊(duì)列已滿糠涛,這個(gè)時(shí)候?qū)懖僮鞯木€程進(jìn)入到寫(xiě)線程隊(duì)列排隊(duì),等待讀線程將隊(duì)列元素移除騰出空間兼犯,然后喚醒寫(xiě)線程隊(duì)列的第一個(gè)等待線程忍捡。
對(duì)于 ArrayBlockingQueue,我們可以在構(gòu)造的時(shí)候指定以下三個(gè)參數(shù):
隊(duì)列容量免都,其限制了隊(duì)列中最多允許的元素個(gè)數(shù)锉罐;
指定獨(dú)占鎖是公平鎖還是非公平鎖。非公平鎖的吞吐量比較高绕娘,公平鎖可以保證每次都是等待最久的線程獲取到鎖脓规;
可以指定用一個(gè)集合來(lái)初始化,將此集合中的元素在構(gòu)造方法期間就先添加到隊(duì)列中险领。
BlockingQueue 實(shí)現(xiàn)之 LinkedBlockingQueue
底層基于單向鏈表實(shí)現(xiàn)的阻塞隊(duì)列侨舆,可以當(dāng)做無(wú)界隊(duì)列也可以當(dāng)做有界隊(duì)列來(lái)使用秒紧。看構(gòu)造方法:
// 傳說(shuō)中的無(wú)界隊(duì)列public LinkedBlockingQueue() { this(Integer.MAX_VALUE);}// 傳說(shuō)中的有界隊(duì)列public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null);}
我們看看這個(gè)類有哪些屬性:
// 隊(duì)列容量private final int capacity; // 隊(duì)列中的元素?cái)?shù)量private final AtomicInteger count = new AtomicInteger(0); // 隊(duì)頭private transient Node<E> head; // 隊(duì)尾private transient Node<E> last; // take, poll, peek 等讀操作的方法需要獲取到這個(gè)鎖private final ReentrantLock takeLock = new ReentrantLock(); // 如果讀操作的時(shí)候隊(duì)列是空的挨下,那么等待 notEmpty 條件private final Condition notEmpty = takeLock.newCondition(); // put, offer 等寫(xiě)操作的方法需要獲取到這個(gè)鎖private final ReentrantLock putLock = new ReentrantLock(); // 如果寫(xiě)操作的時(shí)候隊(duì)列是滿的熔恢,那么等待 notFull 條件private final Condition notFull = putLock.newCondition();
這里用了兩個(gè)鎖,兩個(gè) Condition臭笆,簡(jiǎn)單介紹如下:
takeLock 和 notEmpty 怎么搭配:如果要獲刃鹛省(take)一個(gè)元素,需要獲取 takeLock 鎖愁铺,但是獲取了鎖還不夠鹰霍,如果隊(duì)列此時(shí)為空,還需要隊(duì)列不為空(notEmpty)這個(gè)條件(Condition)茵乱。
putLock 需要和 notFull 搭配:如果要插入(put)一個(gè)元素茂洒,需要獲取 putLock 鎖,但是獲取了鎖還不夠瓶竭,如果隊(duì)列此時(shí)已滿督勺,還需要隊(duì)列不是滿的(notFull)這個(gè)條件(Condition)。
首先斤贰,這里用一個(gè)示意圖來(lái)看看 LinkedBlockingQueue 的并發(fā)讀寫(xiě)控制智哀,然后再開(kāi)始分析源碼:
看懂這個(gè)示意圖,源碼也就簡(jiǎn)單了腋舌,讀操作是排好隊(duì)的盏触,寫(xiě)操作也是排好隊(duì)的,唯一的并發(fā)問(wèn)題在于一個(gè)寫(xiě)操作和一個(gè)讀操作同時(shí)進(jìn)行块饺,只要控制好這個(gè)就可以了赞辩。
先上構(gòu)造方法:
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null);}
注意,這里會(huì)初始化一個(gè)空的頭結(jié)點(diǎn)授艰,那么第一個(gè)元素入隊(duì)的時(shí)候辨嗽,隊(duì)列中就會(huì)有兩個(gè)元素。讀取元素時(shí)淮腾,也總是獲取頭節(jié)點(diǎn)后面的一個(gè)節(jié)點(diǎn)糟需。count 的計(jì)數(shù)值不包括這個(gè)頭節(jié)點(diǎn)。
我們來(lái)看下 put 方法是怎么將元素插入到隊(duì)尾的:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // 如果你糾結(jié)這里為什么是 -1谷朝,可以看看 offer 方法洲押。這就是個(gè)標(biāo)識(shí)成功、失敗的標(biāo)志而已圆凰。 int c = -1; Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 必須要獲取到 putLock 才可以進(jìn)行插入操作 putLock.lockInterruptibly(); try { // 如果隊(duì)列滿杈帐,等待 notFull 的條件滿足。 while (count.get() == capacity) { notFull.await(); } // 入隊(duì) enqueue(node); // count 原子加 1,c 還是加 1 前的值 c = count.getAndIncrement(); // 如果這個(gè)元素入隊(duì)后挑童,還有至少一個(gè)槽可以使用累铅,調(diào)用 notFull.signal() 喚醒等待線程。 // 哪些線程會(huì)等待在 notFull 這個(gè) Condition 上呢站叼? if (c + 1 < capacity) notFull.signal(); } finally { // 入隊(duì)后娃兽,釋放掉 putLock putLock.unlock(); } // 如果 c == 0,那么代表隊(duì)列在這個(gè)元素入隊(duì)前是空的(不包括head空節(jié)點(diǎn))尽楔, // 那么所有的讀線程都在等待 notEmpty 這個(gè)條件投储,等待喚醒,這里做一次喚醒操作 if (c == 0) signalNotEmpty();} // 入隊(duì)的代碼非常簡(jiǎn)單翔试,就是將 last 屬性指向這個(gè)新元素轻要,并且讓原隊(duì)尾的 next 指向這個(gè)元素// 這里入隊(duì)沒(méi)有并發(fā)問(wèn)題,因?yàn)橹挥蝎@取到 putLock 獨(dú)占鎖以后垦缅,才可以進(jìn)行此操作private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node;} // 元素入隊(duì)后,如果需要驹碍,調(diào)用這個(gè)方法喚醒讀線程來(lái)讀private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); }}
我們?cè)倏纯?take 方法:
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 首先壁涎,需要獲取到 takeLock 才能進(jìn)行出隊(duì)操作 takeLock.lockInterruptibly(); try { // 如果隊(duì)列為空,等待 notEmpty 這個(gè)條件滿足再繼續(xù)執(zhí)行 while (count.get() == 0) { notEmpty.await(); } // 出隊(duì) x = dequeue(); // count 進(jìn)行原子減 1 c = count.getAndDecrement(); // 如果這次出隊(duì)后志秃,隊(duì)列中至少還有一個(gè)元素怔球,那么調(diào)用 notEmpty.signal() 喚醒其他的讀線程 if (c > 1) notEmpty.signal(); } finally { // 出隊(duì)后釋放掉 takeLock takeLock.unlock(); } // 如果 c == capacity,那么說(shuō)明在這個(gè) take 方法發(fā)生的時(shí)候浮还,隊(duì)列是滿的 // 既然出隊(duì)了一個(gè)竟坛,那么意味著隊(duì)列不滿了,喚醒寫(xiě)線程去寫(xiě) if (c == capacity) signalNotFull(); return x;}// 取隊(duì)頭钧舌,出隊(duì)private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; // 之前說(shuō)了担汤,頭結(jié)點(diǎn)是空的 Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC // 設(shè)置這個(gè)為新的頭結(jié)點(diǎn) head = first; E x = first.item; first.item = null; return x;}// 元素出隊(duì)后,如果需要洼冻,調(diào)用這個(gè)方法喚醒寫(xiě)線程來(lái)寫(xiě)private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); }}
源碼分析就到這里結(jié)束了吧崭歧,畢竟還是比較簡(jiǎn)單的源碼,基本上只要讀者認(rèn)真點(diǎn)都看得懂撞牢。
BlockingQueue 實(shí)現(xiàn)之 SynchronousQueue
它是一個(gè)特殊的隊(duì)列率碾,它的名字其實(shí)就蘊(yùn)含了它的特征 – - 同步的隊(duì)列。為什么說(shuō)是同步的呢屋彪?這里說(shuō)的并不是多線程的并發(fā)問(wèn)題所宰,而是因?yàn)楫?dāng)一個(gè)線程往隊(duì)列中寫(xiě)入一個(gè)元素時(shí),寫(xiě)入操作不會(huì)立即返回畜挥,需要等待另一個(gè)線程來(lái)將這個(gè)元素拿走仔粥;同理,當(dāng)一個(gè)讀線程做讀操作的時(shí)候砰嘁,同樣需要一個(gè)相匹配的寫(xiě)線程的寫(xiě)操作件炉。這里的 Synchronous 指的就是讀線程和寫(xiě)線程需要同步勘究,一個(gè)讀線程匹配一個(gè)寫(xiě)線程。
我們比較少使用到 SynchronousQueue 這個(gè)類斟冕,不過(guò)它在線程池的實(shí)現(xiàn)類 ScheduledThreadPoolExecutor 中得到了應(yīng)用口糕,感興趣的讀者可以在看完這個(gè)后去看看相應(yīng)的使用。
雖然上面我說(shuō)了隊(duì)列磕蛇,但是 SynchronousQueue 的隊(duì)列其實(shí)是虛的景描,其不提供任何空間(一個(gè)都沒(méi)有)來(lái)存儲(chǔ)元素。數(shù)據(jù)必須從某個(gè)寫(xiě)線程交給某個(gè)讀線程秀撇,而不是寫(xiě)到某個(gè)隊(duì)列中等待被消費(fèi)超棺。
你不能在 SynchronousQueue 中使用 peek 方法(在這里這個(gè)方法直接返回 null),peek 方法的語(yǔ)義是只讀取不移除呵燕,顯然棠绘,這個(gè)方法的語(yǔ)義是不符合 SynchronousQueue 的特征的。SynchronousQueue 也不能被迭代再扭,因?yàn)楦揪蜎](méi)有元素可以拿來(lái)迭代的氧苍。雖然 SynchronousQueue 間接地實(shí)現(xiàn)了 Collection 接口,但是如果你將其當(dāng)做 Collection 來(lái)用的話泛范,那么集合是空的让虐。當(dāng)然,這個(gè)類也是不允許傳遞 null 值的(并發(fā)包中的容器類好像都不支持插入 null 值罢荡,因?yàn)?null 值往往用作其他用途赡突,比如用于方法的返回值代表操作失敗)区赵。
接下來(lái)惭缰,我們來(lái)看看具體的源碼實(shí)現(xiàn)吧,它的源碼不是很簡(jiǎn)單的那種惧笛,我們需要先搞清楚它的設(shè)計(jì)思想从媚。
源碼加注釋大概有 1200 行,我們先看大框架:
// 構(gòu)造時(shí)患整,我們可以指定公平模式還是非公平模式拜效,區(qū)別之后再說(shuō)public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue() : new TransferStack();}abstract static class Transferer { // 從方法名上大概就知道,這個(gè)方法用于轉(zhuǎn)移元素各谚,從生產(chǎn)者手上轉(zhuǎn)到消費(fèi)者手上 // 也可以被動(dòng)地紧憾,消費(fèi)者調(diào)用這個(gè)方法來(lái)從生產(chǎn)者手上取元素 // 第一個(gè)參數(shù) e 如果不是 null,代表場(chǎng)景為:將元素從生產(chǎn)者轉(zhuǎn)移給消費(fèi)者 // 如果是 null昌渤,代表消費(fèi)者等待生產(chǎn)者提供元素赴穗,然后返回值就是相應(yīng)的生產(chǎn)者提供的元素 // 第二個(gè)參數(shù)代表是否設(shè)置超時(shí),如果設(shè)置超時(shí),超時(shí)時(shí)間是第三個(gè)參數(shù)的值 // 返回值如果是 null般眉,代表超時(shí)了赵,或者中斷。具體是哪個(gè)甸赃,可以通過(guò)檢測(cè)中斷狀態(tài)得到柿汛。 abstract Object transfer(Object e, boolean timed, long nanos);}
Transferer 有兩個(gè)內(nèi)部實(shí)現(xiàn)類,是因?yàn)闃?gòu)造 SynchronousQueue 的時(shí)候埠对,我們可以指定公平策略络断。公平模式意味著,所有的讀寫(xiě)線程都遵守先來(lái)后到项玛,F(xiàn)IFO 嘛貌笨,對(duì)應(yīng) TransferQueue。而非公平模式則對(duì)應(yīng) TransferStack襟沮。
我們先采用公平模式分析源碼锥惋,然后再說(shuō)說(shuō)公平模式和非公平模式的區(qū)別。
接下來(lái)开伏,我們看看 put 方法和 take 方法:
// 寫(xiě)入值public void put(E o) throws InterruptedException { if (o == null) throw new NullPointerException(); if (transferer.transfer(o, false, 0) == null) { // 1 Thread.interrupted(); throw new InterruptedException(); }}// 讀取值并移除public E take() throws InterruptedException { Object e = transferer.transfer(null, false, 0); // 2 if (e != null) return (E)e; Thread.interrupted(); throw new InterruptedException();}
我們看到净刮,寫(xiě)操作 put(E o) 和讀操作 take() 都是調(diào)用 Transferer.transfer(…) 方法,區(qū)別在于第一個(gè)參數(shù)是否為 null 值硅则。
我們來(lái)看看 transfer 的設(shè)計(jì)思路,其基本算法如下:
當(dāng)調(diào)用這個(gè)方法時(shí)株婴,如果隊(duì)列是空的怎虫,或者隊(duì)列中的節(jié)點(diǎn)和當(dāng)前的線程操作類型一致(如當(dāng)前操作是 put 操作,而隊(duì)列中的元素也都是寫(xiě)線程)困介。這種情況下大审,將當(dāng)前線程加入到等待隊(duì)列即可。
如果隊(duì)列中有等待節(jié)點(diǎn)座哩,而且與當(dāng)前操作可以匹配(如隊(duì)列中都是讀操作線程徒扶,當(dāng)前線程是寫(xiě)操作線程,反之亦然)根穷。這種情況下姜骡,匹配等待隊(duì)列的隊(duì)頭,出隊(duì)屿良,返回相應(yīng)數(shù)據(jù)圈澈。
其實(shí)這里有個(gè)隱含的條件被滿足了,隊(duì)列如果不為空尘惧,肯定都是同種類型的節(jié)點(diǎn)康栈,要么都是讀操作,要么都是寫(xiě)操作。這個(gè)就要看到底是讀線程積壓了啥么,還是寫(xiě)線程積壓了登舞。
我們可以假設(shè)出一個(gè)男女配對(duì)的場(chǎng)景:一個(gè)男的過(guò)來(lái),如果一個(gè)人都沒(méi)有悬荣,那么他需要等待菠秒;如果發(fā)現(xiàn)有一堆男的在等待,那么他需要排到隊(duì)列后面隅熙;如果發(fā)現(xiàn)是一堆女的在排隊(duì)稽煤,那么他直接牽走隊(duì)頭的那個(gè)女的。
既然這里說(shuō)到了等待隊(duì)列囚戚,我們先看看其實(shí)現(xiàn)酵熙,也就是 QNode:
static final class QNode { volatile QNode next; // 可以看出來(lái),等待隊(duì)列是單向鏈表 volatile Object item; // CAS'ed to or from null volatile Thread waiter; // 將線程對(duì)象保存在這里驰坊,用于掛起和喚醒 final boolean isData; // 用于判斷是寫(xiě)線程節(jié)點(diǎn)(isData == true)匾二,還是讀線程節(jié)點(diǎn) QNode(Object item, boolean isData) { this.item = item; this.isData = isData; } ......
相信說(shuō)了這么多以后,我們?cè)賮?lái)看 transfer 方法的代碼就輕松多了拳芙。
/** * Puts or takes an item. */Object transfer(Object e, boolean timed, long nanos) { QNode s = null; // constructed/reused as needed boolean isData = (e != null); for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null) // saw uninitialized value continue; // spin // 隊(duì)列空察藐,或隊(duì)列中節(jié)點(diǎn)類型和當(dāng)前節(jié)點(diǎn)一致, // 即我們說(shuō)的第一種情況舟扎,將節(jié)點(diǎn)入隊(duì)即可分飞。讀者要想著這塊 if 里面方法其實(shí)就是入隊(duì) if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; // t != tail 說(shuō)明剛剛有節(jié)點(diǎn)入隊(duì),continue 即可 if (t != tail) // inconsistent read continue; // 有其他節(jié)點(diǎn)入隊(duì)睹限,但是 tail 還是指向原來(lái)的譬猫,此時(shí)設(shè)置 tail 即可 if (tn != null) { // lagging tail // 這個(gè)方法就是:如果 tail 此時(shí)為 t 的話,設(shè)置為 tn advanceTail(t, tn); continue; } // if (timed && nanos <= 0) // can't wait return null; if (s == null) s = new QNode(e, isData); // 將當(dāng)前節(jié)點(diǎn)羡疗,插入到 tail 的后面 if (!t.casNext(null, s)) // failed to link in continue; // 將當(dāng)前節(jié)點(diǎn)設(shè)置為新的 tail advanceTail(t, s); // swing tail and wait // 看到這里染服,請(qǐng)讀者先往下滑到這個(gè)方法,看完了以后再回來(lái)這里叨恨,思路也就不會(huì)斷了 Object x = awaitFulfill(s, e, timed, nanos); // 到這里柳刮,說(shuō)明之前入隊(duì)的線程被喚醒了焙畔,準(zhǔn)備往下執(zhí)行 if (x == s) { // wait was cancelled clean(t, s); return null; } if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? x : e; // 這里的 else 分支就是上面說(shuō)的第二種情況曾棕,有相應(yīng)的讀或?qū)懴嗥ヅ涞那闆r } else { // complementary-mode QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read Object x = m.item; if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; } advanceHead(h, m); // successfully fulfilled LockSupport.unpark(m.waiter); return (x != null) ? x : e; } }} void advanceTail(QNode t, QNode nt) { if (tail == t) UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);}
// 自旋或阻塞丝里,直到滿足條件邦尊,這個(gè)方法返回Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) { long lastTime = timed ? System.nanoTime() : 0; Thread w = Thread.currentThread(); // 判斷需要自旋的次數(shù)铛铁, int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // 如果被中斷了喧半,那么取消這個(gè)節(jié)點(diǎn) if (w.isInterrupted()) // 就是將當(dāng)前節(jié)點(diǎn) s 中的 item 屬性設(shè)置為 this s.tryCancel(e); Object x = s.item; // 這里是這個(gè)方法的唯一的出口 if (x != e) return x; // 如果需要秧骑,檢測(cè)是否超時(shí) if (timed) { long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; if (nanos <= 0) { s.tryCancel(e); continue; } } if (spins > 0) --spins; // 如果自旋達(dá)到了最大的次數(shù)霎奢,那么檢測(cè) else if (s.waiter == null) s.waiter = w; // 如果自旋到了最大的次數(shù)益愈,那么線程掛起梢灭,等待喚醒 else if (!timed) LockSupport.park(this); // spinForTimeoutThreshold 這個(gè)之前講 AQS 的時(shí)候其實(shí)也說(shuō)過(guò)夷家,剩余時(shí)間小于這個(gè)閾值的時(shí)候,就 // 不要進(jìn)行掛起了敏释,自旋的性能會(huì)比較好 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); }}
Doug Lea 的巧妙之處在于库快,將各個(gè)代碼湊在了一起,使得代碼非常簡(jiǎn)潔钥顽,當(dāng)然也同時(shí)增加了我們的閱讀負(fù)擔(dān)义屏,看代碼的時(shí)候,還是得仔細(xì)想想各種可能的情況蜂大。
下面闽铐,再說(shuō)說(shuō)前面說(shuō)的公平模式和非公平模式的區(qū)別。
相信大家心里面已經(jīng)有了公平模式的工作流程的概念了奶浦,我就簡(jiǎn)單說(shuō)說(shuō) TransferStack 的算法兄墅,就不分析源碼了。
當(dāng)調(diào)用這個(gè)方法時(shí)澳叉,如果隊(duì)列是空的隙咸,或者隊(duì)列中的節(jié)點(diǎn)和當(dāng)前的線程操作類型一致(如當(dāng)前操作是 put 操作,而棧中的元素也都是寫(xiě)線程)成洗。這種情況下五督,將當(dāng)前線程加入到等待棧中,等待配對(duì)瓶殃。然后返回相應(yīng)的元素充包,或者如果被取消了的話,返回 null遥椿。
如果棧中有等待節(jié)點(diǎn)误证,而且與當(dāng)前操作可以匹配(如棧里面都是讀操作線程,當(dāng)前線程是寫(xiě)操作線程修壕,反之亦然)。將當(dāng)前節(jié)點(diǎn)壓入棧頂遏考,和棧中的節(jié)點(diǎn)進(jìn)行匹配慈鸠,然后將這兩個(gè)節(jié)點(diǎn)出棧。配對(duì)和出棧的動(dòng)作其實(shí)也不是必須的灌具,因?yàn)橄旅娴囊粭l會(huì)執(zhí)行同樣的事情青团。
如果棧頂是進(jìn)行匹配而入棧的節(jié)點(diǎn),幫助其進(jìn)行匹配并出棧咖楣,然后再繼續(xù)操作督笆。
應(yīng)該說(shuō),TransferStack 的源碼要比 TransferQueue 的復(fù)雜一些诱贿,如果讀者感興趣娃肿,請(qǐng)自行進(jìn)行源碼閱讀咕缎。
BlockingQueue 實(shí)現(xiàn)之 PriorityBlockingQueue
帶排序的 BlockingQueue 實(shí)現(xiàn),其并發(fā)控制采用的是 ReentrantLock料扰,隊(duì)列為無(wú)界隊(duì)列(ArrayBlockingQueue 是有界隊(duì)列凭豪,LinkedBlockingQueue 也可以通過(guò)在構(gòu)造函數(shù)中傳入 capacity 指定隊(duì)列最大的容量,但是 PriorityBlockingQueue 只能指定初始的隊(duì)列大小晒杈,后面插入元素的時(shí)候嫂伞,如果空間不夠的話會(huì)自動(dòng)擴(kuò)容)。
簡(jiǎn)單地說(shuō)拯钻,它就是 PriorityQueue 的線程安全版本帖努。不可以插入 null 值,同時(shí)粪般,插入隊(duì)列的對(duì)象必須是可比較大小的(comparable)拼余,否則報(bào) ClassCastException 異常。它的插入操作 put 方法不會(huì) block刊驴,因?yàn)樗菬o(wú)界隊(duì)列(take 方法在隊(duì)列為空的時(shí)候會(huì)阻塞)姿搜。
它的源碼相對(duì)比較簡(jiǎn)單,本節(jié)將介紹其核心源碼部分捆憎。
我們來(lái)看看它有哪些屬性:
// 構(gòu)造方法中舅柜,如果不指定大小的話,默認(rèn)大小為 11private static final int DEFAULT_INITIAL_CAPACITY = 11;// 數(shù)組的最大容量private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; // 這個(gè)就是存放數(shù)據(jù)的數(shù)組private transient Object[] queue; // 隊(duì)列當(dāng)前大小private transient int size; // 大小比較器躲惰,如果按照自然序排序致份,那么此屬性可設(shè)置為 nullprivate transient Comparator<? super E> comparator; // 并發(fā)控制所用的鎖,所有的 public 且涉及到線程安全的方法础拨,都必須先獲取到這個(gè)鎖private final ReentrantLock lock; // 這個(gè)很好理解氮块,其實(shí)例由上面的 lock 屬性創(chuàng)建private final Condition notEmpty; // 這個(gè)也是用于鎖,用于數(shù)組擴(kuò)容的時(shí)候诡宗,需要先獲取到這個(gè)鎖滔蝉,才能進(jìn)行擴(kuò)容操作// 其使用 CAS 操作private transient volatile int allocationSpinLock; // 用于序列化和反序列化的時(shí)候用,對(duì)于 PriorityBlockingQueue 我們應(yīng)該比較少使用到序列化private PriorityQueue q;
此類實(shí)現(xiàn)了 Collection 和 Iterator 接口中的所有接口方法塔沃,對(duì)其對(duì)象進(jìn)行迭代并遍歷時(shí)蝠引,不能保證有序性。如果你想要實(shí)現(xiàn)有序遍歷蛀柴,建議采用 Arrays.sort(queue.toArray()) 進(jìn)行處理螃概。PriorityBlockingQueue 提供了 drainTo 方法用于將部分或全部元素有序地填充(準(zhǔn)確說(shuō)是轉(zhuǎn)移,會(huì)刪除原隊(duì)列中的元素)到另一個(gè)集合中鸽疾。還有一個(gè)需要說(shuō)明的是吊洼,如果兩個(gè)對(duì)象的優(yōu)先級(jí)相同(compare 方法返回 0),此隊(duì)列并不保證它們之間的順序制肮。
PriorityBlockingQueue 使用了基于數(shù)組的二叉堆來(lái)存放元素冒窍,所有的 public 方法采用同一個(gè) lock 進(jìn)行并發(fā)控制递沪。
二叉堆:一顆完全二叉樹(shù),它非常適合用數(shù)組進(jìn)行存儲(chǔ)超燃,對(duì)于數(shù)組中的元素 a[i]区拳,其左子節(jié)點(diǎn)為 a[2i+1],其右子節(jié)點(diǎn)為 a[2\i + 2]意乓,其父節(jié)點(diǎn)為 a[(i-1)/2]樱调,其堆序性質(zhì)為,每個(gè)節(jié)點(diǎn)的值都小于其左右子節(jié)點(diǎn)的值届良。二叉堆中最小的值就是根節(jié)點(diǎn)笆凌,但是刪除根節(jié)點(diǎn)是比較麻煩的,因?yàn)樾枰{(diào)整樹(shù)士葫。
簡(jiǎn)單用個(gè)圖解釋一下二叉堆乞而,我就不說(shuō)太多專業(yè)的嚴(yán)謹(jǐn)?shù)男g(shù)語(yǔ)了,這種數(shù)據(jù)結(jié)構(gòu)的優(yōu)點(diǎn)是一目了然的慢显,最小的元素一定是根元素爪模,它是一棵滿的樹(shù),除了最后一層荚藻,最后一層的節(jié)點(diǎn)從左到右緊密排列屋灌。
下面開(kāi)始 PriorityBlockingQueue 的源碼分析,首先我們來(lái)看看構(gòu)造方法:
// 默認(rèn)構(gòu)造方法应狱,采用默認(rèn)值(11)來(lái)進(jìn)行初始化public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null);}// 指定數(shù)組的初始大小public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null);}// 指定比較器public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity];}// 在構(gòu)造方法中就先填充指定的集合中的元素public PriorityBlockingQueue(Collection<? extends E> c) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); // boolean heapify = true; // true if not known to be in heap order boolean screen = true; // true if must screen for nulls if (c instanceof SortedSet<?>) { SortedSet<? extends E> ss = (SortedSet<? extends E>) c; this.comparator = (Comparator<? super E>) ss.comparator(); heapify = false; } else if (c instanceof PriorityBlockingQueue<?>) { PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c; this.comparator = (Comparator<? super E>) pq.comparator(); screen = false; if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } Object[] a = c.toArray(); int n = a.length; // If c.toArray incorrectly doesn't return Object[], copy it. if (a.getClass() != Object[].class) a = Arrays.copyOf(a, n, Object[].class); if (screen && (n == 1 || this.comparator != null)) { for (int i = 0; i < n; ++i) if (a[i] == null) throw new NullPointerException(); } this.queue = a; this.size = n; if (heapify) heapify();}
接下來(lái)共郭,我們來(lái)看看其內(nèi)部的自動(dòng)擴(kuò)容實(shí)現(xiàn):
private void tryGrow(Object[] array, int oldCap) { // 這邊做了釋放鎖的操作 lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; // 用 CAS 操作將 allocationSpinLock 由 0 變?yōu)?1,也算是獲取鎖 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { // 如果節(jié)點(diǎn)個(gè)數(shù)小于 64疾呻,那么增加的 oldCap + 2 的容量 // 如果節(jié)點(diǎn)數(shù)大于等于 64除嘹,那么增加 oldCap 的一半 // 所以節(jié)點(diǎn)數(shù)較小時(shí),增長(zhǎng)得快一些 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1)); // 這里有可能溢出 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } // 如果 queue != array岸蜗,那么說(shuō)明有其他線程給 queue 分配了其他的空間 if (newCap > oldCap && queue == array) // 分配一個(gè)新的大數(shù)組 newArray = new Object[newCap]; } finally { // 重置尉咕,也就是釋放鎖 allocationSpinLock = 0; } } // 如果有其他的線程也在做擴(kuò)容的操作 if (newArray == null) // back off if another thread is allocating Thread.yield(); // 重新獲取鎖 lock.lock(); // 將原來(lái)數(shù)組中的元素復(fù)制到新分配的大數(shù)組中 if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); }}
擴(kuò)容方法對(duì)并發(fā)的控制也非常的巧妙,釋放了原來(lái)的獨(dú)占鎖 lock璃岳,這樣的話龙考,擴(kuò)容操作和讀操作可以同時(shí)進(jìn)行,提高吞吐量矾睦。
下面,我們來(lái)分析下寫(xiě)操作 put 方法和讀操作 take 方法炎功。
public void put(E e) { // 直接調(diào)用 offer 方法枚冗,因?yàn)榍懊嫖覀円舱f(shuō)了,在這里蛇损,put 方法不會(huì)阻塞 offer(e); }public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; // 首先獲取到獨(dú)占鎖 lock.lock(); int n, cap; Object[] array; // 如果當(dāng)前隊(duì)列中的元素個(gè)數(shù) >= 數(shù)組的大小赁温,那么需要擴(kuò)容了 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; // 節(jié)點(diǎn)添加到二叉堆中 if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); // 更新 size size = n + 1; // 喚醒等待的讀線程 notEmpty.signal(); } finally { lock.unlock(); } return true;}
對(duì)于二叉堆而言坛怪,插入一個(gè)節(jié)點(diǎn)是簡(jiǎn)單的,插入的節(jié)點(diǎn)如果比父節(jié)點(diǎn)小股囊,交換它們袜匿,然后繼續(xù)和父節(jié)點(diǎn)比較。
// 這個(gè)方法就是將數(shù)據(jù) x 插入到數(shù)組 array 的位置 k 處稚疹,然后再調(diào)整樹(shù)private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { // 二叉堆中 a[k] 節(jié)點(diǎn)的父節(jié)點(diǎn)位置 int parent = (k - 1) >>> 1; Object e = array[parent]; if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key;}
我們用圖來(lái)示意一下居灯,我們接下來(lái)要將 11 插入到隊(duì)列中,看看 siftUp 是怎么操作的内狗。
我們?cè)倏纯?take 方法:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 獨(dú)占鎖 lock.lockInterruptibly(); E result; try { // dequeue 出隊(duì) while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result;}
private E dequeue() { int n = size - 1; if (n < 0) return null; else { Object[] array = queue; // 隊(duì)頭怪嫌,用于返回 E result = (E) array[0]; // 隊(duì)尾元素先取出 E x = (E) array[n]; // 隊(duì)尾置空 array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; }}
dequeue 方法返回隊(duì)頭,并調(diào)整二叉堆的樹(shù)柳沙,調(diào)用這個(gè)方法必須先獲取獨(dú)占鎖岩灭。
廢話不多說(shuō),出隊(duì)是非常簡(jiǎn)單的赂鲤,因?yàn)殛?duì)頭就是最小的元素噪径,對(duì)應(yīng)的是數(shù)組的第一個(gè)元素。難點(diǎn)是隊(duì)頭出隊(duì)后数初,需要調(diào)整樹(shù)找爱。
private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; // 這里得到的 half 肯定是非葉節(jié)點(diǎn) // a[n] 是最后一個(gè)元素,其父節(jié)點(diǎn)是 a[(n-1)/2]妙真。所以 n >>> 1 代表的節(jié)點(diǎn)肯定不是葉子節(jié)點(diǎn) // 下面缴允,我們結(jié)合圖來(lái)一行行分析,這樣比較直觀簡(jiǎn)單 // 此時(shí) k 為 0, x 為 17珍德,n 為 9 int half = n >>> 1; // 得到 half = 4 while (k < half) { // 先取左子節(jié)點(diǎn) int child = (k << 1) + 1; // 得到 child = 1 Object c = array[child]; // c = 12 int right = child + 1; // right = 2 // 如果右子節(jié)點(diǎn)存在练般,而且比左子節(jié)點(diǎn)小 // 此時(shí) array[right] = 20,所以條件不滿足 if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; // key = 17, c = 12锈候,所以條件不滿足 if (key.compareTo((T) c) <= 0) break; // 把 12 填充到根節(jié)點(diǎn) array[k] = c; // k 賦值后為 1 k = child; // 一輪過(guò)后薄料,我們發(fā)現(xiàn),12 左邊的子樹(shù)和剛剛的差不多泵琳,都是缺少根節(jié)點(diǎn)摄职,接下來(lái)處理就簡(jiǎn)單了 } array[k] = key; }}
記住二叉堆是一棵完全二叉樹(shù),那么根節(jié)點(diǎn) 10 拿掉后获列,最后面的元素 17 必須找到合適的地方放置谷市。首先,17 和 10 不能直接交換击孩,那么先將根節(jié)點(diǎn) 10 的左右子節(jié)點(diǎn)中較小的節(jié)點(diǎn)往上滑迫悠,即 12 往上滑,然后原來(lái) 12 留下了一個(gè)空節(jié)點(diǎn)巩梢,然后再把這個(gè)空節(jié)點(diǎn)的較小的子節(jié)點(diǎn)往上滑创泄,即 13 往上滑艺玲,最后,留出了位子鞠抑,17 補(bǔ)上即可饭聚。
我稍微調(diào)整下這個(gè)樹(shù),以便讀者能更明白:
好了搁拙, PriorityBlockingQueue 我們也說(shuō)完了秒梳。
總結(jié)
我知道本文過(guò)長(zhǎng),相信一字不漏看完的讀者肯定是少數(shù)感混。
ArrayBlockingQueue 底層是數(shù)組端幼,有界隊(duì)列,如果我們要使用生產(chǎn)者-消費(fèi)者模式弧满,這是非常好的選擇婆跑。
LinkedBlockingQueue 底層是鏈表,可以當(dāng)做無(wú)界和有界隊(duì)列來(lái)使用庭呜,所以大家不要以為它就是無(wú)界隊(duì)列滑进。
SynchronousQueue 本身不帶有空間來(lái)存儲(chǔ)任何元素,使用上可以選擇公平模式和非公平模式募谎。
PriorityBlockingQueue 是無(wú)界隊(duì)列扶关,基于數(shù)組,數(shù)據(jù)結(jié)構(gòu)為二叉堆数冬,數(shù)組第一個(gè)也是樹(shù)的根節(jié)點(diǎn)總是最小值节槐。