在上一篇文章中介紹了guava cache是怎么構(gòu)造的序无,接下來(lái)cache中兩個(gè)重要的方法帝嗡,put狮辽、get喉脖,我們先來(lái)看看put方法:
@Override
public void put(K key, V value) {
localCache.put(key, value);
}
/**
* 代理到Segment的put方法
* @param key
* @param value
* @return
*/
@Override
public V put(K key, V value) {
checkNotNull(key);
checkNotNull(value);
int hash = hash(key);
return segmentFor(hash).put(key, hash, value, false);
}
@Nullable
V put(K key, int hash, V value, boolean onlyIfAbsent) {
//保證線程安全,加鎖
lock();
try {
//獲取當(dāng)前的時(shí)間
long now = map.ticker.read();
//清除隊(duì)列中的元素
preWriteCleanup(now);
//localCache的Count+1
int newCount = this.count + 1;
//擴(kuò)容操作
if (newCount > this.threshold) { // ensure capacity
expand();
newCount = this.count + 1;
}
//獲取當(dāng)前Entry中的HashTable的Entry數(shù)組
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
//定位
int index = hash & (table.length() - 1);
//獲取第一個(gè)元素
ReferenceEntry<K, V> first = table.get(index);
//遍歷整個(gè)Entry鏈表
// Look for an existing entry.
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
// We found an existing entry.
//如果找到相應(yīng)的元素
ValueReference<K, V> valueReference = e.getValueReference();
//獲取value
V entryValue = valueReference.get();
//如果entry的value為null,可能被GC掉了
if (entryValue == null) {
++modCount;
if (valueReference.isActive()) {
enqueueNotification( //減小鎖時(shí)間的開(kāi)銷
key, hash, entryValue, valueReference.getWeight(), RemovalCause.COLLECTED);
//利用原來(lái)的key并且刷新value
setValue(e, key, value, now);//存儲(chǔ)數(shù)據(jù),并且將新增加的元素寫(xiě)入兩個(gè)隊(duì)列中
newCount = this.count; // count remains unchanged
} else {
setValue(e, key, value, now);//存儲(chǔ)數(shù)據(jù)草冈,并且將新增加的元素寫(xiě)入兩個(gè)隊(duì)列中
newCount = this.count + 1;
}
this.count = newCount; // write-volatile疲陕,保證內(nèi)存可見(jiàn)性
//淘汰緩存
evictEntries(e);
return null;
} else if (onlyIfAbsent) {//原來(lái)的Entry中包含指定key的元素,所以讀取一次诅岩,讀取操作需要更新Access隊(duì)列
// Mimic
// "if (!map.containsKey(key)) ...
// else return map.get(key);
recordLockedRead(e, now);
return entryValue;
} else {
//如果value不為null,那么更新value
// clobber existing entry, count remains unchanged
++modCount;
//將replace的Cause添加到隊(duì)列中
enqueueNotification(
key, hash, entryValue, valueReference.getWeight(), RemovalCause.REPLACED);
setValue(e, key, value, now);//存儲(chǔ)數(shù)據(jù)鸳谜,并且將新增加的元素寫(xiě)入兩個(gè)隊(duì)列中
//數(shù)據(jù)的淘汰
evictEntries(e);
return entryValue;
}
}
}
//如果目標(biāo)的entry不存在,那么新建entry
// Create a new entry.
++modCount;
ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
setValue(newEntry, key, value, now);
table.set(index, newEntry);
newCount = this.count + 1;
this.count = newCount; // write-volatile
//淘汰多余的entry
evictEntries(newEntry);
return null;
} finally {
//解鎖
unlock();
//處理剛剛的remove Cause
postWriteCleanup();
}
}
說(shuō)明
- put方法一開(kāi)始就加鎖是為了保證線程安全蝗肪,這點(diǎn)和ConcurrentHashMap一致
- preWriteCleanup在每次put之前都會(huì)清理動(dòng)作,清理啥呢豁延?
@GuardedBy("this")
void preWriteCleanup(long now) {
runLockedCleanup(now);
}
void runLockedCleanup(long now) {
if (tryLock()) {
try {
drainReferenceQueues();
expireEntries(now); // calls drainRecencyQueue
readCount.set(0);
} finally {
unlock();
}
}
}
@GuardedBy("this")
void drainReferenceQueues() {
if (map.usesKeyReferences()) {
drainKeyReferenceQueue();
}
if (map.usesValueReferences()) {
drainValueReferenceQueue();
}
}
@GuardedBy("this")
void drainKeyReferenceQueue() {
Reference<? extends K> ref;
int i = 0;
while ((ref = keyReferenceQueue.poll()) != null) {
@SuppressWarnings("unchecked")
ReferenceEntry<K, V> entry = (ReferenceEntry<K, V>) ref;
map.reclaimKey(entry);
if (++i == DRAIN_MAX) {
break;
}
}
}
清理的是keyReferenceQueue和valueReferenceQueue這兩個(gè)隊(duì)列,這兩個(gè)隊(duì)列是引用隊(duì)列胰苏,Guava Cache為了支持弱引用和軟引用法焰,引入了引用清空隊(duì)列,同時(shí)將key和value包裝成了KeyReference 和 ValueReference,在構(gòu)造cache緩存器一文中我們知道在創(chuàng)建Entry的時(shí)候有:
@GuardedBy("this")
ReferenceEntry<K, V> newEntry(K key, int hash, @Nullable ReferenceEntry<K, V> next) {
return map.entryFactory.newEntry(this, checkNotNull(key), hash, next);
}
利用map.entryFactory創(chuàng)建Entry埃仪。Factory的初始化是通過(guò)
entryFactory = EntryFactory.getFactory(keyStrength, usesAccessEntries(), usesWriteEntries());
可選的工廠有:
static final EntryFactory[] factories = {
STRONG,
STRONG_ACCESS,
STRONG_WRITE,
STRONG_ACCESS_WRITE,
WEAK,
WEAK_ACCESS,
WEAK_WRITE,
WEAK_ACCESS_WRITE,
};
通過(guò)相應(yīng)的工廠創(chuàng)建對(duì)應(yīng)的Entry乙濒,這里主要說(shuō)一下WeakEntry:
WEAK {
@Override
<K, V> ReferenceEntry<K, V> newEntry(
Segment<K, V> segment, K key, int hash, @Nullable ReferenceEntry<K, V> next) {
return new WeakEntry<K, V>(segment.keyReferenceQueue, key, hash, next);
}
},
/**
* Used for weakly-referenced keys.
*/
static class WeakEntry<K, V> extends WeakReference<K> implements ReferenceEntry<K, V> {
WeakEntry(ReferenceQueue<K> queue, K key, int hash, @Nullable ReferenceEntry<K, V> next) {
super(key, queue);
this.hash = hash;
this.next = next;
}
@Override
public K getKey() {
return get();
}
/*
* It'd be nice to get these for free from AbstractReferenceEntry, but we're already extending
* WeakReference<K>.
*/
// null access
@Override
public long getAccessTime() {
throw new UnsupportedOperationException();
}
WeakEntry繼承了WeakReference實(shí)現(xiàn)了ReferenceEntry,也就是說(shuō)這個(gè)引用是弱引用卵蛉。WeakEntry引用的key和Value隨時(shí)可能會(huì)被回收颁股。構(gòu)造的時(shí)候參數(shù)里面有ReferenceQueue<K> queue,這個(gè)就是我們上面提到的KeyReferenceQueue,所以在Key被GC掉的時(shí)候甘有,會(huì)自動(dòng)的將引用加入到ReferenceQueue這樣我們就能處理對(duì)應(yīng)的Entry了。Value也是一樣的葡缰。 清理KeyReferenceQueue的過(guò)程:
@GuardedBy("this")
void drainKeyReferenceQueue() {
Reference<? extends K> ref;
int i = 0;
while ((ref = keyReferenceQueue.poll()) != null) {
@SuppressWarnings("unchecked")
ReferenceEntry<K, V> entry = (ReferenceEntry<K, V>) ref;
map.reclaimKey(entry);
if (++i == DRAIN_MAX) {
break;
}
}
}
void reclaimKey(ReferenceEntry<K, V> entry) {
int hash = entry.getHash();
segmentFor(hash).reclaimKey(entry, hash);
}
/**
* Removes an entry whose key has been garbage collected.
*/
boolean reclaimKey(ReferenceEntry<K, V> entry, int hash) {
lock();
try {
int newCount = count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
if (e == entry) {
++modCount;
ReferenceEntry<K, V> newFirst =
removeValueFromChain(
first,
e,
e.getKey(),
hash,
e.getValueReference().get(),
e.getValueReference(),
RemovalCause.COLLECTED);
newCount = this.count - 1;
table.set(index, newFirst);
this.count = newCount; // write-volatile
return true;
}
}
return false;
} finally {
unlock();
postWriteCleanup();
}
}
上面就是清理過(guò)程了亏掀,如果發(fā)現(xiàn)key或者value被GC了忱反,那么會(huì)在put的時(shí)候觸發(fā)清理。
接下來(lái)是setvalue方法了滤愕,它做的是將value寫(xiě)入Entry
/**
* Sets a new value of an entry. Adds newly created entries at the end of the access queue.
*/
@GuardedBy("Segment.this")
void setValue(ReferenceEntry<K, V> entry, K key, V value, long now) {
ValueReference<K, V> previous = entry.getValueReference();
int weight = map.weigher.weigh(key, value);
checkState(weight >= 0, "Weights must be non-negative");
ValueReference<K, V> valueReference =
map.valueStrength.referenceValue(this, entry, value, weight);
entry.setValueReference(valueReference);
recordWrite(entry, weight, now);
previous.notifyNewValue(value);
}
/**
* Updates eviction metadata that {@code entry} was just written. This currently amounts to
* adding {@code entry} to relevant eviction lists.
*/
@GuardedBy("Segment.this")
void recordWrite(ReferenceEntry<K, V> entry, int weight, long now) {
// we are already under lock, so drain the recency queue immediately
drainRecencyQueue();
totalWeight += weight;
if (map.recordsAccess()) {
entry.setAccessTime(now);
}
if (map.recordsWrite()) {
entry.setWriteTime(now);
}
accessQueue.add(entry);
writeQueue.add(entry);
}
GuavaCache會(huì)維護(hù)兩個(gè)隊(duì)列温算,一個(gè)Write隊(duì)列、一個(gè)Access隊(duì)列间影,用這兩個(gè)隊(duì)列來(lái)實(shí)現(xiàn)最近讀和最近寫(xiě)的清楚操作注竿,來(lái)看看AccessQueue的實(shí)現(xiàn):
/**
* A custom queue for managing access order. Note that this is tightly integrated with
* {@code ReferenceEntry}, upon which it reliese to perform its linking.
*
* <p>Note that this entire implementation makes the assumption that all elements which are in
* the map are also in this queue, and that all elements not in the queue are not in the map.
*
* <p>The benefits of creating our own queue are that (1) we can replace elements in the middle
* of the queue as part of copyWriteEntry, and (2) the contains method is highly optimized
* for the current model.
*/
static final class AccessQueue<K, V> extends AbstractQueue<ReferenceEntry<K, V>> {
final ReferenceEntry<K, V> head = new AbstractReferenceEntry<K, V>() {
@Override
public long getAccessTime() {
return Long.MAX_VALUE;
}
@Override
public void setAccessTime(long time) {}
ReferenceEntry<K, V> nextAccess = this;
@Override
public ReferenceEntry<K, V> getNextInAccessQueue() {
return nextAccess;
}
@Override
public void setNextInAccessQueue(ReferenceEntry<K, V> next) {
this.nextAccess = next;
}
ReferenceEntry<K, V> previousAccess = this;
@Override
public ReferenceEntry<K, V> getPreviousInAccessQueue() {
return previousAccess;
}
@Override
public void setPreviousInAccessQueue(ReferenceEntry<K, V> previous) {
this.previousAccess = previous;
}
};
// implements Queue
@Override
public boolean offer(ReferenceEntry<K, V> entry) {
// unlink
connectAccessOrder(entry.getPreviousInAccessQueue(), entry.getNextInAccessQueue());
// add to tail
connectAccessOrder(head.getPreviousInAccessQueue(), entry);
connectAccessOrder(entry, head);
return true;
}
@Override
public ReferenceEntry<K, V> peek() {
ReferenceEntry<K, V> next = head.getNextInAccessQueue();
return (next == head) ? null : next;
}
@Override
public ReferenceEntry<K, V> poll() {
ReferenceEntry<K, V> next = head.getNextInAccessQueue();
if (next == head) {
return null;
}
remove(next);
return next;
}
@Override
@SuppressWarnings("unchecked")
public boolean remove(Object o) {
ReferenceEntry<K, V> e = (ReferenceEntry) o;
ReferenceEntry<K, V> previous = e.getPreviousInAccessQueue();
ReferenceEntry<K, V> next = e.getNextInAccessQueue();
connectAccessOrder(previous, next);
nullifyAccessOrder(e);
return next != NullEntry.INSTANCE;
}
重點(diǎn)看下offer方法做了啥:
1.將Entry和它的前節(jié)點(diǎn)后節(jié)點(diǎn)的關(guān)聯(lián)斷開(kāi),這樣就需要Entry中維護(hù)它的前向和后向引用魂贬。
2.將新增加的節(jié)點(diǎn)加入到隊(duì)列的尾部巩割,尋找尾節(jié)點(diǎn)用了head.getPreviousInAccessQueue()∷骈伲可以看出來(lái)是個(gè)環(huán)形隊(duì)列喂分。
3.將新增加的節(jié)點(diǎn),或者新調(diào)整出來(lái)的節(jié)點(diǎn)設(shè)為尾部節(jié)點(diǎn)机蔗。
可以得知蒲祈,最近更新的節(jié)點(diǎn)一定是在尾部的,head后面的節(jié)點(diǎn)一定是不活躍的萝嘁,在每一次清除過(guò)期節(jié)點(diǎn)的時(shí)候一定清除head之后的超時(shí)的節(jié)點(diǎn)梆掸。
接下來(lái)看看evictEntry,當(dāng)Cache中設(shè)置了緩存最大條目前提下可能會(huì)觸發(fā):
/**
* Performs eviction if the segment is full. This should only be called prior to adding a new
* entry and increasing {@code count}.
*/
@GuardedBy("Segment.this")
void evictEntries() {
if (!map.evictsBySize()) {
return;
}
drainRecencyQueue();
while (totalWeight > maxSegmentWeight) {
ReferenceEntry<K, V> e = getNextEvictable();
if (!removeEntry(e, e.getHash(), RemovalCause.SIZE)) {
throw new AssertionError();
}
}
}
先判斷是否開(kāi)啟淘汰牙言,之后清理ReferenceQueue酸钦,然后判斷是否超過(guò)權(quán)重,超過(guò)的話就清理最近最少使用的Entry咱枉。
最后再看看在finall執(zhí)行的postWriteCleanup卑硫,回調(diào)相應(yīng)的listener方法。
void processPendingNotifications() {
RemovalNotification<K, V> notification;
while ((notification = removalNotificationQueue.poll()) != null) {
try {
removalListener.onRemoval(notification);
} catch (Throwable e) {
logger.log(Level.WARNING, "Exception thrown by removal listener", e);
}
}
}
get 方法源碼
get 流程
:
- 獲取對(duì)象引用(引用可能是非alive的蚕断,比如是需要失效的欢伏、比如是loading的);
- 判斷對(duì)象引用是否是alive的(如果entry是非法的亿乳、部分回收的硝拧、loading狀態(tài)、需要失效的葛假,則認(rèn)為不是alive)障陶。
- 如果對(duì)象是alive的,如果設(shè)置refresh聊训,則異步刷新查詢value抱究,然后等待返回最新value。
- 針對(duì)不是alive的带斑,但卻是在loading的媳维,等待loading完成(阻塞等待)酿雪。
- 這里如果value還沒(méi)有拿到,則查詢loader方法獲取對(duì)應(yīng)的值(阻塞獲戎豆簟)指黎。
// LoadingCache methods
//local cache的代理
@Override
public V get(K key) throws ExecutionException {
return localCache.getOrLoad(key);
}
/**
* 根據(jù)key獲取value,如果獲取不到進(jìn)行l(wèi)oad
* @param key
* @return
* @throws ExecutionException
*/
V getOrLoad(K key) throws ExecutionException {
return get(key, defaultLoader);
}
V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
int hash = hash(checkNotNull(key));//hash——>rehash
return segmentFor(hash).get(key, hash, loader);
}
// loading
//進(jìn)行指定key對(duì)應(yīng)的value的獲取,讀取不加鎖
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
//保證key-value不為null
checkNotNull(key);
checkNotNull(loader);
try {
if (count != 0) { // read-volatile volatile讀會(huì)刷新緩存州丹,盡量保證可見(jiàn)性,如果為0那么直接load
// don't call getLiveEntry, which would ignore loading values
ReferenceEntry<K, V> e = getEntry(key, hash);
//如果對(duì)應(yīng)的Entry不為Null,證明值還在
if (e != null) {
long now = map.ticker.read();//獲取當(dāng)前的時(shí)間醋安,根據(jù)當(dāng)前的時(shí)間進(jìn)行Live的數(shù)據(jù)的讀取
V value = getLiveValue(e, now); // 判斷是否為alive(此處是懶失效,在每次get時(shí)才檢查是否達(dá)到失效時(shí)機(jī))
//元素不為null的話可以不刷新
if (value != null) {
recordRead(e, now);//為entry增加accessTime,同時(shí)加入recencyQueue
statsCounter.recordHits(1);//更新當(dāng)前的狀態(tài)墓毒,增加為命中,可以用于計(jì)算命中率
// 如果指定了定時(shí)刷新 就嘗試刷新value
return scheduleRefresh(e, key, hash, value, now, loader);
}
//value為null,如果此時(shí)value正在刷新吓揪,那么此時(shí)等待刷新結(jié)果
ValueReference<K, V> valueReference = e.getValueReference();
if (valueReference.isLoading()) {
// 如果正在加載的,等待加載完成后所计,返回加載的值柠辞。(阻塞,future的get)
return waitForLoadingValue(e, key, valueReference);
}
}
}
//如果取不到值主胧,那么進(jìn)行統(tǒng)一的加鎖get
// at this point e is either null or expired; 此處或者為null叭首,或者已經(jīng)被失效。
return lockedGetOrLoad(key, hash, loader);
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof Error) {
throw new ExecutionError((Error) cause);
} else if (cause instanceof RuntimeException) {
throw new UncheckedExecutionException(cause);
}
throw ee;
} finally {
postReadCleanup();//每次Put和get之后都要進(jìn)行一次Clean
}
}
get的實(shí)現(xiàn)和JDK1.6的ConcurrentHashMap思想一致踪栋,都是put加鎖焙格,但是get是用volatile保證。這里主要做了:
- 首先獲取Entry夷都,Entry不為null獲取對(duì)應(yīng)的Value眷唉,如果Value不為空,那么證明值還在囤官,那么這時(shí)候判斷一下是否要刷新直接返回了冬阳。否則判斷目前引用是否在Loading,如果是就等待Loading結(jié)束党饮。
- 如果取不到Entry或者Value為null 并且沒(méi)有在Loading肝陪,那么這時(shí)候進(jìn)行l(wèi)ockedGetOrLoad(),這是一個(gè)大活兒。
V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
ReferenceEntry<K, V> e;
ValueReference<K, V> valueReference = null;
LoadingValueReference<K, V> loadingValueReference = null;
boolean createNewEntry = true;
lock();//加鎖劫谅,因?yàn)闀?huì)改變數(shù)據(jù)結(jié)構(gòu)
try {
// re-read ticker once inside the lock
long now = map.ticker.read();
preWriteCleanup(now);//清除引用隊(duì)列,Acess隊(duì)列和Write隊(duì)列中過(guò)期的數(shù)據(jù),這算是一次put操作
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
//定位目標(biāo)元素
for (e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
valueReference = e.getValueReference();
//如果目前處在loading狀態(tài),不創(chuàng)建新元素
if (valueReference.isLoading()) {
createNewEntry = false;
} else {
V value = valueReference.get();
if (value == null) { //可能被GC掉了嚷掠,加入removeListener
enqueueNotification(
entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED);
} else if (map.isExpired(e, now)) { //可能過(guò)期了
// This is a duplicate check, as preWriteCleanup already purged expired
// entries, but let's accomodate an incorrect expiration queue.
enqueueNotification(
entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED);
} else {//目前就已經(jīng)加載過(guò)了捏检,返回
recordLockedRead(e, now);
statsCounter.recordHits(1);
// we were concurrent with loading; don't consider refresh
return value;
}
//刪除在隊(duì)列中相應(yīng)的引用,因?yàn)楹竺嬉聞?chuàng)建
// immediately reuse invalid entries
writeQueue.remove(e);
accessQueue.remove(e);
this.count = newCount; // write-volatile
}
break;
}
}
//創(chuàng)建新的Entry,但是此時(shí)是沒(méi)有值的
if (createNewEntry) {
loadingValueReference = new LoadingValueReference<K, V>();
if (e == null) {
e = newEntry(key, hash, first);
e.setValueReference(loadingValueReference);
table.set(index, e);
} else {
// 到此不皆,說(shuō)e找到贯城,但是是非法的,數(shù)據(jù)已被移除霹娄。e放入新建的引用
e.setValueReference(loadingValueReference);
}
}
} finally {
unlock();
postWriteCleanup();
}
// 上面加鎖部分建完了新的entry能犯,設(shè)置完valueReference(isAlive為false鲫骗,isLoading 為false),到此踩晶,鎖已經(jīng)被釋放执泰,其他線程可以拿到一個(gè)loading狀態(tài)的引用。這就符合get時(shí)渡蜻,拿到loading狀態(tài)引用后术吝,阻塞等待加載的邏輯了。
if (createNewEntry) {
try {
// Synchronizes on the entry to allow failing fast when a recursive load is
// detected. This may be circumvented when an entry is copied, but will fail fast most
// of the time.
// 這里只對(duì)e加鎖茸苇,而不是segment排苍,允許get操作進(jìn)入。
synchronized (e) {
return loadSync(key, hash, loadingValueReference, loader);
}
} finally {
statsCounter.recordMisses(1);
}
} else {
// The entry already exists. Wait for loading.
return waitForLoadingValue(e, key, valueReference);
}
}
說(shuō)明:
- load算是一個(gè)寫(xiě)操作学密,需要加鎖淘衙,
- 為了避免緩存被擊穿,每個(gè)線程都去load容易導(dǎo)致數(shù)據(jù)庫(kù)雪崩
因?yàn)槭菍?xiě)所以要進(jìn)行preWriteCleanup腻暮,根據(jù)key定位一下Entry彤守,如果能定位到,那么判斷是否在Loading西壮,如果是的話不創(chuàng)建新的Entry并且等待Loading結(jié)束遗增。如果不是那么判斷value是否為null和是否過(guò)期,如果是的話都要進(jìn)行創(chuàng)建新Entry款青,如果都不是證明value是加載過(guò)了做修,那么更新下Access隊(duì)列然后返回,
// at most one of loadSync/loadAsync may be called for any given LoadingValueReference
//同步刷新
V loadSync(
K key,
int hash,
LoadingValueReference<K, V> loadingValueReference,
CacheLoader<? super K, V> loader)
throws ExecutionException {
ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
return getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
}
這里創(chuàng)建了一個(gè)loadingReference抡草,這也就是之前看到的判斷是否在Loading饰及。如果是Loading狀態(tài)那么表明有一個(gè)線程正在更新Cache,其他的線程等待就可以了康震。
從上面我們可以看到燎含,對(duì)于每一次get都會(huì)去進(jìn)行Access隊(duì)列的更新,同時(shí)對(duì)于多線程的更新只會(huì)引起一個(gè)線程去load數(shù)據(jù)腿短,對(duì)于不存在的數(shù)據(jù)屏箍,get時(shí)也會(huì)進(jìn)行一次load操作。同時(shí)通過(guò)同步操作解決了緩存擊穿的問(wèn)題橘忱。
這里講下引用隊(duì)列(Reference Queue)
- 引用隊(duì)列可以很容易地實(shí)現(xiàn)跟蹤不需要的引用赴魁。
- 一旦弱引用對(duì)象開(kāi)始返回null,該弱引用指向的對(duì)象就被標(biāo)記成了垃圾钝诚。
- 當(dāng)你在構(gòu)造WeakReference時(shí)傳入一個(gè)ReferenceQueue對(duì)象颖御,當(dāng)該引用指向的對(duì)象被標(biāo)記為垃圾的時(shí)候,這個(gè)引用對(duì)象會(huì)自動(dòng)地加入到引用隊(duì)列里面凝颇。
ReferenceEntry
- Cache中的所有Entry都是基于ReferenceEntry的實(shí)現(xiàn)潘拱。
- 信息包括:自身hash值疹鳄,寫(xiě)入時(shí)間,讀取時(shí)間芦岂。每次寫(xiě)入和讀取的隊(duì)列瘪弓。以及鏈表指針。
- 每個(gè)Entry中均包含一個(gè)ValueReference類型來(lái)表示值盔腔。
ValueReference的類圖
對(duì)于ValueReference杠茬,有三個(gè)實(shí)現(xiàn)類:StrongValueReference、SoftValueReference弛随、WeakValueReference瓢喉。為了支持動(dòng)態(tài)加載機(jī)制,它還有一個(gè)LoadingValueReference舀透,在需要?jiǎng)討B(tài)加載一個(gè)key的值時(shí)栓票,先把該值封裝在LoadingValueReference中,以表達(dá)該key對(duì)應(yīng)的值已經(jīng)在加載了愕够,如果其他線程也要查詢?cè)搆ey對(duì)應(yīng)的值走贪,就能得到該引用,并且等待改值加載完成惑芭,從而保證該值只被加載一次(可以在evict以后重新加載)坠狡。在該值加載完成后,將LoadingValueReference替換成其他ValueReference類型遂跟。
每個(gè)ValueReference都紀(jì)錄了weight值逃沿,所謂weight從字面上理解是“該值的重量”,它由Weighter接口計(jì)算而得幻锁。
還定義了Stength枚舉類型作為ValueReference的factory類凯亮,它有三個(gè)枚舉值:Strong、Soft哄尔、Weak假消,這三個(gè)枚舉值分別創(chuàng)建各自的ValueReference。
WeakEntry為例子
在cache的put操作和帶CacheBuilder中的都有newEntry的操作岭接。newEntry根據(jù)cache builder的配置生成不用級(jí)別的引用富拗,比如put操作:
// Create a new entry.
++modCount;
// 新建一個(gè)entry
ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
// 設(shè)置值,也就是valueRerence
setValue(newEntry, key, value, now);
newEntry方法
根據(jù)cache創(chuàng)建時(shí)的配置(代碼中生成的工廠)鸣戴,生成不同的Entry啃沪。
ReferenceEntry<K, V> newEntry(K key, int hash, @Nullable ReferenceEntry<K, V> next) {
return map.entryFactory.newEntry(this, checkNotNull(key), hash, next);
}
調(diào)用WEAK的newEntry,其中segment.keyReferenceQueue是key的引用隊(duì)列葵擎。還有一個(gè)value的引用隊(duì)列谅阿,valueReferenceQueue一會(huì)會(huì)出現(xiàn)半哟。
WEAK {
@Override
<K, V> ReferenceEntry<K, V> newEntry(
Segment<K, V> segment, K key, int hash, @Nullable ReferenceEntry<K, V> next) {
return new WeakEntry<K, V>(segment.keyReferenceQueue, key, hash, next);
}
},
setValue方法
首先要生成一個(gè)valueReference酬滤,然后set到entry中签餐。
ValueReference<K, V> valueReference =
map.valueStrength.referenceValue(this, entry, value, weight);
entry.setValueReference(valueReference);
Value的WEAK跟key的WEAK形式很像盯串。只不過(guò)氯檐,增加了weight值(cachebuilder復(fù)寫(xiě)不同k,v對(duì)應(yīng)的權(quán)重)和value的比較方法体捏。
WEAK {
@Override
<K, V> ValueReference<K, V> referenceValue(
Segment<K, V> segment, ReferenceEntry<K, V> entry, V value, int weight) {
return (weight == 1)
? new WeakValueReference<K, V>(segment.valueReferenceQueue, value, entry)
: new WeightedWeakValueReference<K, V>(
segment.valueReferenceQueue, value, entry, weight);
}
@Override
Equivalence<Object> defaultEquivalence() {
return Equivalence.identity();
}
};
cache如何基于引用做清理
如果key或者value的引用不是Strong類型冠摄,那么它們必然會(huì)被gc回收掉〖哥裕回收掉后河泳,引用對(duì)象依然存在,只是值為null了年栓,這也是上文提到從entry中得到的ValueReference要判斷的原因了拆挥。
**
* Drain the key and value reference queues, cleaning up internal entries containing garbage
* collected keys or values.
*/
@GuardedBy("this")
void drainReferenceQueues() {
if (map.usesKeyReferences()) {
drainKeyReferenceQueue();
}
if (map.usesValueReferences()) {
drainValueReferenceQueue();
}
}
如何失效,因?yàn)閗和v的失效方法基本一樣某抓,只舉例drainValueReferenceQueue纸兔。(執(zhí)行前都會(huì)tryLock,執(zhí)行時(shí)保證有鎖)
void drainValueReferenceQueue() {
Reference<? extends V> ref;
int i = 0;
while ((ref = valueReferenceQueue.poll()) != null) {
@SuppressWarnings("unchecked")
ValueReference<K, V> valueReference = (ValueReference<K, V>) ref;
// 回收
map.reclaimValue(valueReference);
if (++i == DRAIN_MAX) {
break;
}
}
}
如何回收呢?
- map是segment維護(hù)的cache的引用否副,再次hash到segment汉矿。
- 找到segment后,加鎖备禀,hash找到entry table洲拇。遍歷鏈表,根據(jù)key找到一個(gè)entry痹届。
- 如果找到呻待,且跟入?yún)⒌膙alueReference==比較相等,執(zhí)行removeValueFromChain
- 如果沒(méi)找到队腐,返回false蚕捉。
- 如果找到,不等柴淘,返回false迫淹。
removeValueFromChain
ReferenceEntry<K, V> removeValueFromChain(ReferenceEntry<K, V> first,
ReferenceEntry<K, V> entry, @Nullable K key, int hash, ValueReference<K, V> valueReference,
RemovalCause cause) {
enqueueNotification(key, hash, valueReference, cause);
writeQueue.remove(entry);
accessQueue.remove(entry);
if (valueReference.isLoading()) {
valueReference.notifyNewValue(null);
return first;
} else {
return removeEntryFromChain(first, entry);
}
}
- 需要執(zhí)行remove的通知,入隊(duì)列为严。
- 針對(duì)LoadingValueReference敛熬,直接返回。
- 非loading執(zhí)行移除第股。
具體如何執(zhí)行remove呢应民?removeEntryFromChain
ReferenceEntry<K, V> removeEntryFromChain(ReferenceEntry<K, V> first,
ReferenceEntry<K, V> entry) {
int newCount = count;
ReferenceEntry<K, V> newFirst = entry.getNext();
for (ReferenceEntry<K, V> e = first; e != entry; e = e.getNext()) {
// 這個(gè)方法是copy當(dāng)前結(jié)點(diǎn)(e),然后將新的結(jié)點(diǎn)指向newFirst,返回copy得到的結(jié)點(diǎn)(next)诲锹。
// 如果改entry是需要回收的繁仁,那么該方法返回null。
ReferenceEntry<K, V> next = copyEntry(e, newFirst);
if (next != null) {
newFirst = next;
} else {
// 如果偶遇k或者v已經(jīng)回收了的entry归园,進(jìn)入需要通知的隊(duì)列黄虱。
removeCollectedEntry(e);
newCount--;
}
}
this.count = newCount;
return newFirst;
}
這段邏輯是,從first開(kāi)始庸诱,一直到要remove結(jié)點(diǎn)(entry)的next(newFirst)捻浦,依次copy每個(gè)結(jié)點(diǎn),指向newFirst桥爽,然后將newFirst變成自身朱灿。最后這條鏈表的頭就變成,最后copy的那個(gè)結(jié)點(diǎn)钠四,也就是entry的上一個(gè)結(jié)點(diǎn)母剥。
Cache的失效和回調(diào)
基于讀寫(xiě)時(shí)間失效
失效邏輯和過(guò)程:
- Entry在進(jìn)行一次讀寫(xiě)操作后,會(huì)標(biāo)識(shí)accessTime和writeTime形导。
f (map.recordsAccess()) {
entry.setAccessTime(now);
}
if (map.recordsWrite()) {
entry.setWriteTime(now);
}
- accessQueue和writeQueue分別會(huì)在讀寫(xiě)操作適時(shí)的添加环疼。
accessQueue.add(entry);
writeQueue.add(entry);
- 遍歷accessQueue和writeQueue
void expireEntries(long now) {
drainRecencyQueue();
ReferenceEntry<K, V> e;
// 取出entry,判斷是否失效
while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) {
if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
throw new AssertionError();
}
}
while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {
if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
throw new AssertionError();
}
}
}
- 判斷是否失效
if (expiresAfterAccess()
&& (now - entry.getAccessTime() >= expireAfterAccessNanos)) {
return true;
}
- removeEntry就是調(diào)用上文的removeValueFromChain朵耕。
write鏈(writeQueue)和access鏈(accessQueue)
這兩條鏈都是一個(gè)雙向鏈表炫隶,通過(guò)ReferenceEntry中的previousInWriteQueue、nextInWriteQueue和previousInAccessQueue阎曹、nextInAccessQueue鏈接而成伪阶,但是以Queue的形式表達(dá)。
WriteQueue和AccessQueue都是自定義了offer处嫌、add(直接調(diào)用offer)栅贴、remove、poll等操作的邏輯熏迹。
- 對(duì)于offer(add)操作檐薯,如果是新加的節(jié)點(diǎn),則直接加入到該鏈的隊(duì)首注暗,如果是已存在的節(jié)點(diǎn)坛缕,則將該節(jié)點(diǎn)鏈接的鏈?zhǔn)住#╤ead始終保持在隊(duì)首捆昏,新節(jié)點(diǎn)不斷插入到隊(duì)首赚楚。邏輯上最先插入的結(jié)點(diǎn)保持在,允許訪問(wèn)的頭部)
- 對(duì)remove操作骗卜,直接從該鏈中移除該節(jié)點(diǎn)宠页;
- 對(duì)poll操作左胞,將頭節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)移除,并返回举户。
@Override
public boolean offer(ReferenceEntry<K, V> entry) {
// unlink
connectAccessOrder(entry.getPreviousInAccessQueue(), entry.getNextInAccessQueue());
// add to tail
connectAccessOrder(head.getPreviousInAccessQueue(), entry);
connectAccessOrder(entry, head);
return true;
}
失效的通知回調(diào)
void enqueueNotification(@Nullable K key, int hash, ValueReference<K, V> valueReference,
RemovalCause cause) {
totalWeight -= valueReference.getWeight();
if (cause.wasEvicted()) {
statsCounter.recordEviction();
}
if (map.removalNotificationQueue != DISCARDING_QUEUE) {
V value = valueReference.get();
RemovalNotification<K, V> notification = new RemovalNotification<K, V>(key, value, cause);
map.removalNotificationQueue.offer(notification);
}
}
首先判斷移除的原因RemovalCause:EXPLICIT(remove罩句、clear等用戶有預(yù)期的操作),REPLACED(put敛摘、replace),COLLECTED乳愉,EXPIRED兄淫,SIZE。RemovalCause有個(gè)方法wasEvicted蔓姚,表示是否是被驅(qū)逐的捕虽。前兩種是false,后三種是true坡脐。
生成一個(gè)notification對(duì)象泄私,入隊(duì)列。
removalNotificationQueue何時(shí)被清理
在讀寫(xiě)操作的finally階段备闲,都會(huì)執(zhí)行晌端。
void processPendingNotifications() {
RemovalNotification<K, V> notification;
while ((notification = removalNotificationQueue.poll()) != null) {
try {
// 這里就是回調(diào)構(gòu)造cache時(shí)注冊(cè)的listener了
removalListener.onRemoval(notification);
} catch (Throwable e) {
logger.log(Level.WARNING, "Exception thrown by removal listener", e);
}
}
}
至此,guava cache的核心源碼分析完了恬砂,感覺(jué)挺惡心的咧纠,,挺復(fù)雜的泻骤,還需要多復(fù)習(xí)鞏固才能真正掌握漆羔。