JUC阻塞隊列BlockingQueue竟然有8種類型滞欠?

點贊再看,養(yǎng)成習(xí)慣肆良,搜一搜【一角錢技術(shù)】關(guān)注更多原創(chuàng)技術(shù)文章筛璧。本文 GitHub org_hejianhui/JavaStudy 已收錄,有我的系列文章惹恃。

前言

隊列是一種特殊的線性表夭谤,是一種先進(jìn)先出(FIFO)的數(shù)據(jù)結(jié)構(gòu)。它只允許在表的前端(front)進(jìn)行刪除操作巫糙,而在表的后端(rear)進(jìn)行插入操作朗儒。進(jìn)行插入操作的端稱為隊尾,進(jìn)行刪除操作的端稱為隊頭。隊列中沒有元素時醉锄,稱為空隊列乏悄。

下面是Queue類的繼承關(guān)系圖:


Queue


Queue:隊列的上層接口,提供了插入恳不、刪除檩小、獲取元素這3種類型的方法,而且對每一種類型都提供了兩種方式烟勋,先來看看插入方法:

  • add(E e):插入元素到隊尾规求,插入成功返回true,沒有可用空間拋出異常 IllegalStateException卵惦。
  • offer(E e): 插入元素到隊尾阻肿,插入成功返回true,否則返回false鸵荠。

add和offer作為插入方法的唯一不同就在于隊列滿了之后的處理方式冕茅。add拋出異常,而offer返回false蛹找。

再來看看刪除和獲取元素方法(和插入方法類似):

  • remove():獲取并移除隊首的元素姨伤,該方法和poll方法的不同之處在于,如果隊列為空該方法會拋出異常庸疾,而poll不會乍楚。
  • poll():獲取并移除隊首的元素,如果隊列為空届慈,返回null徒溪。
  • element():獲取隊列首的元素,該方法和peek方法的不同之處在于金顿,如果隊列為空該方法會拋出異常臊泌,而peek不會。
  • peek():獲取隊列首的元素揍拆,如果隊列為空渠概,返回null。

如果隊列是空嫂拴,remove和element方法會拋出異常播揪,而poll和peek返回null。

Queue 是單向隊列筒狠,為了提供更強大的功能猪狈,JDK在1.6的時候新增了一個雙向隊列Deque,用來實現(xiàn)更靈活的隊列操作辩恼。

Deque


Deque在Queue的基礎(chǔ)上雇庙,增加了以下幾個方法:

  • addFirst(E e):在前端插入元素谓形,異常處理和add一樣;
  • addLast(E e):在后端插入元素状共,和add一樣的效果套耕;
  • offerFirst(E e):在前端插入元素,異常處理和offer一樣峡继;
  • offerLast(E e):在后端插入元素冯袍,和offer一樣的效果;
  • removeFirst():移除前端的一個元素碾牌,異常處理和remove一樣康愤;
  • removeLast():移除后端的一個元素,和remove一樣的效果舶吗;
  • pollFirst():移除前端的一個元素征冷,和poll一樣的效果;
  • pollLast():移除后端的一個元素誓琼,異常處理和poll一樣检激;
  • getFirst():獲取前端的一個元素,和element一樣的效果腹侣;
  • getLast():獲取后端的一個元素叔收,異常處理和element一樣;
  • peekFirst():獲取前端的一個元素傲隶,和peek一樣的效果饺律;
  • peekLast():獲取后端的一個元素,異常處理和peek一樣跺株;
  • removeFirstOccurrence(Object o):從前端開始移除第一個是o的元素复濒;
  • removeLastOccurrence(Object o):從后端開始移除第一個是o的元素;
  • push(E e):和addFirst一樣的效果;
  • pop():和removeFirst一樣的效果。

可以發(fā)現(xiàn)距淫,其實很多方法的效果都是一樣的,只不過名字不同洛二。比如Deque為了實現(xiàn)Stack的語義,定義了push和pop兩個方法攻锰。

BlockingQueue阻塞隊列

