6-Java并發(fā)容器和框架

1.ConcurrentHashMap

①為什么要使用ConcurrentHashMap

1)線程不安全的HashMap

多線程環(huán)境下,使用HashMap進(jìn)行put操作會引起死循環(huán)蛤吓,導(dǎo)致CPU利用率接近100%撼唾。

以下代碼會引起死循環(huán)(1.8之前)

        final HashMap<String, String> map = new HashMap<>(2);
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            map.put(UUID.randomUUID().toString(), "");
                        }
                    }, "ftf" + i).start();
                }
            }
        }, "ftf");
        t.start();
        t.join();
        System.out.println("ok");

HashMap在并發(fā)執(zhí)行put操作時會引起死循環(huán)付鹿,是因為多線程會導(dǎo)致HashMap的Entry鏈表形成環(huán)形數(shù)據(jù)結(jié)構(gòu)涂乌,一單形成環(huán)形數(shù)據(jù)結(jié)構(gòu)催享,Entry的next節(jié)點永遠(yuǎn)不為空靠欢,就會產(chǎn)生死循環(huán)獲取Entry弥姻。

1.8之前HashMap在并發(fā)執(zhí)行put操作時,需要擴(kuò)容的時候會出現(xiàn)鏈表形成環(huán)形數(shù)據(jù)結(jié)構(gòu):1.7擴(kuò)容代碼

    void transfer(Entry[] newTable, boolean rehash) {
        int newCapacity = newTable.length;
        for (Entry<K,V> e : table) {
            while(null != e) {
                Entry<K,V> next = e.next;//這一步被掛起掺涛,就可能出現(xiàn)環(huán)
                if (rehash) {
                    e.hash = null == e.key ? 0 : hash(e.key);
                }
                int i = indexFor(e.hash, newCapacity);
                e.next = newTable[i];
                newTable[i] = e;
                e = next;
            }
        }
    }

假設(shè)原來table的a處,是 k1→k2→null疼进,假設(shè)所有的key計算出在新的table中的位置都是b

線程A:

e=k1薪缆;

next=k1.next=k2;

線程B:

e=k1;

next=k1.next=k2拣帽;k1.next=newTable[b]=null疼电;newTable[b]=k1;e=k2减拭;

next=k2.next=null蔽豺;k2.next=newTable[b]=k1;newTable[b]=k2拧粪;e=null修陡;

//現(xiàn)在順序是:k2→k1→null

線程A:

k1.next=newTable[b]=k2;newTable[b]=k1可霎;e=k2魄鸦;

//現(xiàn)在順序是:k1→k2→k1

next=k2.next=k1;k2.next=newTable[b]=k1癣朗;newTable[b]=k2拾因;e=k1;

next=k1.next=k2旷余;k1.next=newTable[b]=k2绢记;newTable[b]=k1;e=k2正卧;

next=k2.next=k1蠢熄;k2.next=newTable[b]=k1;newTable[b]=k2穗酥;e=k1护赊;

.......//形成死循環(huán)

猜測1.8不會形成死循環(huán),這里并不會出現(xiàn)反向更改Node的next引用的情況

                    else { // preserve order
                        Node<K,V> loHead = null, loTail = null;//原位置
                        Node<K,V> hiHead = null, hiTail = null;//新位置
                        Node<K,V> next;
                        do {
                            next = e.next;
                            if ((e.hash & oldCap) == 0) {//不需要改變位置
                                if (loTail == null)
                                    loHead = e;
                                else
                                    loTail.next = e;
                                loTail = e;
                            }
                            else {//需要改變位置
                                if (hiTail == null)
                                    hiHead = e;
                                else
                                    hiTail.next = e;
                                hiTail = e;
                            }
                        } while ((e = next) != null);
                        if (loTail != null) {
                            loTail.next = null;
                            newTab[j] = loHead;
                        }
                        if (hiTail != null) {
                            hiTail.next = null;
                            newTab[j + oldCap] = hiHead;
                        }
                    }
2)效率低下的HashTable

HashTable容器使用synchronized來保證線程安全砾跃,在線程競爭激烈的情況下HashTable的效率非常低下骏啰。

3)ConcurrentHashMap的鎖分段技術(shù)(1.8之前)可有效提升并發(fā)訪問率

1.8之前:容器中有多把鎖,每一把鎖用于鎖容器其中一部分?jǐn)?shù)據(jù)抽高,那么當(dāng)多線程訪問容器里不同數(shù)據(jù)段的數(shù)據(jù)時判耕,線程間就不會存在鎖競爭,從而可以有效提高并發(fā)訪問率翘骂,這個就是鎖分段技術(shù)壁熄。首先將數(shù)據(jù)分成一段一段地存儲,然后給每一段數(shù)據(jù)配一把鎖碳竟,草丧。

1.8:鎖粒度降低,如果形成鏈表莹桅,以鏈表第一個節(jié)點為synchronized的對象昌执。

