LinkedBlockingQueue和ConcurrentLinkedQueue的選擇問題

隊列是JAVA開發(fā)過程中一種非常常見的數據結構,尤其是需要再使用生產者-消費者的業(yè)務模型中颗味,Queue常常作為多線程執(zhí)行任務的數據交界點直奋,從而保證生產者產生的數據能夠依次被消費诗越。

阻塞隊列的選擇

阻塞隊列的實現包括ArrayBlockingQueue與LinkedBlockingQueue厘熟。相同點不做贅述,區(qū)別有以下幾點:
1.初始化時嘀趟,ArrayBlockingQueue必須指定隊列最大容量脐区,LinkedBlockingQueue不強制指定,若不指定她按,默認Interger.Max為最大容量牛隅。
2.ArrayBlockingQueue內部數據結構是數組:Element[],通過putIndex和takeIndex下標的循環(huán)移動控制隊首和隊尾酌泰;LinkedBlockingQueue內部結構是鏈表:Node<Element>媒佣,通過head 和 tail節(jié)點控制隊首和隊尾。
3.ArrayBlockingQueue生產與消費之間共用一把鎖陵刹,而LinkedBlockingQueue生產與消費時用不同的鎖競爭默伍。

對于阻塞隊列的選擇,一方面考慮吞吐性能,另一方面考慮內存占用也糊。

我們可以看到上面說的第三點炼蹦,可以確定的是,在多生產者與多消費者的情況下狸剃,LinkedBlockingQueue的吞吐性能肯定是要更高的掐隐,而且ArrayBlockingQueue在初始化時直接就申請了一片連續(xù)的內存空間。所以在實際生產使用環(huán)境中钞馁,沒有特殊限制考慮瑟枫,我們在使用阻塞隊列時往往用LinkedBlockingQueue。

那什么場景下我們會偏向于使用ArrayBlockingQueue呢指攒?
  • 生產者與消費者之間沒有太大競爭,傾向于單消費者僻焚,單生產者允悦,且兩者之間沖突較小,這種情況下數組尋址是明顯要比鏈表去指向next的操作要更快的
  • 基本可以確定隊列大小虑啤,且隊列大小穩(wěn)定在一定的數量隙弛,這個時候數組占用內存是比鏈表小的

阻塞隊列與非阻塞隊列的選擇

首先,ConcurrentLinkedQueue相對阻塞隊列來說狞山,采用的是CAS無鎖操作全闷,沒有take和put方法,主用poll與offer萍启,無界总珠。有人說,既然此隊列內部進隊和出隊操作采用的是無鎖勘纯,那性能肯定比有鎖的BlockingQueue強局服,那BlockingQueue還有啥用武之地,其實不然驳遵,有些時候我們就需要線程進入阻塞狀態(tài)而非不斷自旋消耗CPU淫奔,我們可以歸類以下場景:

  • 數據入隊速度過快,出隊速度過慢堤结,這個時候ConcurrentLinkedQueue如果不借助其他限制手段唆迁,隨著時間的推移,JVM必然會進行頻繁的FULL GC ,嚴重的情況下甚至會發(fā)生OOM竞穷。使用BlockingQueue可以更好的控制內存的狀況唐责。
  • 數據入隊速度過慢,出隊速度過快来庭,這個時候消費者線程如果一定想要拿到數據而不進行阻塞妒蔚,將進入大量時間的自旋狀態(tài),白白浪費CPU資源。
  • 入隊與出隊速度相仿肴盏。
    這時候要考慮速度科盛,有多少個線程在同時做操作,線程操作的頻率如何菜皂?
    在大部分場景下贞绵,ConcurrentLinkedQueue的性能是要比BlockingQueue要好的,注意是大部分恍飘,如果線程之間的競爭足夠又高又快榨崩,CAS操作的CPU消耗以及線程操作的成功率是極低的,這個時候是會反而不如用鎖競爭控制效率來的高章母。
    我們寫了個測試類可以大致看下觀感下母蛛,在同樣的環(huán)境下,消費者與生產者在不斷對隊列進行操作乳怎,然后不斷增加消費者與生產者內部線程的數量彩郊。
package algorithm;

import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.LongAdder;

public class TestBlockingQueue {

    BlockingQueue<Data> linkedBlockingQueue = new LinkedBlockingQueue<>(100);
//    BlockingQueue<Data> linkedBlockingQueue = new ArrayBlockingQueue<>(100);
    LongAdder longAdder = new LongAdder();
    Producer producer;
    Consumer consumer;

    static class Data {
        String msg;

        public Data(String msg) {
            this.msg = msg;
        }
    }

    TestBlockingQueue(int size) {
        producer = new Producer(size);
        consumer = new Consumer(size);
    }

    public void startTest() {
        producer.startProduce();
        consumer.startConsume();
    }

    public long stopTestAndReturn() {
        producer.stopProduce();
        consumer.stopConsume();
        return longAdder.longValue();
    }


    class Producer{

        ExecutorService service;
        List<ProduceWorker> workers = new LinkedList<>();
        Producer(int concurrentNum) {
            service = Executors.newFixedThreadPool(concurrentNum);
            for(int i = 0; i < concurrentNum; i++) {
                workers.add(new ProduceWorker());
            }
        }

        public void startProduce() {
            workers.forEach(worker -> service.execute(worker));
        }

        public void stopProduce() {
            service.shutdownNow();
        }

        class ProduceWorker implements Runnable {

