阻塞隊列
BlockingQueue
隊列主要有兩種:FIFO(先進先出)唱逢、LIFO(后進先出)。
再多線程環(huán)境中,隊列很容實現(xiàn)數(shù)據(jù)共享迂求,我們常用的"生產(chǎn)者"、"消費者"模型就可以通過隊列來傳遞數(shù)據(jù)達到數(shù)據(jù)共享晃跺。但是現(xiàn)實中揩局,大多數(shù)情況都是生產(chǎn)者產(chǎn)生消息的速度和消費的速度是不匹配的,就需要相應的對生產(chǎn)或者消費進行阻塞掀虎。當生產(chǎn)的消息積累到一定程度時凌盯,就需要對生產(chǎn)者就行阻塞,以便消費者將積累的消息進行消費烹玉。在concurrent包發(fā)布以前驰怎,在多線程環(huán)境下,我們每個程序員都必須去自己控制這些細節(jié)二打,尤其還要兼顧效率和線程安全县忌。BlockingQueue釋放了我們的雙手,他讓我們不用關系什么時候去阻塞继效,什么時候去喚醒線程症杏。
? | 拋異常 | 返回false | 阻塞 | 超時,拋異常 |
---|---|---|---|---|
插入 | add | offer | put | offer(timeout) |
移除 | take | remove | ? | poll(timeout) |
檢查 | ? | contains | ? | ? |
操作方法:
//--------添加 ----------
boolean add(E e); //添加元素瑞信,加不了拋異常
boolean offer(E e); //添加元素厉颤,加不了返回false
void put(E e) throws InterruptedException; //添加元素,加不了一直阻塞
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException; //添加元素喧伞,達到指定時間沒有加入拋異常
// -------移除-----------
boolean remove(Object o);
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
E take() throws InterruptedException;
// ------
常見的BlockingQueue
? | 有界性 | 鎖 | 數(shù)據(jù)結(jié)構 |
---|---|---|---|
ArrayBlockingQueue | bounded | 加鎖 | ArrayList |
LinkedBlockingQueue | optionally-bounded | 加鎖 | LinkedList |
DelayQueue | unbounded | 加鎖 | heap |
PriorityBlockingQueue | unbounded | 加鎖 | heap |
SynchronousQueue | bounded | 加鎖 | 無 |
1. ArrayBlockingQueue
基于數(shù)組實現(xiàn)的有界阻塞安全線程隊列走芋。
構造函數(shù)
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
//初始化重入鎖
lock = new ReentrantLock(fair);
//初始化讀、寫等待隊列
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
//初始化構造器
this(capacity, fair);
//獲取重入鎖
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
//初始化元素中的個數(shù)
count = i;
//插入元素的下標索引
putIndex = (i == capacity) ? 0 : i;
} finally {
//釋放鎖
lock.unlock();
}
}
相關屬性
final Object[] items; //存放元素的數(shù)組
int takeIndex; //取元素的下標索引
int putIndex; //存元素的下標索引
int count; //數(shù)組中的元素個數(shù)
final ReentrantLock lock; //數(shù)據(jù)讀取的可重入鎖
private final Condition notEmpty; //讀等待的隊列
private final Condition notFull; //寫等待的隊列
核心函數(shù)
put
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//如果當前線程沒有中斷潘鲫,就將當前線程鎖定
lock.lockInterruptibly();
try {
//當前隊列已經(jīng)滿了就一直等待
while (count == items.length)
notFull.await();
//插入元素
enqueue(e);
} finally {
//釋放鎖
lock.unlock();
}
}
take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
2. LinkedBlockingQueue
基于鏈表實現(xiàn)的阻塞隊列
構造函數(shù)
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
相關屬性
private final int capacity; //元素個數(shù)
private final AtomicInteger count = new AtomicInteger(); //
transient Node<E> head; //頭節(jié)點
private transient Node<E> last; //尾節(jié)點
private final ReentrantLock takeLock = new ReentrantLock(); //讀可重入鎖
private final Condition notEmpty = takeLock.newCondition(); //讀等待隊列
private final ReentrantLock putLock = new ReentrantLock(); //寫可重入鎖
private final Condition notFull = putLock.newCondition(); //寫隊列
核心函數(shù)
put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
//獲取寫的可重入鎖
final ReentrantLock putLock = this.putLock;
//線程安全的原子操作類
final AtomicInteger count = this.count;
//判斷線程是否中斷
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
//如果加1后還小于當前容量翁逞,則喚醒一個等待的線程
notFull.signal();
} finally {
//釋放鎖
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
3. DelayQueue
DelayQueue每次都是將元素加入排序隊列,以delay/過期時間為排序因素溉仑,將快過期的元素放在隊首挖函,取數(shù)據(jù)的時候每次都是先取快過期的元素。
構造方法
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
相關屬性
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>(); //根據(jù)隊列里某些元素排序的有序隊列
private final Condition available = lock.newCondition();
private Thread leader = null;
核心函數(shù)
offer
public boolean offer(E e) {
//獲取可重入鎖
final ReentrantLock lock = this.lock;
//加鎖
lock.lock();
try {
//將元素加入優(yōu)先級隊列中
q.offer(e);
//如果當前元素為隊首浊竟,將leader=null怨喘,喚起其他線程
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
//釋放鎖
lock.unlock();
}
}
take
public E take() throws InterruptedException {
//獲取可重入鎖
final ReentrantLock lock = this.lock;
//判斷當前線程是否中斷,沒有中斷就將當前線程鎖定
lock.lockInterruptibly();
try {
//循環(huán)執(zhí)行
for (;;) {
E first = q.peek();
//如果隊首為空振定,阻塞當前線程
if (first == null)
available.await();
else {
//獲取當前元素過期時間
long delay = first.getDelay(NANOSECONDS);
//小于等于0 直接彈出
if (delay <= 0)
return q.poll();
//將first 只為null必怜,避免內(nèi)存泄漏
first = null; // don't retain ref while waiting
if (leader != null)
//阻塞當前線程
available.await();
else {
//將當前線程賦值給leader,然后阻塞delay時間后频,等待隊首元素達到可出隊時間
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
//釋放leader引用
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//如果leader元素為空梳庆,優(yōu)先級隊列不為空喚起其他線程
if (leader == null && q.peek() != null)
available.signal();
//釋放鎖
lock.unlock();
}
}
4. PriorityBlockingQueue
無界優(yōu)先隊列
構造函數(shù)
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
public PriorityBlockingQueue(Collection<? extends E> c) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
boolean heapify = true; // true if not known to be in heap order
boolean screen = true; // true if must screen for nulls
if (c instanceof SortedSet<?>) {
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
}
else if (c instanceof PriorityBlockingQueue<?>) {
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}
Object[] a = c.toArray();
int n = a.length;
// If c.toArray incorrectly doesn't return Object[], copy it.
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
if (screen && (n == 1 || this.comparator != null)) {
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
this.queue = a;
this.size = n;
if (heapify)
heapify();
}
相關屬性
private static final int DEFAULT_INITIAL_CAPACITY = 11; //默認容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //最大容量
private transient Object[] queue; //存放元素數(shù)組
private transient int size; //元素個數(shù)
private transient Comparator<? super E> comparator; //比較器
private final ReentrantLock lock; //可重入鎖
private final Condition notEmpty; //非空條件
private transient volatile int allocationSpinLock; //擴容時暖途,CAS更新這個值誰更新成功誰執(zhí)行
private PriorityQueue<E> q;//不阻塞的優(yōu)先隊列,用于序列化/反序列化
核心函數(shù)
offer
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
//判斷是否需要擴容
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
//根據(jù)是否有比較器選擇不同方法
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
//喚醒notEmpty條件
notEmpty.signal();
} finally {
//釋放鎖
lock.unlock();
}
return true;
}
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
// 取父節(jié)點
int parent = (k - 1) >>> 1;
// 父節(jié)點的元素值
Object e = array[parent];
// 如果key大于父節(jié)點膏执,堆化結(jié)束
if (key.compareTo((T) e) >= 0)
break;
// 否則驻售,交換二者的位置,繼續(xù)下一輪比較
array[k] = e;
k = parent;
}
// 找到了應該放的位置更米,放入元素
array[k] = key;
}
take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加鎖
lock.lockInterruptibly();
E result;
try {
// 隊列沒有元素欺栗,就阻塞在notEmpty條件上
// 出隊成功,就跳出這個循環(huán)
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
// 解鎖
lock.unlock();
}
// 返回出隊的元素
return result;
}
private E dequeue() {
// 元素個數(shù)減1
int n = size - 1;
if (n < 0)
// 數(shù)組元素不足征峦,返回null
return null;
else {
Object[] array = queue;
// 彈出堆頂元素
E result = (E) array[0];
// 把堆尾元素拿到堆頂
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
// 并做自上而下的堆化
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
// 修改size
size = n;
// 返回出隊的元素
return result;
}
}
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
// 只需要遍歷到葉子節(jié)點就夠了
while (k < half) {
// 左子節(jié)點
int child = (k << 1) + 1; // assume left child is least
// 左子節(jié)點的值
Object c = array[child];
// 右子節(jié)點
int right = child + 1;
// 取左右子節(jié)點中最小的值
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
// key如果比左右子節(jié)點都小迟几,則堆化結(jié)束
if (key.compareTo((T) c) <= 0)
break;
// 否則,交換key與左右子節(jié)點中最小的節(jié)點的位置
array[k] = c;
k = child;
}
// 找到了放元素的位置眶痰,放置元素
array[k] = key;
}
}
5. SynchronousQueue
雙棧雙隊列算法瘤旨,一個寫SynchronousQueue需要和一個讀SynchronousQueue組隊出現(xiàn)
構造方法
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
相關屬性
static final int NCPUS = Runtime.getRuntime().availableProcessors();
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
static final int maxUntimedSpins = maxTimedSpins * 16;
static final long spinForTimeoutThreshold = 1000L;
private ReentrantLock qlock;
private WaitQueue waitingProducers;
private WaitQueue waitingConsumers;
核心方法
put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
E transfer(E e, boolean timed, long nanos) {
/* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
* fulfilled (or cancelled) and return matching item.
*
* 2. If queue apparently contains waiting items, and this
* call is of complementary mode, try to fulfill by CAS'ing
* item field of waiting node and dequeuing it, and then
* returning matching item.
*
* In each case, along the way, check for and try to help
* advance head and tail on behalf of other stalled/slow
* threads.
*
* The loop starts off with a null check guarding against
* seeing uninitialized head or tail values. This never
* happens in current SynchronousQueue, but could if
* callers held non-volatile/final ref to the
* transferer. The check is here anyway because it places
* null checks at top of loop, which is usually faster
* than having them implicitly interspersed.
*/
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
if (t != tail) // inconsistent read
continue;
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
continue;
advanceTail(t, s); // swing tail and wait
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
take
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}