②ConcurrentHashMap的結(jié)構(gòu)

在JDK1.7版本中,ConcurrentHashMap的數(shù)據(jù)結(jié)構(gòu)是由一個Segment數(shù)組和多個HashEntry組成:

JDK1.8的實現(xiàn)已經(jīng)摒棄了Segment的概念,而是直接用Node數(shù)組+鏈表+紅黑樹的數(shù)據(jù)結(jié)構(gòu)實現(xiàn)懂拾,并發(fā)控制使用synchronized和CAS來操作煤禽,整個看起來就像是優(yōu)化過且現(xiàn)場安全的HashMap,雖然在JDK1.8中還能看到Segment的數(shù)據(jù)結(jié)構(gòu),但是已經(jīng)簡化了屬性,只是為了兼容舊版本普气。

常量設(shè)計:

// node數(shù)組最大容量:2^30=1073741824
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默認(rèn)初始值,必須是2的幕數(shù)
private static final int DEFAULT_CAPACITY = 16;
//數(shù)組可能最大值选脊,需要與toArray()相關(guān)方法關(guān)聯(lián)
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//并發(fā)級別,遺留下來的栗涂,為兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 負(fù)載因子
private static final float LOAD_FACTOR = 0.75f;
// 鏈表轉(zhuǎn)紅黑樹閥值,> 8 鏈表轉(zhuǎn)換為紅黑樹
static final int TREEIFY_THRESHOLD = 8;
//樹轉(zhuǎn)鏈表閥值知牌,小于等于6(tranfer時,lc斤程、hc=0兩個計數(shù)器分別++記錄原bin角寸、新binTreeNode數(shù)量,<=UNTREEIFY_THRESHOLD 則untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
// 2^15-1忿墅,help resize的最大線程數(shù)
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 32-16=16扁藕,sizeCtl中記錄size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// forwarding nodes的hash值
static final int MOVED     = -1;
// 樹根節(jié)點的hash值
static final int TREEBIN   = -2;
// ReservationNode的hash值
static final int RESERVED  = -3;
// 可用處理器數(shù)量
static final int NCPU = Runtime.getRuntime().availableProcessors();
//存放node的數(shù)組
transient volatile Node<K,V>[] table;
/*控制標(biāo)識符,用來控制table的初始化和擴(kuò)容的操作疚脐,不同的值有不同的含義
 *當(dāng)為負(fù)數(shù)時:-1代表正在初始化亿柑,-N代表有N-1個線程正在 進(jìn)行擴(kuò)容
 *當(dāng)為0時:代表當(dāng)時的table還沒有被初始化
 *當(dāng)為正數(shù)時:表示初始化或者下一次進(jìn)行擴(kuò)容的大小
private transient volatile int sizeCtl;

內(nèi)部一些數(shù)據(jù)結(jié)構(gòu):

Node是ConcurrentHashMap存儲結(jié)構(gòu)的基本單元,用于存儲數(shù)據(jù)棍弄,數(shù)據(jù)結(jié)構(gòu)就是一個鏈表望薄,但只允許對數(shù)據(jù)進(jìn)行查找,不允許進(jìn)行修改:

static class Node<K,V> implements Map.Entry<K,V> {
    //鏈表的數(shù)據(jù)結(jié)構(gòu)
    final int hash;
    final K key;
    //val和next都會在擴(kuò)容時發(fā)生變化呼畸,所以加上volatile來保持可見性和禁止重排序
    volatile V val;
    volatile Node<K,V> next;
    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }
    public final K getKey()       { return key; }
    public final V getValue()     { return val; }
    public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
    public final String toString(){ return key + "=" + val; }
    //不允許更新value 
    public final V setValue(V value) {
        throw new UnsupportedOperationException();
    }
    public final boolean equals(Object o) {
        Object k, v, u; Map.Entry<?,?> e;
        return ((o instanceof Map.Entry) &&
                (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                (v = e.getValue()) != null &&
                (k == key || k.equals(key)) &&
                (v == (u = val) || v.equals(u)));
    }
    //用于map中的get()方法痕支,子類重寫
    Node<K,V> find(int h, Object k) {
        Node<K,V> e = this;
        if (k != null) {
            do {
                K ek;
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
            } while ((e = e.next) != null);
        }
        return null;
    }
}

TreeNode繼承與Node,但是數(shù)據(jù)結(jié)構(gòu)換成了二叉樹結(jié)構(gòu)蛮原,它是紅黑樹的數(shù)據(jù)的存儲結(jié)構(gòu)卧须,用于紅黑樹中存儲數(shù)據(jù),當(dāng)鏈表節(jié)點數(shù)大于8時會轉(zhuǎn)換成紅黑樹的結(jié)構(gòu)儒陨,他就是通過TreeNode作為存儲結(jié)構(gòu)代替Node來轉(zhuǎn)換成紅黑樹花嘶。

static final class TreeNode<K,V> extends Node<K,V> {
    //樹形結(jié)構(gòu)的屬性定義
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red; //標(biāo)志紅黑樹的紅節(jié)點
    TreeNode(int hash, K key, V val, Node<K,V> next,
             TreeNode<K,V> parent) {
        super(hash, key, val, next);
        this.parent = parent;
    }
    Node<K,V> find(int h, Object k) {
        return findTreeNode(h, k, null);
    }
    //根據(jù)key查找 從根節(jié)點開始找出相應(yīng)的TreeNode,
    final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
        if (k != null) {
            TreeNode<K,V> p = this;
            do  {
                int ph, dir; K pk; TreeNode<K,V> q;
                TreeNode<K,V> pl = p.left, pr = p.right;
                if ((ph = p.hash) > h)
                    p = pl;
                else if (ph < h)
                    p = pr;
                else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
                    return p;
                else if (pl == null)
                    p = pr;
                else if (pr == null)
                    p = pl;
                else if ((kc != null ||
                          (kc = comparableClassFor(k)) != null) &&
                         (dir = compareComparables(kc, k, pk)) != 0)
                    p = (dir < 0) ? pl : pr;
                else if ((q = pr.findTreeNode(h, k, kc)) != null)
                    return q;
                else
                    p = pl;
            } while (p != null);
        }
        return null;
    }
}

TreeBin存儲樹形結(jié)構(gòu)的容器蹦漠,樹形結(jié)構(gòu)就是指TreeNode椭员,所以TreeBin就是封裝TreeNode的容器,它提供轉(zhuǎn)換黑紅樹的一些條件和鎖的控制笛园。

static final class TreeBin<K,V> extends Node<K,V> {
    //指向TreeNode列表和根節(jié)點
    TreeNode<K,V> root;
    volatile TreeNode<K,V> first;
    volatile Thread waiter;
    volatile int lockState;
    // 讀寫鎖狀態(tài)
    static final int WRITER = 1; // 獲取寫鎖的狀態(tài)
    static final int WAITER = 2; // 等待寫鎖的狀態(tài)
    static final int READER = 4; // 增加數(shù)據(jù)時讀鎖的狀態(tài)
    /**
     * 初始化紅黑樹
     */
    TreeBin(TreeNode<K,V> b) {
        super(TREEBIN, null, null, null);
        this.first = b;
        TreeNode<K,V> r = null;
        for (TreeNode<K,V> x = b, next; x != null; x = next) {
            next = (TreeNode<K,V>)x.next;
            x.left = x.right = null;
            if (r == null) {
                x.parent = null;
                x.red = false;
                r = x;
            }
            else {
                K k = x.key;
                int h = x.hash;
                Class<?> kc = null;
                for (TreeNode<K,V> p = r;;) {
                    int dir, ph;
                    K pk = p.key;
                    if ((ph = p.hash) > h)
                        dir = -1;
                    else if (ph < h)
                        dir = 1;
                    else if ((kc == null &&
                              (kc = comparableClassFor(k)) == null) ||
                             (dir = compareComparables(kc, k, pk)) == 0)
                        dir = tieBreakOrder(k, pk);
                        TreeNode<K,V> xp = p;
                    if ((p = (dir <= 0) ? p.left : p.right) == null) {
                        x.parent = xp;
                        if (dir <= 0)
                            xp.left = x;
                        else
                            xp.right = x;
                        r = balanceInsertion(r, x);
                        break;
                    }
                }
            }
        }
        this.root = r;
        assert checkInvariants(root);
    }
    ......
}

