阻塞隊列 — LinkedTransferQueue源碼分析

點贊再看硼婿,養(yǎng)成習(xí)慣辜御,公眾號搜一搜【一角錢技術(shù)】關(guān)注更多原創(chuàng)技術(shù)文章撒蟀。本文 GitHub org_hejianhui/JavaStudy 已收錄元践,有我的系列文章揩懒。

前言

LinkedTransferQueue 是一個由鏈表結(jié)構(gòu)組成的無界阻塞傳輸隊列汹忠,它是一個很多隊列的結(jié)合體(ConcurrentLinkedQueue佳窑,LinkedBlockingQueue制恍,SynchronousQueue),在除了有基本阻塞隊列的功能(但是這個阻塞隊列沒有使用鎖)之外神凑;隊列實現(xiàn)了TransferQueue接口重寫了transfer 和 tryTransfer 方法净神,這組方法和SynchronousQueue公平模式的隊列類似,具有匹配的功能耙厚。

LinkedTransferQueue是LinkedBlockingQueue强挫、SynchronousQueue(公平模式)岔霸、ConcurrentLinkedQueue三者的集合體薛躬,它綜合了這三者的方法虫溜,并且提供了更加高效的實現(xiàn)方式饲化。

隊列創(chuàng)建

TransferQueue<String> queue = new LinkedTransferQueue<String>();

應(yīng)用場景

LinkedTransferQueue采用的一種預(yù)占模式翰意。意思就是消費者線程取元素時梦湘,如果隊列為空崎淳,那就生成一個節(jié)點(節(jié)點元素為null)入隊翼雀,然后消費者線程park住徒探,后面生產(chǎn)者線程入隊時發(fā)現(xiàn)有一個元素為null的節(jié)點葬荷,生產(chǎn)者線程就不入隊了坑夯,直接就將元素填充到該節(jié)點岖寞,喚醒該節(jié)點上park住線程,被喚醒的消費者線程拿貨走人柜蜈。這就是預(yù)占的意思:有就拿貨走人仗谆,沒有就占個位置等著,等到或超時淑履。

我們來看一個例子:

package com.niuh.queue.transfer;


import java.util.Random;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;