BlockingQueue(阻塞隊列),在Queue的基礎(chǔ)上實現(xiàn)了阻塞等待的功能妓雾。它是JDK 1.5中加入的接口娶吞,它是指這樣的一個隊列:當(dāng)生產(chǎn)者向隊列添加元素但隊列已滿時,生產(chǎn)者會被阻塞械姻;當(dāng)消費者從隊列移除元素但隊列為空時妒蛇,消費者會被阻塞机断。

BlockingQueue ,是java.util.concurrent 包提供的用于解決并發(fā) 生產(chǎn)者 — 消費者 問題的最有用的類绣夺,很好的解決了多線程中吏奸,如何高效安全“傳輸”數(shù)據(jù)的問題。它的特性是在任意時刻只有一個線程可以進(jìn)行take或者put操作陶耍,并且 BlockingQueue 提供類超時 return null 的機制奋蔚,在許多生產(chǎn)場景里都可以看到這個工具的身影。

總體認(rèn)識

一般我們用到的阻塞隊列有哪些烈钞?可以通過下面一個類圖來總體看下:



可以看到 BlockingQueue 是一個接口泊碑,繼承它的另外還有兩個接口 BlockingDeque(雙端隊列)、TransferQueue(兩個線程之間傳遞元素)毯欣。

阻塞隊列的成員如下:

隊列 有界性 數(shù)據(jù)結(jié)構(gòu)
ArrayBlockingQueue bounded(有界) 加鎖(公平鎖/非公平鎖馒过、全局鎖) 數(shù)組
LinkedBlockingQueue optionally-bounded 加鎖(添加和獲取獨立的鎖) 單鏈表
PriorityBlockingQueue unbounded 加鎖(只有一個鎖,入隊永遠(yuǎn)成功酗钞,出隊會阻塞) 數(shù)組(默認(rèn)長度11腹忽,可擴(kuò)容),底層采用的堆結(jié)構(gòu)實現(xiàn)(二叉堆)
DelayQueue unbounded 加鎖 數(shù)組(可擴(kuò)容)
SynchronousQueue bounded 無鎖(CAS) 隊列(公平策略)砚作、棧(非公平策略)
LinkedTransferQueue unbounded 無鎖(自旋+CAS) 雙重數(shù)據(jù)結(jié)構(gòu)或雙重隊列
LinkedBlockingDeque unbounded 加鎖 雙向鏈表
DelayWorkQueue unbounded 加鎖 數(shù)組(初始長度16窘奏,可擴(kuò)容),底層采用的堆結(jié)構(gòu)實現(xiàn)(二叉堆)

隊列類型

  1. 無限隊列(unbounded queue)— 幾乎可以無限增長
  2. 有限隊列(bounded queue)— 定義了最大容量

隊列數(shù)據(jù)結(jié)構(gòu)

隊列實質(zhì)就是一種存儲數(shù)據(jù)的結(jié)構(gòu)

  • 通常用鏈表或者數(shù)組實現(xiàn)
  • 一般而言隊列具備FIFO先進(jìn)先出的特性偎巢,當(dāng)然也有雙端隊列(Deque)優(yōu)先級隊列
  • 主要操作:入隊(Enqueue)與 出對(Dequeue)

常見的5種阻塞隊列

  • ArrayBlockingQueue:一個由數(shù)組結(jié)構(gòu)組成的有界阻塞隊列蔼夜。
  • LinkedBlockingQueue:一個由鏈表結(jié)構(gòu)組成的有界阻塞隊列。
  • PriorityBlockingQueue:一個支持優(yōu)先級排序的無界阻塞隊列压昼。
  • SynchronousQueue:一個不存儲元素的阻塞隊列求冷。
  • DelayQueue:一個使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列。

BlockingQueue API

BlockingQueue的核心方法

public interface BlockingQueue<E> extends Queue<E> {

    //將給定元素設(shè)置到隊列中窍霞,如果設(shè)置成功返回true, 否則返回false匠题。如果是往限定了長度的隊列中設(shè)置值,推薦使用offer()方法但金。
    boolean add(E e);