③ConcurrentHashMap的初始化

JDK1.7 ConcurrentHashMap的初始化會通過位與運算來初始化Segment的大小拆撼,用ssize來標(biāo)識容劳,如下:

        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }

concurrencyLevel的最大值是65535,這意味著segments數(shù)組的長度最大為65536闸度,對應(yīng)的二進(jìn)制是16位。

④ConcurrentHashMap的操作

參考:https://www.cnblogs.com/study-everyday/p/6430462.html

2.ConcurrentLinkedQueue

并發(fā)編程中蚜印,有時候需要使用線程安全的隊列莺禁。如果要實現(xiàn)一個線程安全的隊列有兩種方式:一種是使用阻塞算法,另一種是使用非阻塞算法窄赋。

使用阻塞算法的隊列哟冬,可以用一個鎖(入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不同的鎖)等方式來實現(xiàn)。

非阻塞的實現(xiàn)方式則可以使用循環(huán)CAS的方式來實現(xiàn)忆绰。

ConcurrentLinkedQueue是使用非阻塞的方式來實現(xiàn)線程安全隊列的浩峡。它是一個基于鏈接節(jié)點的無界線程安全隊列,采用先進(jìn)先出的規(guī)則對節(jié)點進(jìn)行排序错敢,當(dāng)我們添加一個元素的時候翰灾,它會添加到隊列的尾部;當(dāng)我們獲取一個元素時稚茅,它會返回隊列頭部的元素纸淮。

