阻塞隊列
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時屡萤,獲取元素的線程會等待隊列變?yōu)榉强照浣!.旉犃袧M時,存儲元素的線程會等待隊列可用死陆。阻塞隊列常用于生產(chǎn)者和消費者的場景招拙,生產(chǎn)者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程措译。阻塞隊列就是生產(chǎn)者存放元素的容器别凤,而消費者也只從容器里拿元素。下面是 java 常見的阻塞隊列领虹。
ArrayBlockingQueue :一個由數(shù)組結(jié)構(gòu)組成的有界阻塞隊列规哪。
LinkedBlockingQueue :一個由鏈表結(jié)構(gòu)組成的有界阻塞隊列。
PriorityBlockingQueue :一個支持優(yōu)先級排序的無界阻塞隊列塌衰。
DelayQueue:一個使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列诉稍。
SynchronousQueue:一個不存儲元素的阻塞隊列。
LinkedTransferQueue:一個由鏈表結(jié)構(gòu)組成的無界阻塞隊列最疆。
LinkedBlockingDeque:一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列杯巨。
基本簡介
DelayQueue是一個無界阻塞隊列然爆,只有在延遲期滿時才能從中提取元素座菠。該隊列的頭部是延遲期滿后保存時間最長的Delayed 元素像屋。
DelayQueue是一個用來延時處理的隊列佑惠,所謂延時處理就是說可以為隊列中元素設(shè)定一個過期時間,相關(guān)的操作受到這個設(shè)定時間的控制劳澄。
使用場景
a) 關(guān)閉空閑連接童本。服務器中摄职,有很多客戶端的連接烙荷,空閑一段時間之后需要關(guān)閉之镜会。
b) 緩存。緩存中的對象终抽,超過了空閑時間,需要從緩存中移出。
c) 任務超時處理昼伴。在網(wǎng)絡(luò)協(xié)議滑動窗口請求應答式交互時匾旭,處理超時未響應的請求。
如果不使用DelayQueue圃郊,那么常規(guī)的解決辦法就是:使用一個后臺線程价涝,遍歷所有對象,挨個檢查持舆。這種笨笨的辦法簡單好用色瘩,但是對象數(shù)量過多時,可能存在性能問題逸寓,檢查間隔時間不好設(shè)置居兆,間隔時間過大,影響精確度竹伸,過小則存在效率問題泥栖。而且做不到按超時的時間順序處理。
基本原理
- 首先勋篓,這種隊列中只能存放實現(xiàn)Delayed接口的對象吧享,而此接口有兩個需要實現(xiàn)的方法。最重要的就是getDelay譬嚣,這個方法需要返回對象過期前的時間钢颂。簡單說,隊列在某些方法處理前拜银,會調(diào)用此方法來判斷對象有沒有超時殊鞭。
- 其次,DelayQueue是一個BlockingQueue盐股,其特化的參數(shù)是Delayed钱豁。(不了解BlockingQueue的同學,先去了解BlockingQueue再看本文)
- Delayed擴展了Comparable接口疯汁,比較的基準為延時的時間值牲尺,Delayed接口的實現(xiàn)類getDelay的返回值應為固定值(final)。DelayQueue內(nèi)部是使用PriorityQueue實現(xiàn)的幌蚊。
總結(jié)谤碳,DelayQueue的關(guān)鍵元素BlockingQueue、PriorityQueue溢豆、Delayed蜒简。可以這么說漩仙,DelayQueue是一個使用優(yōu)先隊列(PriorityQueue)實現(xiàn)的BlockingQueue搓茬,優(yōu)先隊列的比較基準值是時間犹赖。本質(zhì)上即:
DelayQueue = BlockingQueue +PriorityQueue + Delayed
他們的基本定義如下
public interface Comparable<T> {
public int compareTo(T o);
}
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
public class DelayQueue<E extends Delayed> implements BlockingQueue<E> {
private final PriorityQueue<E> q = new PriorityQueue<E>();
}
基本用法
/**
* 延遲隊列示例
*/
public class DelayQueueTester {
private static DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
static class DelayTask implements Delayed {
// 延遲時間
private final long delay;
// 到期時間
private final long expire;
// 數(shù)據(jù)
private final String msg;
// 創(chuàng)建時間
private final long now;
/**
* 初始化 DelayTask 對象
*
* @param delay 延遲時間 單位:微妙
* @param msg 業(yè)務信息
*/
DelayTask(long delay, String msg) {
this.delay = delay; // 延遲時間
this.msg = msg; // 業(yè)務信息
this.now = Instant.now().toEpochMilli();
this.expire = now + delay; // 到期時間 = 當前時間+延遲時間
}
/**
* 獲取延遲時間
*
* @param unit 單位對象
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expire - Instant.now().toEpochMilli(), TimeUnit.MILLISECONDS);
}
/**
* 比較器
* 比較規(guī)則:延遲時間越長的對象越靠后
*
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
if (o == this) // compare zero ONLY if same object
return 0;
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "DelayTask{" +
"delay=" + delay +
", expire=" + expire +
", msg='" + msg + '\'' +
", now=" + now +
'}';
}
}
/**
* 生產(chǎn)者線程
*
* @param args
*/
public static void main(String[] args) {
initConsumer();
try {
// 等待消費者初始化完畢
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
delayQueue.add(new DelayTask(1000, "Task1"));
delayQueue.add(new DelayTask(2000, "Task2"));
delayQueue.add(new DelayTask(3000, "Task3"));
delayQueue.add(new DelayTask(4000, "Task4"));
delayQueue.add(new DelayTask(5000, "Task5"));
}
/**
* 初始化消費者線程
*/
private static void initConsumer() {
Runnable task = () -> {
while (true) {
try {
System.out.println("嘗試獲取延遲隊列中的任務。" + LocalDateTime.now());
System.out.println(delayQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Thread consumer = new Thread(task);
consumer.start();
}
}
---
嘗試獲取延遲隊列中的任務卷仑。2017-04-05T18:28:03.282
DelayTask{delay=1000, expire=1491388087234, msg='Task1', now=1491388086234}
嘗試獲取延遲隊列中的任務峻村。2017-04-05T18:28:07.235
DelayTask{delay=2000, expire=1491388088235, msg='Task2', now=1491388086235}
嘗試獲取延遲隊列中的任務。2017-04-05T18:28:08.237
DelayTask{delay=3000, expire=1491388089235, msg='Task3', now=1491388086235}
嘗試獲取延遲隊列中的任務锡凝。2017-04-05T18:28:09.237
DelayTask{delay=4000, expire=1491388090235, msg='Task4', now=1491388086235}
嘗試獲取延遲隊列中的任務粘昨。2017-04-05T18:28:10.240
DelayTask{delay=5000, expire=1491388091235, msg='Task5', now=1491388086235}
嘗試獲取延遲隊列中的任務。2017-04-05T18:28:11.240
DelayQueue 實現(xiàn)原理
主要屬性
// 可以看看AbstractQueue 窜锯,實現(xiàn)了阻塞Queue接口
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
// 阻塞等待使用了可重入鎖张肾,只有一把
private final transient ReentrantLock lock = new ReentrantLock();
// 優(yōu)先隊列,用來對不同延遲任務的排序
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
* Thread designated to wait for the element at the head of
* the queue. This variant of the Leader-Follower pattern
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
* minimize unnecessary timed waiting. When a thread becomes
* the leader, it waits only for the next delay to elapse, but
* other threads await indefinitely. The leader thread must
* signal some other thread before returning from take() or
* poll(...), unless some other thread becomes leader in the
* interim. Whenever the head of the queue is replaced with
* an element with an earlier expiration time, the leader
* field is invalidated by being reset to null, and some
* waiting thread, but not necessarily the current leader, is
* signalled. So waiting threads must be prepared to acquire
* and lose leadership while waiting.
*/
// 這個Leader 有意思锚扎,解決了隊列頭的數(shù)據(jù)和線程的關(guān)聯(lián)
// 同時解決了其他線程由誰喚醒
private Thread leader = null;
/**
* Condition signalled when a newer element becomes available
* at the head of the queue or a new thread may need to
* become leader.
*/
// 與Leader Thread配合 喚醒等待的Leader或者新Leader替換
private final Condition available = lock.newCondition();
DelayQueue的take方法吞瞪,把優(yōu)先隊列q的first拿出來(peek),如果沒有達到延時閥值工秩,則進行await處理尸饺。
如下:
public E take() throws InterruptedException {
// 獲取鎖。每個延遲隊列內(nèi)聚了一個重入鎖助币。
final ReentrantLock lock = this.lock;
// 獲取可中斷的鎖浪听。
lock.lockInterruptibly();
try {
for (;;) {
// 嘗試從優(yōu)先級隊列中獲取隊列頭部元素
E first = q.peek();
if (first == null)
// 無元素,當前線程節(jié)點加入等待隊列眉菱,并阻塞當前線程
available.await();
else {
// 通過延遲任務的 getDelay 方法獲取延遲時間
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 延遲時間到期迹栓,獲取并刪除頭部元素。
return q.poll();
first = null; // don't retain ref while waiting
// 存在leader線程俭缓,則其他的線程進入時克伊,直接進入等待
if (leader != null)
available.await();
else {
// 獲取當前線程 說明線程變了
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 線程節(jié)點進入等待隊列 x 納秒。
available.awaitNanos(delay);
} finally {
// 等待完了华坦,該線程則設(shè)置為null
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 若還存在元素的話愿吹,則將等待隊列頭節(jié)點中的線程節(jié)點移動到同步隊列中。
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
Add
public boolean add(E e) {
return offer(e);
}
/**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return {@code true}
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
// 獲取到重入鎖
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
// 剛添加的元素成為頭節(jié)點
// 那之前的頭結(jié)點就直接廢掉
leader = null;
// 喚醒take等待的線程惜姐,重新走查一遍
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
Ref:
http://blog.csdn.net/kobejayandy/article/details/46833623