    //將給定的元素設(shè)置到隊列中韭山,如果設(shè)置成功返回true, 否則返回false. e的值不能為空,否則拋出空指針異常冷溃。
    boolean offer(E e);

    //將元素設(shè)置到隊列中钱磅,如果隊列中沒有多余的空間,該方法會一直阻塞似枕,直到隊列中有多余的空間盖淡。
    void put(E e) throws InterruptedException;

    //將給定元素在給定的時間內(nèi)設(shè)置到隊列中,如果設(shè)置成功返回true, 否則返回false.
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    //從隊列中獲取值凿歼,如果隊列中沒有值褪迟,線程會一直阻塞冗恨,直到隊列中有值,并且該方法取得了該值味赃。
    E take() throws InterruptedException;

    //在給定的時間里掀抹,從隊列中獲取值,時間到了直接調(diào)用普通的poll方法心俗,為null則直接返回null傲武。
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    //獲取隊列中剩余的空間。
    int remainingCapacity();

    //從隊列中移除指定的值另凌。
    boolean remove(Object o);

    //判斷隊列中是否擁有該值谱轨。
    public boolean contains(Object o);

    //將隊列中值,全部移除吠谢,并發(fā)設(shè)置到給定的集合中土童。
    int drainTo(Collection<? super E> c);

    //指定最多數(shù)量限制將隊列中值,全部移除工坊,并發(fā)設(shè)置到給定的集合中献汗。
    int drainTo(Collection<? super E> c, int maxElements);
}

BlockingQueue 接口的所有方法可以分為兩大類:負(fù)責(zé)向隊列添加元素的方法檢索這些元素的方法。在隊列滿/空的情況下王污,來自這兩個組的每個方法的行為都不同罢吃。

添加元素

方法 說明
add() 如果插入成功則返回 true,否則拋出 IllegalStateException 異常
put() 將指定的元素插入隊列昭齐,如果隊列滿了尿招,那么會阻塞直到有空間插入
offer() 如果插入成功則返回 true,否則返回 false
offer(E e, long timeout, TimeUnit unit) 嘗試將元素插入隊列阱驾,如果隊列已滿就谜,那么會阻塞直到有空間插入

檢索元素

方法 說明
take() 獲取隊列的頭部元素并將其刪除,如果隊列為空里覆,則阻塞并等待元素變?yōu)榭捎?/td>
poll(long timeout, TimeUnit unit) 檢索并刪除隊列的頭部丧荐,如有必要虹统,等待指定的等待時間以使元素可用车荔,如果超時夸赫,則返回 null

BlockingQueue最重要的也就是關(guān)于阻塞等待的幾個方法茬腿,而這幾個方法正好可以用來實現(xiàn)生產(chǎn)-消費的模型宜雀。

ArrayBlockingQueue

ArrayBlockingQueue 由數(shù)組支持的有界阻塞隊列切平,隊列基于數(shù)組實現(xiàn),容量大小在創(chuàng)建 ArrayBlockingQueue 對象時已經(jīng)定義好辐董。 此隊列按照先進(jìn)先出(FIFO)的原則對元素進(jìn)行排序悴品。支持公平鎖和非公平鎖,默認(rèn)采用非公平鎖简烘。

ArrayBlockingQueue 內(nèi)部由ReentrantLock來實現(xiàn)線程安全苔严,由Condition的await和signal來實現(xiàn)等待喚醒的功能。它的數(shù)據(jù)結(jié)構(gòu)是數(shù)組孤澎,準(zhǔn)確的說是一個循環(huán)數(shù)組(可以類比一個圓環(huán))届氢,所有的下標(biāo)在到達(dá)最大長度時自動從0繼續(xù)開始。

深入理解ArrayBlockingQueue可以閱讀《阻塞隊列 — ArrayBlockingQueue源碼分析》

LinkedBlockingQueue