①ConcurrentLinkedQueue的結(jié)構(gòu)

②入隊列

入隊列的過程

    public boolean offer(E e) {
        checkNotNull(e);
        //入隊前,創(chuàng)建一個入隊節(jié)點
        final Node<E> newNode = new Node<E>(e);
        //死循環(huán)亚享,入隊不成功反復(fù)入隊
        for (Node<E> t = tail, p = t;;) {
        //創(chuàng)建一個指向tail節(jié)點的引用咽块,用p來標(biāo)識隊列的尾節(jié)點,默認(rèn)情況下等于tail節(jié)點欺税。
            Node<E> q = p.next;//獲取p節(jié)點的下一個節(jié)點
            if (q == null) {//p節(jié)點是尾節(jié)點
                if (p.casNext(null, newNode)) {//設(shè)置p節(jié)點的next節(jié)點為入隊節(jié)點
                    if (p != t) // t已經(jīng)不是尾節(jié)點了侈沪,t=tail,p經(jīng)過循環(huán)后已經(jīng)改變晚凿。
                        casTail(t, newNode);  // 更新tail節(jié)點亭罪,允許失敗
                    return true;
                }
                //另一個線程CAS成功了,重新讀取下一個
            }
            else if (p == q)
                //我們已經(jīng)脫離了隊列晃虫。如果tail沒有變化皆撩,它也已經(jīng)脫離了隊列,在這種情況下哲银,我們需要跳到頭部扛吞,否則跳到尾部
                p = (t != (t = tail)) ? t : head;
            else
                //檢查tail更新
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

入隊列就是將入隊節(jié)點添加到隊列的尾部。tail節(jié)點不總是尾節(jié)點荆责。入隊方法永遠(yuǎn)返回true滥比,所以不要通過返回值判斷入隊是否成功。

在一個隊列中依次插入4個節(jié)點做院,示例:

  • 添加元素1盲泛。隊列更新head節(jié)點的next節(jié)點為元素1節(jié)點濒持。又因為tail節(jié)點默認(rèn)情況下等于head節(jié)點,所以它們的next節(jié)點都指向元素1節(jié)點寺滚。
  • 添加元素2柑营。隊列首先設(shè)置元素1節(jié)點的next節(jié)點為元素2節(jié)點,然后更新tail節(jié)點指向元素2節(jié)點村视。(會跳過1節(jié)點)官套。
  • 添加元素3。設(shè)置tail節(jié)點的next節(jié)點為元素3節(jié)點蚁孔。
  • 添加元素4奶赔。設(shè)置元素3的next節(jié)點為元素4節(jié)點,然后將tail節(jié)點指向元素4節(jié)點杠氢。

③出隊列

    public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;//獲取p節(jié)點的元素
                //p節(jié)點的元素不為空站刑,使用CAS設(shè)置p節(jié)點引用的元素為null,如果CAS成功鼻百,返回p節(jié)點的元素
                if (item != null && p.casItem(item, null)) {
                    if (p != h) // 一次跳躍兩個節(jié)點
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {//如果p的下一個節(jié)點也為空绞旅,說明這個隊列已經(jīng)空了。
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

出隊列的就是從隊列里返回一個幾點元素愕宋,并清空該節(jié)點對元素的引用玻靡。

3.Java中的阻塞隊列

①什么是阻塞隊列

阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作支持阻塞的插入和移除方法中贝。

1)支持阻塞的插入方法:意思是當(dāng)隊列滿時囤捻,隊列會阻塞插入元素的線程,知道隊列不滿邻寿。

2)支持阻塞的移除方法:意思是在隊列為空時蝎土,獲取元素的線程會等待隊列變?yōu)榉强铡?/p>

阻塞隊列常用于生產(chǎn)者和消費者的場景,生產(chǎn)者是向隊列里添加元素的線程绣否,消費者是從隊列里取元素的線程誊涯。阻塞隊列就是生產(chǎn)者用來存放元素、消費者用來獲取元素的容器蒜撮。

在阻塞隊列不可用時暴构,這兩個附加操作提供了4種處理方式:

  • 拋出異常:當(dāng)隊列滿時,如果再往隊列里插入元素段磨,會拋出IllegalStateException(“Queue full”)異常取逾。當(dāng)隊列空時,從隊列里獲取元素會拋出NoSuchElementException異常苹支。(AbstractQueue)
  • 返回特殊值:往隊列里插入元素時砾隅,成功返回true。移除方法取出一個元素债蜜,如果沒有則返回null晴埂。
  • 一直阻塞:當(dāng)阻塞隊列滿時究反,如果生產(chǎn)者線程往隊列里put元素,隊列會一直阻塞生產(chǎn)者線程儒洛,知道隊列可用或者響應(yīng)中斷退出精耐。當(dāng)隊列空時,如果消費者線程從隊列里take元素晶丘,隊列會阻塞住消費者線程黍氮,直到隊列不為空。
  • 超時退出:當(dāng)阻塞隊列滿時浅浮,如果生產(chǎn)者線程往隊列里插入元素,隊列會阻塞生產(chǎn)者線程一點時間捷枯,如果超過了指定的時間滚秩,生產(chǎn)者線程就會退出。

注意:無界阻塞隊列淮捆,隊列不可能出現(xiàn)滿的情況郁油。

②Java里的阻塞隊列

  • ArrayBlockingQueue:一個由數(shù)組結(jié)構(gòu)組成的有界阻塞隊列。
  • LinkedBlockingQueue:一個由鏈表結(jié)構(gòu)組成的有界阻塞隊列攀痊。
  • PriorityBlockingQueue:一個支持優(yōu)先級排序的無界阻塞隊列。
  • DelayQueue:一個使用優(yōu)先級隊列實現(xiàn)的支持延時獲取元素的無界阻塞隊列苟径。
  • SynchronousQueue:一個不存儲元素的阻塞隊列。
  • LinkedTransferQueue:一個由鏈表結(jié)構(gòu)組成的無界阻塞隊列蟆盐。
  • LinkedBlockingDeque:一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列。
1)ArrayBlockingQueue

