本文作者:王一飛,叩丁狼高級講師纹份。原創(chuàng)文章文黎,轉(zhuǎn)載請注明出處侵俗。
概述
SynchronousQueue 是一個(gè)特殊的阻塞BlockingQueue隊(duì)列(實(shí)現(xiàn)類), 但是它跟BlockingQueue又有顯著不同:
1>SynchronousQueue沒有容量,算是一個(gè)不存儲元素的BlockingQueue背镇。每次put操作之后,當(dāng)前線程(生產(chǎn)者)會掛起,等待另外一個(gè)線程(消費(fèi)者)執(zhí)行take操作后,才會喚醒掛起線程(生產(chǎn)者)執(zhí)行,否則阻塞,不運(yùn)行添加元素.反之亦然泽裳。
2>SynchronousQueue 因?yàn)闆]有容量,所以遍歷,size, isEmpty 這些常規(guī)具有遍歷性質(zhì)的方法就沒意義啦
3>SynchronousQueue分公平隊(duì)列和非公平隊(duì)列瞒斩,默認(rèn)是false,非公平隊(duì)列. 當(dāng)然,我也也可以使用SynchronousQueue(boolean fair) 構(gòu)造器額外指定.
外部結(jié)構(gòu):
內(nèi)部結(jié)構(gòu):(有點(diǎn)復(fù)雜)
構(gòu)造器:
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
帶boolean類型參數(shù)的構(gòu)造器, fair 為true時(shí), 是公平策略, 使用TransferQueue 對象實(shí)現(xiàn), fairl為fasle時(shí), 非公平策略隊(duì)列所, 使用TransferStack 對象實(shí)現(xiàn).
//內(nèi)部api: TransferQueue 跟 TransferStack 類的父類
//目的:規(guī)范跟統(tǒng)一SynchronousQueue 隊(duì)列公平與非公平策略操作api
abstract static class Transferer<E> {
//take 跟 put 操作
abstract E transfer(E e, boolean timed, long nanos);
}
TransferStack: 非公平隊(duì)列
static final class TransferStack<E> extends Transferer<E> {
static final int REQUEST = 0; //當(dāng)前線程是消費(fèi)者
static final int DATA = 1; //當(dāng)前線程是生成者
static final int FULFILLING = 2; //其他情況
//使用SNode對象輔助完成隊(duì)列實(shí)現(xiàn)
static final class SNode {
volatile SNode next;
volatile SNode match;
volatile Thread waiter;
Object item;
int mode;
//.....................
}
E transfer(E e, boolean timed, long nanos) {
//.....
}
}
SynchronousQueue隊(duì)列實(shí)現(xiàn)非公平策略核心類:TransferStack, 內(nèi)部定義SNode作為容器內(nèi)容節(jié)點(diǎn).
TransferQueue: 公平隊(duì)列
static final class TransferQueue<E> extends Transferer<E> {
static final class QNode {
volatile QNode next;
volatile Object item;
volatile Thread waiter;
final boolean isData; //標(biāo)記生產(chǎn)者還是消費(fèi)者
//..................
}
E transfer(E e, boolean timed, long nanos) {
//......................
}
}
SynchronousQueue隊(duì)列實(shí)現(xiàn)公平策略核心類:TransferQueue, 內(nèi)部定義QNode 作為容器內(nèi)容節(jié)點(diǎn).
基本使用
1:take/put 操作線程阻塞,等待配對的put/take
public class App {
public static void main(String[] args) throws InterruptedException {
final SynchronousQueue<String> queue = new SynchronousQueue<String>();
queue.put("a");
queue.take();
System.out.println("end....");
}
}
運(yùn)行后, 并控制臺中并沒有打印出"end.....", 前面提到了, SynchronousQueue 沒有容量的概念, 生產(chǎn)線程put元素之后,線程會自動掛起, 等待消費(fèi)線程take喚醒.執(zhí)行put方法之后, main線程會掛起, 所以執(zhí)行不了end....., 執(zhí)行take操作也一樣. 結(jié)論:SynchronousQueue take跟put 必須是配對的, 否則總一個(gè)線程被掛起.
2: 查詢性質(zhì)方法無意義, 必須等待take 與put 配對之后線程才可以繼續(xù)執(zhí)行
public class App {
public static void main(String[] args) throws InterruptedException {
final SynchronousQueue<String> queue = new SynchronousQueue<String>();
queue.put("a"); //線程被掛起
System.out.println("大小:" + queue.size());
System.out.println("為空:" + queue.isEmpty());
}
}
3: SynchronousQueue 執(zhí)行規(guī)則: 一個(gè)put 對應(yīng)一個(gè)take,反之亦然
public class App {
public static void main(String[] args) throws InterruptedException {
final SynchronousQueue<String> queue = new SynchronousQueue<String>();
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("t1 begin...");
queue.put("a");
System.out.println("t1 end...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
Thread.sleep(1000);
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("t2 begin...");
System.out.println(queue.take());
System.out.println("t2 end....");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
輸出結(jié)果:
t1 begin...
t2 begin...
a
t2 end....
t1 end...
多執(zhí)行幾次, 結(jié)果差不多, 但每次a的輸出必須是在t2 begin... 執(zhí)行之后, 原因: t1 線程添加a元素之后馬上掛起, 等t2線程執(zhí)行take消費(fèi)并喚醒之后, t1才有機(jī)會繼續(xù)執(zhí)行.
4:SynchronousQueue分公平隊(duì)列和非公平隊(duì)列
公平隊(duì)列: put線程入隊(duì)時(shí), 會依次掛起. 當(dāng)執(zhí)行take線程時(shí),掛起的put線程按FIFO原則,誰先掛起,誰先喚醒.
非公平隊(duì)列: put線程入隊(duì)時(shí), 會依次掛起. 當(dāng)執(zhí)行take線程時(shí), 隨機(jī)喚醒掛起的put線程.
public class App{
public static void main(String[] args) throws InterruptedException {
//非公平
final SynchronousQueue<String> queue = new SynchronousQueue<String>(false);
//公平
//final SynchronousQueue<String> queue = new SynchronousQueue<String>(true);
//創(chuàng)建5個(gè)偶數(shù)put線程
for (int i = 0; i < 10; i+= 2){
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " begin...");
//用于識別隊(duì)列中元素由哪個(gè)線程入列的
queue.put("put:" + Thread.currentThread().getName());
System.out.println(Thread.currentThread().getName() + " end...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t" + i).start();
}
//休眠4s保證 put線程全部掛起等待
Thread.sleep(4000);
//創(chuàng)建5個(gè)奇數(shù)take線程
for (int i = 1; i < 10; i+=2){
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " begin...");
//查看take 與 put 線程配對
System.out.println(queue.take());
System.out.println(Thread.currentThread().getName() + " end...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"t" + i).start();
}
}
}
結(jié)果分析:
//公平
final SynchronousQueue<String> queue = new SynchronousQueue<String>(true);
-------------------------------------------------
t2 begin...
t6 begin...
t0 begin...
t4 begin...
t8 begin...
t1 begin...
put:t2
t1 end...
t3 begin...
put:t6
t3 end...
t9 begin...
put:t0
t9 end...
t0 end...
t7 begin...
put:t4
t7 end...
t5 begin...
t4 end...
t2 end...
t8 end...
t6 end...
put:t8
t5 end...
將SynchronousQueue 切換成公平鎖策略時(shí), put線程掛起順序依次是: t2 t6 t0 t4 t8, 4s之后, take線程執(zhí)行后的配對的順序是: put:t2 put:t6 put:t0 put:t4 put:t8 , put的順序跟take的順序完全一致.
//非公平
final SynchronousQueue<String> queue = new SynchronousQueue<String>(false);
-------------------------------------------------
t2 begin...
t6 begin...
t0 begin...
t4 begin...
t8 begin...
t1 begin...
t3 begin...
t7 begin...
put:t0
t7 end...
t0 end...
put:t4
t1 end...
put:t8
t3 end...
t4 end...
t8 end...
t5 begin...
t9 begin...
put:t2
t9 end...
t6 end...
put:t6
t5 end...
t2 end...
SynchronousQueue 采用非公平鎖策略時(shí), put線程掛起順序依次是: t2 t6 t0 t4 t8, 4s之后, take線程執(zhí)行后的配對的順序是: put:t0 put:t4 put:t8 put:t2 put:t6 , 很明顯put的順序跟take的順序不一致.
源碼分析
1:公平鎖策略- put / take
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
put 跟 take 方法有都調(diào)用:
調(diào)用transfer 方法:
put : transferer.transfer(e, false, 0)
take: transferer.transfer(null, false, 0);
從前面內(nèi)部結(jié)構(gòu)看: SynchronousQueue 公平策略底層實(shí)際上委托給TransferQueue 鏈表隊(duì)列實(shí)現(xiàn), 而內(nèi)部節(jié)點(diǎn)QNode, 就是鏈表節(jié)點(diǎn)啦, 來看下節(jié)點(diǎn)源碼:
static final class QNode {
volatile QNode next; //下一個(gè)節(jié)點(diǎn)
volatile Object item; //節(jié)點(diǎn)內(nèi)容
volatile Thread waiter; //線程掛起與喚醒控制: park /unpark
//模式控制變量:
//true: 數(shù)據(jù)模式, put操作為true
//false: 請求模式, take操作時(shí)為false
//隊(duì)列隊(duì)列的take操作與put操作, 使用同一個(gè) transfer 方法 , 所以isData變量來區(qū)分是take操作還是put操作
final boolean isData;
//構(gòu)造器
QNode(Object item, boolean isData) {...}
//cas原子操作, 設(shè)置next節(jié)點(diǎn)
boolean casNext(QNode cmp, QNode val) {...}
//cas 原子操作, 設(shè)置及節(jié)點(diǎn)內(nèi)容
boolean casItem(Object cmp, Object val) {...}
//取消節(jié)點(diǎn), 將節(jié)點(diǎn)內(nèi)容設(shè)置為自己
void tryCancel(Object cmp) {...}
//判斷是否操作結(jié)束
boolean isCancelled() {...}
}
此處研究的是公平鎖策略, 所以, 此時(shí)的transfer變量執(zhí)項(xiàng)的是: TransferQueue 類的實(shí)例
E transfer(E e, boolean timed, long nanos) {
QNode s = null;
//e != null, isData 為true , 表示數(shù)據(jù)模式
boolean isData = (e != null);
for (;;) { //非鎖并發(fā)中, 自旋到滿足條件為止
//TransferQueue 初始化時(shí), tail 跟 head 變量執(zhí)行同一個(gè)節(jié)點(diǎn): h
//QNode tail, head --> new QNode(null, false);
QNode t = tail; //尾節(jié)點(diǎn)
QNode h = head; //頭節(jié)點(diǎn)
if (t == null || h == null) //初始化出問題, 跳過
continue;
//h==t 為true時(shí), 表示SynchronousQueue隊(duì)列此時(shí)沒數(shù)據(jù), 可以添加
//t.isData == isData h != t時(shí), 隊(duì)列不為空,需要進(jìn)行判斷此時(shí)為take操作還是put操作
if (h == t || t.isData == isData) {
//進(jìn)入表示put操作 : 隊(duì)列此時(shí)可能為空, 也可能不為空
QNode tn = t.next;
//前面剛設(shè)置t=tail, 此時(shí)若不等,表示其他線程修改tail值, 放棄此次操作,進(jìn)入下次循環(huán)
if (t != tail)
continue;
if (tn != null) {
// tn != null, 表示此時(shí)隊(duì)列已有值, 嘗試推進(jìn)節(jié)點(diǎn)后移
//advanceTail 方法內(nèi)部做了限制, 當(dāng) t == tail 才進(jìn)行
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) //過期,沒必要操作
return null;
if (s == null) //執(zhí)行到這, 表示所有操作合法
//創(chuàng)建節(jié)點(diǎn)
s = new QNode(e, isData);
if (!t.casNext(null, s)) //在鏈表上掛載節(jié)點(diǎn), 若失敗,重來
continue;
//推進(jìn)節(jié)點(diǎn)后移, 嘗試將s設(shè)置為尾節(jié)點(diǎn)
//注意:但且僅當(dāng)t == tail才執(zhí)行
//一旦執(zhí)行, 表示s節(jié)點(diǎn)入列
advanceTail(t, s);
//advanceTail 操作成功之后, awaitFulfill
//awaitFulfill : 此用于自旋/阻塞節(jié)點(diǎn),直到節(jié)點(diǎn)被匹配返回或者取消涮总、中斷胸囱。
Object x = awaitFulfill(s, e, timed, nanos);
//awaitFulfill 方法只有一個(gè)出口: s.item == e, 而出現(xiàn)這情況,
//前提必須是s.tryCancel 方法 ,2種情形調(diào)用: 1>等待時(shí)間到 2>線程中斷
//執(zhí)行到這步, 有幾種可能: 1>線程被中斷 2>等待時(shí)間到 3>有消費(fèi)/生產(chǎn)線程配對了
//當(dāng)x == s 表示線程中斷或等待時(shí)間到
if (x == s) {
//清除s節(jié)點(diǎn)
clean(t, s);
return null;
}
//到這: 表示有消費(fèi)/生產(chǎn)線程配對了
//用于判斷節(jié)點(diǎn)是否已經(jīng)從隊(duì)列中離開了
if (!s.isOffList()) {
//嘗試將s節(jié)點(diǎn)設(shè)置為head,移出t
advanceHead(t, s);
if (x != null)
s.item = s;
// 釋放線程
s.waiter = null;
}
//返回:
return (x != null) ? (E)x : e;
} else {
//能進(jìn)入到這個(gè)分支, 表示等待隊(duì)列中有等待線程
//可能是take1 然后在等put1 也可能是put1在等take1
QNode m = h.next;
//當(dāng)?shù)却?duì)列中有掛起線程, m != null
if (t != tail || m == null || h != head)
continue;
Object x = m.item; //拿到第一個(gè)節(jié)點(diǎn)元素
if (isData == (x != null) ||
x == m || //如果相等表示已經(jīng)有線程跟掛起的線程配置了
!m.casItem(x, e)) { //交換item的內(nèi)容
advanceHead(h, m); //推動頭節(jié)點(diǎn)移動
continue;
}
//推動頭節(jié)點(diǎn)移動
//這里注意: advanceHead 有個(gè)動作 h.next = h
//節(jié)點(diǎn)被匹配之后, next節(jié)點(diǎn)指向自己, 這種節(jié)點(diǎn)會在clean 方法中被刪除
advanceHead(h, m);
//喚醒等待隊(duì)列中被配對的線程
LockSupport.unpark(m.waiter);
//返回
return (x != null) ? (E)x : e;
}
}
}
公平策略源碼操作流程簡化:
總結(jié):
結(jié)合代碼: transferer執(zhí)行流程可歸納為以下:
1: transferer調(diào)用transfer方法實(shí)現(xiàn)SynchronousQueue 公平隊(duì)列的take跟put操作
2:為區(qū)分take與put操作瀑梗, 設(shè)計(jì)控制變量 isData區(qū)分 true:put操作烹笔, fasle:take操作
3:如果隊(duì)列為null或者isData一致(為true), 隊(duì)列嘗試將節(jié)點(diǎn)添加到等待隊(duì)列中, 知道被其他線程匹配, 超時(shí) 或者取消.
4:如果隊(duì)列不為null, 隊(duì)列嘗試配對, 一旦配對成功, 按順序喚醒掛起的線程, 調(diào)用clean方法清除配對節(jié)點(diǎn).
想獲取更多技術(shù)干貨,請前往叩丁狼官網(wǎng):http://www.wolfcode.cn/all_article.html