我們都知道Queue是一種操作線性表漾肮,可以用數(shù)組厂抖、鏈表來實現(xiàn)。
Queue的特點是先進先出克懊。
存在問題:
如果我們使用普通的數(shù)組來實現(xiàn)Queue忱辅,一定會存在一個問題,每次出對的時候谭溉,從數(shù)組的第一位開始取墙懂,那么只要出對完成,就會涉及到整個數(shù)組搬遷扮念,會導(dǎo)致時間最差復(fù)雜度成為O(n) 损搬。單純從設(shè)計Queue來講,我覺得是應(yīng)該要避免這樣的搬遷操作。
循環(huán)數(shù)組
我們都知道數(shù)組是一塊連續(xù)的內(nèi)存空間巧勤。但是循環(huán)數(shù)組嵌灰,確實不多見。如何理解颅悉?
如上圖沽瞭,是一個大神的畫的,看著挺復(fù)雜的剩瓶,實現(xiàn)起來并沒有那么復(fù)雜驹溃,只要抓住核心tail和head 即可,還有一個很重要的一個算法 (tail + 1) % n 儒搭,如下代碼吠架。
public class CircularQueue {
// 數(shù)組:items,數(shù)組大新辍:n
private String[] items;
private int n = 0;
// head表示隊頭下標(biāo)傍药,tail表示隊尾下標(biāo)
private int head = 0;
private int tail = 0;
// 申請一個大小為capacity的數(shù)組
public CircularQueue(int capacity) {
items = new String[capacity];
n = capacity;
}
// 入隊
public boolean enqueue(String item) {
// 隊列滿了
if ((tail + 1) % n == head) return false;
items[tail] = item;
tail = (tail + 1) % n;
return true;
}
// 出隊
public String dequeue() {
// 如果head == tail 表示隊列為空
if (head == tail) return null;
String ret = items[head];
head = (head + 1) % n;
return ret;
}
}
這是一段使用循環(huán)數(shù)組實現(xiàn)簡單Queue的代碼,這樣實現(xiàn)完全解決了上面的問題魂仍,出對的時候時間復(fù)雜度從O(n) 降成O(1)拐辽。
如果仔細看代碼你會發(fā)現(xiàn),在執(zhí)行到臨界點的時候擦酌,即隊列滿了俱诸,會存在有一個位置為空,內(nèi)存如此寶貴怎么能忍赊舶。
有個簡單操作睁搭,show you code。
public class CircularQueue {
// 數(shù)組:items笼平,數(shù)組大性奥妗:n
private String[] items;
private int n = 0;
// 定義數(shù)組的長度
private int size = 0;
// head表示隊頭下標(biāo),tail表示隊尾下標(biāo)
private int head = 0;
private int tail = 0;
// 申請一個大小為capacity的數(shù)組
public CircularQueue(int capacity) {
items = new String[capacity];
n = capacity;
}
// 入隊
public boolean enqueue(String item) {
if (size == n) {
return false;
}
// 隊列滿了
items[tail] = item;
tail = (tail + 1) % n;
size++;
return true;
}
// 出隊
public String dequeue() {
// 如果head == tail 表示隊列為空
if (size == 0) {
return null;
}
String ret = items[head];
head = (head + 1) % n;
size--;
return ret;
}
}
上述簡單介紹了循環(huán)隊列寓调,設(shè)計確認比較巧妙锌唾,對于一些底層的編碼,什么都要節(jié)省夺英,如果有方式方法能節(jié)省就節(jié)省吧晌涕,畢竟現(xiàn)在我們都是天天把節(jié)省成本掛在嘴邊。
重點看下ArrayBlockingQueue
之前一直用ArrayBlockingQueue痛悯,也看過幾次源碼余黎,看了也忘記了。一直沒有抓住核心點载萌。這次看到了循環(huán)數(shù)組突然對ArrayBlockingQueue有些好奇驯耻。
是不是也是循環(huán)數(shù)組的方式來實現(xiàn)Queue
我們看下ArrayBlockingQueue源碼亲族,其他方法可以先不著急去看,先去看核心的方法enqueue和dequeue
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
從上述代碼來看可缚,涉及到幾個成員變量和ArrayBlockingQueue構(gòu)造函數(shù)
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and the specified access policy.
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
其中takeIndex霎迫、putIndex、count 都是int 類型帘靡,默認為0知给。
notEmpty 表示隊列為空的時候,出對操作的線程狀態(tài)變成等待狀態(tài)描姚,并釋放鎖涩赢,入隊操作的時候,會通知喚醒等待時間最長的線程轩勘。
notFull 表示隊列為滿的時候筒扒,入隊操作線程狀態(tài)變成等待狀態(tài),并釋放鎖绊寻,出對操作的時候花墩,會通知喚醒等待時間最長的線程。
從ArrayBlockingQueue 中enqueue 和dequeue 源碼看很容易理解使用的就是循環(huán)數(shù)組來實現(xiàn)的澄步,如下代碼
if (++putIndex == items.length)
putIndex = 0;
if (++takeIndex == items.length)
takeIndex = 0;
光看enqueue 和dequeue 源碼發(fā)現(xiàn)其實整個邏輯非常不嚴謹冰蘑,不嚴謹也沒關(guān)系,主要看他們倆都是私有方法村缸。更何況有個核心成員變量 count還沒有涉及到祠肥。
ArrayBlockingQueue 對外提供的出對和入對操作都是成對,我們一一看來
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and {@code false} if this queue
* is full. This method is generally preferable to method {@link #add},
* which can fail to insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
offer和poll方法中 通過count 來 判斷隊列是否滿了梯皿,或者是否為空隊列仇箱,前提都加了Lock,為了保證線程安全东羹,出入對都統(tǒng)一鎖了剂桥。
到這里可能會有疑問阻塞去哪了?
我們再看另外一對的出入對方法
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
notFull.await() 和 notEmpty.await() 使用上了百姓,表示這倆個操作是阻塞的,很容易理解况木。
這里面有一個細節(jié)點需要注意
while (count == items.length)
notFull.await();
while (count == 0)
notEmpty.await();
我剛開始看的時候垒拢,我也有疑問為什么使用while而非if判斷,其實是做了雙重保證火惊,防止過早或者意外通知喚醒求类。
再看下最后一對
/**
* Inserts the specified element at the tail of this queue, waiting
* up to the specified wait time for space to become available if
* the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
理解了上面?zhèn)z對出入對的方法,看到這對也是比較容易理解屹耐,加了阻塞時間而已尸疆。
總結(jié)
- 在我們經(jīng)常使用基于數(shù)組的隊列,里面也是使用循環(huán)數(shù)組的思想
- 循環(huán)數(shù)組的核心,就是一個圓環(huán)寿弱,通過tail 和head指針來定義出入對的索引犯眠。
- 循環(huán)數(shù)組的實現(xiàn),有很多種症革,只要理解了思想筐咧,怎么實現(xiàn)就都可以。
- ArrayBlockingQueue 是一種阻塞隊列噪矛,也提供非阻塞的線程安全出入對方法量蕊。
- ArrayBlockingQueue 阻塞實現(xiàn)是通過lock Condition來實現(xiàn)的。
今天是辛丑年最后一天艇挨,除夕残炮,回不了自己的家鄉(xiāng),只能待在杭城缩滨。
在此祝所有的親朋好友除夕安康势就,來年虎虎生威!