Netty中Queue的實(shí)現(xiàn)

概述

最近在看Netty的源碼吆视,關(guān)注了下其隊(duì)列的實(shí)現(xiàn)情妖;Netty中基于不同的IO模型瀑志,提供了不同的線程實(shí)現(xiàn):

  1. BIO:ThreadPerChannelEventLoop
    每個(gè)Channel一個(gè)線程贩猎,采用的隊(duì)列為LinkedBlockingQueue
  2. NIO:NioEventLoop(水平觸發(fā))
    每個(gè)線程一個(gè)Selector蜗顽,可以注冊多個(gè)Channel,采用的隊(duì)列為MpscChunkedArrayQueue或MpscLinkedAtomicQueue
  3. Epoll:EpollEventLoop(邊緣觸發(fā))
    和2相同
    那為什么要采用不同的Queue實(shí)現(xiàn)呢血崭?下面看看不同Queue的具體實(shí)現(xiàn);

LinkedBlockingQueue

LinkedBlockingQueue是JDK提供的卧惜,采用鏈表存儲數(shù)據(jù),通過ReentrantLock和Condition來解決競爭和支持堵塞夹纫;

既然采用鏈表咽瓷,鐵定要定義一個(gè)新的節(jié)點(diǎn)類,在LinkedBlockingQueue中這個(gè)節(jié)點(diǎn)類為:

static class Node<E> {
    E item;
    Node<E> next;

    Node(E x) { item = x;}
}

可以看到實(shí)現(xiàn)很簡單舰讹,采用單向鏈接茅姜,通過next指向下一個(gè)節(jié)點(diǎn),如果next為null月匣,表示該節(jié)點(diǎn)為尾節(jié)點(diǎn)钻洒;

LinkedBlockingQueue的成員變量為:

//容量,隊(duì)列是和ArrayList不同锄开,有容量限制
private final int capacity;
//當(dāng)前節(jié)點(diǎn)數(shù)量
private final AtomicInteger count = new AtomicInteger(0);
//頭節(jié)點(diǎn)
private transient Node<E> head;
//尾節(jié)點(diǎn)
private transient Node<E> last;
//出列鎖素标,當(dāng)從隊(duì)列取數(shù)據(jù)時(shí),要先獲取該鎖
private final ReentrantLock takeLock = new ReentrantLock();
//隊(duì)列非空條件變量萍悴,當(dāng)隊(duì)列為空時(shí)头遭,出列線程要等待該條件變量
private final Condition notEmpty = takeLock.newCondition();
//入列鎖寓免,當(dāng)往隊(duì)列添加數(shù)據(jù)時(shí),要先獲取該鎖
private final ReentrantLock putLock = new ReentrantLock();
//隊(duì)列容量未滿條件變量计维,當(dāng)隊(duì)列滿了袜香,入列線程要等待該條件變量
private final Condition notFull = putLock.newCondition();

從上面的成員變量大概可以看出:

  1. 可以設(shè)置容量,但未提供初始容量鲫惶、最大容量之類的特性蜈首;
  2. 先入先出隊(duì)列,入列和出列都要獲取鎖欠母,因此是線程安全的疾就;
  3. 入列和出列分為兩個(gè)鎖;

以其中的入列offer方法為例(由于netty中使用的是Queue而不是BlockingQueue,因此此處分析的都是非堵塞的方法):

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();//參數(shù)非空
    final AtomicInteger count = this.count;//隊(duì)列元素?cái)?shù)量
    if (count.get() == capacity)//隊(duì)列已滿艺蝴,無法添加猬腰,返回false
        return false;
    int c = -1;
    Node<E> node = new Node(e);//將元素封裝為節(jié)點(diǎn)
    final ReentrantLock putLock = this.putLock;
    putLock.lock();//獲取鎖,所有入列操作共有同一個(gè)鎖
    try {
        if (count.get() < capacity) {//只有隊(duì)列不滿猜敢,才能添加
            enqueue(node);//入列
            c = count.getAndIncrement();
            if (c + 1 < capacity)//如果添加元素之后姑荷,隊(duì)列仍然不滿,notFull條件變量滿足條件缩擂,通知排隊(duì)等待的線程
                notFull.signal();
        }
    } finally {
        putLock.unlock();//釋放鎖
    }
    if (c == 0)
        signalNotEmpty();//說明之前隊(duì)列為空鼠冕,因此需要出發(fā)非空條件變量
    return c >= 0;
}

ArrayBlockingQueue

顧名思義,ArrayBlockingQueue是采用數(shù)組存儲數(shù)據(jù)的胯盯;它的成員變量如下:

//數(shù)組懈费,用于存儲數(shù)據(jù)
final Object[] items;
//ArrayBlockingQueue維護(hù)了兩個(gè)索引,一個(gè)用于出列博脑,一個(gè)用于入列
int takeIndex;
int putIndex;
//當(dāng)前隊(duì)列的元素?cái)?shù)量
int count;
//可重入鎖
final ReentrantLock lock;
//隊(duì)列容量非空條件變量憎乙,當(dāng)隊(duì)列空了,出列線程要等待該條件變量
private final Condition notEmpty;
//隊(duì)列容量未滿條件變量叉趣,當(dāng)隊(duì)列滿了泞边,入列線程要等待該條件變量
private final Condition notFull;

從上面可出:

  1. 入列和出列采用同一個(gè)鎖,也就是說入列和出列會彼此競爭鎖疗杉;
  2. 采用索引來記錄當(dāng)前出列和入列的位置阵谚,避免了移動(dòng)數(shù)組元素;
  3. 基于以上2點(diǎn)烟具,在高并發(fā)的情況下梢什,由于鎖競爭,性能應(yīng)該比不上鏈表的實(shí)現(xiàn)朝聋;

MpscChunkedArrayQueue

MpscChunkedArrayQueue也是采用數(shù)組來實(shí)現(xiàn)的嗡午,從名字上可以看出它是支持多生產(chǎn)者單消費(fèi)者( Multi Producer Single Consumer),和前面的兩種隊(duì)列使用場景有些差異;但恰好符合netty的使用場景玖翅;它對特定場景進(jìn)行了優(yōu)化:

  1. CacheLine Padding
    LinkedBlockingQueue的head和last是相鄰的翼馆,ArrayBlockingQueue的takeIndex和putIndex是相鄰的;而我們都知道CPU將數(shù)據(jù)加載到緩存實(shí)際上是按照緩存行加載的,因此可能出現(xiàn)明明沒有修改last金度,但由于出列操作修改了head应媚,導(dǎo)致整個(gè)緩存行失效,需要重新進(jìn)行加載猜极;
//此處我將多個(gè)類中的變量合并到了一起中姜,便于查看
long p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16, p17;
protected long producerIndex;
long p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16, p17;
protected long maxQueueCapacity;
protected long producerMask;
protected E[] producerBuffer;
protected volatile long producerLimit;
protected boolean isFixedChunkSize = false;
long p0, p1, p2, p3, p4, p5, p6, p7;
long p10, p11, p12, p13, p14, p15, p16, p17;
protected long consumerMask;
protected E[] consumerBuffer;
protected long consumerIndex;

可以看到生產(chǎn)者索引和消費(fèi)者索引中間padding了18個(gè)long變量,18*8=144跟伏,而一般操作系統(tǒng)的cacheline為64,可以通過如下方式查看緩存行大小:

cat /sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size
  1. 減少鎖的使用,使用CAS+自旋:
    由于使用鎖會造成線程切換丢胚,消耗資源;因此MpscChunkedArrayQueue并未使用鎖受扳,而是使用自旋携龟;和Disruptor的BusySpinWaitStrategy比較類似,如果系統(tǒng)比較繁忙勘高,自旋效率會很適合峡蟋;當(dāng)然它也會造成CPU使用率比較高,所以建議使用時(shí)將這些線程綁定到特定的CPU;

  2. 支持?jǐn)U容;
    MpscChunkedArrayQueue采用數(shù)組作為內(nèi)部存儲結(jié)構(gòu)华望,那么它是如何實(shí)現(xiàn)擴(kuò)容的呢蕊蝗?可能大家第一反應(yīng)想到的是創(chuàng)建新數(shù)組,然后將老數(shù)據(jù)挪到新數(shù)組中去赖舟;但MpscChunkedArrayQueue采用了一種獨(dú)特的方式蓬戚,避免了數(shù)組的復(fù)制;
    舉例說明:
    假設(shè)隊(duì)列的初始化大小為4宾抓,則初始的buffer數(shù)組為4+1子漩;為什么要+1呢?因?yàn)樽詈笠粋€(gè)元素需要存儲下一個(gè)buffer的指針石洗;假設(shè)隊(duì)列中存儲了8個(gè)元素痛单,則數(shù)組的內(nèi)容如下:

  • buffer
數(shù)組下標(biāo) 0 1 2 3 4
內(nèi)容 e0 e1 e2 JUMP next[5]
  • next