LinkedBlockingQueue 由鏈表節(jié)點支持的可選有界隊列覆旭,是一個基于鏈表的無界隊列(理論上有界)退子,隊列按照先進(jìn)先出的順序進(jìn)行排序寂祥。LinkedBlockingQueue不同于ArrayBlockingQueue,它如果不指定容量惜犀,默認(rèn)為 Integer.MAX_VALUE,也就是無界隊列。所以為了避免隊列過大造成機器負(fù)載或者內(nèi)存爆滿的情況出現(xiàn)颈将,我們在使用的時候建議手動傳一個隊列的大小噪奄。

LinkedBlockingQueue 內(nèi)部由單鏈表實現(xiàn)都毒,只能從head取元素戳护,從tail添加元素。添加元素和獲取元素都有獨立的鎖,也就是說LinkedBlockingQueue是讀寫分離的柄粹,讀寫操作可以并行執(zhí)行。LinkedBlockingQueue采用可重入鎖(ReentrantLock)來保證在并發(fā)情況下的線程安全。

向無限隊列添加元素的所有操作都將永遠(yuǎn)不會阻塞,[注意這里不是說不會加鎖保證線程安全]爬迟,因此它可以增長到非常大的容量跌捆。

使用無限 BlockingQueue 設(shè)計生產(chǎn)者 - 消費者模型時最重要的是 消費者應(yīng)該能夠像生產(chǎn)者向隊列添加消息一樣快地消費消息。否則潮瓶,內(nèi)存可能會填滿,然后就會得到一個 OutOfMemory 異常。

深入理解LinkedBlockingQueue可以閱讀《阻塞隊列 — LinkedBlockingQueue源碼分析》

PriorityBlockingQueue

PriorityBlockingQueue 優(yōu)先級隊列媚媒,線程安全(添加栈顷、讀取都進(jìn)行了加鎖)靡努、無界、讀阻塞的隊列,底層采用的堆結(jié)構(gòu)實現(xiàn)(二叉樹)梁肿,默認(rèn)是小根堆缔莲,最小的或者最大的元素會一直置頂厌秒,每次獲取都取最頂端的數(shù)據(jù)。可以實現(xiàn)優(yōu)先出隊篡石。最特別的是它只有一個鎖械馆,入隊操作永遠(yuǎn)成功珊搀,而出隊只有在空隊列的時候才會進(jìn)行線程阻塞错沽。可以說有一定的應(yīng)用場景吧谒臼,比如:有任務(wù)要執(zhí)行冯挎,可以對任務(wù)加一個優(yōu)先級的權(quán)重续滋,這樣隊列會識別出來了袁,對該任務(wù)優(yōu)先進(jìn)行出隊僻肖。

深入理解PriorityBlockingQueue可以閱讀《阻塞隊列 —PriorityBlockingQueue源碼分析》

DelayQueue

DelayQueue 由優(yōu)先級支持的冀自、基于時間的調(diào)度隊列余境,內(nèi)部使用非線程安全的優(yōu)先隊列(PriorityQueue)實現(xiàn),而無界隊列基于數(shù)組的擴(kuò)容實現(xiàn)佣盒。在創(chuàng)建元素時紊搪,可以指定多久才能從隊列中獲取當(dāng)前元素牵囤。只有延時期滿后才能從隊列中獲取元素。

深入理解DelayQueue可以閱讀《阻塞隊列 — DelayQueue源碼分析》

SynchronousQueue

SynchronousQueue 一個不存儲元素的阻塞隊列,每一個 put 操作必須等待 take 操作,否則不能繼續(xù)添加元素脆霎。支持公平鎖和非公平鎖2種策略來訪問隊列胧谈。默認(rèn)是采用非公平性策略訪問隊列。公平性策略底層使用了類似隊列的數(shù)據(jù)結(jié)構(gòu)场仲,而非公平策略底層使用了類似棧的數(shù)據(jù)結(jié)構(gòu)。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。

深入理解SynchronousQueue可以閱讀《阻塞隊列 — SynchronousQueue源碼分析》

LinkedTransferQueue