是一個用數(shù)組實現(xiàn)的有界阻塞隊列遭殉。此隊列按照先進(jìn)先出(FIFO)的原則對元素進(jìn)行排序石挂。用重入鎖ReentrantLock來實現(xiàn)線程訪問隊列的公平性。

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
2)LinkedBlockingQueue

一個用來鏈表實現(xiàn)的有界阻塞隊列险污。此隊列的默認(rèn)和最大長度為Integer.MAX_VALUE痹愚。

3)PriorityBlockingQueue

一個支持優(yōu)先級的無界阻塞隊列。默認(rèn)情況下元素采取自然順序升序排列蛔糯。也可以自定義類實現(xiàn)compareTo()方法來指定元素排序規(guī)則拯腮,或者初始化PriorityBlockingQueue時,指定構(gòu)造參數(shù)Comparator來對元素進(jìn)行排序渤闷。不能保證同優(yōu)先級元素的順序疾瓮。

4)DelayQueue

一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現(xiàn)飒箭。隊列中的元素必須實現(xiàn)Delayed接口狼电,在創(chuàng)建元素時可以指定多久才能從隊列中獲取當(dāng)前元素蜒灰。只有在延遲期滿時才能從隊列中提取元素。

適用于以下場景:

  • 緩存系統(tǒng)的設(shè)計:可以保存緩存元素的有效期肩碟,使用一個線程循環(huán)查詢DelayQueue强窖,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了削祈。
  • 定時任務(wù)調(diào)度:使用DelayQueue保存當(dāng)天將會執(zhí)行的任務(wù)和執(zhí)行時間翅溺,一旦從DelayQueue中獲取到任務(wù)就開始執(zhí)行,比如TimerQueue就是使用DelayQueue實現(xiàn)的髓抑。

a.如何實現(xiàn)Delayed接口

參考java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask類的實現(xiàn)咙崎,一共有三步:

  • 在對象創(chuàng)建的時候,初始化基本數(shù)據(jù)吨拍。使用time記錄當(dāng)前對象延遲到什么時候可以使用褪猛,使用sequenceNumber來標(biāo)識元素在隊列中的先后順序伊滋。

            private static final AtomicLong sequencer = new AtomicLong(0);
            ScheduledFutureTask(Runnable r, V result, long ns, long period) {
                super(r, result);
                this.time = ns;
                this.period = period;
                this.sequenceNumber = sequencer.getAndIncrement();
            }
    

    ?

  • 實現(xiàn)getDelay方法笑旺,該方法返回當(dāng)前元素還需要延時多長時間,單位是納秒物舒。當(dāng)time小于當(dāng)前時間時冠胯,getDelay會返回負(fù)數(shù)荠察。

            public long getDelay(TimeUnit unit) {
                return unit.convert(time - now(), TimeUnit.NANOSECONDS);
            }
    
  • 實現(xiàn)compareTo方法來指定元素的順序悉盆。例如,讓延時時間最長的放在隊列的末尾脚翘。

            public int compareTo(Delayed other) {
                if (other == this) // compare zero ONLY if same object
                    return 0;
                if (other instanceof ScheduledFutureTask) {
                    ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                    long diff = time - x.time;
                    if (diff < 0)
                        return -1;
                    else if (diff > 0)
                        return 1;
                    else if (sequenceNumber < x.sequenceNumber)
                        return -1;
                    else
                        return 1;
                }
                long d = (getDelay(TimeUnit.NANOSECONDS) -
                          other.getDelay(TimeUnit.NANOSECONDS));
                return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
            }
    

b.如何實現(xiàn)延時阻塞隊列

