點贊再看硼婿,養(yǎng)成習(xí)慣辜御,公眾號搜一搜【一角錢技術(shù)】關(guān)注更多原創(chuàng)技術(shù)文章撒蟀。本文 GitHub org_hejianhui/JavaStudy 已收錄元践,有我的系列文章揩懒。
前言
LinkedTransferQueue 是一個由鏈表結(jié)構(gòu)組成的無界阻塞傳輸隊列汹忠,它是一個很多隊列的結(jié)合體(ConcurrentLinkedQueue佳窑,LinkedBlockingQueue制恍,SynchronousQueue),在除了有基本阻塞隊列的功能(但是這個阻塞隊列沒有使用鎖)之外神凑;隊列實現(xiàn)了TransferQueue接口重寫了transfer 和 tryTransfer 方法净神,這組方法和SynchronousQueue公平模式的隊列類似,具有匹配的功能耙厚。
LinkedTransferQueue是LinkedBlockingQueue强挫、SynchronousQueue(公平模式)岔霸、ConcurrentLinkedQueue三者的集合體薛躬,它綜合了這三者的方法虫溜,并且提供了更加高效的實現(xiàn)方式饲化。
隊列創(chuàng)建
TransferQueue<String> queue = new LinkedTransferQueue<String>();
應(yīng)用場景
LinkedTransferQueue采用的一種預(yù)占模式翰意。意思就是消費者線程取元素時梦湘,如果隊列為空崎淳,那就生成一個節(jié)點(節(jié)點元素為null)入隊翼雀,然后消費者線程park住徒探,后面生產(chǎn)者線程入隊時發(fā)現(xiàn)有一個元素為null的節(jié)點葬荷,生產(chǎn)者線程就不入隊了坑夯,直接就將元素填充到該節(jié)點岖寞,喚醒該節(jié)點上park住線程,被喚醒的消費者線程拿貨走人柜蜈。這就是預(yù)占的意思:有就拿貨走人仗谆,沒有就占個位置等著,等到或超時淑履。
我們來看一個例子:
package com.niuh.queue.transfer;
import java.util.Random;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
public class TestLinkedTransferQueue {
public static void main(String[] args) {
TransferQueue<String> queue = new LinkedTransferQueue<String>();
Thread producer = new Thread(new Producer(queue));
producer.setDaemon(true); // 設(shè)置為守護(hù)進(jìn)程使得線程執(zhí)行結(jié)束后程序自動結(jié)束運行
producer.start();
for (int i = 0; i < 10; i++) {
Thread consumer = new Thread(new Consumer(queue));
consumer.setDaemon(true);
consumer.start();
try {
// 消費者進(jìn)程休眠一秒鐘隶垮,以便以便生產(chǎn)者獲得CPU,從而生產(chǎn)產(chǎn)品
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 模擬生產(chǎn)者
*/
class Producer implements Runnable {
private final TransferQueue<String> queue;
public Producer(TransferQueue<String> queue) {
this.queue = queue;
}
private String produce() {
return " number " + (new Random().nextInt(100));
}
@Override
public void run() {
try {
while (true) {
if (queue.hasWaitingConsumer()) {
queue.transfer(produce());
}
TimeUnit.SECONDS.sleep(1);// 生產(chǎn)者睡眠一秒鐘,這樣可以看出程序的執(zhí)行過程
}
} catch (InterruptedException e) {
}
}
}
/**
* 模擬消費者
*/
class Consumer implements Runnable {
private final TransferQueue<String> queue;
public Consumer(TransferQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
System.out.println(" Consumer " + Thread.currentThread().getName() + queue.take());
} catch (InterruptedException e) {
}
}
}
工作原理
LinkedTransferQueue 使用了一個叫做 dual data structure
的數(shù)據(jù)結(jié)構(gòu)秘噪,或者叫做 dual queue
狸吞,翻譯為雙重數(shù)據(jù)結(jié)構(gòu)或者雙重隊列。
雙重隊列是指:放取元素使用同一個隊列指煎,隊列中的節(jié)點具有兩種模式蹋偏,一種是數(shù)據(jù)節(jié)點,一種是非數(shù)據(jù)節(jié)點至壤。
- 放元素時先跟隊列頭節(jié)點對比暖侨,如果頭節(jié)點是非數(shù)據(jù)節(jié)點,就讓它們匹配崇渗,如果頭節(jié)點是數(shù)據(jù)節(jié)點字逗,就生產(chǎn)一個數(shù)據(jù)節(jié)點放在隊列尾端(入隊)京郑。
- 取元素時也是先跟隊列頭節(jié)點對比,如果頭節(jié)點是數(shù)據(jù)節(jié)點葫掉,就讓它們匹配些举,如果頭節(jié)點是非數(shù)據(jù)節(jié)點,就生產(chǎn)一個非數(shù)據(jù)節(jié)點放在隊列尾端(入隊)俭厚。
用圖來來表示如下:
不管是放元素還是取元素户魏,都先跟頭節(jié)點對比,如果二者模式不一樣就匹配它們挪挤,如果二者模式一樣叼丑,就入隊。
源碼分析
定義
LinkedTransferQueue的類繼承關(guān)系如下:
LinkedTransferQueue實現(xiàn)了TransferQueue接口扛门,而TransferQueue接口是繼承自BlockingQueue的鸠信,所以LinkedTransferQueue也是一個阻塞隊列。
其包含的方法定義如下:
TransferQueue接口
對比前面的阻塞隊列论寨,會發(fā)現(xiàn)LinkedTransferQueue 的繼承體系有特殊之處星立。前面的阻塞隊列都直接實現(xiàn)的BlockingQueue接口,在LinkedTransferQueue 卻多了一個TransferQueue 接口葬凳,而該接口繼承至BlockingQueue绰垂。
BlockingQueue 接口代表的是普通的阻塞隊列,TransferQueue 則代表的是另一種特殊阻塞隊列火焰,它是指這樣的一個隊列:當(dāng)生產(chǎn)者向隊列添加元素但隊列已滿時劲装,生產(chǎn)者會被阻塞;當(dāng)消費者從隊列移除元素但隊列為空時昌简,消費者會被阻塞占业。
前面我們分析的SynchronousQueue 不就是有這種特性嗎,但是SynchronousQueue 并沒有實現(xiàn)TransferQueue 接口江场,原因就在于TransferQueue 接口也是在jdk 1.7才出現(xiàn)的纺酸,應(yīng)該是為了和前面的阻塞隊列進(jìn)行區(qū)分,同時為了后面擴充這種特殊的阻塞隊列址否,才加入了TransferQueue 餐蔬,這樣功能才不至于混亂(單一職能原則)。
public interface TransferQueue<E> extends BlockingQueue<E> {
//立即轉(zhuǎn)交一個元素給消費者佑附,如果沒有等待的消費者樊诺,則返回false(元素不入隊)
boolean tryTransfer(E e);
//轉(zhuǎn)交一個元素給消費者,如果沒有等待的消費者音同,則阻塞直到消費者到來词爬,或者發(fā)生異常
void transfer(E e) throws InterruptedException;
//轉(zhuǎn)交一個元素給消費者,如果沒有等待的消費者权均,則阻塞直到超時
boolean tryTransfer(E e, long timeout, TimeUnit unit)throws InterruptedException;
//是否存在等待的消費者
boolean hasWaitingConsumer();
//返回等待的消費者的個數(shù)
int getWaitingConsumerCount();
}
在SynchronousQueue 中也有類似的方法顿膨,當(dāng)然沒有這么多锅锨,只是以內(nèi)部類的形式存在(TransferQueue、TransferStack)恋沃,而在LinkedTransferQueue 則把這種阻塞操作抽成了接口必搞。
成員屬性
// 判斷是否為多核
private static final boolean MP =Runtime.getRuntime().availableProcessors() > 1;
// 自旋次數(shù)
private static final int FRONT_SPINS = 1 << 7;
// 前驅(qū)節(jié)點正在處理,當(dāng)前節(jié)點需要自旋的次數(shù)
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
// 容忍清除節(jié)點失敗次數(shù)的閾值
static final int SWEEP_THRESHOLD = 32;
/** 頭節(jié)點 */
transient volatile Node head;
/** 尾節(jié)點 */
private transient volatile Node tail;
/*
* 放取元素的幾種方式囊咏,調(diào)用xfer()方法時需要傳入,區(qū)分不同處理恕洲,
* xfer()方法是LinkedTransferQueue的最核心的方法
*/
// 立即返回,用于非超時的 poll() 和 tryTransfer() 方法中
private static final int NOW = 0; // for untimed poll, tryTransfer
// 異步梅割,不會阻塞霜第,用于放元素時,因為內(nèi)部使用無界單鏈表存儲元素户辞,不會阻塞放元素的過程
private static final int ASYNC = 1; // for offer, put, add
// 同步泌类,調(diào)用的時候如果沒有匹配到會阻塞直到匹配為止
private static final int SYNC = 2; // for transfer, take
// 超時,用于有超時的poll() 和 tryTransfer() 方法中
private static final int TIMED = 3; // for timed poll, tryTransfer
注意:xfer
者幾個參數(shù)很重要咆课。
-
NOW
:表示的是立即末誓,不需要等待的意思扯俱,用于poll和tryTransfer方法书蚪,poll 隊列為空返回,tryTransfer隊列沒有消費者迅栅,構(gòu)造一個空的節(jié)點直接返回殊校,都是不等待的。 -
ASYNC
:異步读存,offer, put, add等入隊方法为流,由于是無界隊列,所以不會阻塞让簿。 -
SYNC
:同步表示會阻塞敬察,take一個元素,沒有就會阻塞尔当,transfer傳輸莲祸,必須等待消費者來消費。 -
TIMED
:帶超時時間的now椭迎,會等待一定的時間后返回锐帜。
主要內(nèi)部類
static final class Node {
// 是否是數(shù)據(jù)節(jié)點(也就標(biāo)識了是生產(chǎn)者還是消費者)
final boolean isData; // false if this is a request node
// 元素的值
volatile Object item; // initially non-null if isData; CASed to match
// 下一個節(jié)點
volatile Node next;
// 持有元素的線程
volatile Thread waiter; // null until waiting
}
典型的單鏈表結(jié)構(gòu),內(nèi)部除了存儲元素的值和下一個節(jié)點的指針外畜号,還包含了是否為數(shù)據(jù)節(jié)點和持有元素的線程缴阎。內(nèi)部通過 isData 區(qū)分是生產(chǎn)者還是消費者。
構(gòu)造函數(shù)
public LinkedTransferQueue() {
}
public LinkedTransferQueue(Collection<? extends E> c) {
this();
addAll(c);
}
只有這兩個構(gòu)造方法简软,且沒有初始容量蛮拔,所以是無界的一個阻塞隊列述暂。
入隊方法
LinkedTransferQueue提供了add、put建炫、offer三類方法贸典,用于將元素放到隊列中。其中三類(4個)方法都是一樣的踱卵,使用異步的方式調(diào)用 xfer() 方法廊驼,傳入的參數(shù)都一模一樣。
注意:我們這里所說的入隊操作是指add,put,offer這幾個方法惋砂,而不是指真正的把節(jié)點入隊的操作妒挎,因為LinkedTransferQueue 中針對的不是數(shù)據(jù),而是操作西饵,操作可能需要入隊酝掩,而這個操作可能是放數(shù)據(jù)操作,也可能是取數(shù)據(jù)操作眷柔,這里注意區(qū)分一下期虾,不要搞混了。
public void put(E e) {
// 異步模式驯嘱,不會阻塞镶苞,不會超時
// 因為是放元素,單鏈表存儲鞠评,會一直往后加
xfer(e, true, ASYNC, 0);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
xfer(e, true, ASYNC, 0);
return true;
}
public boolean offer(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
public boolean add(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
LinkedTransferQueue 是一個由鏈表組成的無界隊列茂蚓,因此不會有容量限制(一定范圍內(nèi)),因此這里入隊的操作都不會阻塞(因此超時入隊方法實際也沒有用)剃幌,也就是說聋涨,入隊后線程會立即返回负乡,這個是參數(shù)ASYNC的作用牍白。
xfer(E e, boolean haveData, int how, long nanos)的參數(shù)分別是:
- e表示元素;
- haveData表示是否是數(shù)據(jù)節(jié)點抖棘,
- how表示放取元素的方式茂腥,上面提到的四種,NOW钉答、ASYNC础芍、SYNC、TIMED数尿;
- nanos表示超時時間仑性;
xfer()方法
在看 xfer 方法之前,我們先來了解以下大致流程右蹦,來幫助我們理解诊杆。
LinkedTransferQueue 和 SynchronousQueue 是一樣的歼捐,隊列中主要的不是針對數(shù)據(jù),而是操作(put或take晨汹,注意這里put豹储、take指的是放入數(shù)據(jù)和取數(shù)據(jù)),隊列中即可以存儲入隊操作淘这,也可以存儲出隊操作剥扣,當(dāng)隊列為空時,如果有線程進(jìn)行出隊操作铝穷,那么這個時候隊列是沒有數(shù)據(jù)的钠怯,那么這個操作就會被入隊,同時線程也會阻塞曙聂,直到數(shù)據(jù)的到來(或出現(xiàn)異常)晦炊,如果最開始隊列為空,放入數(shù)據(jù)的操作到來宁脊,那么數(shù)據(jù)就會被放到隊列中断国,此后如果取數(shù)據(jù)操作到來,那么就會從隊列中取出數(shù)據(jù)榆苞,因此可以知道隊列中存放的都是一系列相同的操作(put-放數(shù)據(jù)操作 或 take-取數(shù)據(jù)操作)稳衬。
接下來我們先說 放數(shù)據(jù)操作,那么如果隊列為空语稠,那么直接將數(shù)據(jù)入隊即可宋彼,同時因為是無界隊列弄砍,線程不會阻塞仙畦,直接返回,如果隊列不為空音婶,那么隊列里面可能有兩種情況:
- 存放的都是數(shù)據(jù)慨畸。那么本次操作的和隊列中的節(jié)點操作是一樣的,因此直接把數(shù)據(jù)放到隊列末尾衣式,線程返回寸士。
- 存放的都是取數(shù)據(jù)操作。那么本次操作和隊列中的節(jié)點操作是不一樣的(也就是匹配的碴卧,放入數(shù)據(jù)操作和取數(shù)據(jù)操作是匹配的弱卡,也就是不同的操作是匹配的,相同的操作是不匹配的)住册,那么就把對頭的節(jié)點出隊婶博,就把本次的數(shù)據(jù)給對頭節(jié)點,同時喚醒該節(jié)點的線程荧飞。
private E xfer(E e, boolean haveData, int how, long nanos) {
// 不允許放入空元素
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
// 外層循環(huán)凡人,自旋名党,失敗就重試
retry:
for (;;) { // restart on append race
/**
* 下面這個for循環(huán)用于控制匹配的過程
* 同一時刻隊列中只會存儲一種類型的節(jié)點
* 從頭節(jié)點開始嘗試匹配,如果頭節(jié)點被其它線程先一步匹配了挠轴,
* 就再嘗試其下一個传睹,直到匹配為止,或者到隊列中沒有元素為止
*/
for (Node h = head, p = h; p != null;) { // find & match first node
// p節(jié)點的模式
boolean isData = p.isData;
// p節(jié)點的值
Object item = p.item;
// p沒有被匹配到
if (item != p && (item != null) == isData) { // unmatched
// 如果兩者模式一樣岸晦,則不能匹配欧啤,跳出循環(huán)后嘗試入隊
if (isData == haveData) // can't match
break;
// 如果兩者模式不一樣,則嘗試匹配
// 把p的值設(shè)置為e(如果是取元素則e是null启上,如果是放元素則e是元素值)
if (p.casItem(item, e)) { // match
// 匹配成功
// for里面的邏輯比較復(fù)雜堂油,用于控制多線程同時放取元素時出現(xiàn)競爭的情況的
// 看不懂可以直接跳過
for (Node q = p; q != h;) {
// 進(jìn)入到這里可能是頭節(jié)點已經(jīng)被匹配,然后p會變成h的下一個節(jié)點
Node n = q.next; // update by 2 unless singleton
// 如果head還沒變碧绞,就把它更新成新的節(jié)點
// 并把它刪除(forgetNext()會把它的next設(shè)為自己府框,也就是從單鏈表中刪除了)
// 這時為什么要把head設(shè)為n呢?因為到這里了讥邻,肯定head本身已經(jīng)被匹配掉了
// 而上面的p.casItem()又成功了迫靖,說明p也被當(dāng)前這個元素給匹配掉了
// 所以需要把它們倆都出隊列,讓其它線程可以從真正的頭開始兴使,不用重復(fù)檢查了
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
// 如果新的頭節(jié)點為空系宜,或者其next為空,或者其next未匹配发魄,就重試
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
// 喚醒p中等待的線程
LockSupport.unpark(p.waiter);
// 并返回匹配到的元素
return LinkedTransferQueue.<E>cast(item);
}
}
// p已經(jīng)被匹配了或者嘗試匹配的時候失敗了
// 也就是其它線程先一步匹配了p
// 這時候又分兩種情況盹牧,p的next還沒來得及修改,p的next指向了自己
// 如果p的next已經(jīng)指向了自己励幼,就重新取head重試汰寓,否則就取其next重試
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
// 到這里肯定是隊列中存儲的節(jié)點類型和自己一樣 或者 隊列中沒有元素了,就入隊(不管放元素還是取元素都得入隊)
// 入隊又分成四種情況:
// NOW苹粟,立即返回有滑,沒有匹配到立即返回,不做入隊操作
// ASYNC嵌削,異步毛好,元素入隊但當(dāng)前線程不會阻塞(相當(dāng)于無界LinkedBlockingQueue的元素入隊)
// SYNC,同步苛秕,元素入隊后當(dāng)前線程阻塞肌访,等待被匹配到
// TIMED,有超時艇劫,元素入隊后等待一段時間被匹配吼驶,時間到了還沒匹配到就返回元素本身
// 如果不是立即返回
if (how != NOW) { // No matches available
// 新建s節(jié)點
if (s == null)
s = new Node(e, haveData);
// 嘗試入隊
Node pred = tryAppend(s, haveData);
// 入隊失敗,重試
if (pred == null)
continue retry; // lost race vs opposite mode
// 如果不是異步(同步或者有超時)
// 就等待被匹配
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
對于這里的操作 (放入數(shù)據(jù)) ,尋找匹配節(jié)點:
- 如果找到了旨剥,就設(shè)置item值咧欣,然后unpark匹配節(jié)點的waiter線程,返回(其實就是看看隊列里面的操作是不是取數(shù)據(jù)操作)轨帜,否則就入隊(NOW直接返回)魄咕;
- 如果沒有找到匹配節(jié)點,則根據(jù)傳入的how來處理蚌父,NOW直接返回哮兰,其余三種先入隊,入隊后如果是ASYNC則返回苟弛,SYNC和TIMED則會阻塞等待匹配喝滞。
我們再看看里面的部分方法:
/**
* Returns true if this node has been matched, including the
* case of artificial matches due to cancellation.
*/
final boolean isMatched() {
Object x = item;
return (x == this) || ((x == null) == isData);
}
如果操作節(jié)點已經(jīng)被匹配了,那么item會被改變膏秫,對于取數(shù)據(jù)操作右遭,那么item會被設(shè)置成數(shù)據(jù),如果操作被取消了缤削,那么會設(shè)置item為this窘哈。
/**
* Links node to itself to avoid garbage retention. Called
* only after CASing head field, so uses relaxed write.
*/
final void forgetNext() {
UNSAFE.putObject(this, nextOffset, this);
}
forgetNext 設(shè)置為 next 為自身,也就是脫離鏈表亭敢,同時方便gc回收自己滚婉。
接下來我們再看看入隊調(diào)用的 tryAppend 方法:
private Node tryAppend(Node s, boolean haveData) {
// 從tail開始遍歷,把s放到鏈表尾端
for (Node t = tail, p = t;;) { // move p to last node and append
Node n, u; // temps for reads of next & tail
// 如果首尾都是null帅刀,說明鏈表中還沒有元素
if (p == null && (p = head) == null) {
// 就讓首節(jié)點指向s
// 注意让腹,這里插入第一個元素的時候tail指針并沒有指向s
if (casHead(null, s))
return s; // initialize
}
else if (p.cannotPrecede(haveData))
// 如果p無法處理,則返回null
// 這里無法處理的意思是扣溺,p和s節(jié)點的類型不一樣骇窍,不允許s入隊
// 比如,其它線程先入隊了一個數(shù)據(jù)節(jié)點娇妓,這時候要入隊一個非數(shù)據(jù)節(jié)點像鸡,就不允許,
// 隊列中所有的元素都要保證是同一種類型的節(jié)點
// 返回null后外面的方法會重新嘗試匹配重新入隊等
return null; // lost race vs opposite mode
else if ((n = p.next) != null) // not last; keep traversing
// 如果p的next不為空哈恰,說明不是最后一個節(jié)點
// 則讓p重新指向最后一個節(jié)點
p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
else if (!p.casNext(null, s))
// 如果CAS更新s為p的next失敗
// 則說明有其它線程先一步更新到p的next了
// 就讓p指向p的next,重新嘗試讓s入隊
p = p.next; // re-read on CAS failure
else {
// 到這里說明s成功入隊了
// 如果p不等于t志群,就更新tail指針
// 還記得上面插入第一個元素時tail指針并沒有指向新元素嗎着绷?
// 這里就是用來更新tail指針的
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
// 返回p,即s的前一個元素
return p;
}
}
}
這個操作入隊(把節(jié)點鏈接接到鏈表末尾)看上去有點復(fù)雜锌云,主要原因還是沒有使用鎖荠医,存在很多并發(fā)情況下,有可能自己在添加節(jié)點入隊的時候,其它線程已經(jīng)把隊列改變了彬向,那么這個時候就需要重新找到隊尾兼贡,進(jìn)行提那件操作,添加成功后娃胆,也需要設(shè)置隊尾指針遍希,這個時候隊尾指針可能也被其它線程設(shè)置了,那么這個時候自己也要保證隊尾指針是正確的(遍歷驗證)里烦。
在上面看到有個方法:p.cannotPrecede(haveData)凿蒜,如果數(shù)據(jù)不符合要求,那么是不會入隊的胁黑。
/**
* Returns true if a node with the given mode cannot be
* appended to this node because this node is unmatched and
* has opposite data mode.
*/
final boolean cannotPrecede(boolean haveData) {
boolean d = isData;
Object x;
return d != haveData && (x = item) != this && (x != null) == d;
}
這個就是驗證操作和其它數(shù)據(jù)節(jié)點的數(shù)據(jù)是否吻合的废封。
awaitMatch 這個我們在出隊來分析,因為放數(shù)據(jù)的過程是不會阻塞的丧蘸,當(dāng)然也更不會執(zhí)行該方法漂洋。
出隊方法
LinkedTransferQueue提供了poll、take力喷、remove方法用于出列元素氮发,出隊的三類(4個)方法也是直接或間接的調(diào)用xfer()方法,放取元素的方式和超時規(guī)則略微不同冗懦,本質(zhì)沒有大的區(qū)別爽冕。
注意:同上披蕉,這里的出隊操作,指的是poll眯娱、take方法,而不是真正指的是出隊操作徙缴,因為poll嘁信、take操作也可能會入隊(隊列針對的是操作于样,不是數(shù)據(jù))。
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
public E take() throws InterruptedException {
// 同步模式潘靖,會阻塞直到取到元素
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 有超時時間
E e = xfer(null, false, TIMED, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}
public E poll() {
// 立即返回穿剖,沒取到元素返回null
return xfer(null, false, NOW, 0);
}
通過上面一系列的方法,我們看到卦溢,其實poll糊余、take內(nèi)部調(diào)用的仍然是xfer 方法秀又,因為是取數(shù)據(jù),因此參數(shù)部分發(fā)生了變化贬芥,這個注意一下吐辙。
xfer()方法
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
//這里是取數(shù)據(jù)操作,那么遍歷隊列看看有沒有匹配的操作(即放數(shù)據(jù)操作)
for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData;
Object item = p.item;
if (item != p && (item != null) == isData) { // unmatched
if (isData == haveData) // can't match
break;
/**
* 隊列里面確實都是放數(shù)據(jù)的操作蘸劈,則和當(dāng)前操作是匹配的
* 設(shè)置匹配操作節(jié)點的item域為null (e為null昏苏,原本item 域是數(shù)據(jù))
*/
if (p.casItem(item, e)) { // match
// 協(xié)助推進(jìn)head,這個和上面是一樣的
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
// 喚醒阻塞線程(實際這里p.waiter是為null的,因為放數(shù)據(jù)操作是非阻塞的)
LockSupport.unpark(p.waiter);
// item線程是數(shù)據(jù)昵时,本次操作是取數(shù)據(jù)操作捷雕,因此返回數(shù)據(jù)
return LinkedTransferQueue.<E>cast(item);
}
}
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
// 如果參數(shù)指定為NOW,那么就算沒有被匹配壹甥,那么還是不入隊救巷,直接返回
if (how != NOW) { // No matches available
if (s == null)
s = new Node(e, haveData);
// 添加節(jié)點
Node pred = tryAppend(s, haveData);
if (pred == null)
continue retry; // lost race vs opposite mode
/**
* 如果參數(shù)不是ASYC的這種,這可能需要阻塞等待
* 取數(shù)據(jù)操作其參數(shù)都不是ASYNC句柠,因此如果沒有取到數(shù)據(jù)(被匹配)浦译,那么就可能進(jìn)行阻塞等待
*/
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
在這里我們分析的是 取數(shù)據(jù)操作 ,因為有了前面 放數(shù)據(jù)操作 的分析溯职,這里應(yīng)該還是很好理解精盅,取數(shù)據(jù)和放數(shù)據(jù)都是差不多的,都是和隊列里面的操作進(jìn)行匹配谜酒,如果隊列里面的操作是取數(shù)據(jù)操作僻族,本次操作是取數(shù)據(jù)操作,那么此時是不匹配的蝌数,需要把本次操作入隊(參數(shù):NOW顶伞、ASYNC唆貌、SYNC挠锥、TIMED 不一樣)侨赡,如果隊列的操作都是放數(shù)據(jù)操作蓖宦,本次操作是取數(shù)據(jù)操作稠茂,那么這個是匹配的睬关,就把對頭的數(shù)據(jù)取出來毡证,返回即可料睛。
下面我們來看看 awaitMatch 方法:
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
// 如果是有超時的恤煞,計算其超時時間
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 當(dāng)前線程
Thread w = Thread.currentThread();
// 自旋次數(shù)
int spins = -1; // initialized after first item and cancel checks
// 隨機數(shù)居扒,隨機讓一些自旋的線程讓出CPU
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
Object item = s.item;
// 如果s元素的值不等于e喜喂,說明它被匹配到了
if (item != e) { // matched
// assert item != s;
// 把s的item更新為s本身
// 并把s中的waiter置為空
s.forgetContents(); // avoid garbage
// 返回匹配到的元素
return LinkedTransferQueue.<E>cast(item);
}
// 如果當(dāng)前線程中斷了姻灶,或者有超時的到期了
// 就更新s的元素值指向s本身
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) { // cancel
// 嘗試解除s與其前一個節(jié)點的關(guān)系
// 也就是刪除s節(jié)點
unsplice(pred, s);
// 返回元素的值本身产喉,說明沒匹配到
return e;
}
// 如果自旋次數(shù)小于0曾沈,就計算自旋次數(shù)
if (spins < 0) { // establish spins at/near front
// spinsFor()計算自旋次數(shù)
// 如果前面有節(jié)點未被匹配就返回0
// 如果前面有節(jié)點且正在匹配中就返回一定的次數(shù)塞俱,等待
if ((spins = spinsFor(pred, s.isData)) > 0)
// 初始化隨機數(shù)
randomYields = ThreadLocalRandom.current();
}
else if (spins > 0) { // spin
// 還有自旋次數(shù)就減1
--spins;
// 并隨機讓出CPU
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // occasionally yield
}
else if (s.waiter == null) {
// 更新s的waiter為當(dāng)前線程
s.waiter = w; // request unpark then recheck
}
else if (timed) {
// 如果有超時障涯,計算超時時間唯蝶,并阻塞一定時間
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
else {
// 不是超時的,直接阻塞鼓蜒,等待被喚醒
// 喚醒后進(jìn)入下一次循環(huán)都弹,走第一個if的邏輯就返回匹配的元素了
LockSupport.park(this);
}
}
}
這里和 SynchronousQueue 的 awaitFulfill 差不多畅厢,主要進(jìn)行了自旋或详,如果自旋后霸琴,仍然沒有被匹配或者取消昭伸,則進(jìn)行阻塞(如果設(shè)置了超時阻塞庐杨,則進(jìn)行一段時間的阻塞)灵份,如果發(fā)生了中斷異常,會取消該操作弦聂,改變item的值莺葫,匹配成功后也會更改item的值捺檬,因此如果item和原來的值不想等時堡纬,則說明發(fā)生了改變,返回即可饺饭。
在 awaitMatch 過程中职车,如果線程被中斷了悴灵,或者超時了則會調(diào)用 unsplice() 方法去除該節(jié)點。
final void unsplice(Node pred, Node s) {
//清除s的部分?jǐn)?shù)據(jù)
s.forgetContents(); // forget unneeded fields
if (pred != null && pred != s && pred.next == s) {
Node n = s.next;
if (n == null ||
(n != s && pred.casNext(s, n) && pred.isMatched())) {
/**
*這個for循環(huán)登下,用于推進(jìn)head,如果head已經(jīng)被匹配了被芳,則需要更新head
*/
for (;;) { // check if at, or could be, head
Node h = head;
if (h == pred || h == s || h == null)
return; // at head or list empty
//h 沒有被匹配剩晴,跳出循環(huán)侵状,否則可能需要更新head
if (!h.isMatched())
break;
Node hn = h.next;
//遍歷結(jié)束了绽左,退出循環(huán)
if (hn == null)
return; // now empty
//head 被匹配了拼窥,重新設(shè)置head
if (hn != h && casHead(h, hn))
h.forgetNext(); // advance head
}
//s節(jié)點被移除后闯团,需要記錄刪除的操作次數(shù)房交,如果超過閥值候味,則需要清理隊列
if (pred.next != pred && s.next != s) { // recheck if offlist
for (;;) { // sweep now if enough votes
int v = sweepVotes;
//沒超過閥值,則遞增記錄值
if (v < SWEEP_THRESHOLD) {
if (casSweepVotes(v, v + 1))
break;
}
else if (casSweepVotes(v, 0)) {
//重新設(shè)置記錄數(shù)尚胞,并清理隊列
sweep();
break;
}
}
}
}
}
}
private void sweep() {
for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
if (!s.isMatched()) // s節(jié)點未被匹配,則繼續(xù)向后遍歷
// Unmatched nodes are never self-linked
p = s;
else if ((n = s.next) == null) //s節(jié)點被匹配粱玲,但是是尾節(jié)點抽减,則退出循環(huán)
//s為尾結(jié)點卵沉,則可能其它線程剛好匹配完,所有這里不移除s琼掠,讓其它匹配線程操作
break;
else if (s == n) // stale s節(jié)點已經(jīng)脫離了隊列了眉枕,重頭開始遍歷
// No need to also check for p == s, since that implies s == n
p = head;
else
p.casNext(s, n); //移除s節(jié)點
}
}
看看這個移除操作也挺復(fù)雜的速挑,這里并沒有簡單的就將節(jié)點移除就ok姥宝,同時還檢查了隊列 head 的有效性腊满,如果 head 被匹配了碳蛋,則會推薦 head肃弟,保持隊列 head 是有效的穷缤。如果移除節(jié)點的前驅(qū)節(jié)點也失效了津肛,說明其它線程在操作身坐,這里就不操作了掀亥,當(dāng)移除了節(jié)點后,需要記錄移除節(jié)點的操作次數(shù) sweepVotes嘹害,如果這個值超過了閥值笔呀,則會對隊列進(jìn)行清理(移除那些失效的節(jié)點)许师。
移交元素的方法
請注意第二個參數(shù)微渠,都是true逞盆,也就是這三個方法其實也是放元素的方法
// 立即轉(zhuǎn)交一個元素給消費者云芦,如果此時隊列沒有消費者贸桶,那就false
public boolean tryTransfer(E e) {
// 立即返回
return xfer(e, true, NOW, 0) == null;
}
// 轉(zhuǎn)交一個元素給消費者,如果此時隊列沒有消費者琉历,那就阻塞
public void transfer(E e) throws InterruptedException {
// 同步模式
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// 有超時時間
if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
獲取隊列首個有效操作
在SynchronousQueue 中其隊列是無法遍歷的灼捂,而且也無法獲取對頭信息悉稠,但是在 LinkedTransferQueue 卻不一樣,LinkedTransferQueue 可以獲取隊頭艘包,也可以進(jìn)行遍歷 的猛。
peek() 方法
public E peek() {
return firstDataItem();
}
private E firstDataItem() {
//遍歷隊列,查找第一個有效的操作節(jié)點
for (Node p = head; p != null; p = succ(p)) {
Object item = p.item;
//如果該節(jié)點是數(shù)據(jù)節(jié)點想虎,同時沒有被取消卦尊,則返回數(shù)據(jù)
if (p.isData) {
if (item != null && item != p)
return LinkedTransferQueue.<E>cast(item);
}
else if (item == null)// 非數(shù)據(jù)節(jié)點返回null,這里注意
return null;
}
return null;
}
final Node succ(Node p) { //如果節(jié)點p 失效則返回head,否則返回p的后繼
Node next = p.next;
return (p == next) ? head : next;
}
這個 peek() 方法返回的是隊列的第一個有效的節(jié)點舌厨,而這個節(jié)點可能是數(shù)據(jù)節(jié)點岂却,也可能是取數(shù)據(jù)的操作節(jié)點,那么peek可能返回數(shù)據(jù),也可能返回null,但是返回null正驻,并不一定是隊列為空氓栈,也可能是隊列里面都是取數(shù)據(jù)的操作節(jié)點提完,這個需要注意一下。
總結(jié)
- LinkedTransferQueue 可以看在是 LinkedBlockingQueue挪捕、SynchronousQueue(公平模式)奏纪、ConcurrentLinkedQeueue 三者的集合體兔簇;
- LinkedTransferQueue 的實現(xiàn)方式使用一種叫做 “雙重隊列” 的數(shù)據(jù)結(jié)構(gòu)此虑;
- 不管是取元素還是放元素都會入隊韭寸;
- 先嘗試跟頭節(jié)點比較椰拒,如果二者模式不一樣,就匹配它們,組成CP践啄,然后返回對方的值聂儒;
- 如果二者模式一樣非春,就入隊储耐,并自旋或阻塞等待被喚醒闽撤;
- 至于是否入隊及阻塞有四種方式:NOW闸餐、ASYNC场勤、SYNC、TIMED留瞳;
- LinkedTransferQueue 全程都沒有使用 synchronized硬梁、重入鎖等比較中的鎖跃巡,基本是通過 自旋 + CAS 實現(xiàn)苍狰;
- 對于入隊后胡野,先自旋一定次數(shù)后再調(diào)用 LockSupport.park() 或 LockSupport.parkNanos() 阻塞。
LinkedTransferQueue和SynchronousQueue(公平模式)區(qū)別
- LinkedTransferQueue 和SynchronousQueue 其實基本是差不多的递览,兩者都是無鎖帶阻塞功能的隊列,都是使用的雙重隊列痹栖;
- SynchronousQueue 通過內(nèi)部類Transferer 來實現(xiàn)公平和非公平隊列南捂,在LinkedTransferQueue 中沒有公平與非公平的區(qū)分溺健;
- LinkedTransferQueue 實現(xiàn)了TransferQueue接口岭辣,該接口定義的是帶阻塞操作的操作躏精,相比SynchronousQueue 中的Transferer 功能更豐富瞭吃。
- SynchronousQueue 中放數(shù)據(jù)操作和取數(shù)據(jù)操作都是阻塞的抠刺,當(dāng)隊列中的操作和本次操作不匹配時,線程會阻塞罕容,直到匹配的操作到來妨马。LinkedTransferQueue 是無界隊列,放數(shù)據(jù)操作不會阻塞杀赢,取數(shù)據(jù)操作如果沒有匹配操作可能會阻塞,通過參數(shù)決定是否阻塞(ASYNC,SYNC,NOW,TIMED)湘纵。
PS:以上代碼提交在 Github :https://github.com/Niuh-Study/niuh-juc-final.git
文章持續(xù)更新脂崔,可以公眾號搜一搜「 一角錢技術(shù) 」第一時間閱讀, 本文 GitHub org_hejianhui/JavaStudy 已經(jīng)收錄梧喷,歡迎 Star砌左。