前言
java提供了很多阻塞隊(duì)列,在平時(shí)的開發(fā)中也會使用到,所以在此對java提供的阻塞隊(duì)列進(jìn)行一個(gè)了解總結(jié)
首先
java的阻塞隊(duì)列都繼承與BlockingQueue,其有共同的方法
boolean offer(Object o);//將數(shù)據(jù)o加入隊(duì)列中,加入成功返回true,失敗則為false,此方法不阻塞
boolean offer(Object o,long timeout,TimeUnit unit);//將o加入隊(duì)列中,若timeout過后為未加入成功返回false,否則返回true,此方法會阻塞等待,unit為時(shí)間單位
put(Object o);//將數(shù)據(jù)o加入隊(duì)列中,若隊(duì)列沒有空間則會阻塞當(dāng)前的線程進(jìn)行等待
Object poll();//取走隊(duì)列頭部的數(shù)據(jù),如取不出則返回null
Object poll(long timeout,TimeUnit unit);//取走隊(duì)列頭部的數(shù)據(jù),若取不出則等待timeout,等待后取不出返回null
Object take();//取出隊(duì)列首部數(shù)據(jù),若取不出則阻塞直到取出來
int drainTo(Collection c);//取出隊(duì)列中所有數(shù)據(jù)放到容器c中,其中的排序?yàn)殛?duì)列中的排序,返回取出的數(shù)量
int drainTo(Collection c,int maxLength);//取出隊(duì)列的數(shù)據(jù),最大數(shù)量為maxLength,返回實(shí)際獲取得數(shù)量
其他隊(duì)列基本都有實(shí)現(xiàn)以下的方法:
peek()與poll()功能一樣
隊(duì)列有add()方法,其內(nèi)部實(shí)現(xiàn)與put()基本都是一樣的.而且還有許多方法與列表方法一樣用的,如size(),remove(),clear(),遍歷時(shí)使用迭代器遍歷.其中remove()為刪除隊(duì)頭
隊(duì)列的有界無界表明其是否指定或限定隊(duì)列大小
ArrayBlockingQueue
用數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列,按照先進(jìn)先出的原則.
public class ArrayBlockingQueueTest {
ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(5,true);//定義隊(duì)列大小為5,隊(duì)列為先進(jìn)先出順序,false為未指定順序
ExecutorService executorService = Executors.newSingleThreadExecutor();//單線程的線程池
public ArrayBlockingQueueTest() {
for (int i = 0;i<6;i++){
put("數(shù)據(jù):"+i);//放入數(shù)據(jù)
}
}
public void put(String data){
try {
arrayBlockingQueue.put(data);//向隊(duì)列中放入數(shù)據(jù)
} catch (InterruptedException e) {
e.printStackTrace();
}
handle();//開啟線程處理
}
public void handle(){
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("處理中...");
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String data = null;
try {
data = arrayBlockingQueue.poll(1, TimeUnit.SECONDS);//取出數(shù)據(jù),1秒內(nèi)取不出返回null
} catch (InterruptedException e) {
e.printStackTrace();
}
if ( data != null) {
System.out.println(data+"處理結(jié)束...");
}else {
System.out.println("無數(shù)據(jù)處理...");
}
}
});
}
}
LinkedBlockingQueue
基于鏈表的阻塞隊(duì)列,按照先進(jìn)先出,其加入隊(duì)列與取出隊(duì)列的線程使用獨(dú)立的鎖來控制同步,所以其有更高的并發(fā)效率,需要注意的是在初始化時(shí)如果不指定長度會默認(rèn)為無限長,這有可能會占用較多的資源,使用方法與ArrayBlockingQueue一致.
PriorityBlockingQueue
無界優(yōu)先級隊(duì)列,默認(rèn)為升序,與Arrays.sort()方法排序類似,在初始化時(shí)可以指定其初始長度,默認(rèn)為11,增長數(shù)量為當(dāng)大于等于64長度時(shí),為原長度的1.5被,當(dāng)小于64的時(shí)候?yàn)樵L度的2倍+2.初始化時(shí)可設(shè)置其排序的比較規(guī)則Comparator(),也可重寫其compareTo方法.其無法排序同優(yōu)先級
public class PriorityBlockingQueueTest {
PriorityBlockingQueue<String> priorityBlockingQueue;
String[] strings;
Comparator<String> mComparator = new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return o2.length() - o1.length();
}
@Override
public boolean equals(Object obj) {
return false;
}
};
public PriorityBlockingQueueTest() {
strings = new String[]{"666", "6626", "6645457645661234566", "6612423564566", "6644564564564564564564566", "664566", "664564566", "664566"};
priorityBlockingQueue = new PriorityBlockingQueue<>(11,mComparator);
priorityBlockingQueue.put("666");
priorityBlockingQueue.put("6626");
priorityBlockingQueue.put("6645457645661234566");
priorityBlockingQueue.put("6612423564566");
priorityBlockingQueue.put("6644564564564564564564566");
priorityBlockingQueue.put("664566");
priorityBlockingQueue.put("664564566");
priorityBlockingQueue.put("664566");
Iterator<String> iterator = priorityBlockingQueue.iterator();
while (iterator.hasNext()) {
System.out.println(iterator.next());
}
System.out.println("-----------------------------------------");
Arrays.sort(strings,mComparator);
for (String data : strings){
System.out.println(data);
}
}
}
運(yùn)行結(jié)果
經(jīng)試驗(yàn)驗(yàn)證,其內(nèi)部的排序使用二分法排序,如下代碼,結(jié)果與想象的不一樣,具體原因還有待研究
private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
Comparator<? super T> cmp) {
while (k > 0) {
int parent = (k - 1) >>> 1;//取中間值比較
Object e = array[parent];
if (cmp.compare(x, (T) e) >= 0)//此處遇到返回大于等于0的就退出,退出操作有點(diǎn)問題
break;
array[k] = e;
k = parent;
}
array[k] = x;
}
DelayQueue
延時(shí)取出的無界隊(duì)列,基于PriorityQueue實(shí)現(xiàn)的,加入的元素需要實(shí)現(xiàn)Delayed接口,目前并不會使用
SynchronousQueue
不存儲元素的隊(duì)列,當(dāng)插入的一個(gè)元素,必須等待其他線程移除才可繼續(xù)運(yùn)行,當(dāng)移除一個(gè)元素,如使用remove且無插入在等待會報(bào)錯(cuò),使用take()會等到有元素插入了才返回,否則阻塞,其他的有就有沒有就返回null.
public class SynchronoutsQueueTest {
SynchronousQueue<String> stringSynchronousQueue = new SynchronousQueue<>();
public SynchronoutsQueueTest() {
new MyThread().start();
try {
stringSynchronousQueue.put("666");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("----------putEndMain----------");
try {
System.out.println("----------poll:"+stringSynchronousQueue.take()+"Main----------");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
class MyThread extends Thread{
@Override
public void run() {
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("----------poll:"+stringSynchronousQueue.poll()+"Thread----------");
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
stringSynchronousQueue.put("666");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("----------putEndThread----------");
}
}
}
運(yùn)行結(jié)果
LinkedTransferQueue
無界的鏈表隊(duì)列,用于生產(chǎn)者等待消費(fèi)者的需求,其實(shí)現(xiàn)了TransferQueue接口,其中有幾個(gè)重要方法
transfer(Object o);//加入隊(duì)列,如加入時(shí)沒有取出隊(duì)列操作,會阻塞等待取出.
boolean tryTransfer(Object o);//加入隊(duì)列,如有取出的操作在等待則加入并返回true,否則不加入并返回false
boolean tryTransfer(Object o,long timeout,TimeUnit unit);//有超時(shí)機(jī)制的tryTransfer
public class LinkedTransferQueueTest {
LinkedTransferQueue<String> linkedTransferQueue = new LinkedTransferQueue<>();
public LinkedTransferQueueTest() {
new TestThread().start();
while (true) {
try {
Thread.sleep(3000);
System.out.println("消費(fèi):" + linkedTransferQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class TestThread extends Thread{
@Override
public void run() {
while (true) {
try {
System.out.println("出產(chǎn)...");
linkedTransferQueue.transfer("34636");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
運(yùn)行結(jié)果
在每次出產(chǎn)后都會等待消費(fèi)后再進(jìn)行下一次的出產(chǎn)
LinedBlockingDeque
雙向阻塞隊(duì)列,在初始化時(shí)可設(shè)置其最大容量,默認(rèn)為int的最大值,隊(duì)列提供了addFirst、addLast全释、offerFirst装处、offerLast、peekFirst浸船、peekLast等方法進(jìn)行對隊(duì)頭和隊(duì)尾的操作妄迁。
小結(jié)
java提供的阻塞隊(duì)列使用起來也是挺便利的,雖然有些可能寫錯(cuò)了,希望大家指出糾正.
個(gè)人博客:https://lewis-v.github.io/
公眾號</article>