當(dāng)消費者從隊列里獲取元素時来农,如果元素沒有達(dá)到延時時間涩咖,就阻塞當(dāng)前線程檩互。

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay <= 0) //到時間了
                        return q.poll();
                    else if (leader != null) //leader是等待獲取隊列頭部元素的線程不為空,表示已經(jīng)有線程在等待獲取隊列的頭元素。
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;//將當(dāng)前線程設(shè)置成leader
                        try {
                            available.awaitNanos(delay);//等待delay時間
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
5)SynchronousQueue

一個不存儲元素的阻塞隊列村刨。每一個put操作必須等待一個take操作嵌牺,否則不能繼續(xù)添加元素逆粹。默認(rèn)采用非公平性策略訪問隊列僻弹。

    public SynchronousQueue(boolean fair) {//true則等待的線程會采用先進(jìn)先出的順序訪問隊列
        transferer = fair ? new TransferQueue() : new TransferStack();
    }

SynchronousQueue負(fù)責(zé)把生產(chǎn)者線程處理的數(shù)據(jù)直接傳遞給消費者線程。隊列本身不存儲元素卸耘,適合傳遞性場景蚣抗。吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue翰铡。

6)LinkedTransferQueue

一個由鏈表結(jié)構(gòu)組成的無界阻塞TransferQueue隊列网梢。相對于其他阻塞隊列战虏,多了tryTransfer和transfer方法烦感。

a.transfer方法

  • 當(dāng)前有消費者正在等待接收元素(消費者使用take方法或帶時間限制的poll方法時)手趣,transfer方法可以把生產(chǎn)者傳入的元素liketransfer(傳輸)給消費者。
  • 沒有消費者在等待接收元素中符,transfer方法會將元素采暖費在隊列的tail節(jié)點淀散,并等到該元素被消費者消費了才返回。

關(guān)鍵代碼如下:

Node pred = tryAppend(s, haveData);//試圖把存放當(dāng)前元素的s節(jié)點作為tail節(jié)點亚再。
return awaitMatch(s, pred, e, (how == TIMED), nanos);//讓CPU自旋等待消費者消費元素饲鄙。因為自旋會消耗CPU,所以自旋一定次數(shù)后使用Thread.yield()方法來暫停當(dāng)前正在執(zhí)行的線程轴咱,并執(zhí)行其他線程窖剑。

b.tryTransfer方法

用來試探生產(chǎn)者傳入的元素是否能直接傳遞給消費者西土。如果沒有消費者等待接收元素,則返回false肋乍。

和transfer方法的區(qū)別是無論消費者是否接收,方法立即返回锚烦。

tryTransfer(E e, long timeout, TimeUnit unit)谱煤,試圖把生產(chǎn)者傳入的元素直接傳遞給消費者。但如果沒有消費者消費該元素睹栖,則等待指定的時間再返回,如果超時還沒有消費元素曼氛,返回false舀患,超時時間內(nèi)消費了元素聊浅,返回true。

7)LinkedBlockingDeque

一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列顽冶∑劭梗可以從隊列的兩端插入和移出元素,多線程同時入隊時强重,減少了一半訂單競爭绞呈。相比其他阻塞隊列,多了addFirst竿屹、addLast报强、offerFirst、offerLast拱燃、peekFirst召嘶、peekLast等方法。

以first結(jié)尾的方法非竿,表示插入韧骗、獲取(peek)或移除雙端隊列的第一個元素。以last結(jié)尾的方法表示插入芦缰、獲扔伞(peek)或移除雙端隊列的最后一個元素栖疑。

另外锻霎,插入方法add等同于addLast昂勒,remove等效于removeFirst刁岸。

初始化時可以設(shè)置容量防止其過度膨脹合蔽。另外牲证,雙向阻塞隊列可以運用在“工作竊取”模式中包颁。

③阻塞隊列的實現(xiàn)原理

1)使用通知模式實現(xiàn)漠其。

當(dāng)生產(chǎn)者往滿的隊列里添加元素時會阻塞住生產(chǎn)者,當(dāng)消費者消費了一個隊列中的元素后萄涯,會通知生產(chǎn)者當(dāng)前隊列可用枣察。

    private final Condition notEmpty;
    private final Condition notFull;
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

4.Fork/Join框架

①什么是Fork/Join框架

Fork/Join框架是Java 7 提供的一個用于并行執(zhí)行任務(wù)的框架,是一個把大任務(wù)分割成若干個小任務(wù)蕴掏,最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架即供。

②工作竊取算法

工作竊取(work-stealing)算法是指某個線程從其他隊列里竊取任務(wù)來執(zhí)行于微。

每個線程一個隊列逗嫡,當(dāng)前線程將任務(wù)執(zhí)行完,而其他線程對應(yīng)的隊列里還有任務(wù)等待處理株依。干完活的線程與其等著驱证,不如去幫其他線程干活。為了減少竊取任務(wù)線程和被竊取任務(wù)線程之間的競爭恋腕,通常會使用雙端隊列抹锄,被竊取任務(wù)線程永遠(yuǎn)從雙端隊列的頭部拿任務(wù)執(zhí)行,而竊取任務(wù)的線程永遠(yuǎn)從雙端隊列的尾部拿任務(wù)執(zhí)行吗坚。