LinkedTransferQueue 是一個由鏈表結(jié)構(gòu)組成的無界阻塞傳輸隊列,它是一個很多隊列的結(jié)合體(ConcurrentLinkedQueue,LinkedBlockingQueue刹帕,SynchronousQueue),在除了有基本阻塞隊列的功能(但是這個阻塞隊列沒有使用鎖)之外侦另;隊列實現(xiàn)了TransferQueue接口重寫了transfer 和 tryTransfer 方法袄友,這組方法和SynchronousQueue公平模式的隊列類似支竹,具有匹配的功能待诅。

深入理解LinkedTransferQueue可以閱讀《阻塞隊列 — LinkedTransferQueue源碼分析》

LinkedBlockingDeque

LinkedBlockingDeque 一個由于鏈表結(jié)構(gòu)組成的雙向阻塞隊列测蹲,隊列頭部和尾部都可以添加和移除元素,多線程并發(fā)時,可以將鎖的競爭對多降到一半。

深入理解LinkedBlockingDeque可以閱讀《阻塞隊列 — LinkedBlockingDeque源碼分析》

DelayedWorkQueue

DelayedWorkQueue 也是一種設(shè)計為定時任務(wù)的延遲隊列纱耻,其實現(xiàn)原理和DelayQueue 基本一樣,核心數(shù)據(jù)結(jié)構(gòu)是二叉最小堆的優(yōu)先隊列,隊列滿時會自動擴(kuò)容卖漫,不過是將優(yōu)先級隊列和DelayQueue的實現(xiàn)過程遷移到本身方法體中查描,從而可以在該過程當(dāng)中靈活的加入定時任務(wù)特有的方法調(diào)用。

深入理解DelayedWorkQueue可以閱讀《阻塞隊列 — DelayedWorkQueue源碼分析》

對比分析

LinkedBlockingQueue與ArrayBlockingQueue區(qū)別

  • 隊列大小有所不同,ArrayBlockingQueue是有界的初始化必須指定大小,而LinkedBlockingQueue可以是有界的也可以是無界的(Integer.MAX_VALUE),對于后者而言贫奠,當(dāng)添加速度大于移除速度時脖律,在無界的情況下兜挨,可能會造成內(nèi)存溢出等問題弊决。
  • 數(shù)據(jù)存儲容器不同,ArrayBlockingQueue采用的是數(shù)組作為數(shù)據(jù)存儲容器,而LinkedBlockingQueue采用的則是以Node節(jié)點作為連接對象的鏈表净响。
  • 由于ArrayBlockingQueue采用的是數(shù)組的存儲容器配乓,因此在插入或刪除元素時不會產(chǎn)生或銷毀任何額外的對象實例,而LinkedBlockingQueue則會生成一個額外的Node對象洞豁。這可能在長時間內(nèi)需要高效并發(fā)地處理大批量數(shù)據(jù)的時曙咽,對于GC可能存在較大影響。
  • 兩者的實現(xiàn)隊列添加或移除的鎖不一樣,ArrayBlockingQueue實現(xiàn)的隊列中的鎖是沒有分離的渔隶,即添加操作和移除操作采用的同一個ReenterLock鎖羔挡,而LinkedBlockingQueue實現(xiàn)的隊列中的鎖是分離的,其添加采用的是putLock间唉,移除采用的則是takeLock绞灼,這樣能大大提高隊列的吞吐量,也意味著在高并發(fā)的情況下生產(chǎn)者和消費者可以并行地操作隊列中的數(shù)據(jù),以此來提高整個隊列的并發(fā)性能幔戏。

