身份越來越多,自己越來越少淘正。 — 《一念天堂》
寫在前面
阻塞隊列常用于生產者和消費者的場景祸穷,生產者就是往隊列中放入元素,消費者就是從隊列中獲取元素停蕉,阻塞隊列就是生產者存放元素的容器愕鼓,而消費者也從該容器中拿元素。
阻塞隊列有兩種常見的阻塞場景慧起,滿足這兩種阻塞場景的隊列就是阻塞隊列菇晃,分別如下:
- 當隊列中沒有元素的情況下,消費者端的所有線程會被自動阻塞蚓挤,直到生產者往隊列中放入元素磺送,線程會被自動喚醒驻子。
- 當隊列中元素填滿的情況下,生產者端的所有線程會被自動阻塞估灿,直到消費者從隊列中獲取元素崇呵,線程會被自動喚醒。
Java中的阻塞隊列
Java中提供了7個阻塞隊列馅袁,分別如下:
- ArrayBlockingQueue:由數(shù)組結構組成的有界阻塞隊列域慷,按照先進先出的原則對元素進行排序,支持公平鎖和非公平鎖汗销。
- LinkedBlockingQueue:由鏈表結構組成的有界阻塞隊列犹褒,按照先進先出的原則對元素進行排序,默認長度為Integer.MAX_VALUE大溜。
- PriorityBlockingQueue:支持優(yōu)先級排序的無界阻塞隊列化漆,默認自然序對元素進行排序,可以自定義實現(xiàn)compareTo()方法指定排序規(guī)則钦奋,不保證同優(yōu)先級元素的順序座云。
- DelayedQueue:使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列,在創(chuàng)建元素時付材,可以指定多久才能從隊列中獲取元素朦拖,只有延時期滿后才能從隊列中獲取元素。
- SynchronousQueue:不存儲元素的阻塞隊列厌衔,每一個put操作都要等待take操作璧帝,否則不能添加元素,支持公平鎖和非公平鎖富寿。
- LinkedTransferQueue:由鏈表結構組成的無界阻塞隊列睬隶,相當于其他隊列,多了transfer和tryTransfer方法页徐。
- LinkedBlockingDeque:由鏈表結構組成的雙向阻塞隊列苏潜,隊首和隊尾都可以添加和移除元素,多線程并發(fā)時变勇,可以將鎖的競爭最多降到一半恤左。
ArrayBlockingQueue和LinkedBlockingQueue一般為常用的阻塞隊列。
阻塞隊列的使用
接下來通過一個Demo演示阻塞隊列的用法搀绣。
public class MainActivity extends AppCompatActivity {
private static final String TAG = MainActivity.class.getSimpleName();
private ArrayBlockingQueue<String> mBlockingQueue = new ArrayBlockingQueue<>(10);
private Producer mProducer;
private Consumer mConsumer;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
mProducer = new Producer();
mProducer.start();
mConsumer = new Consumer();
mConsumer.start();
}
@Override
protected void onStop() {
super.onStop();
mProducer.stopProduct();
mConsumer.stopConsume();
}
/**
* 生產者
*/
private class Producer extends Thread {
private volatile boolean isStop;
private int event;
public void stopProduct() {
isStop = true;
}
@Override
public void run() {
super.run();
while (!isStop) {
try {
// 事件 - 5 發(fā)送完成后飞袋,睡2秒
if (event == 5) {
Thread.sleep(2000);
}
// 事件 - 10 發(fā)送完成后,調用stopProduct()
if (event == 10) {
stopProduct();
}
event++;
// 隊列中沒有空余位置链患,生產者端的線程進入阻塞狀態(tài)巧鸭,直到消費者端的線程從隊列中拿元素,喚醒生產者端的線程繼續(xù)執(zhí)行
mBlockingQueue.put("事件 - " + event);
Log.d(TAG, "生產者 生產事件 = " + event);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 消費者
*/
private class Consumer extends Thread {
private volatile boolean isStop;
public void stopConsume() {
isStop = true;
}
@Override
public void run() {
super.run();
while (!isStop) {
try {
// 拿不到元素消費者端的線程進入阻塞狀態(tài)麻捻,直到生產者端的線程往隊列中放入元素蹄皱,喚醒消費者端的線程繼續(xù)執(zhí)行
String event = mBlockingQueue.take();
Log.d(TAG, "消費者 消費事件 = " + event);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
這里維護了一個ArrayBlockingQueue览闰,并指定其大小為10芯肤,創(chuàng)建了一個生產者線程和一個消費者線程巷折,生產者線程在生產5個事件后睡兩秒鐘,消費者線程在消費完“事件 - 5”后由于從隊列中拿不到元素崖咨,就會自動阻塞锻拘,等待生產者往隊列中放入元素,只要隊列中有生產者放入元素击蹲,就會立即喚醒消費者線程繼續(xù)獲取元素署拟,詳見以下Log:
2019-09-02 21:23:58.822 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 1
2019-09-02 21:23:58.822 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 2
2019-09-02 21:23:58.822 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 1
2019-09-02 21:23:58.823 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 2
2019-09-02 21:23:58.823 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 3
2019-09-02 21:23:58.823 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 3
2019-09-02 21:23:58.823 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 4
2019-09-02 21:23:58.823 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 4
2019-09-02 21:23:58.823 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 5
2019-09-02 21:23:58.823 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 5
生產者線程睡2秒,繼續(xù)生產事件
2019-09-02 21:24:00.825 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 6
2019-09-02 21:24:00.825 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 6
2019-09-02 21:24:00.825 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 7
2019-09-02 21:24:00.825 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 8
2019-09-02 21:24:00.825 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 7
2019-09-02 21:24:00.825 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 9
2019-09-02 21:24:00.826 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 8
2019-09-02 21:24:00.826 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 10
2019-09-02 21:24:00.826 18692-18764/com.chad.blockingqueue D/MainActivity: 生產者 生產事件 = 11
2019-09-02 21:24:00.826 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 9
2019-09-02 21:24:00.826 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 10
2019-09-02 21:24:00.826 18692-18765/com.chad.blockingqueue D/MainActivity: 消費者 消費事件 = 事件 - 11
阻塞隊列的原理
下面通過分析ArrayBlockingQueue的原理加深對阻塞隊列的理解歌豺。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 存放元素的數(shù)組 */
final Object[] items;
/** 隊首元素下標 */
int takeIndex;
/** 隊尾元素下標 */
int putIndex;
/** 當前隊列中存放的元素總數(shù) */
int count;
/** 重入鎖 */
final ReentrantLock lock;
/** 等待獲取元素的條件 */
private final Condition notEmpty;
/** 等待放入元素的條件 */
private final Condition notFull;
/** 構造函數(shù)推穷,參數(shù)capacity為該隊列的容量 */
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/** 構造函數(shù),參數(shù)capacity為該隊列的容量类咧,參數(shù)fair為重入鎖是公平鎖還是非公平鎖 */
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/** 構造函數(shù)馒铃,構造函數(shù),參數(shù)capacity為該隊列的容量痕惋,參數(shù)fair為重入鎖是公平鎖還是非公平鎖区宇,參數(shù)c為初始隊列的元素集合 */
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
// 得到鎖
lock.lock(); // Lock only for visibility, not mutual exclusion
// 遍歷元素集合,初始化元素數(shù)組
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
// 釋放鎖
lock.unlock();
}
}
/**
* 放入元素
* 如果元素數(shù)組沒有空余位置值戳,不會阻塞消費者端的線程议谷,直接返回false
* 如果元素數(shù)組還有空余位置,調用enqueue()函數(shù)堕虹,并且返回true
*/
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 得到鎖
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
// 釋放鎖
lock.unlock();
}
}
/**
* 放入元素
* 如果元素數(shù)組沒有空余位置卧晓,調用notFull.awaitNanos(nanos),會使生產者端的線程進入阻塞狀態(tài)等待一段時間赴捞,
* 當?shù)却瑫r后逼裆,如果元素數(shù)組依然沒有空余位置,直接返回false
* 如果元素數(shù)組還有空余位置螟炫,調用enqueue()函數(shù)波附,并且返回true
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 得到可中斷的鎖
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
// 釋放鎖
lock.unlock();
}
}
/**
* 放入元素
* 如果元素數(shù)組沒有空余位置,調用notFull.await()使生產者端的線程進入阻塞狀態(tài)昼钻,
* 直到有消費者從隊列中獲取元素并且會喚醒生產者端進入阻塞狀態(tài)的線程繼續(xù)執(zhí)行
* 如果元素數(shù)組還有空余位置掸屡,調用enqueue()函數(shù)
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 得到可中斷的鎖
lock.lockInterruptibly();
try {
while (count == items.length)
// 使生產者端的線程進入阻塞狀態(tài)
notFull.await();
enqueue(e);
} finally {
// 釋放鎖
lock.unlock();
}
}
/**
* 放入元素(核心函數(shù))
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 喚醒消費端因隊列沒有元素獲取而進入阻塞狀態(tài)的線程繼續(xù)執(zhí)行
notEmpty.signal();
}
/**
* 獲取元素
* 如果元素數(shù)組中沒有元素,則直接返回null然评,否則調用dequeue()返回隊首元素
*/
public E poll() {
final ReentrantLock lock = this.lock;
// 得到鎖
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
// 釋放鎖
lock.unlock();
}
}
/**
* 獲取元素
* 如果元素數(shù)組中沒有元素仅财,則調用notEmpty.awaitNanos(nanos)使消費者端線程進入阻塞狀態(tài),
* 直到有生產者往隊列中放入元素并且會喚醒消費者端進入阻塞狀態(tài)的線程繼續(xù)執(zhí)行
* 如果元素數(shù)組中還有元素碗淌,調用dequeue()函數(shù)返回隊首元素
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 得到可中斷的鎖
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
// 釋放鎖
lock.unlock();
}
}
/**
* 獲取元素
* 如果元素數(shù)組沒有元素盏求,調用notEmpty.await()使消費者端的線程進入阻塞狀態(tài)抖锥,
* 直到有生產者往隊列中放入元素并且會喚醒消費者端進入阻塞狀態(tài)的線程繼續(xù)執(zhí)行
* 如果元素數(shù)組還有元素,調用enqueue()函數(shù)返回隊首元素
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 得到可中斷的鎖
lock.lockInterruptibly();
try {
while (count == 0)
// 使消費者端的線程進入阻塞狀態(tài)
notEmpty.await();
return dequeue();
} finally {
// 釋放鎖
lock.unlock();
}
}
/**
* 獲取元素(核心函數(shù))
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 喚醒生產者端因隊列元素填滿而進入阻塞狀態(tài)的線程繼續(xù)執(zhí)行
notFull.signal();
return x;
}
}
總結
在生產者消費模型中碎罚,生產數(shù)據(jù)和消費數(shù)據(jù)的速率不一致磅废,如果生產數(shù)據(jù)速度快一些,消費不過來荆烈,就會導致數(shù)據(jù)丟失拯勉,這時候我們就可以使用阻塞隊列來解決這個問題。
阻塞隊列是一個隊列憔购,我們使用單線程生產數(shù)據(jù)宫峦,使用多線程消費數(shù)據(jù)。由于阻塞隊列的特點:隊列為空的時候消費者端阻塞玫鸟,隊列滿的時候生產者端阻塞导绷。多線程消費數(shù)據(jù)起到了加速消費的作用,使得生產的數(shù)據(jù)不會在隊列里積壓過多屎飘,而生產的數(shù)據(jù)也不會丟失處理妥曲。