工作竊取算法的優(yōu)點:充分利用線程進(jìn)行并行計算祈远,減少了線程間的競爭。

工作竊取算法的缺點:在某些情況下還是存在競爭商源,比如雙端隊列里只有一個任務(wù)時车份。并且該算法會消耗更多的系統(tǒng)資源,比如創(chuàng)建多個線程和多個雙端隊列牡彻。

③Fork/Join框架的設(shè)計

  • 分割任務(wù)扫沼。需要有一個fork類來把大任務(wù)分割成子任務(wù),有可能子任務(wù)還是很大庄吼,所以還需要不停地分割缎除,直到分割出的子任務(wù)足夠小。
  • 執(zhí)行任務(wù)并合并結(jié)果总寻。分割的子任務(wù)分別放在雙端隊列里器罐,然后幾個啟動線程分別從雙端隊列里獲取任務(wù)執(zhí)行。子任務(wù)執(zhí)行完的結(jié)果都統(tǒng)一放在一個隊列里渐行,啟動一個線程從隊列里拿數(shù)據(jù)轰坊,然后合并這些數(shù)據(jù)铸董。

Fork/Join框架使用兩個類來完成以上兩件事情:

1)ForkJoinTask:使用Fork/Join框架,先創(chuàng)建一個ForkJoin任務(wù)肴沫。它提供在任務(wù)中執(zhí)行fork和join操作的機(jī)制粟害。一般我們不需要直接繼承ForkJoinTask類,只需要繼承它的子類颤芬,F(xiàn)ork/Join框架提供了以下子類:

  • RecursiveAction:用于沒有返回結(jié)果的任務(wù)悲幅。
  • RecursiveTask:用于有返回結(jié)果的任務(wù)。

2)ForkJoinPool:ForkJoinTask需要通過ForkJoinPool來執(zhí)行站蝠。

任務(wù)分割出的子任務(wù)會添加到當(dāng)前工作線程所維護(hù)的雙端隊列中汰具,進(jìn)入隊列的頭部。當(dāng)一個工作線程的隊列里暫時沒有任務(wù)時沉衣,它會隨機(jī)從其他工作線程的隊列的尾部獲取一個任務(wù)郁副。

④使用Fork/Join框架

示例(計算1+2+3+4):

public class CountTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 2;//閾值
    private int start;
    private int end;
    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        int sum = 0;
        //如果任務(wù)足夠小就計算任務(wù)
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            //如果任務(wù)大于閾值,就分裂長兩個子任務(wù)計算
            int middle = (start + end) /2;
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);
            //執(zhí)行子任務(wù)
            leftTask.fork();//執(zhí)行fork方法時豌习,又會進(jìn)入compute方法
            rightTask.fork();
            //等待子任務(wù)執(zhí)行完存谎,并得到其結(jié)果
            Integer leftResult = leftTask.join();//join方法會等待子任務(wù)執(zhí)行完并得到其結(jié)果
            Integer rightResult = rightTask.join();
            //合并子任務(wù)
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //生成一個計算任務(wù),負(fù)責(zé)計算
        CountTask task = new CountTask(1,4);
        //執(zhí)行一個任務(wù)
        ForkJoinTask<Integer> result = forkJoinPool.submit(task);
        try {
            System.out.println(result.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

⑤Fork/Join框架的異常處理

ForkJoinTask在執(zhí)行的時候可能會拋出異常肥隆,但我們在主線程里沒辦法直接捕獲異常既荚,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務(wù)是否已經(jīng)拋出異常或已經(jīng)被取消了栋艳,并且可以通過ForkJoinTask的getException()方法獲取異常恰聘。

getException()返回Throwable對象,如果任務(wù)被取消了則返回CancellationException吸占。如果任務(wù)沒有完成或者沒有拋出異常則返回null晴叨。

        if (result.isCompletedAbnormally()) {
            System.out.println(result.getException());
        }

⑥Fork/Join框架的實現(xiàn)原理(1.7)

ForkJoinPool由ForkJoinTask數(shù)組和ForkJoinWorkerThread數(shù)組組成,F(xiàn)orkJoinTask數(shù)組負(fù)責(zé)將存放程序提交給ForkJoinPool的任務(wù)矾屯,而ForkJoinWorkerThread數(shù)組負(fù)責(zé)執(zhí)行這些任務(wù)兼蕊。

1)ForkJoinTask的fork方法實現(xiàn)原理

    public final ForkJoinTask<V> fork() {
        ((ForkJoinWorkerThread) Thread.currentThread())
            .pushTask(this);//將當(dāng)前任務(wù)存放在ForkJoinTask數(shù)組隊列里。
        return this;
    }
    final void pushTask(ForkJoinTask<?> t) {
        ForkJoinTask<?>[] q; int s, m;
        if ((q = queue) != null) {    // ignore if queue removed
            long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
            UNSAFE.putOrderedObject(q, u, t);
            queueTop = s + 1;         // or use putOrderedInt
            if ((s -= queueBase) <= 2)
                pool.signalWork();//喚醒或創(chuàng)建一個工作線程來執(zhí)行任務(wù)件蚕。
            else if (s == m)
                growQueue();
        }
    }

2)ForkJoinTask的join方法實現(xiàn)原理

