LinkedBlockingQueue是一個(gè)基于鏈表的阻塞隊(duì)列意蛀,實(shí)現(xiàn)了BlockingQueue遍愿、java.io.Serializable接口繼承自AbstractQueue
創(chuàng)建:
//當(dāng)隊(duì)列中沒(méi)有任何元素的時(shí)候,此時(shí)隊(duì)列的頭部就等于隊(duì)列的尾部,指向的是同一個(gè)節(jié)點(diǎn),并且內(nèi)容為null
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
//無(wú)參構(gòu)造型奥,默認(rèn)使用int的最大值作為capacity的值
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//將一個(gè)集合中的元素全部入隊(duì)列群凶。
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
//獲取鎖
putLock.lock(); // Never contended, but necessary for visibility
try {
//迭代集合中的每一個(gè)元素,讓其入隊(duì)列,并且更新一下當(dāng)前隊(duì)列中的元素?cái)?shù)量
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
//參考下面的enqueue分析
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
//釋放鎖
putLock.unlock();
}
}
數(shù)據(jù)元素操作:
//我們看到是一個(gè)鏈表柠偶,里面每個(gè)節(jié)點(diǎn)都是一個(gè)內(nèi)部類Node,所有的元素都通過(guò)Node類來(lái)進(jìn)行存儲(chǔ)
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
//創(chuàng)建一個(gè)節(jié)點(diǎn)户秤,并加入鏈表尾部螺捐,入隊(duì)
private void enqueue(Node<E> node) {
//把node賦給當(dāng)前的最后一個(gè)節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)颠悬,然后在將node設(shè)為最后一個(gè)節(jié)點(diǎn)
last = last.next = node;
}
//出隊(duì)
private E dequeue() {
Node<E> h = head;//獲取頭節(jié)點(diǎn):x==null
Node<E> first = h.next;//將頭節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)賦值給first
h.next = h; // 將當(dāng)前將要出隊(duì)的節(jié)點(diǎn)置null(為了使其做head節(jié)點(diǎn)做準(zhǔn)備
head = first;//將當(dāng)前將要出隊(duì)的節(jié)點(diǎn)作為了頭節(jié)點(diǎn)
E x = first.item;//獲取出隊(duì)節(jié)點(diǎn)的值
first.item = null;//將出隊(duì)節(jié)點(diǎn)的值置空
return x;
}
重要參數(shù):
//AtomicInteger 一個(gè)提供原子操作的Integer的類,用來(lái)計(jì)算現(xiàn)在隊(duì)列有多少個(gè)元素
//解決在入隊(duì)或出隊(duì)時(shí)并發(fā)修改元素?cái)?shù)量
private final AtomicInteger count = new AtomicInteger(0);
//元素出隊(duì)入隊(duì)時(shí)線程所獲取的重入鎖
private final ReentrantLock takeLock = new ReentrantLock();
private final ReentrantLock putLock = new ReentrantLock();
//當(dāng)隊(duì)列為空時(shí)定血,讓從隊(duì)列中獲取元素的線程處于掛起狀態(tài)
private final Condition notEmpty = takeLock.newCondition();
//當(dāng)隊(duì)列為空時(shí)赔癌,讓元素入隊(duì)列的的線程處于掛起狀態(tài)
private final Condition notFull = putLock.newCondition();
入隊(duì):
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
int c = -1;
Node<E> node = new Node(e);
//當(dāng)成員變量被其他線程改變時(shí),因?yàn)樵诜椒▋?nèi)部重新引用并用final修飾澜沟,保證在一次操作內(nèi)數(shù)據(jù)是一致的灾票?
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//執(zhí)行可中斷的鎖獲取操作
putLock.lockInterruptibly();
try {
//當(dāng)隊(duì)列的容量到底最大時(shí),此時(shí)線程將處于等待狀態(tài) ?為什么要使用while
while (count.get() == capacity) {
notFull.await();
}
//讓元素排入隊(duì)列的末尾
enqueue(node);
//更新隊(duì)列中的元素個(gè)數(shù).此處的count.getAndIncrement()方法會(huì)更新元素個(gè)數(shù)并返回舊值
c = count.getAndIncrement();
//如果添加元素后的容量茫虽,還小于指定容量刊苍,說(shuō)明至少還可以再插一個(gè)元素
if (c + 1 < capacity)
notFull.signal();
} finally {
//釋放鎖
putLock.unlock();
}
//注意!1粑觥正什!這個(gè)c的值就count容量的舊值,c == 0時(shí)說(shuō)明之前的隊(duì)列是空隊(duì)列号杏,即出隊(duì)列=的線程都處于等待狀態(tài)
//上邊增加了一個(gè)新的元素婴氮,隊(duì)列不為空,就會(huì)喚醒正在等待獲取元素的線程
if (c == 0)
signalNotEmpty();
}
//喚醒正在等待獲取元素的線程
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//通過(guò)notEmpty喚醒獲取元素的線程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
出隊(duì):
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//獲取鎖
takeLock.lockInterruptibly();
try {
//當(dāng)隊(duì)列為空時(shí)盾致,則讓當(dāng)前線程處于等待
while (count.get() == 0) {
notEmpty.await();
}
//完成元素的出隊(duì)列
x = dequeue();
//更新隊(duì)列中的元素個(gè)數(shù).
c = count.getAndDecrement();
//當(dāng)前隊(duì)列中元素?cái)?shù)量大于1主经,喚醒線程,繼續(xù)執(zhí)行出隊(duì)操作
if (c > 1)
notEmpty.signal();
} finally {
//釋放鎖
takeLock.unlock();
}
//c==capaitcy的時(shí)候庭惜,在執(zhí)行此次出隊(duì)操作完成之前隊(duì)列已經(jīng)滿了罩驻,去喚醒入隊(duì)操作的線程進(jìn)行插入操作
if (c == capacity)
signalNotFull();
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
補(bǔ)充:
AtomicInteger
對(duì)于一個(gè)Integer線程安全的操作。
//當(dāng)前值+1蜈块,采用無(wú)限循環(huán)鉴腻,直到+1成功為止
//返回的是舊值
public final int getAndIncrement() {
for (;;) {
int current = get();//獲取當(dāng)前值
int next = current + 1;//當(dāng)前值+1
if (compareAndSet(current, next))//基于CAS賦值
return current;
}
}
//當(dāng)前值-1,采用無(wú)限循環(huán)百揭,直到-1成功為止
//返回舊值
public final int getAndDecrement() {
for (;;) {
int current = get();
int next = current - 1;
if (compareAndSet(current, next))
return current;
}
}
compareAndSet這個(gè)方法多見(jiàn)于并發(fā)控制中爽哎,簡(jiǎn)稱CAS(Compare And Swap),意思是如果valueOffset位置包含的值與expect值相同器一,則更新valueOffset位置的值為update课锌,并返回true,否則不更新,返回false渺贤。
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}