            @Override
            public void run() {
                for(;;) {
                    if(!Thread.currentThread().isInterrupted()) {
                        try {
                            linkedBlockingQueue.put(new Data(randomString(10)));
                        } catch (InterruptedException e) {
//                        e.printStackTrace();
                        }
                    } else {
                        break;
                    }
                }
            }
        }

    }

    class Consumer{
        ExecutorService service;

        List<ConsumeWork> workers = new LinkedList<>();

        Consumer(int concurrentNum) {
            service = Executors.newFixedThreadPool(concurrentNum);
            for(int i = 0; i < concurrentNum; i++) {
                workers.add(new ConsumeWork());
            }
        }

        class ConsumeWork implements Runnable {

            @Override
            public void run() {
                for(;;) {
                    if(!Thread.currentThread().isInterrupted()) {
                        try {
                            Data data = linkedBlockingQueue.take();
                            if (null != data) {
                                longAdder.increment();
                            }
                        } catch (InterruptedException e) {
//                        e.printStackTrace();
                        }
                    } else {
                        break;
                    }
                }
            }
        }

        public void startConsume() {
            workers.forEach(worker -> service.execute(worker));
        }

        public void stopConsume() {
            service.shutdownNow();
        }
    }


    public static void main(String[] args) {
        long timeStart = System.currentTimeMillis();
        System.out.println(timeStart);
        TestBlockingQueue testQueue = new TestBlockingQueue(4);
        testQueue.startTest();
        try {
            Thread.currentThread().sleep(5000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long result = testQueue.stopTestAndReturn();
        System.out.println("最終結果為; " + result);
        long timeEnd = System.currentTimeMillis();
        System.out.println(timeEnd);
        //計算每s吞吐
        double average = (result / (timeEnd -timeStart)) * 1000;
        System.out.println("1每秒吞吐: " + average);
    }

    static String randomString(int strLength) {
        Random rnd = ThreadLocalRandom.current();
        StringBuilder ret = new StringBuilder();
        for (int i = 0; i < strLength; i++) {
            boolean isChar = (rnd.nextInt(2) % 2 == 0);// 輸出字母還是數字
            if (isChar) { // 字符串
                int choice = rnd.nextInt(2) % 2 == 0 ? 65 : 97; // 取得大寫字母還是小寫字母
                ret.append((char) (choice + rnd.nextInt(26)));
            } else { // 數字
                ret.append(Integer.toString(rnd.nextInt(10)));
            }
        }
        return ret.toString();
    }
}

ConcurrentQueue無法用put和take方法,需要用poll和offer,其他代碼一致蚪缀,不同的地方在于

                        Data data = concurrentLinkedQueue.poll();
                        if (null != data) {
                            longAdder.increment();
                        }
concurrentLinkedQueue.offer(new Data(randomString(10)));

可以先試試同時有4個生產者和消費者在不斷進行隊列操作秫逝,然后再試試1000個生產者與消費者在不斷進行隊列操作。

BlockingQueue-4

![ConcurrentQueue-4](https://upload-images.jianshu.io/upload_images/14745967-9f2a71o z66c7c3f219.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
這個情況下可以看出無鎖操作是遠高于有鎖操作的
BlockingQueue-100

ConcurrentQueue-100

負載過高的情況下询枚,CAS效率低下违帆,反而不如有鎖操作

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市金蜀,隨后出現的幾起案子刷后,更是在濱河造成了極大的恐慌,老刑警劉巖廉油,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件惠险,死亡現場離奇詭異,居然都是意外死亡抒线,警方通過查閱死者的電腦和手機班巩,發(fā)現死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來嘶炭,“玉大人抱慌,你說我怎么就攤上這事≌A裕” “怎么了抑进?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長睡陪。 經常有香客問我寺渗,道長匿情,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任信殊,我火速辦了婚禮炬称,結果婚禮上,老公的妹妹穿的比我還像新娘涡拘。我一直安慰自己玲躯,他們只是感情好,可當我...
    茶點故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布鳄乏。 她就那樣靜靜地躺著跷车,像睡著了一般。 火紅的嫁衣襯著肌膚如雪橱野。 梳的紋絲不亂的頭發(fā)上朽缴,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天,我揣著相機與錄音水援,去河邊找鬼不铆。 笑死,一個胖子當著我的面吹牛裹唆,可吹牛的內容都是我干的。 我是一名探鬼主播只洒,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼许帐,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了毕谴?” 一聲冷哼從身側響起成畦,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎涝开,沒想到半個月后循帐,有當地人在樹林里發(fā)現了一具尸體,經...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡舀武,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年拄养,在試婚紗的時候發(fā)現自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片银舱。...
    茶點故事閱讀 38,117評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡瘪匿,死狀恐怖,靈堂內的尸體忽然破棺而出寻馏,到底是詐尸還是另有隱情棋弥,我是刑警寧澤,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布诚欠,位于F島的核電站顽染,受9級特大地震影響漾岳,放射性物質發(fā)生泄漏。R本人自食惡果不足惜粉寞,卻給世界環(huán)境...
    茶點故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一尼荆、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧仁锯,春花似錦耀找、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至双炕,卻和暖如春狞悲,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背妇斤。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工摇锋, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人站超。 一個月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓荸恕,卻偏偏與公主長得像,于是被迫代替她去往敵國和親死相。 傳聞我的和親對象是個殘疾皇子融求,可洞房花燭夜當晚...
    茶點故事閱讀 42,877評論 2 345

推薦閱讀更多精彩內容