本文作者:王一飛沛膳,叩丁狼高級(jí)講師于置。原創(chuàng)文章八毯,轉(zhuǎn)載請(qǐng)注明出處话速。
概念
LinkedBlockingQueue按照api解釋:一個(gè)基于鏈表而實(shí)現(xiàn)的有界阻塞隊(duì)列。遵循先進(jìn)先出原則廓俭,由隊(duì)頭入列研乒,再?gòu)年?duì)尾出列雹熬。具體操作上跟ArrayBlockingQueue類似,區(qū)別在于底層維護(hù)數(shù)據(jù)上烈菌,LinkedBlockingQueue底層是一個(gè)鏈接僧界,而ArrayBlockingQueue是一個(gè)數(shù)組捂襟。
內(nèi)部結(jié)構(gòu)
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private final AtomicInteger count = new AtomicInteger(); //隊(duì)列元素個(gè)數(shù)
private final int capacity; //隊(duì)列容器
transient Node<E> head; //隊(duì)頭
private transient Node<E> last; //隊(duì)尾
//出列入列過(guò)程中維護(hù)現(xiàn)場(chǎng)安全的各類鎖
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
//隊(duì)列數(shù)據(jù)節(jié)點(diǎn)
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
}
基本操作
public class App {
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue(5);
//入列
queue.add("a"); //隊(duì)列滿后拋異常
queue.put("b");//隊(duì)列滿后阻塞
queue.offer("c"); //入列失敗返回false
System.out.println(queue);
queue.put("a");
queue.put("b");
queue.put("c");
queue.put("d");
queue.put("e");
//出列
queue.remove("a"); //刪除指定元素
queue.poll(); //出列宠漩,如果隊(duì)列為空返回null
queue.take(); //隊(duì)列為空扒吁,阻塞等待
System.out.println(queue);
}
}
一般推薦使用put入列魁索, take出列
源碼解析
構(gòu)造-LinkedBlockingQueue提供了3個(gè)構(gòu)造器粗蔚,無(wú)參鹏控, 帶有容量,帶集合數(shù)據(jù)
//無(wú)參數(shù)時(shí)瀑构,默認(rèn)容量為int的最大值
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//帶容量參數(shù)【推薦】
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
//初始化隊(duì)頭世吨,隊(duì)尾
last = head = new Node<E>(null);
}
//帶數(shù)據(jù)集合
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE); //容量為int最大值
final ReentrantLock putLock = this.putLock;
putLock.lock(); //謹(jǐn)慎起見(jiàn)也加鎖罢浇,需要將傳入集合逐一入列
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
LinkedBlockingQueue 3個(gè)構(gòu)造器,實(shí)際使用中更推薦使用指定容量的隊(duì)列胞锰。
在繼續(xù)看源碼前嗅榕,先了解一個(gè)原子操作類:AtomicInteger
//int 類型的原子操作兼雄,不指定初始為0
//底層維護(hù)了一個(gè)volatile 修飾變量 value = 0
AtomicInteger atomicInteger = new AtomicInteger();
//獲壬饫摺:value = 0
System.out.println(atomicInteger.get()); //0
//返回value值0, 然后value 值加一【這里也是原子操作】
System.out.println(atomicInteger.getAndIncrement()); //0
//返回value值1恕稠, 然后value 值加一【這里也是原子操作】
System.out.println(atomicInteger.getAndIncrement()); //1
//獲取:value=2
System.out.println(atomicInteger.get()); //2
//int 類型的原子操作骆捧,不指定初始為0
//底層維護(hù)了一個(gè)volatile 修飾變量 value = 0
AtomicInteger atomicInteger = new AtomicInteger();
//獲取:value = 0
System.out.println(atomicInteger.get()); //0
//返回value值0, 然后value 值減一【這里也是原子操作】
System.out.println(atomicInteger.getAndDecrement()); //0
//返回value值1来涨, 然后value 值減一【這里也是原子操作】
System.out.println(atomicInteger.getAndDecrement()); //-1
//返回value值1
System.out.println(atomicInteger.get()); //-2
getAndIncrement : 返回未操作前value 的值, 然后加1
getAndDecrement : 返回未操作前value 的值, 然后減1
入列-put:將入列數(shù)據(jù)添加到隊(duì)尾社裆,如果隊(duì)列滿了时呀,阻塞等待。
public void put(E e) throws InterruptedException {
//入列元素不允許為null
if (e == null) throw new NullPointerException();
//隊(duì)列臨時(shí)容量緩存趴梢,作為執(zhí)行喚醒/阻塞線程操作標(biāo)記
int c = -1;
Node<E> node = new Node<E>(e);
//入列鎖
final ReentrantLock putLock = this.putLock;
//隊(duì)列元素個(gè)數(shù)
final AtomicInteger count = this.count;
putLock.lockInterruptibly(); //入列前加鎖蝴悉,可中斷鎖
try {
//自旋排除硬件加鎖延時(shí)問(wèn)題
//如果隊(duì)列已滿尿这,線程阻塞等待
while (count.get() == capacity) {
notFull.await();
}
//數(shù)據(jù)入列
enqueue(node);
//原子操作射众,入列后再獲取隊(duì)列元素個(gè)數(shù)叨橱,并+1,確保當(dāng)前操作隊(duì)列元素個(gè)數(shù)最新
c = count.getAndIncrement();
//c + 1 < capacity 表示隊(duì)列未滿,仍可添加,喚醒未持鎖而等待的入列線程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock(); //釋放鎖
}
//c == 0 說(shuō)明隊(duì)列為空,喚醒入列線程入列
if (c == 0)
signalNotEmpty();
}
從上面源碼上看put方法其實(shí)做5件事:
1>判斷元素是否null, 為null 拋異常
2>判斷是否滿列陕靠,滿列則等待垄开,此處需要留意while這個(gè)操作益兄。
理想請(qǐng)求下疑枯,if即可荆永,但是有種情況,如果jvm執(zhí)行指令傳到cpu到程序時(shí)間片執(zhí)行存在一點(diǎn)的時(shí)間延時(shí),while 重復(fù)執(zhí)行桃漾,可以減少延時(shí)影響敦迄。
3>數(shù)據(jù)入列
4>入列后需要喚醒未持鎖而等待的入列線程
5>c==0的判斷苦囱, c的值是入列前容量值,如果為0說(shuō)明入列前蚀狰,隊(duì)列為空,可以存在出列等待線程,所以在c==0的時(shí)候,且已經(jīng)入列成功深夯,可以喚醒出列等待線程,讓其順利出列。
出列-take : 出列,從隊(duì)頭彈出元素模暗, 如果隊(duì)列個(gè)數(shù)為0, 阻塞等待念祭。
public E take() throws InterruptedException {
E x;
//隊(duì)列臨時(shí)容量緩存兑宇,作為執(zhí)行喚醒/阻塞線程操作標(biāo)記
int c = -1;
//隊(duì)列元素個(gè)數(shù)
final AtomicInteger count = this.count;
//出列鎖
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//出列前加鎖,可中斷鎖
try {
//自旋排除硬件加鎖延時(shí)問(wèn)題
//如果隊(duì)列已空粱坤,線程阻塞等待
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue(); //數(shù)據(jù)出列
//原子操作隶糕,出列后再獲取隊(duì)列元素個(gè)數(shù),并-1站玄,確保當(dāng)前操作隊(duì)列元素個(gè)數(shù)最新
c = count.getAndDecrement();
//c > 1 表示隊(duì)列未空若厚,仍可出列,喚醒未持鎖而等待的出列線程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();//釋放鎖
}
//c == capacity 說(shuō)明隊(duì)列已滿蜒什,喚醒出列線程出列
if (c == capacity)
signalNotFull();
return x;
}
上面源碼看出,take操作跟put原理差不多疤估,執(zhí)行的是反向操作而已灾常。需要注意的是take方法喚醒線程條件c 變量值判斷條件霎冯。
1> 出列成功之后, c > 1 表示出列前隊(duì)列至少有2個(gè)元素钞瀑,所以出列成功后沈撞,喚醒未持鎖而等待的出列線程
2>c == capacity 滿足這個(gè)條件, 表示出列前雕什,隊(duì)列是滿的缠俺,可能存在入列等待線程,出列成功之后贷岸,解除等待壹士,喚醒入列等待線程。
刪除-remove : 根據(jù)指定元素刪除隊(duì)列中的元素偿警,如果有刪除躏救,如果沒(méi)有返回false
public boolean remove(Object o) {
if (o == null) return false; //參數(shù)為null 直接返回
fullyLock(); //為保證入列出列線程安全,加雙鎖
try {
//循環(huán)遍歷列表螟蒸,刪除指定元素
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail); //元素刪除
return true;
}
}
return false;
} finally {
fullyUnlock(); //是否雙鎖
}
}
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void unlink(Node<E> p, Node<E> trail) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
//刪除元素后盒使,隊(duì)列元素個(gè)數(shù)-1,喚醒入列等待線程
if (count.getAndDecrement() == capacity)
notFull.signal();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
LinkedBlockingQueue 在執(zhí)行刪除操作七嫌,需要對(duì)takeLock 跟 putLock同時(shí)加鎖少办,其目標(biāo)確保在刪除期間,其他線程無(wú)法操作隊(duì)列诵原,進(jìn)而保證刪除操作的線程安全英妓。
總結(jié)
LinkedBlockingQueue 使用2把鎖確保線程安全,入列時(shí)使用putLock皮假,出列時(shí)使用takeLock鞋拟,這種鎖分離操作機(jī)制,在一定層面上提高隊(duì)列的吞吐量惹资,在高并發(fā)的情況下生產(chǎn)者(入列線程)和消費(fèi)者(出列線程)可以并行地操作隊(duì)列中的數(shù)據(jù)贺纲,以此來(lái)提高整個(gè)隊(duì)列的并發(fā)性能。
想獲取更多技術(shù)干貨褪测,請(qǐng)前往叩丁狼官網(wǎng):http://www.wolfcode.cn/all_article.html