public class TestLinkedTransferQueue {
    public static void main(String[] args) {
        TransferQueue<String> queue = new LinkedTransferQueue<String>();
        Thread producer = new Thread(new Producer(queue));
        producer.setDaemon(true); // 設(shè)置為守護(hù)進(jìn)程使得線程執(zhí)行結(jié)束后程序自動結(jié)束運行
        producer.start();
        for (int i = 0; i < 10; i++) {
            Thread consumer = new Thread(new Consumer(queue));
            consumer.setDaemon(true);
            consumer.start();
            try {
                // 消費者進(jìn)程休眠一秒鐘隶垮,以便以便生產(chǎn)者獲得CPU,從而生產(chǎn)產(chǎn)品
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

/**
 * 模擬生產(chǎn)者
 */
class Producer implements Runnable {
    private final TransferQueue<String> queue;

    public Producer(TransferQueue<String> queue) {
        this.queue = queue;
    }

    private String produce() {
        return " number " + (new Random().nextInt(100));
    }

    @Override
    public void run() {
        try {
            while (true) {
                if (queue.hasWaitingConsumer()) {
                    queue.transfer(produce());
                }
                TimeUnit.SECONDS.sleep(1);// 生產(chǎn)者睡眠一秒鐘,這樣可以看出程序的執(zhí)行過程
            }
        } catch (InterruptedException e) {
        }
    }
}

/**
 * 模擬消費者
 */
class Consumer implements Runnable {
    private final TransferQueue<String> queue;

    public Consumer(TransferQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            System.out.println(" Consumer " + Thread.currentThread().getName() + queue.take());
        } catch (InterruptedException e) {
        }
    }
}

工作原理

LinkedTransferQueue 使用了一個叫做 dual data structure 的數(shù)據(jù)結(jié)構(gòu)秘噪,或者叫做 dual queue 狸吞,翻譯為雙重數(shù)據(jù)結(jié)構(gòu)或者雙重隊列。

雙重隊列是指:放取元素使用同一個隊列指煎,隊列中的節(jié)點具有兩種模式蹋偏,一種是數(shù)據(jù)節(jié)點,一種是非數(shù)據(jù)節(jié)點至壤。

  • 放元素時先跟隊列頭節(jié)點對比暖侨,如果頭節(jié)點是非數(shù)據(jù)節(jié)點,就讓它們匹配崇渗,如果頭節(jié)點是數(shù)據(jù)節(jié)點字逗,就生產(chǎn)一個數(shù)據(jù)節(jié)點放在隊列尾端(入隊)京郑。
  • 取元素時也是先跟隊列頭節(jié)點對比,如果頭節(jié)點是數(shù)據(jù)節(jié)點葫掉,就讓它們匹配些举,如果頭節(jié)點是非數(shù)據(jù)節(jié)點,就生產(chǎn)一個非數(shù)據(jù)節(jié)點放在隊列尾端(入隊)俭厚。

用圖來來表示如下:



不管是放元素還是取元素户魏,都先跟頭節(jié)點對比,如果二者模式不一樣就匹配它們挪挤,如果二者模式一樣叼丑,就入隊。

源碼分析

定義

LinkedTransferQueue的類繼承關(guān)系如下:



LinkedTransferQueue實現(xiàn)了TransferQueue接口扛门,而TransferQueue接口是繼承自BlockingQueue的鸠信,所以LinkedTransferQueue也是一個阻塞隊列。

其包含的方法定義如下:


TransferQueue接口

對比前面的阻塞隊列论寨,會發(fā)現(xiàn)LinkedTransferQueue 的繼承體系有特殊之處星立。前面的阻塞隊列都直接實現(xiàn)的BlockingQueue接口,在LinkedTransferQueue 卻多了一個TransferQueue 接口葬凳,而該接口繼承至BlockingQueue绰垂。


BlockingQueue 接口代表的是普通的阻塞隊列,TransferQueue 則代表的是另一種特殊阻塞隊列火焰,它是指這樣的一個隊列:當(dāng)生產(chǎn)者向隊列添加元素但隊列已滿時劲装,生產(chǎn)者會被阻塞;當(dāng)消費者從隊列移除元素但隊列為空時昌简,消費者會被阻塞占业。

前面我們分析的SynchronousQueue 不就是有這種特性嗎,但是SynchronousQueue 并沒有實現(xiàn)TransferQueue 接口江场,原因就在于TransferQueue 接口也是在jdk 1.7才出現(xiàn)的纺酸,應(yīng)該是為了和前面的阻塞隊列進(jìn)行區(qū)分,同時為了后面擴充這種特殊的阻塞隊列址否,才加入了TransferQueue 餐蔬,這樣功能才不至于混亂(單一職能原則)。

public interface TransferQueue<E> extends BlockingQueue<E> {
    //立即轉(zhuǎn)交一個元素給消費者佑附,如果沒有等待的消費者樊诺,則返回false(元素不入隊)
    boolean tryTransfer(E e);

    //轉(zhuǎn)交一個元素給消費者,如果沒有等待的消費者音同,則阻塞直到消費者到來词爬,或者發(fā)生異常
    void transfer(E e) throws InterruptedException;

    //轉(zhuǎn)交一個元素給消費者,如果沒有等待的消費者权均,則阻塞直到超時
    boolean tryTransfer(E e, long timeout, TimeUnit unit)throws InterruptedException;

    //是否存在等待的消費者
    boolean hasWaitingConsumer();

    //返回等待的消費者的個數(shù)
    int getWaitingConsumerCount();
}

在SynchronousQueue 中也有類似的方法顿膨,當(dāng)然沒有這么多锅锨,只是以內(nèi)部類的形式存在(TransferQueue、TransferStack)恋沃,而在LinkedTransferQueue 則把這種阻塞操作抽成了接口必搞。

成員屬性

// 判斷是否為多核
private static final boolean MP =Runtime.getRuntime().availableProcessors() > 1;
// 自旋次數(shù)
private static final int FRONT_SPINS   = 1 << 7;
// 前驅(qū)節(jié)點正在處理,當(dāng)前節(jié)點需要自旋的次數(shù)
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
// 容忍清除節(jié)點失敗次數(shù)的閾值
static final int SWEEP_THRESHOLD = 32;

/** 頭節(jié)點 */
transient volatile Node head;
/** 尾節(jié)點 */
private transient volatile Node tail;

/*
*  放取元素的幾種方式囊咏,調(diào)用xfer()方法時需要傳入,區(qū)分不同處理恕洲,
* xfer()方法是LinkedTransferQueue的最核心的方法
*/
// 立即返回,用于非超時的 poll() 和 tryTransfer() 方法中
private static final int NOW   = 0; // for untimed poll, tryTransfer
// 異步梅割,不會阻塞霜第,用于放元素時,因為內(nèi)部使用無界單鏈表存儲元素户辞,不會阻塞放元素的過程
private static final int ASYNC = 1; // for offer, put, add
// 同步泌类,調(diào)用的時候如果沒有匹配到會阻塞直到匹配為止
private static final int SYNC  = 2; // for transfer, take
// 超時,用于有超時的poll() 和 tryTransfer() 方法中
private static final int TIMED = 3; // for timed poll, tryTransfer

注意xfer 者幾個參數(shù)很重要咆课。

  • NOW :表示的是立即末誓,不需要等待的意思扯俱,用于poll和tryTransfer方法书蚪,poll 隊列為空返回,tryTransfer隊列沒有消費者迅栅,構(gòu)造一個空的節(jié)點直接返回殊校,都是不等待的。
  • ASYNC :異步读存,offer, put, add等入隊方法为流,由于是無界隊列,所以不會阻塞让簿。
  • SYNC :同步表示會阻塞敬察,take一個元素,沒有就會阻塞尔当,transfer傳輸莲祸,必須等待消費者來消費。
  • TIMED :帶超時時間的now椭迎,會等待一定的時間后返回锐帜。

主要內(nèi)部類

static final class Node {
    // 是否是數(shù)據(jù)節(jié)點(也就標(biāo)識了是生產(chǎn)者還是消費者)
    final boolean isData;   // false if this is a request node
    // 元素的值
    volatile Object item;   // initially non-null if isData; CASed to match
    // 下一個節(jié)點
    volatile Node next;
    // 持有元素的線程
    volatile Thread waiter; // null until waiting
}

典型的單鏈表結(jié)構(gòu),內(nèi)部除了存儲元素的值和下一個節(jié)點的指針外畜号,還包含了是否為數(shù)據(jù)節(jié)點和持有元素的線程缴阎。內(nèi)部通過 isData 區(qū)分是生產(chǎn)者還是消費者。

構(gòu)造函數(shù)

public LinkedTransferQueue() {
}

public LinkedTransferQueue(Collection<? extends E> c) {
    this();
    addAll(c);
}

只有這兩個構(gòu)造方法简软,且沒有初始容量蛮拔,所以是無界的一個阻塞隊列述暂。

入隊方法

LinkedTransferQueue提供了add、put建炫、offer三類方法贸典,用于將元素放到隊列中。其中三類(4個)方法都是一樣的踱卵,使用異步的方式調(diào)用 xfer() 方法廊驼,傳入的參數(shù)都一模一樣。

注意:我們這里所說的入隊操作是指add,put,offer這幾個方法惋砂,而不是指真正的把節(jié)點入隊的操作妒挎,因為LinkedTransferQueue 中針對的不是數(shù)據(jù),而是操作西饵,操作可能需要入隊酝掩,而這個操作可能是放數(shù)據(jù)操作,也可能是取數(shù)據(jù)操作眷柔,這里注意區(qū)分一下期虾,不要搞混了。

public void put(E e) {
    // 異步模式驯嘱,不會阻塞镶苞,不會超時
    // 因為是放元素,單鏈表存儲鞠评,會一直往后加
    xfer(e, true, ASYNC, 0);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    xfer(e, true, ASYNC, 0);
    return true;
}

public boolean offer(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}

public boolean add(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}

LinkedTransferQueue 是一個由鏈表組成的無界隊列茂蚓,因此不會有容量限制(一定范圍內(nèi)),因此這里入隊的操作都不會阻塞(因此超時入隊方法實際也沒有用)剃幌,也就是說聋涨,入隊后線程會立即返回负乡,這個是參數(shù)ASYNC的作用牍白。

xfer(E e, boolean haveData, int how, long nanos)的參數(shù)分別是:

  1. e表示元素;
  2. haveData表示是否是數(shù)據(jù)節(jié)點抖棘,
  3. how表示放取元素的方式茂腥,上面提到的四種,NOW钉答、ASYNC础芍、SYNC、TIMED数尿;
  4. nanos表示超時時間仑性;

xfer()方法

在看 xfer 方法之前,我們先來了解以下大致流程右蹦,來幫助我們理解诊杆。

LinkedTransferQueue 和 SynchronousQueue 是一樣的歼捐,隊列中主要的不是針對數(shù)據(jù),而是操作(put或take晨汹,注意這里put豹储、take指的是放入數(shù)據(jù)和取數(shù)據(jù)),隊列中即可以存儲入隊操作淘这,也可以存儲出隊操作剥扣,當(dāng)隊列為空時,如果有線程進(jìn)行出隊操作铝穷,那么這個時候隊列是沒有數(shù)據(jù)的钠怯,那么這個操作就會被入隊,同時線程也會阻塞曙聂,直到數(shù)據(jù)的到來(或出現(xiàn)異常)晦炊,如果最開始隊列為空,放入數(shù)據(jù)的操作到來宁脊,那么數(shù)據(jù)就會被放到隊列中断国,此后如果取數(shù)據(jù)操作到來,那么就會從隊列中取出數(shù)據(jù)榆苞,因此可以知道隊列中存放的都是一系列相同的操作(put-放數(shù)據(jù)操作 或 take-取數(shù)據(jù)操作)稳衬。

接下來我們先說 放數(shù)據(jù)操作,那么如果隊列為空语稠,那么直接將數(shù)據(jù)入隊即可宋彼,同時因為是無界隊列弄砍,線程不會阻塞仙畦,直接返回,如果隊列不為空音婶,那么隊列里面可能有兩種情況:

  1. 存放的都是數(shù)據(jù)慨畸。那么本次操作的和隊列中的節(jié)點操作是一樣的,因此直接把數(shù)據(jù)放到隊列末尾衣式,線程返回寸士。
  2. 存放的都是取數(shù)據(jù)操作。那么本次操作和隊列中的節(jié)點操作是不一樣的(也就是匹配的碴卧,放入數(shù)據(jù)操作和取數(shù)據(jù)操作是匹配的弱卡,也就是不同的操作是匹配的,相同的操作是不匹配的)住册,那么就把對頭的節(jié)點出隊婶博,就把本次的數(shù)據(jù)給對頭節(jié)點,同時喚醒該節(jié)點的線程荧飞。
private E xfer(E e, boolean haveData, int how, long nanos) {
    // 不允許放入空元素
    if (haveData && (e == null))
        throw new NullPointerException();
    Node s = null;                        // the node to append, if needed
    // 外層循環(huán)凡人,自旋名党,失敗就重試
    retry:
    for (;;) {                            // restart on append race

        /**
         * 下面這個for循環(huán)用于控制匹配的過程
         * 同一時刻隊列中只會存儲一種類型的節(jié)點
         * 從頭節(jié)點開始嘗試匹配,如果頭節(jié)點被其它線程先一步匹配了挠轴,
         * 就再嘗試其下一個传睹,直到匹配為止,或者到隊列中沒有元素為止
         */
        for (Node h = head, p = h; p != null;) { // find & match first node
            // p節(jié)點的模式
            boolean isData = p.isData;
            // p節(jié)點的值
            Object item = p.item;
            // p沒有被匹配到
            if (item != p && (item != null) == isData) { // unmatched
                // 如果兩者模式一樣岸晦,則不能匹配欧啤,跳出循環(huán)后嘗試入隊
                if (isData == haveData)   // can't match
                    break;
                // 如果兩者模式不一樣,則嘗試匹配
                // 把p的值設(shè)置為e(如果是取元素則e是null启上,如果是放元素則e是元素值)
                if (p.casItem(item, e)) { // match
                    // 匹配成功
                    // for里面的邏輯比較復(fù)雜堂油,用于控制多線程同時放取元素時出現(xiàn)競爭的情況的
                    // 看不懂可以直接跳過
                    for (Node q = p; q != h;) {
                        // 進(jìn)入到這里可能是頭節(jié)點已經(jīng)被匹配,然后p會變成h的下一個節(jié)點 
                        Node n = q.next;  // update by 2 unless singleton
                        // 如果head還沒變碧绞,就把它更新成新的節(jié)點
                        // 并把它刪除(forgetNext()會把它的next設(shè)為自己府框,也就是從單鏈表中刪除了)
                        // 這時為什么要把head設(shè)為n呢?因為到這里了讥邻,肯定head本身已經(jīng)被匹配掉了
                        // 而上面的p.casItem()又成功了迫靖,說明p也被當(dāng)前這個元素給匹配掉了
                        // 所以需要把它們倆都出隊列,讓其它線程可以從真正的頭開始兴使,不用重復(fù)檢查了
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        // 如果新的頭節(jié)點為空系宜,或者其next為空,或者其next未匹配发魄,就重試
                        if ((h = head)   == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    // 喚醒p中等待的線程
                    LockSupport.unpark(p.waiter);
                    // 并返回匹配到的元素
                    return LinkedTransferQueue.<E>cast(item);
                }
            }
            // p已經(jīng)被匹配了或者嘗試匹配的時候失敗了
            // 也就是其它線程先一步匹配了p
            // 這時候又分兩種情況盹牧,p的next還沒來得及修改,p的next指向了自己
            // 如果p的next已經(jīng)指向了自己励幼,就重新取head重試汰寓,否則就取其next重試
            Node n = p.next;
            p = (p != n) ? n : (h = head); // Use head if p offlist
        }

        // 到這里肯定是隊列中存儲的節(jié)點類型和自己一樣 或者 隊列中沒有元素了,就入隊(不管放元素還是取元素都得入隊)
        // 入隊又分成四種情況:
        // NOW苹粟,立即返回有滑,沒有匹配到立即返回,不做入隊操作
        // ASYNC嵌削,異步毛好,元素入隊但當(dāng)前線程不會阻塞(相當(dāng)于無界LinkedBlockingQueue的元素入隊)
        // SYNC,同步苛秕,元素入隊后當(dāng)前線程阻塞肌访,等待被匹配到
        // TIMED,有超時艇劫,元素入隊后等待一段時間被匹配吼驶,時間到了還沒匹配到就返回元素本身

        // 如果不是立即返回
        if (how != NOW) {                 // No matches available
            // 新建s節(jié)點
            if (s == null)
                s = new Node(e, haveData);
            // 嘗試入隊
            Node pred = tryAppend(s, haveData);
            // 入隊失敗,重試
            if (pred == null)
                continue retry;           // lost race vs opposite mode
            // 如果不是異步(同步或者有超時)
            // 就等待被匹配
            if (how != ASYNC)
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting
    }
}

對于這里的操作 (放入數(shù)據(jù)) ,尋找匹配節(jié)點:

  • 如果找到了旨剥,就設(shè)置item值咧欣,然后unpark匹配節(jié)點的waiter線程,返回(其實就是看看隊列里面的操作是不是取數(shù)據(jù)操作)轨帜,否則就入隊(NOW直接返回)魄咕;
  • 如果沒有找到匹配節(jié)點,則根據(jù)傳入的how來處理蚌父,NOW直接返回哮兰,其余三種先入隊,入隊后如果是ASYNC則返回苟弛,SYNC和TIMED則會阻塞等待匹配喝滞。

我們再看看里面的部分方法:

 /**
  * Returns true if this node has been matched, including the
  * case of artificial matches due to cancellation.
  */
 final boolean isMatched() {
     Object x = item;
     return (x == this) || ((x == null) == isData);
 }

如果操作節(jié)點已經(jīng)被匹配了,那么item會被改變膏秫,對于取數(shù)據(jù)操作右遭,那么item會被設(shè)置成數(shù)據(jù),如果操作被取消了缤削,那么會設(shè)置item為this窘哈。

/**
 * Links node to itself to avoid garbage retention.  Called
 * only after CASing head field, so uses relaxed write.
 */
final void forgetNext() {
    UNSAFE.putObject(this, nextOffset, this);
}

forgetNext 設(shè)置為 next 為自身,也就是脫離鏈表亭敢,同時方便gc回收自己滚婉。

接下來我們再看看入隊調(diào)用的 tryAppend 方法:

private Node tryAppend(Node s, boolean haveData) {
    // 從tail開始遍歷,把s放到鏈表尾端
    for (Node t = tail, p = t;;) {        // move p to last node and append
        Node n, u;                        // temps for reads of next & tail
        // 如果首尾都是null帅刀,說明鏈表中還沒有元素
        if (p == null && (p = head) == null) {
            // 就讓首節(jié)點指向s
            // 注意让腹,這里插入第一個元素的時候tail指針并沒有指向s
            if (casHead(null, s))
                return s;                 // initialize
        }
        else if (p.cannotPrecede(haveData))
            // 如果p無法處理,則返回null
            // 這里無法處理的意思是扣溺,p和s節(jié)點的類型不一樣骇窍,不允許s入隊
            // 比如,其它線程先入隊了一個數(shù)據(jù)節(jié)點娇妓,這時候要入隊一個非數(shù)據(jù)節(jié)點像鸡,就不允許,
            // 隊列中所有的元素都要保證是同一種類型的節(jié)點
            // 返回null后外面的方法會重新嘗試匹配重新入隊等
            return null;                  // lost race vs opposite mode
        else if ((n = p.next) != null)    // not last; keep traversing
            // 如果p的next不為空哈恰,說明不是最后一個節(jié)點
            // 則讓p重新指向最后一個節(jié)點
            p = p != t && t != (u = tail) ? (t = u) : // stale tail
                (p != n) ? n : null;      // restart if off list
        else if (!p.casNext(null, s))
            // 如果CAS更新s為p的next失敗
            // 則說明有其它線程先一步更新到p的next了
            // 就讓p指向p的next,重新嘗試讓s入隊
            p = p.next;                   // re-read on CAS failure
        else {
            // 到這里說明s成功入隊了
            // 如果p不等于t志群,就更新tail指針
            // 還記得上面插入第一個元素時tail指針并沒有指向新元素嗎着绷?
            // 這里就是用來更新tail指針的
            if (p != t) {                 // update if slack now >= 2
                while ((tail != t || !casTail(t, s)) &&
                       (t = tail)   != null &&
                       (s = t.next) != null && // advance and retry
                       (s = s.next) != null && s != t);
            }
            // 返回p,即s的前一個元素
            return p;
        }
    }
}

這個操作入隊(把節(jié)點鏈接接到鏈表末尾)看上去有點復(fù)雜锌云,主要原因還是沒有使用鎖荠医,存在很多并發(fā)情況下,有可能自己在添加節(jié)點入隊的時候,其它線程已經(jīng)把隊列改變了彬向,那么這個時候就需要重新找到隊尾兼贡,進(jìn)行提那件操作,添加成功后娃胆,也需要設(shè)置隊尾指針遍希,這個時候隊尾指針可能也被其它線程設(shè)置了,那么這個時候自己也要保證隊尾指針是正確的(遍歷驗證)里烦。

在上面看到有個方法:p.cannotPrecede(haveData)凿蒜,如果數(shù)據(jù)不符合要求,那么是不會入隊的胁黑。

/**
 * Returns true if a node with the given mode cannot be
 * appended to this node because this node is unmatched and
 * has opposite data mode.
 */
final boolean cannotPrecede(boolean haveData) {
    boolean d = isData;
    Object x;
    return d != haveData && (x = item) != this && (x != null) == d;
}

這個就是驗證操作和其它數(shù)據(jù)節(jié)點的數(shù)據(jù)是否吻合的废封。

awaitMatch 這個我們在出隊來分析,因為放數(shù)據(jù)的過程是不會阻塞的丧蘸,當(dāng)然也更不會執(zhí)行該方法漂洋。

出隊方法

LinkedTransferQueue提供了poll、take力喷、remove方法用于出列元素氮发,出隊的三類(4個)方法也是直接或間接的調(diào)用xfer()方法,放取元素的方式和超時規(guī)則略微不同冗懦,本質(zhì)沒有大的區(qū)別爽冕。

注意:同上披蕉,這里的出隊操作,指的是poll眯娱、take方法,而不是真正指的是出隊操作徙缴,因為poll嘁信、take操作也可能會入隊(隊列針對的是操作于样,不是數(shù)據(jù))。

public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}
public E take() throws InterruptedException {
    // 同步模式潘靖,會阻塞直到取到元素
    E e = xfer(null, false, SYNC, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 有超時時間
    E e = xfer(null, false, TIMED, unit.toNanos(timeout));
    if (e != null || !Thread.interrupted())
        return e;
    throw new InterruptedException();
}

public E poll() {
    // 立即返回穿剖,沒取到元素返回null
    return xfer(null, false, NOW, 0);
}

通過上面一系列的方法,我們看到卦溢,其實poll糊余、take內(nèi)部調(diào)用的仍然是xfer 方法秀又,因為是取數(shù)據(jù),因此參數(shù)部分發(fā)生了變化贬芥,這個注意一下吐辙。

xfer()方法

private E xfer(E e, boolean haveData, int how, long nanos) {
     if (haveData && (e == null))
         throw new NullPointerException();
     Node s = null;                        // the node to append, if needed
     retry:
     for (;;) {                            // restart on append race
         //這里是取數(shù)據(jù)操作,那么遍歷隊列看看有沒有匹配的操作(即放數(shù)據(jù)操作)
         for (Node h = head, p = h; p != null;) { // find & match first node
             boolean isData = p.isData;
             Object item = p.item; 
             if (item != p && (item != null) == isData) { // unmatched
                 if (isData == haveData)   // can't match
                     break;
                 /**
                 * 隊列里面確實都是放數(shù)據(jù)的操作蘸劈,則和當(dāng)前操作是匹配的
                 * 設(shè)置匹配操作節(jié)點的item域為null (e為null昏苏,原本item 域是數(shù)據(jù))
                 */
                 if (p.casItem(item, e)) { // match
                     // 協(xié)助推進(jìn)head,這個和上面是一樣的
                     for (Node q = p; q != h;) {
                         Node n = q.next;  // update by 2 unless singleton
                         if (head == h && casHead(h, n == null ? q : n)) {
                             h.forgetNext();
                             break;
                         }                 // advance and retry
                         if ((h = head)   == null ||
                             (q = h.next) == null || !q.isMatched())
                             break;        // unless slack < 2
                     }
                     // 喚醒阻塞線程(實際這里p.waiter是為null的,因為放數(shù)據(jù)操作是非阻塞的)
                     LockSupport.unpark(p.waiter);
                     // item線程是數(shù)據(jù)昵时,本次操作是取數(shù)據(jù)操作捷雕,因此返回數(shù)據(jù)
                     return LinkedTransferQueue.<E>cast(item);
                 }
             }
             Node n = p.next;
             p = (p != n) ? n : (h = head); // Use head if p offlist
         }
         // 如果參數(shù)指定為NOW,那么就算沒有被匹配壹甥,那么還是不入隊救巷,直接返回
         if (how != NOW) {                 // No matches available
             if (s == null)
                 s = new Node(e, haveData);
             // 添加節(jié)點    
             Node pred = tryAppend(s, haveData);
             if (pred == null)
                 continue retry;           // lost race vs opposite mode
             /**
             * 如果參數(shù)不是ASYC的這種,這可能需要阻塞等待
             * 取數(shù)據(jù)操作其參數(shù)都不是ASYNC句柠,因此如果沒有取到數(shù)據(jù)(被匹配)浦译,那么就可能進(jìn)行阻塞等待
             */    
             if (how != ASYNC)
                 return awaitMatch(s, pred, e, (how == TIMED), nanos);
         }
         return e; // not waiting
     }
 }

在這里我們分析的是 取數(shù)據(jù)操作 ,因為有了前面 放數(shù)據(jù)操作 的分析溯职,這里應(yīng)該還是很好理解精盅,取數(shù)據(jù)和放數(shù)據(jù)都是差不多的,都是和隊列里面的操作進(jìn)行匹配谜酒,如果隊列里面的操作是取數(shù)據(jù)操作僻族,本次操作是取數(shù)據(jù)操作,那么此時是不匹配的蝌数,需要把本次操作入隊(參數(shù):NOW顶伞、ASYNC唆貌、SYNC挠锥、TIMED 不一樣)侨赡,如果隊列的操作都是放數(shù)據(jù)操作蓖宦,本次操作是取數(shù)據(jù)操作稠茂,那么這個是匹配的睬关,就把對頭的數(shù)據(jù)取出來毡证,返回即可料睛。

下面我們來看看 awaitMatch 方法:

private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
    // 如果是有超時的恤煞,計算其超時時間
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 當(dāng)前線程
    Thread w = Thread.currentThread();
    // 自旋次數(shù)
    int spins = -1; // initialized after first item and cancel checks
    // 隨機數(shù)居扒,隨機讓一些自旋的線程讓出CPU
    ThreadLocalRandom randomYields = null; // bound if needed

    for (;;) {
        Object item = s.item;
        // 如果s元素的值不等于e喜喂,說明它被匹配到了
        if (item != e) {                  // matched
            // assert item != s;
            // 把s的item更新為s本身
            // 并把s中的waiter置為空
            s.forgetContents();           // avoid garbage
            // 返回匹配到的元素
            return LinkedTransferQueue.<E>cast(item);
        }
        // 如果當(dāng)前線程中斷了姻灶,或者有超時的到期了
        // 就更新s的元素值指向s本身
        if ((w.isInterrupted() || (timed && nanos <= 0)) &&
                s.casItem(e, s)) {        // cancel
            // 嘗試解除s與其前一個節(jié)點的關(guān)系
            // 也就是刪除s節(jié)點
            unsplice(pred, s);
            // 返回元素的值本身产喉,說明沒匹配到
            return e;
        }
        
        // 如果自旋次數(shù)小于0曾沈,就計算自旋次數(shù)
        if (spins < 0) {                  // establish spins at/near front
            // spinsFor()計算自旋次數(shù)
            // 如果前面有節(jié)點未被匹配就返回0
            // 如果前面有節(jié)點且正在匹配中就返回一定的次數(shù)塞俱,等待
            if ((spins = spinsFor(pred, s.isData)) > 0)
                // 初始化隨機數(shù)
                randomYields = ThreadLocalRandom.current();
        }
        else if (spins > 0) {             // spin
            // 還有自旋次數(shù)就減1
            --spins;
            // 并隨機讓出CPU
            if (randomYields.nextInt(CHAINED_SPINS) == 0)
                Thread.yield();           // occasionally yield
        }
        else if (s.waiter == null) {
            // 更新s的waiter為當(dāng)前線程
            s.waiter = w;                 // request unpark then recheck
        }
        else if (timed) {
            // 如果有超時障涯,計算超時時間唯蝶,并阻塞一定時間
            nanos = deadline - System.nanoTime();
            if (nanos > 0L)
                LockSupport.parkNanos(this, nanos);
        }
        else {
            // 不是超時的,直接阻塞鼓蜒,等待被喚醒
            // 喚醒后進(jìn)入下一次循環(huán)都弹,走第一個if的邏輯就返回匹配的元素了
            LockSupport.park(this);
        }
    }
}

這里和 SynchronousQueue 的 awaitFulfill 差不多畅厢,主要進(jìn)行了自旋或详,如果自旋后霸琴,仍然沒有被匹配或者取消昭伸,則進(jìn)行阻塞(如果設(shè)置了超時阻塞庐杨,則進(jìn)行一段時間的阻塞)灵份,如果發(fā)生了中斷異常,會取消該操作弦聂,改變item的值莺葫,匹配成功后也會更改item的值捺檬,因此如果item和原來的值不想等時堡纬,則說明發(fā)生了改變,返回即可饺饭。

在 awaitMatch 過程中职车,如果線程被中斷了悴灵,或者超時了則會調(diào)用 unsplice() 方法去除該節(jié)點。

final void unsplice(Node pred, Node s) {
    //清除s的部分?jǐn)?shù)據(jù)
    s.forgetContents(); // forget unneeded fields

    if (pred != null && pred != s && pred.next == s) {
        Node n = s.next;
        if (n == null ||
            (n != s && pred.casNext(s, n) && pred.isMatched())) {
            /**
            *這個for循環(huán)登下,用于推進(jìn)head,如果head已經(jīng)被匹配了被芳,則需要更新head
            */
            for (;;) {               // check if at, or could be, head
                Node h = head;
                if (h == pred || h == s || h == null)
                    return;          // at head or list empty
                 //h 沒有被匹配剩晴,跳出循環(huán)侵状,否則可能需要更新head  
                if (!h.isMatched())
                    break;
                Node hn = h.next;
                //遍歷結(jié)束了绽左,退出循環(huán)
                if (hn == null)
                    return;          // now empty
                //head 被匹配了拼窥,重新設(shè)置head    
                if (hn != h && casHead(h, hn))
                    h.forgetNext();  // advance head
            }
            //s節(jié)點被移除后闯团,需要記錄刪除的操作次數(shù)房交,如果超過閥值候味,則需要清理隊列
            if (pred.next != pred && s.next != s) { // recheck if offlist
                for (;;) {           // sweep now if enough votes
                    int v = sweepVotes;
                    //沒超過閥值,則遞增記錄值
                    if (v < SWEEP_THRESHOLD) {
                        if (casSweepVotes(v, v + 1))
                            break;
                    }
                    else if (casSweepVotes(v, 0)) {
                        //重新設(shè)置記錄數(shù)尚胞,并清理隊列
                        sweep();
                        break;
                    }
                }
            }
        }
    }
}
private void sweep() {
    for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
        if (!s.isMatched()) // s節(jié)點未被匹配,則繼續(xù)向后遍歷
            // Unmatched nodes are never self-linked
            p = s;
        else if ((n = s.next) == null) //s節(jié)點被匹配粱玲,但是是尾節(jié)點抽减,則退出循環(huán)
            //s為尾結(jié)點卵沉,則可能其它線程剛好匹配完,所有這里不移除s琼掠,讓其它匹配線程操作
            break;
        else if (s == n)    // stale s節(jié)點已經(jīng)脫離了隊列了眉枕,重頭開始遍歷
            // No need to also check for p == s, since that implies s == n
            p = head;
        else
            p.casNext(s, n); //移除s節(jié)點
    }
}

看看這個移除操作也挺復(fù)雜的速挑,這里并沒有簡單的就將節(jié)點移除就ok姥宝,同時還檢查了隊列 head 的有效性腊满,如果 head 被匹配了碳蛋,則會推薦 head肃弟,保持隊列 head 是有效的穷缤。如果移除節(jié)點的前驅(qū)節(jié)點也失效了津肛,說明其它線程在操作身坐,這里就不操作了掀亥,當(dāng)移除了節(jié)點后,需要記錄移除節(jié)點的操作次數(shù) sweepVotes嘹害,如果這個值超過了閥值笔呀,則會對隊列進(jìn)行清理(移除那些失效的節(jié)點)许师。

移交元素的方法

請注意第二個參數(shù)微渠,都是true逞盆,也就是這三個方法其實也是放元素的方法

// 立即轉(zhuǎn)交一個元素給消費者云芦,如果此時隊列沒有消費者贸桶,那就false
public boolean tryTransfer(E e) {
    // 立即返回
    return xfer(e, true, NOW, 0) == null;
}

// 轉(zhuǎn)交一個元素給消費者,如果此時隊列沒有消費者琉历,那就阻塞
public void transfer(E e) throws InterruptedException {
    // 同步模式
    if (xfer(e, true, SYNC, 0) != null) {
        Thread.interrupted(); // failure possible only due to interrupt
        throw new InterruptedException();
    }
}

public boolean tryTransfer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    // 有超時時間
    if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
        return true;
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}

獲取隊列首個有效操作

在SynchronousQueue 中其隊列是無法遍歷的灼捂,而且也無法獲取對頭信息悉稠,但是在 LinkedTransferQueue 卻不一樣,LinkedTransferQueue 可以獲取隊頭艘包,也可以進(jìn)行遍歷 的猛。

peek() 方法

public E peek() {
    return firstDataItem();
}
private E firstDataItem() {
    //遍歷隊列,查找第一個有效的操作節(jié)點
    for (Node p = head; p != null; p = succ(p)) {
        Object item = p.item;
        //如果該節(jié)點是數(shù)據(jù)節(jié)點想虎,同時沒有被取消卦尊,則返回數(shù)據(jù)
        if (p.isData) {
            if (item != null && item != p)
                return LinkedTransferQueue.<E>cast(item);
        }
        else if (item == null)// 非數(shù)據(jù)節(jié)點返回null,這里注意
            return null;
    }
    return null;
}
final Node succ(Node p) {  //如果節(jié)點p 失效則返回head,否則返回p的后繼
    Node next = p.next;
    return (p == next) ? head : next;
}

這個 peek() 方法返回的是隊列的第一個有效的節(jié)點舌厨,而這個節(jié)點可能是數(shù)據(jù)節(jié)點岂却,也可能是取數(shù)據(jù)的操作節(jié)點,那么peek可能返回數(shù)據(jù),也可能返回null,但是返回null正驻,并不一定是隊列為空氓栈,也可能是隊列里面都是取數(shù)據(jù)的操作節(jié)點提完,這個需要注意一下。

總結(jié)

  1. LinkedTransferQueue 可以看在是 LinkedBlockingQueue挪捕、SynchronousQueue(公平模式)奏纪、ConcurrentLinkedQeueue 三者的集合體兔簇;
  2. LinkedTransferQueue 的實現(xiàn)方式使用一種叫做 “雙重隊列” 的數(shù)據(jù)結(jié)構(gòu)此虑;
  3. 不管是取元素還是放元素都會入隊韭寸;
  4. 先嘗試跟頭節(jié)點比較椰拒,如果二者模式不一樣,就匹配它們,組成CP践啄,然后返回對方的值聂儒;
  5. 如果二者模式一樣非春,就入隊储耐,并自旋或阻塞等待被喚醒闽撤;
  6. 至于是否入隊及阻塞有四種方式:NOW闸餐、ASYNC场勤、SYNC、TIMED留瞳;
  7. LinkedTransferQueue 全程都沒有使用 synchronized硬梁、重入鎖等比較中的鎖跃巡,基本是通過 自旋 + CAS 實現(xiàn)苍狰;
  8. 對于入隊后胡野,先自旋一定次數(shù)后再調(diào)用 LockSupport.park() 或 LockSupport.parkNanos() 阻塞。

LinkedTransferQueue和SynchronousQueue(公平模式)區(qū)別

  • LinkedTransferQueue 和SynchronousQueue 其實基本是差不多的递览,兩者都是無鎖帶阻塞功能的隊列,都是使用的雙重隊列痹栖;
  • SynchronousQueue 通過內(nèi)部類Transferer 來實現(xiàn)公平和非公平隊列南捂,在LinkedTransferQueue 中沒有公平與非公平的區(qū)分溺健;
  • LinkedTransferQueue 實現(xiàn)了TransferQueue接口岭辣,該接口定義的是帶阻塞操作的操作躏精,相比SynchronousQueue 中的Transferer 功能更豐富瞭吃。
  • SynchronousQueue 中放數(shù)據(jù)操作和取數(shù)據(jù)操作都是阻塞的抠刺,當(dāng)隊列中的操作和本次操作不匹配時,線程會阻塞罕容,直到匹配的操作到來妨马。LinkedTransferQueue 是無界隊列,放數(shù)據(jù)操作不會阻塞杀赢,取數(shù)據(jù)操作如果沒有匹配操作可能會阻塞,通過參數(shù)決定是否阻塞(ASYNC,SYNC,NOW,TIMED)湘纵。

PS:以上代碼提交在 Githubhttps://github.com/Niuh-Study/niuh-juc-final.git

文章持續(xù)更新脂崔,可以公眾號搜一搜「 一角錢技術(shù) 」第一時間閱讀, 本文 GitHub org_hejianhui/JavaStudy 已經(jīng)收錄梧喷,歡迎 Star砌左。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末脖咐,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子汇歹,更是在濱河造成了極大的恐慌屁擅,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,744評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件产弹,死亡現(xiàn)場離奇詭異派歌,居然都是意外死亡,警方通過查閱死者的電腦和手機痰哨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,505評論 3 392
  • 文/潘曉璐 我一進(jìn)店門胶果,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人斤斧,你說我怎么就攤上這事早抠。” “怎么了撬讽?”我有些...
    開封第一講書人閱讀 163,105評論 0 353
  • 文/不壞的土叔 我叫張陵蕊连,是天一觀的道長。 經(jīng)常有香客問我游昼,道長甘苍,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,242評論 1 292
  • 正文 為了忘掉前任酱床,我火速辦了婚禮羊赵,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘扇谣。我一直安慰自己昧捷,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,269評論 6 389
  • 文/花漫 我一把揭開白布罐寨。 她就那樣靜靜地躺著靡挥,像睡著了一般。 火紅的嫁衣襯著肌膚如雪鸯绿。 梳的紋絲不亂的頭發(fā)上魄藕,一...
    開封第一講書人閱讀 51,215評論 1 299
  • 那天,我揣著相機與錄音愚争,去河邊找鬼渊跋。 笑死,一個胖子當(dāng)著我的面吹牛舷手,可吹牛的內(nèi)容都是我干的拧簸。 我是一名探鬼主播,決...
    沈念sama閱讀 40,096評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼男窟,長吁一口氣:“原來是場噩夢啊……” “哼盆赤!你這毒婦竟也來了贾富?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,939評論 0 274
  • 序言:老撾萬榮一對情侶失蹤牺六,失蹤者是張志新(化名)和其女友劉穎颤枪,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體淑际,經(jīng)...
    沈念sama閱讀 45,354評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡畏纲,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,573評論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了庸追。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片霍骄。...
    茶點故事閱讀 39,745評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖淡溯,靈堂內(nèi)的尸體忽然破棺而出读整,到底是詐尸還是另有隱情,我是刑警寧澤咱娶,帶...
    沈念sama閱讀 35,448評論 5 344
  • 正文 年R本政府宣布米间,位于F島的核電站,受9級特大地震影響膘侮,放射性物質(zhì)發(fā)生泄漏屈糊。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,048評論 3 327
  • 文/蒙蒙 一琼了、第九天 我趴在偏房一處隱蔽的房頂上張望逻锐。 院中可真熱鬧,春花似錦雕薪、人聲如沸昧诱。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,683評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽盏档。三九已至,卻和暖如春燥爷,著一層夾襖步出監(jiān)牢的瞬間蜈亩,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,838評論 1 269
  • 我被黑心中介騙來泰國打工前翎, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留稚配,地道東北人。 一個月前我還...
    沈念sama閱讀 47,776評論 2 369
  • 正文 我出身青樓港华,卻偏偏與公主長得像药有,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,652評論 2 354