LinkedTransferQueue和SynchronousQueue(公平模式)區(qū)別

  • LinkedTransferQueue 和SynchronousQueue 其實基本是差不多的合愈,兩者都是無鎖帶阻塞功能的隊列桃纯,都是使用的雙重隊列壮锻;
  • SynchronousQueue 通過內(nèi)部類Transferer 來實現(xiàn)公平和非公平隊列,在LinkedTransferQueue 中沒有公平與非公平的區(qū)分歧蕉;
  • LinkedTransferQueue 實現(xiàn)了TransferQueue接口匿沛,該接口定義的是帶阻塞操作的操作平匈,相比SynchronousQueue 中的Transferer 功能更豐富。
  • SynchronousQueue 中放數(shù)據(jù)操作和取數(shù)據(jù)操作都是阻塞的吞加,當(dāng)隊列中的操作和本次操作不匹配時揖盘,線程會阻塞伐庭,直到匹配的操作到來箫攀。LinkedTransferQueue 是無界隊列绝葡,放數(shù)據(jù)操作不會阻塞,取數(shù)據(jù)操作如果沒有匹配操作可能會阻塞,通過參數(shù)決定是否阻塞(ASYNC,SYNC,NOW,TIMED)。

LinkedBlockingDeque與LinkedList區(qū)別

package com.niuh.deque;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingDeque;

/*
 *   LinkedBlockingDeque是“線程安全”的隊列,而LinkedList是非線程安全的。
 *
 *   下面是“多個線程同時操作并且遍歷queue”的示例
 *   (1) 當(dāng)queue是LinkedBlockingDeque對象時压状,程序能正常運行。
 *   (2) 當(dāng)queue是LinkedList對象時橱鹏,程序會產(chǎn)生ConcurrentModificationException異常红碑。
 *
 */
public class LinkedBlockingDequeRunner {

    // TODO: queue是LinkedList對象時品山,程序會出錯咙好。
    // private static Queue<String> queue = new LinkedList<String>();
    private static Queue<String> queue = new LinkedBlockingDeque<String>();

    public static void main(String[] args) {

        // 同時啟動兩個線程對queue進(jìn)行操作窗慎!
        new MyThread("A").start();
        new MyThread("B").start();
    }

    private static void printAll() {
        String value;
        Iterator iter = queue.iterator();
        while (iter.hasNext()) {
            value = (String) iter.next();
            System.out.print(value + ", ");
        }
        System.out.println();
    }

    private static class MyThread extends Thread {
        MyThread(String name) {
            super(name);
        }

        @Override
        public void run() {
            int i = 0;
            while (i++ < 6) {
                // “線程名” + "-" + "序號"
                String val = Thread.currentThread().getName() + i;
                queue.add(val);
                // 通過“Iterator”遍歷queue整吆。
                printAll();
            }
        }
    }
}

輸出結(jié)果

A1, 
A1, A2, 
A1, A2, A3, 
A1, A2, A3, A4, 
A1, A2, A3, A4, A5, 
A1, A2, A3, A4, A5, A6, 
A1, A2, A3, A4, A5, A6, B1, 
A1, A2, A3, A4, A5, A6, B1, B2, 
A1, A2, A3, A4, A5, A6, B1, B2, B3, 
A1, A2, A3, A4, A5, A6, B1, B2, B3, B4, 
A1, A2, A3, A4, A5, A6, B1, B2, B3, B4, B5, 
A1, A2, A3, A4, A5, A6, B1, B2, B3, B4, B5, B6, 

結(jié)果說明:示例程序中,啟動兩個線程(線程A和線程B)分別對LinkedBlockingDeque進(jìn)行操作:

  • 以線程A而言澡腾,它會先獲取“線程名”+“序號”沸伏,然后將該字符串添加到LinkedBlockingDeque中;
  • 接著动分,遍歷并輸出LinkedBlockingDeque中的全部元素毅糟。
  • 線程B的操作和線程A一樣,只不過線程B的名字和線程A的名字不同澜公。
  • 當(dāng)queue是LinkedBlockingDeque對象時姆另,程序能正常運行。
  • 如果將queue改為LinkedList時坟乾,程序會產(chǎn)生ConcurrentModificationException異常迹辐。

BlockingQueue應(yīng)用

多線程生產(chǎn)者-消費者示例

接下來我們創(chuàng)建一個由兩部分組成的程序: 生產(chǎn)者 ( Producer ) 和消費者 ( Consumer ) 。

