下面是ArrayBlockingQueue的部分實現(xiàn)源碼:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
//創(chuàng)建數(shù)組
this.items = new Object[capacity];
//創(chuàng)建鎖和阻塞條件
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
//添加元素的方法
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
//如果隊列不滿就入隊
enqueue(e);
} finally {
lock.unlock();
}
}
//入隊的方法
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
//移除元素的方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
//出隊的方法
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
可以看到
//創(chuàng)建鎖和阻塞條件
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
我們這里主要使用了Lock機制,通過Condition的'await()' 和'signal()'方法來實現(xiàn)線程的喚醒和阻塞倒得。當(dāng)我們調(diào)用隊列的take()方法進行出隊操作時固阁,首先判斷隊列中的個數(shù)是否為0,如果等于0贡茅,則調(diào)用notEmpty.await()方法阻塞當(dāng)前出隊操作的線程秘蛇,所以出隊操作就會在沒有任務(wù)時一直阻塞其做,知道我們在put()方法中放入新的任務(wù)到隊列中,通過notEmpty.signal()方法進行出隊阻塞的喚醒赁还,這就是阻塞隊列的實現(xiàn)原理妖泄,當(dāng)然我們也可以使用Object 的wait和notify實現(xiàn)同步的,當(dāng)沒有任務(wù)時object.wait(),新的任務(wù)入隊是object.notify()喚醒操作艘策。