上次的學(xué)習(xí)
1.了解了MessageQueue運(yùn)用場景結(jié)合handler、looper使用。
2.單向鏈表的基本運(yùn)用,MessageQueue中enqueueSyncBarrier和removeSyncBarrier方法有實(shí)際運(yùn)用。
3.同步方式厌衔,同步塊,功能和邏輯分開捍岳,盡量提高性能和可讀性富寿。
4.同步分割欄(消息屏障)睬隶,意識到項(xiàng)目中有很多延遲的處理可以用這個方法取代。
LinkedBlockingQueue 是一個阻塞隊(duì)列页徐,結(jié)合線程池使用
源碼:
//序列號
private static final long serialVersionUID = -6903933977591709194L;
serialVersionUID的作用
簡單來說苏潜,Java的序列化機(jī)制是通過在運(yùn)行時(shí)判斷類的serialVersionUID來驗(yàn)證版本一致性的。
在進(jìn)行反序列化時(shí)变勇,JVM會把傳來的字節(jié)流中的serialVersionUID與本地實(shí)體類的serialVersionUID進(jìn)行比較
serialVersionUID有兩種顯示的生成方式:
1.默認(rèn)的1L恤左,比如:private static final long serialVersionUID = 1L;
2.根據(jù)類名、接口名搀绣、成員方法及屬性等來生成一個64位的哈希字段飞袋,比如:private static final long serialVersionUID = xxxxL;
//所有的元素都通過Node這個靜態(tài)內(nèi)部類來進(jìn)行存儲
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
//阻塞隊(duì)列所能存儲的容量
private final int capacity;
//LinkedBlockingQueue的入隊(duì)列和出隊(duì)列使用的是兩個不同的lock對象,因此無論是在入隊(duì)列還是出隊(duì)列,都會涉及對元素?cái)?shù)量的并發(fā)修改
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head; //鏈表的頭部
private transient Node<E> last; //鏈表的尾部
//元素出隊(duì)列時(shí)線程所獲取的鎖,當(dāng)執(zhí)行take豌熄、poll等操作時(shí)線程需要獲取的鎖
private final ReentrantLock takeLock = new ReentrantLock();
//當(dāng)隊(duì)列為空時(shí)授嘀,通過該Condition讓從隊(duì)列中獲取元素的線程處于等待狀態(tài)
private final Condition notEmpty = takeLock.newCondition();
//元素入隊(duì)列時(shí)線程所獲取的鎖,當(dāng)執(zhí)行add物咳、put锣险、offer等操作時(shí)線程需要獲取鎖
private final ReentrantLock putLock = new ReentrantLock();
//當(dāng)隊(duì)列的元素已經(jīng)達(dá)到capactiy,通過該Condition讓元素入隊(duì)列的線程處于等待狀態(tài)
private final Condition notFull = putLock.newCondition();
總結(jié):從上面我們可以發(fā)現(xiàn)LinkedBlockingQueue在入隊(duì)列和出隊(duì)列時(shí)使用的不是同一個Lock览闰,這也意味著它們之間的操作不會存在互斥操作芯肤。它們可以做到真正的在同一時(shí)刻既消費(fèi)、又生產(chǎn)压鉴,能夠做到并行處理崖咨。
//通過對應(yīng)的方法喚醒獲取元素的線程
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
//作用傳進(jìn)來的node = 下一個 再等于最后一個 最終放到最后面?
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
// 讓頭部元素出隊(duì)列的過程,往頭部移油吭,移到頭部置空击蹲,為什么不往尾部移?
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
//加鎖和解鎖
/**
* Locks to prevent both puts and takes.
*/
void fullyLock() {
putLock.lock();
takeLock.lock();
}
/**
* Unlocks to allow both puts and takes.
*/
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
//設(shè)置默認(rèn)為最大值
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//小于等于0報(bào)異常婉宰,頭和尾都置空
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
//在初始化LinkedBlockingQueue的時(shí)候歌豺,還可以直接將一個集合,中的元素全部入隊(duì)列,此時(shí)隊(duì)列最大容量依然是int的最大值心包。
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
//將指定元素插入到此隊(duì)列的尾部类咧,如有必要等待空間變得可用。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}