數(shù)組下標(biāo) 5 6 7 8 9
內(nèi)容 e4 e5 JUMP e3 next

可以看到,每個(gè)buffer數(shù)組的大小都是固定的(之前的版本支持固定大小和非固定大芯⑼取)旭绒,也就是initialCapacity指定的大小焦人;每個(gè)數(shù)組的最后一個(gè)實(shí)際保存的是個(gè)指針挥吵,指向下一個(gè)數(shù)組;讀取數(shù)據(jù)時(shí)花椭,如果遇到JUMP表示要從下一個(gè)buffer數(shù)組讀取數(shù)據(jù)忽匈;

public E poll() {//消費(fèi)隊(duì)列元素
    final E[] buffer = consumerBuffer;
    final long index = consumerIndex;
    final long mask = consumerMask;
    //通過Unsafe.getObjectVolatile(E[] buffer, long offset)獲取數(shù)組元素
    //因此需要根據(jù)數(shù)組索引,計(jì)算出在內(nèi)存中的偏移量
    final long offset = modifiedCalcElementOffset(index, mask);
    Object e = lvElement(buffer, offset);
    if (e == null) {
        //e==null并不一定表示隊(duì)列為空,因?yàn)槿肓械臅r(shí)候是先更新producerIndex,后更新數(shù)組元素矿辽,因此需要判斷producerIndex
        if (index != lvProducerIndex()) {
            //采用自旋丹允,直到獲取到數(shù)據(jù)
            do {
                e = lvElement(buffer, offset);
            } while (e == null);
        }
        else {
            return null;
        }
    }
    if (e == JUMP) {//跳轉(zhuǎn)到新的buff尋找
        final E[] nextBuffer = getNextBuffer(buffer, mask);
        return newBufferPoll(nextBuffer, index);
    }
    //從隊(duì)列中取出數(shù)據(jù)之后郭厌,將數(shù)組對應(yīng)位置元素清除
    soElement(buffer, offset, null);
    soConsumerIndex(index + 2);
    return (E) e;
}

性能對比

從網(wǎng)上找了一份測試代碼,稍做修改:

public class TestQueue {
    private static int PRD_THREAD_NUM;
    private static int C_THREAD_NUM=1;

    private static int N = 1<<20;
    private static ExecutorService executor;

    public static void main(String[] args) throws Exception {
        System.out.println("Producer\tConsumer\tcapacity \t LinkedBlockingQueue \t ArrayBlockingQueue \t MpscLinkedAtomicQueue \t MpscChunkedArrayQueue \t MpscArrayQueue");

        for (int j = 1; j < 8; j++) {
            PRD_THREAD_NUM = (int) Math.pow(2, j);
            executor = Executors.newFixedThreadPool(PRD_THREAD_NUM * 2);

            for (int i = 9; i < 12; i++) {
                int length = 1<< i;
                System.out.print(PRD_THREAD_NUM + "\t\t");
                System.out.print(C_THREAD_NUM + "\t\t");
                System.out.print(length + "\t\t");
                System.out.print(doTest2(new LinkedBlockingQueue<Integer>(length), N) + "/s\t\t");
                System.out.print(doTest2(new ArrayBlockingQueue<Integer>(length), N) + "/s\t\t");
                System.out.print(doTest2(new MpscLinkedAtomicQueue<Integer>(), N) + "/s\t\t");
                System.out.print(doTest2(new MpscChunkedArrayQueue<Integer>(length), N) + "/s\t\t");
                System.out.print(doTest2(new MpscArrayQueue<Integer>(length), N) + "/s");
                System.out.println();
            }

            executor.shutdown();
        }
    }

    private static class Producer implements Runnable {
        int n;
        Queue<Integer> q;

        public Producer(int initN, Queue<Integer> initQ) {
            n = initN;
            q = initQ;
        }

        public void run() {
            while (n > 0) {
                if (q.offer(n)) {
                    n--;
                }
            }
        }
    }

    private static class Consumer implements Callable<Long> {
        int n;
        Queue<Integer> q;

        public Consumer(int initN, Queue<Integer> initQ) {
            n = initN;
            q = initQ;
        }

        public Long call() {
            long sum = 0;
            Integer e = null;
            while (n > 0) {
                if ((e = q.poll()) != null) {
                    sum += e;
                    n--;
                }

            }
            return sum;
        }
    }

