前言
最近在看并發(fā)編程藝術(shù)這本書意系,對(duì)看書的一些筆記及個(gè)人工作中的總結(jié)痰催。
什么是阻塞隊(duì)列夸溶?
A java.util.Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.
簡(jiǎn)而言之就是當(dāng)隊(duì)列滿時(shí)缝裁,插入阻塞压语;當(dāng)隊(duì)列為空時(shí)胎食,刪除(取出)阻塞厕怜。常用于生產(chǎn)者和消費(fèi)者場(chǎng)景粥航。
自己實(shí)現(xiàn)一個(gè)簡(jiǎn)單的阻塞隊(duì)列递雀,實(shí)現(xiàn)原理就是通知模式缀程,當(dāng)隊(duì)列滿時(shí),添加元素阻塞撩满,當(dāng)隊(duì)列空時(shí)伺帘,刪除阻塞
/**
* 意思就是自定義一個(gè)隊(duì)列窍仰,當(dāng)隊(duì)列中的元素個(gè)數(shù)等于指定的長(zhǎng)度時(shí)礼殊,put方法要想再加入元素,必須等待take從集合取出元素之后才能放入婚陪,這邊涉及到wait和notify泌参,
* 同理沽一,take的時(shí)候如果隊(duì)列中沒有元素(此時(shí)的元素集合長(zhǎng)度為0铣缠,那么也要執(zhí)行等待,等到put進(jìn)元素才能取出捡硅,這邊也涉及到wait和notify)
*
*/
public class MyQueue {
//1 需要一個(gè)承裝元素的集合
private LinkedList<Object> list = new LinkedList<>();
//2 需要一個(gè)計(jì)數(shù)器
private AtomicInteger count = new AtomicInteger(0);
//3 需要制定上限和下限
private final int minSize = 0;
private final int maxSize ;
//4 構(gòu)造方法
public MyQueue(int size){
this.maxSize = size;
}
//5 初始化一個(gè)對(duì)象 用于加鎖
private final Object lock = new Object();
//put(anObject): 把a(bǔ)nObject加到BlockingQueue里,如果BlockQueue沒有空間,則調(diào)用此方法的線程被阻斷,直到BlockingQueue里面有空間再繼續(xù).
public void put(Object obj){
synchronized (lock) {
while(count.get() == this.maxSize){
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//1 加入元素
list.add(obj);
//2 計(jì)數(shù)器累加
count.incrementAndGet();
//3 通知另外一個(gè)線程(喚醒)
lock.notify();
System.out.println("新加入的元素為:" + obj);
}
}
//take: 取走BlockingQueue里排在首位的對(duì)象,若BlockingQueue為空,阻斷進(jìn)入等待狀態(tài)直到BlockingQueue有新的數(shù)據(jù)被加入.
public Object take(){
Object ret = null;
synchronized (lock) {
while(count.get() == this.minSize){
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//1 做移除元素操作
ret = list.removeFirst();
//2 計(jì)數(shù)器遞減
count.decrementAndGet();
//3 喚醒另外一個(gè)線程
lock.notify();
}
return ret;
}
public int getSize(){
return this.count.get();
}
public static void main(String[] args) {
//線程run中使用變量必須使用final修飾
final MyQueue mq = new MyQueue(5);
mq.put("a");
mq.put("b");
mq.put("c");
mq.put("d");
mq.put("e");
System.out.println("當(dāng)前容器的長(zhǎng)度:" + mq.getSize());
Thread t1 = new Thread(() -> {
mq.put("f");
mq.put("g");
},"t1");
t1.start();
Thread t2 = new Thread(() -> {
Object o1 = mq.take();
System.out.println("移除的元素為:" + o1);
Object o2 = mq.take();
System.out.println("移除的元素為:" + o2);
},"t2");
try {
//使用此單元執(zhí)行 Thread.sleep.這是將時(shí)間參數(shù)轉(zhuǎn)換為 Thread.sleep 方法所需格式的便捷方法。
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
t2.start();
}
}
jdk并發(fā)包提供的BlockingQueue及其實(shí)現(xiàn)
然后看其比較主要的實(shí)現(xiàn)
ArrayBlockingQueue:一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列。
LinkedBlockingQueue:一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列寄疏。
PriorityBlockingQueue:一個(gè)支持優(yōu)先級(jí)排序的無界阻塞隊(duì)列驳棱。
DelayQueue:一個(gè)使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的無界阻塞隊(duì)列。
SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列驻债。
LinkedTransferQueue:一個(gè)由鏈表結(jié)構(gòu)組成的無界阻塞隊(duì)列合呐。
LinkedBlockingDeque:一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列淌实。
ArrayBlockingQueue
基于數(shù)組的阻塞隊(duì)列實(shí)現(xiàn),在ArrayBlockingQueue內(nèi)部放坏,維護(hù)了一個(gè)定長(zhǎng)數(shù)組轻姿,以便緩存隊(duì)列中的數(shù)據(jù)對(duì)象,其內(nèi)部沒有實(shí)現(xiàn)讀寫分離余素,長(zhǎng)度是需要定義的威根,按照先進(jìn)先出(FIFO)的原則對(duì)元素進(jìn)行排序洛搀。是有界隊(duì)列(bounded),在很多場(chǎng)合非常適合使用谎砾。
默認(rèn)情況下不保證線程公平的訪問隊(duì)列较雕,所謂公平訪問隊(duì)列是指阻塞的線程亮蒋,可以按照阻塞的先后順序訪問隊(duì)列宛蚓,即先阻塞線程先訪問隊(duì)列远舅。非公平性是對(duì)先等待的線程是非公平的,當(dāng)隊(duì)列可用時(shí)序六,阻塞的線程都可以爭(zhēng)奪訪問隊(duì)列的資格,有可能先阻塞的線程最后才訪問隊(duì)列繁涂。為了保證公平性,通常會(huì)降低吞吐量矿酵。我們可以使用以下代碼創(chuàng)建一個(gè)公平的阻塞隊(duì)列。
可以使用構(gòu)造參數(shù)實(shí)現(xiàn)公平的阻塞隊(duì)列
/**
* ArrayBlockingQueue是有界的阻塞隊(duì)列辜腺,如果插入隊(duì)列中的元素多余自己定義的長(zhǎng)度會(huì)拋出InterruptedException異常
*
*/
public class ArrayBlockingQueueTest {
public static void main(String[] args) throws Exception{
ArrayBlockingQueue<String> array = new ArrayBlockingQueue<>(5);
array.put("a");
array.put("b");
array.add("c");
array.add("d");
// array.add("e");
// array.add("f");
//offer方法表示將元素加到隊(duì)列中的最后哪自,第二個(gè)參數(shù)和第三個(gè)參數(shù)表示如果隊(duì)列滿的話丰包,等待時(shí)間后還是滿的話插入失敗
System.out.println(array.offer("a", 3, TimeUnit.SECONDS));
System.out.println(array);
}
}
LinkedBlockingQueue
是一個(gè)用鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列。此隊(duì)列的默認(rèn)和最大長(zhǎng)度為Integer.MAX_VALUE壤巷。此隊(duì)列按照先進(jìn)先出的原則對(duì)元素進(jìn)行排序邑彪。
Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.
鏈表隊(duì)列通常比數(shù)組隊(duì)列有更好的吞吐量,但是在大多數(shù)并發(fā)應(yīng)用中可預(yù)見性低胧华。
/**
* 阻塞隊(duì)列寄症,不指定隊(duì)列長(zhǎng)度,底層是由鏈表實(shí)現(xiàn)的矩动,LinkedBlockingDeque之所以能夠高效的處理并發(fā)數(shù)據(jù)篮迎,
* 是因?yàn)槠鋬?nèi)部實(shí)現(xiàn)采用分離鎖(讀寫分離兩個(gè)鎖)岂傲,是一個(gè)無界隊(duì)列
*/
public class LinkedBlockingQueueTest {
public static void main(String[] args) {
LinkedBlockingQueue<String> q = new LinkedBlockingQueue<>();
//offer將元素添加到隊(duì)列的末尾,如果隊(duì)列不滿镐侯,添加成功返回true,隊(duì)列full則返回false
q.offer("a");
q.offer("b");
q.offer("c");
q.offer("d");
q.offer("e");
q.add("f");
for(Iterator<String> iterator = q.iterator();iterator.hasNext();){
String value = iterator.next();
System.out.println(value);
}
System.out.println(".............................");
q.stream().forEach(str -> System.out.println(str));
System.out.println("..............................");
List<String> list = new ArrayList<>();
//javadoc沒有這方法的說明涕烧,根據(jù)返回的結(jié)果很明顯了憨攒,調(diào)用drainTo方法返回的是向list集合中添加的元素
System.out.println(q.drainTo(list, 3));
System.out.println(list.size());
for (String string : list) {
System.out.println(string);
}
}
}
PriorityBlockingQueue
PriorityBlockingQueue是一個(gè)支持優(yōu)先級(jí)的無界(unbounded)阻塞隊(duì)列。默認(rèn)情況下元素采取自然順序升序排列。也可以自定義類實(shí)現(xiàn)compareTo()方法來指定元素排序規(guī)則瀑罗,或者初始化PriorityBlockingQueue時(shí)摧玫,指定構(gòu)造參數(shù)Comparator來對(duì)元素進(jìn)行排序。需要注意的是不能保證同優(yōu)先級(jí)元素的順序对竣。
//take方法按照規(guī)定的優(yōu)先級(jí)順序取出數(shù)據(jù)
public class PriorityBlockingQueueTest {
public static void main(String[] args) throws Exception{
PriorityBlockingQueue<String> q = new PriorityBlockingQueue<>();
q.add("world");
q.add("welcome");
q.add("aaaaaa");
q.add("hello");
q.stream().forEach(str -> System.out.println(str));
PriorityBlockingQueue<Student> studentPriorityBlockingQueue = new PriorityBlockingQueue<>();
Student student = new Student(1,"miaozhihao");
Student student1 = new Student(2,"zob");
Student student2 = new Student(3,"aaaa");
Student student3 = new Student(2,"tom");
Student student4 = new Student(2,"bbbb");
studentPriorityBlockingQueue.add(student);
studentPriorityBlockingQueue.add(student1);
studentPriorityBlockingQueue.add(student2);
studentPriorityBlockingQueue.add(student3);
studentPriorityBlockingQueue.add(student4);
System.out.println(studentPriorityBlockingQueue.take());
System.out.println(studentPriorityBlockingQueue.take());
System.out.println(studentPriorityBlockingQueue.take());
System.out.println(studentPriorityBlockingQueue.take());
System.out.println(studentPriorityBlockingQueue.take());
}
}
class Student implements Comparable<Student>{
private int id;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Student(int id,String name){
this.id = id;
this.name = name;
}
@Override
public String toString() {
return "Student{" + "id=" + id + ", name='" + name + '\'' + '}';
}
@Override
public int compareTo(Student student) {
return this.id - student.id != 0 ? (this.id - student.id) : (this.name.compareToIgnoreCase(student.name));
}
}
DelayQueue
帶有延遲時(shí)間的Queue,其中的元素只有當(dāng)前指定的延遲時(shí)間到了算灸,才能夠從隊(duì)列中獲取該元素。DelayQueue中的元素必須實(shí)現(xiàn)Delayed接口,DelayQueue是一個(gè)無界隊(duì)列袒哥,應(yīng)用場(chǎng)景很多
·緩存系統(tǒng)的設(shè)計(jì):可以用DelayQueue保存緩存元素的有效期,使用一個(gè)線程循環(huán)查詢DelayQueue,一旦能從DelayQueue中獲取元素時(shí)飘弧,表示緩存有效期到了。
·定時(shí)任務(wù)調(diào)度:使用DelayQueue保存當(dāng)天將會(huì)執(zhí)行的任務(wù)和執(zhí)行時(shí)間卖擅,一旦從DelayQueue中獲取到任務(wù)就開始執(zhí)行,比如TimerQueue就是使用DelayQueue實(shí)現(xiàn)的统刮。
/**
* 局限娃善,當(dāng)隊(duì)列中的head元素時(shí)間到了才能取下一個(gè)元素瘫寝,如果第一個(gè)元素延遲時(shí)間最長(zhǎng),那么等待時(shí)間到了之后外厂,元素中的元素一起取出來了
* 所以察纯,我的demo daqiu1 設(shè)置的延遲時(shí)間是最短的
*
*/
public class DelayQueueTest {
public static void main(String[] args) throws Exception{
Daqiu daqiu1 = new Daqiu("miaozhihao",1 * 1000 +System.currentTimeMillis());
Daqiu daqiu2 = new Daqiu("zhangsan",4 * 1000 +System.currentTimeMillis());
Daqiu daqiu3 = new Daqiu("lisi",2 * 1000 +System.currentTimeMillis());
Daqiu daqiu4 = new Daqiu("wangwu",13 * 1000 +System.currentTimeMillis());
final DelayQueue<Daqiu> daqius = new DelayQueue<>();
Thread th1 = new Thread(() -> {
while(true){
try {
Daqiu daqiu = daqius.take();
System.out.println(daqiu);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
th1.start();
daqius.add(daqiu1);
daqius.add(daqiu2);
daqius.add(daqiu3);
daqius.add(daqiu4);
}
}
//加入到DelayQueue的方法必須要實(shí)現(xiàn)Delayed接口博肋,而Delayed繼承Comparable接口
class Daqiu implements Delayed{
//打球的人
private String name;
//截止時(shí)間
private long endTime;
//定義時(shí)間工具類
private TimeUnit timeUnit = TimeUnit.SECONDS;
public Daqiu(String name,long endTime){
this.name = name;
this.endTime = endTime;
}
@Override
public String toString() {
return "打球的人為:"+this.name;
}
//使用給定的時(shí)間單位返回與當(dāng)前對(duì)象關(guān)聯(lián)的剩余時(shí)間
@Override
public long getDelay(TimeUnit unit) {
return endTime - System.currentTimeMillis();
}
//compareTo是根據(jù)getDelay方法提供指定的順序
@Override
public int compareTo(Delayed delayed) {
Daqiu daqiu = (Daqiu)delayed;
return this.getDelay(this.timeUnit) - daqiu.getDelay(this.timeUnit) > 0 ? 1:0;
}
}
SynchronousQueue
A BlockingQueue blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. The head of the queue is the element that the first queued inserting thread is trying to add to the queue; if there is no such queued thread then no element is available for removal and poll() will return null. For purposes of other Collection methods (for example contains), a SynchronousQueue acts as an empty collection. This queue does not permit null elements.
總結(jié)一下:
不存儲(chǔ)元素的阻塞隊(duì)列低斋。每一個(gè)插入(put)操作必須等待一個(gè)刪除(take)操作蜂厅,否則不能繼續(xù)添加元素。阻塞表現(xiàn)在你不能peek一個(gè)synchronous隊(duì)列除非存在另一個(gè)線程存在并且去刪除這個(gè)隊(duì)列拔稳,不能去插入一個(gè)元素除非另外一個(gè)線程去刪除它葛峻。只有另外一個(gè)線程在刪除元素的時(shí)候才能遍歷。
它支持公平訪問隊(duì)列巴比。默認(rèn)情況下線程采用非公平性策略訪問隊(duì)列术奖。使用以下構(gòu)造方法可以創(chuàng)建公平性訪問的SynchronousQueue,如果設(shè)置為true轻绞,則等待的線程會(huì)采用先進(jìn)先出的順序訪問隊(duì)列采记。
SynchronousQueue可以看成是一個(gè)傳球手,負(fù)責(zé)把生產(chǎn)者線程處理的數(shù)據(jù)直接傳遞給消費(fèi)者線程政勃。隊(duì)列本身并不存儲(chǔ)任何元素唧龄,非常適合傳遞性場(chǎng)景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue奸远。
/**
* take方法睡眠3s既棺,那么insert操作就阻塞3s等待去take的時(shí)候才能去消費(fèi)它
*/
public class SynchronousQueueTest {
public static void main(String[] args) throws Exception{
final SynchronousQueue<String> q = new SynchronousQueue<>();
Thread t1 = new Thread(() -> {
try {
String value = q.take();
Thread.sleep(3000);
System.out.println(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
Thread t2 = new Thread(() -> q.add("hello"));
t2.start();
}
}
其實(shí)看了源碼也是使用LockSupport工具類的加鎖方法實(shí)現(xiàn)阻塞的。
LinkedTransferQueue
其javadoc:
An unbounded {@link TransferQueue} based on linked nodes. This queue orders elements FIFO (first-in-first-out) with respect to any given producer. The head of the queue is that element that has been on the queue the longest time for some producer. The tail of the queue is that element that has been on the queue the shortest time for some producer.
Beware that, unlike in most collections, the size method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal.
Additionally, the bulk operations addAll, removeAll, retainAll, containsAll, equals, and toArray are not guaranteed to be performed atomically. For example, an iterator operating concurrently with an addAll operation might view only some of the added elements.
總結(jié)一下:
LinkedTransferQueue是一個(gè)由鏈表結(jié)構(gòu)組成的無界阻塞TransferQueue隊(duì)列懒叛。先進(jìn)先出丸冕。head元素是隊(duì)列中最長(zhǎng)的生產(chǎn)者,tail是隊(duì)列中最短時(shí)間的生產(chǎn)者薛窥。
不同于其他的集合胖烛,size方法不是恒定不變的,因?yàn)殛?duì)列是異步的诅迷,確定當(dāng)前元素的數(shù)目需要去遍歷該元素佩番,在遍歷的時(shí)候修改集合會(huì)對(duì)結(jié)果產(chǎn)生影響。
相對(duì)于其他阻塞隊(duì)列罢杉,LinkedTransferQueue多了tryTransfer和transfer方法趟畏。
如果當(dāng)前有消費(fèi)者正在等待接收元素(消費(fèi)者使用take()方法或帶時(shí)間限制的poll()方法時(shí)),transfer方法可以把生產(chǎn)者傳入的元素立刻transfer(傳輸)給消費(fèi)者滩租。如果沒有消費(fèi)者在等待接收元素拱镐,transfer方法會(huì)將元素存放在隊(duì)列的tail節(jié)點(diǎn),并等到該元素被消費(fèi)者消費(fèi)了才返回持际。
tryTransfer方法是用來試探生產(chǎn)者傳入的元素是否能直接傳給消費(fèi)者。如果沒有消費(fèi)者等待接收元素哗咆,則返回false蜘欲。和transfer方法的區(qū)別是tryTransfer方法無論消費(fèi)者是否接收,方法立即返回晌柬,而transfer方法是必須等到消費(fèi)者消費(fèi)了才返回姥份。
對(duì)于帶有時(shí)間限制的tryTransfer(E e郭脂,long timeout,TimeUnit unit)方法澈歉,試圖把生產(chǎn)者傳入的元素直接傳給消費(fèi)者展鸡,但是如果沒有消費(fèi)者消費(fèi)該元素則等待指定的時(shí)間再返回,如果超時(shí)還沒消費(fèi)元素埃难,則返回false莹弊,如果在超時(shí)時(shí)間內(nèi)消費(fèi)了元素,則返回true涡尘。
/**
* tryTransfer與transfer的區(qū)別就是一個(gè)有返回值忍弛,前者有線程等待則返回true并且將參數(shù)(元素)直接推送給消費(fèi)端,沒有消費(fèi)者則直接返回false
* 后者不返回這個(gè)標(biāo)志
*/
public class LinkedTransferQueueTest {
public static void main(String[] args) throws Exception{
final LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();
//queue.transfer("bbb");
new Thread(() -> {
try {
System.out.println("--------");
String value = queue.take();
System.out.println(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(1000);
//tryTransfer將元素立即傳遞給消費(fèi)者考抄,如果當(dāng)前有消費(fèi)者等待則返回true细疚,如果沒有則返回false
boolean flag = queue.tryTransfer("aaa");
System.out.println(flag);
new Thread(() -> {
System.out.println("---------");
try {
String value = queue.poll(4, TimeUnit.SECONDS);
System.out.println(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(3000);
//Thread.sleep(10000);
queue.transfer("bbb");
System.out.println("-----end-------");
}
}
LinkedBlockingDeque
LinkedBlockingDeque是一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。所謂雙向隊(duì)列指的是可以從隊(duì)列的兩端插入和移出元素川梅。雙向隊(duì)列因?yàn)槎嗔艘粋€(gè)操作隊(duì)列的入口疯兼,在多線程同時(shí)入隊(duì)時(shí),也就減少了一半的競(jìng)爭(zhēng)贫途。相比其他的阻塞隊(duì)列吧彪,LinkedBlockingDeque多了addFirst、addLast潮饱、offerFirst来氧、offerLast、peekFirst和peekLast等方法香拉,以First單詞結(jié)尾的方法啦扬,表示插入、獲荣炻怠(peek)或移除雙端隊(duì)列的第一個(gè)元素扑毡。以Last單詞結(jié)尾的方法,表示插入盛险、獲取或移除雙端隊(duì)列的最后一個(gè)元素瞄摊。
//由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列
public class LinkedBlockingDequeTest {
public static void main(String[] args) {
LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<>();
deque.addFirst("a");
deque.addFirst("b");
deque.addFirst("c");
deque.addLast("e");
deque.addFirst("f");
System.out.println(deque);
System.out.println(deque.getLast());
System.out.println(deque.peekLast());
}
}
以上BlockingQueue的實(shí)現(xiàn)。