join方法主要作用是阻塞當(dāng)前線程并等待獲取結(jié)果孙技。

    public final V join() {
        //通過doJoin方法得到當(dāng)前任務(wù)的狀態(tài)
        //任務(wù)狀態(tài)有4種:已完成(NORMAL)、被取消(CANCELLED)排作、信號(SIGNAL)牵啦、出現(xiàn)異常(EXCEPTIONAL)
        if (doJoin() != NORMAL) 
            return reportResult();
        else
            return getRawResult(); //任務(wù)狀態(tài)是已完成,直接返回任務(wù)結(jié)果
    }
    private V reportResult() {
        int s; Throwable ex;
        if ((s = status) == CANCELLED)  //任務(wù)狀態(tài)是被取消妄痪,拋出CancellationException
            throw new CancellationException();
        if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) //任務(wù)狀態(tài)是拋出異常哈雏,則直接拋出對應(yīng)的異常
            UNSAFE.throwException(ex);
        return getRawResult();
    }
    private int doJoin() {
        Thread t; ForkJoinWorkerThread w; int s; boolean completed;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
            if ((s = status) < 0)
                return s;//任務(wù)執(zhí)行完成,直接返回任務(wù)狀態(tài)
            if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {//從任務(wù)數(shù)組里取出任務(wù)
                try {
                    completed = exec();//執(zhí)行任務(wù)
                } catch (Throwable rex) {
                    return setExceptionalCompletion(rex);//執(zhí)行出現(xiàn)異常,記錄異常裳瘪,并將任務(wù)狀態(tài)設(shè)置為EXCEPTIONAL
                }
                if (completed)
                    return setCompletion(NORMAL);//任務(wù)順利執(zhí)行完成履因,設(shè)置任務(wù)狀態(tài)為NORMAL
            }
            return w.joinTask(this);
        }
        else
            return externalAwaitDone();
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市盹愚,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌站故,老刑警劉巖皆怕,帶你破解...
    沈念sama閱讀 222,104評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異西篓,居然都是意外死亡愈腾,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評論 3 399
  • 文/潘曉璐 我一進(jìn)店門岂津,熙熙樓的掌柜王于貴愁眉苦臉地迎上來虱黄,“玉大人,你說我怎么就攤上這事吮成〕髀遥” “怎么了?”我有些...
    開封第一講書人閱讀 168,697評論 0 360
  • 文/不壞的土叔 我叫張陵粱甫,是天一觀的道長泳叠。 經(jīng)常有香客問我,道長茶宵,這世上最難降的妖魔是什么危纫? 我笑而不...
    開封第一講書人閱讀 59,836評論 1 298
  • 正文 為了忘掉前任,我火速辦了婚禮乌庶,結(jié)果婚禮上种蝶,老公的妹妹穿的比我還像新娘。我一直安慰自己瞒大,他們只是感情好螃征,可當(dāng)我...
    茶點故事閱讀 68,851評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著糠赦,像睡著了一般会傲。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上拙泽,一...
    開封第一講書人閱讀 52,441評論 1 310
  • 那天淌山,我揣著相機(jī)與錄音,去河邊找鬼顾瞻。 笑死泼疑,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的荷荤。 我是一名探鬼主播退渗,決...
    沈念sama閱讀 40,992評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼移稳,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了会油?” 一聲冷哼從身側(cè)響起个粱,我...
    開封第一講書人閱讀 39,899評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎翻翩,沒想到半個月后都许,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,457評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡嫂冻,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,529評論 3 341
  • 正文 我和宋清朗相戀三年胶征,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片桨仿。...
    茶點故事閱讀 40,664評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡睛低,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出服傍,到底是詐尸還是另有隱情钱雷,我是刑警寧澤,帶...
    沈念sama閱讀 36,346評論 5 350
  • 正文 年R本政府宣布吹零,位于F島的核電站急波,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏瘪校。R本人自食惡果不足惜澄暮,卻給世界環(huán)境...
    茶點故事閱讀 42,025評論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望阱扬。 院中可真熱鬧泣懊,春花似錦、人聲如沸麻惶。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,511評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽窃蹋。三九已至卡啰,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間警没,已是汗流浹背匈辱。 一陣腳步聲響...
    開封第一講書人閱讀 33,611評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留杀迹,地道東北人亡脸。 一個月前我還...
    沈念sama閱讀 49,081評論 3 377
  • 正文 我出身青樓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親浅碾。 傳聞我的和親對象是個殘疾皇子大州,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,675評論 2 359