    private static long doTest2(final Queue<Integer> q, final int n)
            throws Exception {
        CompletionService<Long> completionServ = new ExecutorCompletionService<>(executor);

        long t = System.nanoTime();
        for (int i = 0; i < PRD_THREAD_NUM; i++) {
            executor.submit(new Producer(n / PRD_THREAD_NUM, q));
        }
        for (int i = 0; i < C_THREAD_NUM; i++) {
            completionServ.submit(new Consumer(n / C_THREAD_NUM, q));
        }

        for (int i = 0; i < 1; i++) {
            completionServ.take().get();
        }

        t = System.nanoTime() - t;
        return (long) (1000000000.0 * N / t); // Throughput, items/sec
    }
}

chart.png

從上面可以看到:

  1. Mpsc*Queue表現(xiàn)最好,而且性能表現(xiàn)也最穩(wěn)定雕蔽;
  2. 并發(fā)數(shù)較低的時(shí)候,基于數(shù)組的隊(duì)列比基于鏈表的隊(duì)列表現(xiàn)要好折柠,,推測有可能是因?yàn)閿?shù)組在內(nèi)存中是連續(xù)分配的批狐,因此加載的時(shí)候可以有效利用緩存行扇售,減少讀的次數(shù);而鏈表在內(nèi)存的地址不是連續(xù)的嚣艇,隨機(jī)讀代價(jià)比較大承冰;
  3. 并發(fā)數(shù)較高的時(shí)候,基于鏈表的隊(duì)列比基于數(shù)組的隊(duì)列表現(xiàn)要好食零;LinkedBlockingQueue因?yàn)槿肓泻统隽胁捎貌煌逆i困乒,因此鎖競爭應(yīng)該比ArrayBlockingQueue小贰谣;而MpscLinkedAtomicQueue沒有容量限制顶燕,使用AtomicReference提供的XCHG功能修改鏈接即可達(dá)到出列和入列的目的,效率特別高冈爹;
  4. MpscChunkedArrayQueue相對于MpscArrayQueue涌攻,提供了動(dòng)態(tài)擴(kuò)容大能力型酥;
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末乌企,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子瓢对,更是在濱河造成了極大的恐慌憋肖,老刑警劉巖因痛,帶你破解...
    沈念sama閱讀 211,194評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異岸更,居然都是意外死亡鸵膏,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評論 2 385
  • 文/潘曉璐 我一進(jìn)店門怎炊,熙熙樓的掌柜王于貴愁眉苦臉地迎上來谭企,“玉大人,你說我怎么就攤上這事评肆≌椋” “怎么了?”我有些...
    開封第一講書人閱讀 156,780評論 0 346
  • 文/不壞的土叔 我叫張陵瓜挽,是天一觀的道長盹廷。 經(jīng)常有香客問我,道長久橙,這世上最難降的妖魔是什么俄占? 我笑而不...
    開封第一講書人閱讀 56,388評論 1 283
  • 正文 為了忘掉前任管怠,我火速辦了婚禮,結(jié)果婚禮上缸榄,老公的妹妹穿的比我還像新娘渤弛。我一直安慰自己,他們只是感情好碰凶,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,430評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著鹿驼,像睡著了一般欲低。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上畜晰,一...
    開封第一講書人閱讀 49,764評論 1 290
  • 那天砾莱,我揣著相機(jī)與錄音,去河邊找鬼凄鼻。 笑死腊瑟,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的块蚌。 我是一名探鬼主播闰非,決...
    沈念sama閱讀 38,907評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼峭范!你這毒婦竟也來了财松?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,679評論 0 266
  • 序言:老撾萬榮一對情侶失蹤纱控,失蹤者是張志新(化名)和其女友劉穎辆毡,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體甜害,經(jīng)...
    沈念sama閱讀 44,122評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡舶掖,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,459評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了尔店。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片眨攘。...
    茶點(diǎn)故事閱讀 38,605評論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖嚣州,靈堂內(nèi)的尸體忽然破棺而出期犬,到底是詐尸還是另有隱情,我是刑警寧澤避诽,帶...
    沈念sama閱讀 34,270評論 4 329
  • 正文 年R本政府宣布龟虎,位于F島的核電站,受9級特大地震影響沙庐,放射性物質(zhì)發(fā)生泄漏鲤妥。R本人自食惡果不足惜佳吞,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,867評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望棉安。 院中可真熱鬧底扳,春花似錦、人聲如沸贡耽。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蒲赂。三九已至阱冶,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間滥嘴,已是汗流浹背木蹬。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評論 1 265
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留若皱,地道東北人镊叁。 一個(gè)月前我還...
    沈念sama閱讀 46,297評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像走触,于是被迫代替她去往敵國和親晦譬。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,472評論 2 348

推薦閱讀更多精彩內(nèi)容