大飛老師帶你看線程(并發(fā)容器—PriorityBlockingQueue)

本文作者:王一飛,叩丁狼高級講師芙沥。原創(chuàng)文章优俘,轉(zhuǎn)載請注明出處京办。

概述

按api上的解釋,PriorityBlockingQueue 是有一個帶有優(yōu)先級級別的無界阻塞隊列帆焕,不支持null元素入列惭婿,并且要求隊列對象必須為可以比較對象。這點跟PriorityQueue類 類似叶雹,區(qū)別是PriorityBlockingQueue 帶有阻塞功能财饥。

PriorityBlockingQueue 出列具有優(yōu)先級之分,每次出列返回優(yōu)先級最高的元素浑娜。其底層通過是二叉樹最小堆實現(xiàn)佑力,這也導(dǎo)致了遍歷隊列時,獲取到的元素是無序的筋遭。

外部結(jié)構(gòu)

內(nèi)部結(jié)構(gòu):

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    private transient Object[] queue;   //存數(shù)據(jù)
    private transient int size;  //隊列大小
    private transient Comparator<? super E> comparator;  //隊列比較器
    private final ReentrantLock lock;  //線程安全鎖
    private final Condition notEmpty;  //鎖條件
    ...
}
基本使用

需求:學(xué)號越大打颤,優(yōu)先級越高

public class Student implements Comparable<Student>{
    private int num;  //學(xué)號
    private String name;

    public Student(int num, String name){
       this.num = num;
       this.name = name;
    }

    public int compareTo(Student o) {
        return o.num - this.num;
    }

    public String toString() {
        return "Student{num=" + num +", name='" + name +"}";
    }
}
public class App {

    public static void  main(String[] args) throws InterruptedException {
        //初始化隊列,容量為3
        PriorityBlockingQueue<Student> queue = new PriorityBlockingQueue<Student>(3);

        queue.offer(new Student(1, "zhangsan"));
        queue.offer(new Student(4, "zhaoliu"));
        queue.offer(new Student(3, "wangwu"));
        queue.offer(new Student(2, "lisi"));  //超過3漓滔,自動拓展

        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());

        System.out.println(queue.take()); //超過容量编饺,阻塞
    }
}

上面代碼可以大體看出PriorityBlockingQueue的特性,阻塞响驴,無界透且,帶優(yōu)先級的隊列。

源碼解析

構(gòu)造器

    //空參數(shù),使用默認(rèn)的初始容量: 11
    public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }
    //單參數(shù)秽誊, 指定初始容量
    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }
    //2參數(shù)數(shù)鲸沮, 指定初始容量與額外的比較器
    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

入列-offer
PriorityBlockingQueue 是一個無界的隊列,所以隨便插入不需要進(jìn)行邊界限制锅论,一直返回true

public boolean offer(E e) {
        //不允許null入列
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock; 
        lock.lock();  //加鎖
        int n, cap;  //操作臨時變量
        Object[] array;
        //判斷當(dāng)前隊列元素是否超過隊列容量讼溺,超過擴(kuò)容
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);  //嘗試擴(kuò)容,可能會失敗
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                //無比較器使用自然排序
                siftUpComparable(n, e, array); //根據(jù)優(yōu)先級插入數(shù)據(jù)
            else
                //使用指定的比較器
                siftUpUsingComparator(n, e, array, cmp);//根據(jù)優(yōu)先級插入數(shù)據(jù)
            size = n + 1;
            notEmpty.signal();  //隊列非空最易,喚醒出列阻塞線程
        } finally {
            lock.unlock(); //釋放鎖
        }
        //入列成功返回true
        return true;
    }

入列一個核心點隊列擴(kuò)容: tryGrow:

private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); //出于性能考慮先釋放鎖
        Object[] newArray = null;  //擴(kuò)容后的數(shù)組
        //allocationSpinLock  自旋鎖變量怒坯,默認(rèn)為0,需要經(jīng)過原子cas判斷之后才會改值此處控制單線程進(jìn)入if語句
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                //擴(kuò)容大小藻懒,<64, +2, 如果大于64剔猿, + 50%, 封頂為int最大值-8
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                //擴(kuò)容出新數(shù)組嬉荆,此處需要queue==array, 原因:已有線程擴(kuò)容成功不必再擴(kuò)容
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        //線程不滿足allocationSpinLock 自旋鎖變量的if判斷归敬,表示擴(kuò)容失敗,讓出cpu
        if (newArray == null) 
            Thread.yield();
        //數(shù)據(jù)拷貝员寇,不允許同時出列入列弄慰,需要獲取鎖
        lock.lock();
        //queue==array 如果已經(jīng)擴(kuò)容, 此處擴(kuò)容放棄
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }

總結(jié)一下上代碼蝶锋, tryGrow的最終的目的是進(jìn)行擴(kuò)容,先釋放鎖是出于性能考慮什往,比較擴(kuò)容操作需要消耗一定時間扳缕,而期間無法進(jìn)行入列出列,那隊列的并發(fā)性就大大打折扣了别威。但是躯舔,如果釋放鎖之后,那么擴(kuò)容的安全就需要另辟蹊徑了省古。

所以tryGrow方法采用了cas原子操作方式實現(xiàn):

if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {
    .....
}

allocationSpinLock 變量最初為0粥庄, UNSAFE.compareAndSwapInt方法比較與置換allocationSpinLock =1, 因為compareAndSwapInt原子操作性豺妓,導(dǎo)致同一時刻只有一個線程進(jìn)入執(zhí)行if語句塊惜互。這樣也可以達(dá)到加鎖目的。

無法進(jìn)入if語句塊的線程琳拭,則執(zhí)行Thread.yield();語句训堆,讓cpu, 別占著茅坑不拉屎白嘁。

 if (newArray == null) 
            Thread.yield();

當(dāng)幸運(yùn)線程執(zhí)行if語句塊之后坑鱼,新隊列空間已經(jīng)開辟好了, 接下來就是數(shù)據(jù)拷貝絮缅,這里就又得注意的鲁沥,為防止數(shù)據(jù)拷貝出亂子呼股,又得爭奪鎖,保證隊列數(shù)據(jù)安全画恰,所有需要重新加鎖彭谁。

lock.lock();
if (newArray != null && queue == array) {
    queue = newArray;
    System.arraycopy(array, 0, newArray, 0, oldCap);
}