生產(chǎn)者(Producer)

生產(chǎn)者將生成一個 0 到 100 的隨機數(shù)(十全大補丸的編號)甚侣,并將該數(shù)字放在 BlockingQueue 中明吩。我們將創(chuàng)建 16 個線程(潘金蓮)用于生成隨機數(shù)并使用 put() 方法阻塞,直到隊列中有可用空間殷费。

需要記住的重要一點是印荔,我們需要阻止我們的消費者線程無限期地等待元素出現(xiàn)在隊列中低葫。

從生產(chǎn)者(潘金蓮)向消費者(武大郎)發(fā)出信號的好方法是,不需要處理消息躏鱼,而是發(fā)送稱為毒 ( poison ) 丸 ( pill ) 的特殊消息氮采。 我們需要發(fā)送盡可能多的毒 ( poison ) 丸 ( pill ) ,因為我們有消費者(武大郎)染苛。然后當(dāng)消費者從隊列中獲取特殊的毒 ( poison ) 丸 ( pill )消息時鹊漠,它將優(yōu)雅地完成執(zhí)行。

以下生產(chǎn)者的代碼:

package com.niuh.queue;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;

/**
 * 生產(chǎn)者(Producer)
 **/
@Slf4j
public class NumbersProducer implements Runnable {
    private BlockingQueue<Integer> numbersQueue;
    private final int poisonPill;
    private final int poisonPillPerProducer;

    public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
        this.numbersQueue = numbersQueue;
        this.poisonPill = poisonPill;
        this.poisonPillPerProducer = poisonPillPerProducer;
    }

    public void run() {
        try {
            generateNumbers();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void generateNumbers() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
            log.info("潘金蓮-{}號,給武大郎的泡藥茶行!", Thread.currentThread().getId());
        }

        /*while (true) {
            numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
            if (false) {
                break;
            }
        }*/

        for (int j = 0; j < poisonPillPerProducer; j++) {
            numbersQueue.put(poisonPill);
            log.info("潘金蓮-{}號,往武大郎的藥里放入第{}顆毒丸躯概!", Thread.currentThread().getId(), j + 1);
        }
    }
}

我們的生成器構(gòu)造函數(shù)將 BlockingQueue 作為參數(shù),用于協(xié)調(diào)生產(chǎn)者和使用者之間的處理畔师,我們看到方法generateNumbers() 將 100 個元素(生產(chǎn)100副藥給武大郎吃)放入隊列中娶靡。它還需要有毒 ( poison ) 丸 ( pill ) (潘金蓮給武大郎下毒)消息,以便知道在執(zhí)行完成時放入隊列的消息類型看锉。該消息需要將 poisonPillPerProducer 次放入隊列中姿锭。

消費者(Consumer)

每個消費者將使用 take() 方法從 BlockingQueue 獲取一個元素,因此它將阻塞伯铣,直到隊列中有一個元素呻此。從隊列中取出一個 Integer 后,它會檢查該消息是否是毒 ( poison ) 丸 ( pill )(武大郎看潘金蓮有沒有下毒) 腔寡,如果是焚鲜,則完成一個線程的執(zhí)行。否則放前,它將在標(biāo)準(zhǔn)輸出上打印出結(jié)果以及當(dāng)前線程的名稱忿磅。

package com.niuh.queue;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BlockingQueue;

/**
 * 消費者(Consumer)
 **/
@Slf4j
public class NumbersConsumer implements Runnable {
    private BlockingQueue<Integer> queue;
    private final int poisonPill;