入列另外一個核心點,數(shù)據(jù)存儲:siftUpComparable / siftUpUsingComparator

    private static <T> void siftUpComparable(int k, T x, Object[] array) {
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (key.compareTo((T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }
    private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                       Comparator<? super T> cmp) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (cmp.compare(x, (T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = x;
    }

PriorityBlockingQueue使用的是二叉樹最小堆的方式進(jìn)行存儲數(shù)據(jù)阐枣。
要想理解上面代碼马靠, 先得明白二叉樹最小堆操作概念:
1>二叉樹最小堆:是一種經(jīng)過排序的完全二叉樹,其中任一非終端節(jié)點的數(shù)據(jù)值均不大于其左子節(jié)點和右子節(jié)點的值蔼两。
2>使用數(shù)組來實現(xiàn)二叉堆最小堆甩鳄,如果把最新入列元素下標(biāo)設(shè)置 i,那么該節(jié)點的父節(jié)點是i/2额划。
3>如果父節(jié)點的值大于子節(jié)點子需要進(jìn)行交互妙啃。
有這個概念之后,我們來看 siftUpComparable代碼俊戳,進(jìn)入siftUpComparable方法揖赴,先算出父parent的索引,再判斷父節(jié)點跟入列數(shù)據(jù)比對抑胎,如果父節(jié)點數(shù)據(jù)小于入列數(shù)據(jù)跳過燥滑,尋找上一個父節(jié)點。再重復(fù)剛剛判斷即可阿逃,如果父節(jié)點數(shù)據(jù)大于入列數(shù)據(jù)铭拧,則交換。

出列-take

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); //加鎖
        E result;
        try {
            //出列失敗恃锉,等待
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }

出列核心方法dequeue:

    private E dequeue() {
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            E result = (E) array[0];  //出列剔除數(shù)組第一個搀菩,也是最小堆的樹根
            E x = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            //樹根出列之后,需要向上調(diào)整堆結(jié)構(gòu)破托,弄出新的最小堆肪跋。
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }
總結(jié)

PriorityBlockingQueue是一個無界阻塞隊列,出隊的元素是優(yōu)先級最高的元素土砂,而優(yōu)先級的規(guī)則可以自己指定州既,如果沒指定默認(rèn)使用自然趴下規(guī)則。
PriorityBlockingQueue內(nèi)部通過使用一個二叉樹最小堆算法來維護(hù)內(nèi)部數(shù)組瘟芝,這個數(shù)組是可擴(kuò)容的易桃,當(dāng)前元素個數(shù)>=最大容量時候會通過算法(小于64+2 大于64 + 50%)擴(kuò)容。

想獲取更多技術(shù)干貨锌俱,請前往叩丁狼官網(wǎng):http://www.wolfcode.cn/all_article.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末晤郑,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌造寝,老刑警劉巖磕洪,帶你破解...
    沈念sama閱讀 217,907評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異诫龙,居然都是意外死亡析显,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評論 3 395
  • 文/潘曉璐 我一進(jìn)店門签赃,熙熙樓的掌柜王于貴愁眉苦臉地迎上來谷异,“玉大人,你說我怎么就攤上這事锦聊〈踵冢” “怎么了?”我有些...
    開封第一講書人閱讀 164,298評論 0 354
  • 文/不壞的土叔 我叫張陵孔庭,是天一觀的道長尺上。 經(jīng)常有香客問我,道長圆到,這世上最難降的妖魔是什么怎抛? 我笑而不...
    開封第一講書人閱讀 58,586評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮芽淡,結(jié)果婚禮上马绝,老公的妹妹穿的比我還像新娘。我一直安慰自己挣菲,他們只是感情好迹淌,可當(dāng)我...
    茶點故事閱讀 67,633評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著己单,像睡著了一般。 火紅的嫁衣襯著肌膚如雪耙饰。 梳的紋絲不亂的頭發(fā)上纹笼,一...
    開封第一講書人閱讀 51,488評論 1 302
  • 那天,我揣著相機(jī)與錄音苟跪,去河邊找鬼廷痘。 笑死,一個胖子當(dāng)著我的面吹牛件已,可吹牛的內(nèi)容都是我干的笋额。 我是一名探鬼主播,決...
    沈念sama閱讀 40,275評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼篷扩,長吁一口氣:“原來是場噩夢啊……” “哼兄猩!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,176評論 0 276
  • 序言:老撾萬榮一對情侶失蹤枢冤,失蹤者是張志新(化名)和其女友劉穎鸠姨,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體淹真,經(jīng)...
    沈念sama閱讀 45,619評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡讶迁,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,819評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了核蘸。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片巍糯。...
    茶點故事閱讀 39,932評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖客扎,靈堂內(nèi)的尸體忽然破棺而出祟峦,到底是詐尸還是另有隱情,我是刑警寧澤虐唠,帶...
    沈念sama閱讀 35,655評論 5 346
  • 正文 年R本政府宣布搀愧,位于F島的核電站,受9級特大地震影響疆偿,放射性物質(zhì)發(fā)生泄漏咱筛。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,265評論 3 329
  • 文/蒙蒙 一杆故、第九天 我趴在偏房一處隱蔽的房頂上張望迅箩。 院中可真熱鬧,春花似錦处铛、人聲如沸饲趋。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽奕塑。三九已至,卻和暖如春家肯,著一層夾襖步出監(jiān)牢的瞬間龄砰,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評論 1 269
  • 我被黑心中介騙來泰國打工讨衣, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留换棚,地道東北人。 一個月前我還...
    沈念sama閱讀 48,095評論 3 370
  • 正文 我出身青樓反镇,卻偏偏與公主長得像固蚤,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子歹茶,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,884評論 2 354

推薦閱讀更多精彩內(nèi)容