    public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
        this.queue = queue;
        this.poisonPill = poisonPill;
    }

    public void run() {
        try {
            while (true) {
                Integer number = queue.take();
                if (number.equals(poisonPill)) {
                    return;
                }
                log.info("武大郎-{}號,喝藥-編號:{}", Thread.currentThread().getId(), number);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

需要注意的重要事項是隊列的使用。與生成器構(gòu)造函數(shù)中的相同凭语,隊列作為參數(shù)傳遞葱她。我們可以這樣做,是因為 BlockingQueue 可以在線程之間共享而無需任何顯示同步似扔。

驗證測試

既然我們有生產(chǎn)者和消費者吨些,我們就可以開始我們的計劃。我們需要定義隊列的容量虫几,并將其設(shè)置為 10個元素锤灿。
我們創(chuàng)建4 個生產(chǎn)者線程,并且創(chuàng)建等于可用處理器數(shù)量的消費者線程:

package com.niuh.queue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 多線程生產(chǎn)者-消費者示例
 **/
public class Main {

    public static void main(String[] args) {
        int BOUND = 10;
        int N_PRODUCERS = 16;
        int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); //=8
        int poisonPill = Integer.MAX_VALUE;
        int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS; // =0
        int mod = N_CONSUMERS % N_PRODUCERS;//0+8=8

        BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(BOUND);

        //潘金蓮給武大郎熬藥
        for (int i = 1; i < N_PRODUCERS; i++) {
            new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
        }

        //武大郎開始喝藥
        for (int j = 0; j < N_CONSUMERS; j++) {
            new Thread(new NumbersConsumer(queue, poisonPill)).start();
        }

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //潘金蓮開始投毒辆脸,武大郎喝完毒藥GG
        new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();
    }

}

BlockingQueue 是使用具有容量的構(gòu)造創(chuàng)建的醇王。我們正在創(chuàng)造 4 個生產(chǎn)者和 N 個消費者(武大郎)前翎。我們將我們的毒 ( poison ) 丸 ( pill )消息指定為 Integer.MAX_VALUE盯质,因為我們的生產(chǎn)者在正常工作條件下永遠(yuǎn)不會發(fā)送這樣的值拓型。這里要注意的最重要的事情是 BlockingQueue 用于協(xié)調(diào)它們之間的工作。

相關(guān)文章

PS:以上代碼提交在 Githubhttps://github.com/Niuh-Study/niuh-juc-final.git

文章持續(xù)更新,可以搜一搜「 一角錢技術(shù) 」第一時間閱讀, 本文 GitHub org_hejianhui/JavaStudy 已經(jīng)收錄,歡迎 Star袭艺。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市叨粘,隨后出現(xiàn)的幾起案子猾编,更是在濱河造成了極大的恐慌,老刑警劉巖升敲,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件答倡,死亡現(xiàn)場離奇詭異,居然都是意外死亡驴党,警方通過查閱死者的電腦和手機瘪撇,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來港庄,“玉大人倔既,你說我怎么就攤上這事∨粞酰” “怎么了渤涌?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長度帮。 經(jīng)常有香客問我歼捏,道長稿存,這世上最難降的妖魔是什么笨篷? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮瓣履,結(jié)果婚禮上率翅,老公的妹妹穿的比我還像新娘。我一直安慰自己袖迎,他們只是感情好冕臭,可當(dāng)我...
    茶點故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著燕锥,像睡著了一般辜贵。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上归形,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天托慨,我揣著相機與錄音,去河邊找鬼暇榴。 笑死厚棵,一個胖子當(dāng)著我的面吹牛蕉世,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播婆硬,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼狠轻,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了彬犯?” 一聲冷哼從身側(cè)響起向楼,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎谐区,沒想到半個月后蜜自,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡卢佣,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年重荠,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片虚茶。...
    茶點故事閱讀 40,040評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡戈鲁,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出嘹叫,到底是詐尸還是另有隱情婆殿,我是刑警寧澤,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布罩扇,位于F島的核電站婆芦,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏喂饥。R本人自食惡果不足惜消约,卻給世界環(huán)境...
    茶點故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望员帮。 院中可真熱鬧或粮,春花似錦、人聲如沸捞高。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽硝岗。三九已至氢哮,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間型檀,已是汗流浹背冗尤。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人生闲。 一個月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓媳溺,卻偏偏與公主長得像,于是被迫代替她去往敵國和親碍讯。 傳聞我的和親對象是個殘疾皇子悬蔽,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,979評論 2 355