一走趋、Guava的設(shè)計(jì)思想###
之前一篇短文衅金,簡要的概括了一下GuavaCache具有的一些特性。例如像緩存淘汰簿煌、刪除監(jiān)聽和緩存刷新等氮唯。這次主要寫一些Guava Cache是怎樣實(shí)現(xiàn)這些特性的。
GuavaCache的源碼在 https://github.com/google/guava
GuavaCache的設(shè)計(jì)是類似與ConcurrentHashMap的姨伟,主要靠鎖的細(xì)化惩琉,來減小并發(fā),同時通過Hash算法來加快檢索速度夺荒。但是GuavaCahce和ConcurrentHash不同的是GuavaCache要支持很多的Cache特性瞒渠,所以設(shè)計(jì)上還是很比較復(fù)雜的。
二技扼、源碼的分析###
這里我們主要以LoadingCache為例子來分析GuavaCache的結(jié)構(gòu)和實(shí)現(xiàn)伍玖,首先Wiki的例子是:
LoadingCache<Key, Graph> graphs = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(10, TimeUnit.MINUTES)
.removalListener(MY_LISTENER)
.build(
new CacheLoader<Key, Graph>() {
public Graph load(Key key) throws AnyException {
return createExpensiveGraph(key);
}
});
這里GuavaCache主要采用builder的模式,CacheBuilder的每一個方法都返回這個CacheBuilder知道build方法的調(diào)用剿吻。
那么我們先看一下CacheBuilder的各個方法:
/**
*
* 指定一個Cahce的大小上限窍箍,當(dāng)Cache中的數(shù)據(jù)將要達(dá)到上限的時候淘汰掉不常用的。
* Specifies the maximum number of entries the cache may contain. Note that the cache <b>may evict
* an entry before this limit is exceeded</b>. As the cache size grows close to the maximum, the
* cache evicts entries that are less likely to be used again. For example, the cache may evict an
* entry because it hasn't been used recently or very often.
*
* <p>When {@code size} is zero, elements will be evicted immediately after being loaded into the
* cache. This can be useful in testing, or to disable caching temporarily without a code change.
*
* <p>This feature cannot be used in conjunction with {@link #maximumWeight}.
*
* @param size the maximum size of the cache
* @return this {@code CacheBuilder} instance (for chaining)
* @throws IllegalArgumentException if {@code size} is negative
* @throws IllegalStateException if a maximum size or weight was already set
*/
public CacheBuilder<K, V> maximumSize(long size) {
checkState(
this.maximumSize == UNSET_INT, "maximum size was already set to %s", this.maximumSize);
checkState(
this.maximumWeight == UNSET_INT,
"maximum weight was already set to %s",
this.maximumWeight);
checkState(this.weigher == null, "maximum size can not be combined with weigher");
checkArgument(size >= 0, "maximum size must not be negative");
this.maximumSize = size;
return this;
狀態(tài)檢測之后就是執(zhí)行了一個賦值操作。
同理
public CacheBuilder<K, V> expireAfterWrite(long duration, TimeUnit unit) {
checkState(
expireAfterWriteNanos == UNSET_INT,
"expireAfterWrite was already set to %s ns",
expireAfterWriteNanos);
checkArgument(duration >= 0, "duration cannot be negative: %s %s", duration, unit);
this.expireAfterWriteNanos = unit.toNanos(duration);
return this;
}
public <K1 extends K, V1 extends V> CacheBuilder<K1, V1> removalListener(
RemovalListener<? super K1, ? super V1> listener) {
checkState(this.removalListener == null);
// safely limiting the kinds of caches this can produce
@SuppressWarnings("unchecked")
CacheBuilder<K1, V1> me = (CacheBuilder<K1, V1>) this;
me.removalListener = checkNotNull(listener);
return me;
}
執(zhí)行build方法:
public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(
CacheLoader<? super K1, V1> loader) {
checkWeightWithWeigher();
return new LocalCache.LocalLoadingCache<K1, V1>(this, loader);
}
這里主要返回一個LocalCache.LocalLoadingCache仔燕,這是LocalCache的一個內(nèi)部類,到這里GuavaCahce真正的存儲結(jié)構(gòu)出現(xiàn)了魔招,LocalLoadingCache繼承了LocalManualCache實(shí)現(xiàn)了LoadingCache接口晰搀。實(shí)例化的時候,根據(jù)CacheBuilder構(gòu)建了一個LocalCache办斑,而LoadingCache和LocalManualCache只是在LocalCache上做了代理外恕。
LocalLoadingCache( CacheBuilder<? super K, ? super V> builder, CacheLoader<? super K, V> loader) { super(new LocalCache<K, V>(builder, checkNotNull(loader)));}
private LocalManualCache(LocalCache<K, V> localCache) { this.localCache = localCache;}
那么LocalCache的構(gòu)建是什么樣的呢?
LocalCache(
CacheBuilder<? super K, ? super V> builder, @Nullable CacheLoader<? super K, V> loader) {
//并發(fā)度,seg的個數(shù)
concurrencyLevel = Math.min(builder.getConcurrencyLevel(), MAX_SEGMENTS);
//key強(qiáng)弱關(guān)系
keyStrength = builder.getKeyStrength();
//value的強(qiáng)弱關(guān)系
valueStrength = builder.getValueStrength();
//比較器乡翅,類似于Object.equal
keyEquivalence = builder.getKeyEquivalence();
valueEquivalence = builder.getValueEquivalence();
//最大權(quán)重,weigher為null那么maxWeight=maxsize
maxWeight = builder.getMaximumWeight();
//entry的權(quán)重鳞疲,用于淘汰策略
weigher = builder.getWeigher();
//lastAccess之后多長時間刪除
expireAfterAccessNanos = builder.getExpireAfterAccessNanos();
//在寫入后長時間之后刪除
expireAfterWriteNanos = builder.getExpireAfterWriteNanos();
//刷新的時間間隔
refreshNanos = builder.getRefreshNanos();
//entry刪除之后的Listener
removalListener = builder.getRemovalListener();
//刪除監(jiān)聽的隊(duì)列
removalNotificationQueue =
(removalListener == NullListener.INSTANCE)
? LocalCache.<RemovalNotification<K, V>>discardingQueue()
: new ConcurrentLinkedQueue<RemovalNotification<K, V>>();
//時鐘
ticker = builder.getTicker(recordsTime());
//創(chuàng)建Entry的Factory
entryFactory = EntryFactory.getFactory(keyStrength, usesAccessEntries(), usesWriteEntries());
//緩存的狀態(tài)統(tǒng)計(jì)器,用于統(tǒng)計(jì)緩存命中率等
globalStatsCounter = builder.getStatsCounterSupplier().get();
//加載數(shù)據(jù)的Loader
defaultLoader = loader;
//初始化HashTable的容量
int initialCapacity = Math.min(builder.getInitialCapacity(), MAXIMUM_CAPACITY);
//沒有設(shè)置權(quán)重設(shè)置但是有maxsize的設(shè)置蠕蚜,那么需要減小容量的設(shè)置
if (evictsBySize() && !customWeigher()) {
initialCapacity = Math.min(initialCapacity, (int) maxWeight);
}
// Find the lowest power-of-two segmentCount that exceeds concurrencyLevel, unless
// maximumSize/Weight is specified in which case ensure that each segment gets at least 10
// entries. The special casing for size-based eviction is only necessary because that eviction
// happens per segment instead of globally, so too many segments compared to the maximum size
// will result in random eviction behavior.
//類似于ConcurentHashMap
int segmentShift = 0;//seg的掩碼
int segmentCount = 1;//seg的個數(shù)
//如果seg的個數(shù)事故小于并發(fā)度的
//初始化并發(fā)度為4,默認(rèn)的maxWeight是-1尚洽,默認(rèn)是不驅(qū)逐
while (segmentCount < concurrencyLevel && (!evictsBySize() || segmentCount * 20 <= maxWeight)) {
++segmentShift;
segmentCount <<= 1;
}
this.segmentShift = 32 - segmentShift;
segmentMask = segmentCount - 1;
this.segments = newSegmentArray(segmentCount);
int segmentCapacity = initialCapacity / segmentCount;
if (segmentCapacity * segmentCount < initialCapacity) {
++segmentCapacity;
}
int segmentSize = 1;
while (segmentSize < segmentCapacity) {
segmentSize <<= 1;
}
//默認(rèn)不驅(qū)逐
if (evictsBySize()) {
// Ensure sum of segment max weights = overall max weights
long maxSegmentWeight = maxWeight / segmentCount + 1;
long remainder = maxWeight % segmentCount;
for (int i = 0; i < this.segments.length; ++i) {
if (i == remainder) {
maxSegmentWeight--;
}
this.segments[i] =
createSegment(segmentSize, maxSegmentWeight, builder.getStatsCounterSupplier().get());
}
} else {
//為每一個Segment進(jìn)行初始化
for (int i = 0; i < this.segments.length; ++i) {
this.segments[i] =
createSegment(segmentSize, UNSET_INT, builder.getStatsCounterSupplier().get());
}
}
}
初始化的時候初始化一些配置等,可以看到和ConcurrentHashMap基本一致靶累,但是引入了一些其他的概念腺毫。
那么回過頭看一下,最關(guān)鍵的兩個方法挣柬,首先是put方法:
@Override
public void put(K key, V value) {
localCache.put(key, value);
}
/**
* 代理到Segment的put方法
* @param key
* @param value
* @return
*/
@Override
public V put(K key, V value) {
checkNotNull(key);
checkNotNull(value);
int hash = hash(key);
return segmentFor(hash).put(key, hash, value, false);
}
@Nullable
V put(K key, int hash, V value, boolean onlyIfAbsent) {
//保證線程安全潮酒,加鎖
lock();
try {
//獲取當(dāng)前的時間
long now = map.ticker.read();
//清除隊(duì)列中的元素
preWriteCleanup(now);
//localCache的Count+1
int newCount = this.count + 1;
//擴(kuò)容操作
if (newCount > this.threshold) { // ensure capacity
expand();
newCount = this.count + 1;
}
//獲取當(dāng)前Entry中的HashTable的Entry數(shù)組
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
//定位
int index = hash & (table.length() - 1);
//獲取第一個元素
ReferenceEntry<K, V> first = table.get(index);
//遍歷整個Entry鏈表
// Look for an existing entry.
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
// We found an existing entry.
//如果找到相應(yīng)的元素
ValueReference<K, V> valueReference = e.getValueReference();
//獲取value
V entryValue = valueReference.get();
//如果entry的value為null,可能被GC掉了
if (entryValue == null) {
++modCount;
if (valueReference.isActive()) {
enqueueNotification( //減小鎖時間的開銷
key, hash, entryValue, valueReference.getWeight(), RemovalCause.COLLECTED);
//利用原來的key并且刷新value
setValue(e, key, value, now);//存儲數(shù)據(jù)邪蛔,并且將新增加的元素寫入兩個隊(duì)列中
newCount = this.count; // count remains unchanged
} else {
setValue(e, key, value, now);//存儲數(shù)據(jù)急黎,并且將新增加的元素寫入兩個隊(duì)列中
newCount = this.count + 1;
}
this.count = newCount; // write-volatile,保證內(nèi)存可見性
//淘汰緩存
evictEntries(e);
return null;
} else if (onlyIfAbsent) {//原來的Entry中包含指定key的元素侧到,所以讀取一次勃教,讀取操作需要更新Access隊(duì)列
// Mimic
// "if (!map.containsKey(key)) ...
// else return map.get(key);
recordLockedRead(e, now);
return entryValue;
} else {
//如果value不為null,那么更新value
// clobber existing entry, count remains unchanged
++modCount;
//將replace的Cause添加到隊(duì)列中
enqueueNotification(
key, hash, entryValue, valueReference.getWeight(), RemovalCause.REPLACED);
setValue(e, key, value, now);//存儲數(shù)據(jù),并且將新增加的元素寫入兩個隊(duì)列中
//數(shù)據(jù)的淘汰
evictEntries(e);
return entryValue;
}
}
}
//如果目標(biāo)的entry不存在床牧,那么新建entry
// Create a new entry.
++modCount;
ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
setValue(newEntry, key, value, now);
table.set(index, newEntry);
newCount = this.count + 1;
this.count = newCount; // write-volatile
//淘汰多余的entry
evictEntries(newEntry);
return null;
} finally {
//解鎖
unlock();
//處理剛剛的remove Cause
postWriteCleanup();
}
}
代碼比較長荣回,看上去是比較惡心的,注釋寫了一些戈咳,那么重點(diǎn)說幾個注意的點(diǎn):
- 加鎖,和ConcurrentHashMap一樣心软,加鎖是為了保證線程安全。
- preWriteCleanup:在每一次做put之前都要清理一下著蛙,清理什么删铃?看下代碼:
@GuardedBy("this")
void preWriteCleanup(long now) {
runLockedCleanup(now);
}
void runLockedCleanup(long now) {
if (tryLock()) {
try {
drainReferenceQueues();
expireEntries(now); // calls drainRecencyQueue
readCount.set(0);
} finally {
unlock();
}
}
}
@GuardedBy("this")
void drainReferenceQueues() {
if (map.usesKeyReferences()) {
drainKeyReferenceQueue();
}
if (map.usesValueReferences()) {
drainValueReferenceQueue();
}
}
@GuardedBy("this")
void drainKeyReferenceQueue() {
Reference<? extends K> ref;
int i = 0;
while ((ref = keyReferenceQueue.poll()) != null) {
@SuppressWarnings("unchecked")
ReferenceEntry<K, V> entry = (ReferenceEntry<K, V>) ref;
map.reclaimKey(entry);
if (++i == DRAIN_MAX) {
break;
}
}
}
看上去可能有點(diǎn)懵,其實(shí)它要做的就是清空兩個隊(duì)列keyReferenceQueue和valueReferenceQueue踏堡,這兩個隊(duì)列是什么東西猎唁?其實(shí)是引用使用隊(duì)列。
GuavaCache為了支持弱引用和軟引用顷蟆,引入了引用清空隊(duì)列诫隅。同時將key和Value包裝成了keyReference和valueReference腐魂。
在創(chuàng)建Entry的時候:
@GuardedBy("this")
ReferenceEntry<K, V> newEntry(K key, int hash, @Nullable ReferenceEntry<K, V> next) {
return map.entryFactory.newEntry(this, checkNotNull(key), hash, next);
}
利用map.entryFactory創(chuàng)建Entry。Factory的初始化是通過
entryFactory = EntryFactory.getFactory(keyStrength, usesAccessEntries(), usesWriteEntries());
實(shí)現(xiàn)的逐纬。keyStrength是我們在初始化時指定的引用強(qiáng)度蛔屹。可選的有工廠有:
static final EntryFactory[] factories = {
STRONG,
STRONG_ACCESS,
STRONG_WRITE,
STRONG_ACCESS_WRITE,
WEAK,
WEAK_ACCESS,
WEAK_WRITE,
WEAK_ACCESS_WRITE,
};
通過相應(yīng)的工廠創(chuàng)建對應(yīng)的Entry豁生,這里主要說一下WeakEntry:
WEAK {
@Override
<K, V> ReferenceEntry<K, V> newEntry(
Segment<K, V> segment, K key, int hash, @Nullable ReferenceEntry<K, V> next) {
return new WeakEntry<K, V>(segment.keyReferenceQueue, key, hash, next);
}
},
/**
* Used for weakly-referenced keys.
*/
static class WeakEntry<K, V> extends WeakReference<K> implements ReferenceEntry<K, V> {
WeakEntry(ReferenceQueue<K> queue, K key, int hash, @Nullable ReferenceEntry<K, V> next) {
super(key, queue);
this.hash = hash;
this.next = next;
}
@Override
public K getKey() {
return get();
}
/*
* It'd be nice to get these for free from AbstractReferenceEntry, but we're already extending
* WeakReference<K>.
*/
// null access
@Override
public long getAccessTime() {
throw new UnsupportedOperationException();
}
@Override
public void setAccessTime(long time) {
throw new UnsupportedOperationException();
}
@Override
public ReferenceEntry<K, V> getNextInAccessQueue() {
throw new UnsupportedOperationException();
}
@Override
public void setNextInAccessQueue(ReferenceEntry<K, V> next) {
throw new UnsupportedOperationException();
}
@Override
public ReferenceEntry<K, V> getPreviousInAccessQueue() {
throw new UnsupportedOperationException();
}
@Override
public void setPreviousInAccessQueue(ReferenceEntry<K, V> previous) {
throw new UnsupportedOperationException();
}
// null write
@Override
public long getWriteTime() {
throw new UnsupportedOperationException();
}
@Override
public void setWriteTime(long time) {
throw new UnsupportedOperationException();
}
@Override
public ReferenceEntry<K, V> getNextInWriteQueue() {
throw new UnsupportedOperationException();
}
@Override
public void setNextInWriteQueue(ReferenceEntry<K, V> next) {
throw new UnsupportedOperationException();
}
@Override
public ReferenceEntry<K, V> getPreviousInWriteQueue() {
throw new UnsupportedOperationException();
}
@Override
public void setPreviousInWriteQueue(ReferenceEntry<K, V> previous) {
throw new UnsupportedOperationException();
}
// The code below is exactly the same for each entry type.
final int hash;
final ReferenceEntry<K, V> next;
volatile ValueReference<K, V> valueReference = unset();
@Override
public ValueReference<K, V> getValueReference() {
return valueReference;
}
@Override
public void setValueReference(ValueReference<K, V> valueReference) {
this.valueReference = valueReference;
}
@Override
public int getHash() {
return hash;
}
@Override
public ReferenceEntry<K, V> getNext() {
return next;
}
}
WeakEntry繼承了WeakReference實(shí)現(xiàn)了ReferenceEntry兔毒,也就是說這個引用是弱引用。WeakEntry引用的key和Value隨時可能會被回收甸箱。構(gòu)造的時候參數(shù)里面有ReferenceQueue<K> queue育叁,這個就是我們上面提到的KeyReferenceQueue,所以在Key被GC掉的時候芍殖,會自動的將引用加入到ReferenceQueue這樣我們就能處理對應(yīng)的Entry了豪嗽。Value也是一樣的。是不是覺得十分牛逼豌骏?
回到正題清理KeyReferenceQueue:
@GuardedBy("this")
void drainKeyReferenceQueue() {
Reference<? extends K> ref;
int i = 0;
while ((ref = keyReferenceQueue.poll()) != null) {
@SuppressWarnings("unchecked")
ReferenceEntry<K, V> entry = (ReferenceEntry<K, V>) ref;
map.reclaimKey(entry);
if (++i == DRAIN_MAX) {
break;
}
}
}
void reclaimKey(ReferenceEntry<K, V> entry) {
int hash = entry.getHash();
segmentFor(hash).reclaimKey(entry, hash);
}
/**
* Removes an entry whose key has been garbage collected.
*/
boolean reclaimKey(ReferenceEntry<K, V> entry, int hash) {
lock();
try {
int newCount = count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
if (e == entry) {
++modCount;
ReferenceEntry<K, V> newFirst =
removeValueFromChain(
first,
e,
e.getKey(),
hash,
e.getValueReference().get(),
e.getValueReference(),
RemovalCause.COLLECTED);
newCount = this.count - 1;
table.set(index, newFirst);
this.count = newCount; // write-volatile
return true;
}
}
return false;
} finally {
unlock();
postWriteCleanup();
}
}
上面就是清理過程了昵骤,如果發(fā)現(xiàn)key或者value被GC了,那么會在put的時候觸發(fā)清理肯适。
3.setValue都干了什么变秦?setValue其實(shí)是將value寫入Entry,但是這是一個寫操作框舔,所以會刷新上一次寫的時間蹦玫,但是這是根據(jù)什么維護(hù)的呢?
/**
* Sets a new value of an entry. Adds newly created entries at the end of the access queue.
*/
@GuardedBy("this")
void setValue(ReferenceEntry<K, V> entry, K key, V value, long now) {
ValueReference<K, V> previous = entry.getValueReference();
int weight = map.weigher.weigh(key, value);
checkState(weight >= 0, "Weights must be non-negative");
ValueReference<K, V> valueReference =
map.valueStrength.referenceValue(this, entry, value, weight);
entry.setValueReference(valueReference);
//寫入隊(duì)列
recordWrite(entry, weight, now);
previous.notifyNewValue(value);
}
/**
* Updates eviction metadata that {@code entry} was just written. This currently amounts to
* adding {@code entry} to relevant eviction lists.
*/
@GuardedBy("this")
void recordWrite(ReferenceEntry<K, V> entry, int weight, long now) {
// we are already under lock, so drain the recency queue immediately
drainRecencyQueue();
totalWeight += weight;
if (map.recordsAccess()) {
entry.setAccessTime(now);
}
if (map.recordsWrite()) {
entry.setWriteTime(now);
}
accessQueue.add(entry);
writeQueue.add(entry);
}
其實(shí)GuavaCache會維護(hù)兩個隊(duì)列一個Write隊(duì)列和一個Access隊(duì)列刘绣,用這兩個隊(duì)列來實(shí)現(xiàn)最近讀和最近寫的清除操作樱溉,我們可以猜測這兩個隊(duì)列需要有序,同時也需要能快速定位元素纬凤。以Access隊(duì)列為例:
/**
* A custom queue for managing access order. Note that this is tightly integrated with
* {@code ReferenceEntry}, upon which it reliese to perform its linking.
*
* <p>Note that this entire implementation makes the assumption that all elements which are in the
* map are also in this queue, and that all elements not in the queue are not in the map.
*
* <p>The benefits of creating our own queue are that (1) we can replace elements in the middle of
* the queue as part of copyWriteEntry, and (2) the contains method is highly optimized for the
* current model.
*/
static final class AccessQueue<K, V> extends AbstractQueue<ReferenceEntry<K, V>> {
final ReferenceEntry<K, V> head =
new AbstractReferenceEntry<K, V>() {
@Override
public long getAccessTime() {
return Long.MAX_VALUE;
}
@Override
public void setAccessTime(long time) {}
ReferenceEntry<K, V> nextAccess = this;
@Override
public ReferenceEntry<K, V> getNextInAccessQueue() {
return nextAccess;
}
@Override
public void setNextInAccessQueue(ReferenceEntry<K, V> next) {
this.nextAccess = next;
}
ReferenceEntry<K, V> previousAccess = this;
@Override
public ReferenceEntry<K, V> getPreviousInAccessQueue() {
return previousAccess;
}
@Override
public void setPreviousInAccessQueue(ReferenceEntry<K, V> previous) {
this.previousAccess = previous;
}
};
// implements Queue
@Override
public boolean offer(ReferenceEntry<K, V> entry) {
// unlink
connectAccessOrder(entry.getPreviousInAccessQueue(), entry.getNextInAccessQueue());
// add to tail
connectAccessOrder(head.getPreviousInAccessQueue(), entry);
connectAccessOrder(entry, head);
return true;
}
@Override
public ReferenceEntry<K, V> peek() {
ReferenceEntry<K, V> next = head.getNextInAccessQueue();
return (next == head) ? null : next;
}
@Override
public ReferenceEntry<K, V> poll() {
ReferenceEntry<K, V> next = head.getNextInAccessQueue();
if (next == head) {
return null;
}
remove(next);
return next;
}
head.setNextInAccessQueue(head);
head.setPreviousInAccessQueue(head);
}
}
}
重點(diǎn)關(guān)注幾個點(diǎn):offer方法福贞,offer主要做了幾個事情:
1.將Entry和它的前節(jié)點(diǎn)后節(jié)點(diǎn)的關(guān)聯(lián)斷開,這樣就需要Entry中維護(hù)它的前向和后向引用停士。
2.將新增加的節(jié)點(diǎn)加入到隊(duì)列的尾部挖帘,尋找尾節(jié)點(diǎn)用了head.getPreviousInAccessQueue()×导迹可以看出來是個環(huán)形隊(duì)列拇舀。
3.將新增加的節(jié)點(diǎn),或者新調(diào)整出來的節(jié)點(diǎn)設(shè)為尾部節(jié)點(diǎn)蜻底。
通過這幾點(diǎn)骄崩,可以得知,最近更新的節(jié)點(diǎn)一定是在尾部的,head后面的節(jié)點(diǎn)一定是不活躍的要拂,在每一次清除過期節(jié)點(diǎn)的時候一定清除head之后的超時的節(jié)點(diǎn)抠璃,這點(diǎn)可以通過poll進(jìn)行驗(yàn)證。
Write隊(duì)列也是同理脱惰。也就是每次寫入操作都會更新元素的引用和寫入的時間哗伯,并且更新元素在讀寫隊(duì)列中的位置姊舵。我又一次感覺它挺牛逼的卡儒。
4.evictEntries(e)殿遂,item的淘汰背伴,這個操作是在設(shè)置了Cache中能緩存最大條目的前提下觸發(fā)的:
/**
* Performs eviction if the segment is over capacity. Avoids flushing the entire cache if the
* newest entry exceeds the maximum weight all on its own.
*
* @param newest the most recently added entry
*/
@GuardedBy("this")
void evictEntries(ReferenceEntry<K, V> newest) {
if (!map.evictsBySize()) {
return;
}
drainRecencyQueue();
// If the newest entry by itself is too heavy for the segment, don't bother evicting
// anything else, just that
if (newest.getValueReference().getWeight() > maxSegmentWeight) {
if (!removeEntry(newest, newest.getHash(), RemovalCause.SIZE)) {
throw new AssertionError();
}
}
while (totalWeight > maxSegmentWeight) {
ReferenceEntry<K, V> e = getNextEvictable();
if (!removeEntry(e, e.getHash(), RemovalCause.SIZE)) {
throw new AssertionError();
}
}
}
這里主要做了幾件事椭赋,首先判斷是否開啟淘汰茧痒,之后呢清理RecencyQueue选侨,然后判斷新增加的元素是否有很大的權(quán)重良蛮,如果是那么直接刪掉抽碌,因?yàn)樗亓恕W詈笈袛嗍欠駲?quán)重已經(jīng)大于上限决瞳,如果是的話那么我們就清除最近最少有使用的Entry货徙,直到Weight小于上限。
// TODO(fry): instead implement this with an eviction head
@GuardedBy("this")
ReferenceEntry<K, V> getNextEvictable() {
for (ReferenceEntry<K, V> e : accessQueue) {
int weight = e.getValueReference().getWeight();
if (weight > 0) {
return e;
}
}
throw new AssertionError();
}
這里比較容易疑惑的是:Weight是啥皮胡?其實(shí)如果不做設(shè)置Weight都是1痴颊,Weight上限就是maxSize。但是Guava允許自己定義Weight屡贺,那么上限就是maxWeight了蠢棱。這部分可以看上面初始化部分。
5.removeListener:removeListener可以看到甩栈,在元素被覆蓋的時候后注冊了一個事件泻仙,同時在finnally里面進(jìn)行了一次清理:
/**
* Notifies listeners that an entry has been automatically removed due to expiration, eviction, or
* eligibility for garbage collection. This should be called every time expireEntries or
* evictEntry is called (once the lock is released).
*/
void processPendingNotifications() {
RemovalNotification<K, V> notification;
while ((notification = removalNotificationQueue.poll()) != null) {
try {
removalListener.onRemoval(notification);
} catch (Throwable e) {
logger.log(Level.WARNING, "Exception thrown by removal listener", e);
}
}
}
可以看到為了減小put的開銷,這里做了一個類似于異步的操作量没,并且在解鎖之后做這樣的操作來避免阻塞其他的put玉转。
關(guān)于Guava的Put操作就分析完了,的確是夠復(fù)雜的殴蹄。下面看一下get部分:
// LoadingCache methods
//local cache的代理
@Override
public V get(K key) throws ExecutionException {
return localCache.getOrLoad(key);
}
/**
* 根據(jù)key獲取value,如果獲取不到進(jìn)行l(wèi)oad
* @param key
* @return
* @throws ExecutionException
*/
V getOrLoad(K key) throws ExecutionException {
return get(key, defaultLoader);
}
V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
int hash = hash(checkNotNull(key));//hash——>rehash
return segmentFor(hash).get(key, hash, loader);
}
// loading
//進(jìn)行指定key對應(yīng)的value的獲取究抓,讀取不加鎖
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
//保證key-value不為null
checkNotNull(key);
checkNotNull(loader);
try {
if (count != 0) { // read-volatile volatile讀會刷新緩存,盡量保證可見性,如果為0那么直接load
// don't call getLiveEntry, which would ignore loading values
ReferenceEntry<K, V> e = getEntry(key, hash);
//如果對應(yīng)的Entry不為Null,證明值還在
if (e != null) {
long now = map.ticker.read();//獲取當(dāng)前的時間袭灯,根據(jù)當(dāng)前的時間進(jìn)行Live的數(shù)據(jù)的讀取
V value = getLiveValue(e, now);
//元素不為null的話可以不刷新
if (value != null) {
recordRead(e, now);//為entry增加accessTime,同時加入recencyQueue
statsCounter.recordHits(1);//更新當(dāng)前的狀態(tài)漩蟆,增加為命中,可以用于計(jì)算命中率
//判斷當(dāng)前有沒有到刷新的時機(jī),如果沒有的話那么返回原值妓蛮。否則進(jìn)行刷新
return scheduleRefresh(e, key, hash, value, now, loader);
}
//value為null,如果此時value正在刷新怠李,那么此時等待刷新結(jié)果
ValueReference<K, V> valueReference = e.getValueReference();
if (valueReference.isLoading()) {
return waitForLoadingValue(e, key, valueReference);
}
}
}
//如果取不到值,那么進(jìn)行統(tǒng)一的加鎖get
// at this point e is either null or expired;
return lockedGetOrLoad(key, hash, loader);
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof Error) {
throw new ExecutionError((Error) cause);
} else if (cause instanceof RuntimeException) {
throw new UncheckedExecutionException(cause);
}
throw ee;
} finally {
postReadCleanup();//每次Put和get之后都要進(jìn)行一次Clean
}
}
get的實(shí)現(xiàn)和JDK1.6的ConcurrentHashMap思想一致,都是put加鎖捺癞,但是get是用volatile保證夷蚊。
這里主要做了幾件事:
- 首先獲取Entry,Entry不為null獲取對應(yīng)的Value髓介,如果Value不為空惕鼓,那么證明值還在,那么這時候判斷一下是否要刷新直接返回了唐础。否則判斷目前引用是否在Loading箱歧,如果是就等待Loading結(jié)束。
- 如果取不到Entry或者Value為null 并且沒有在Loading一膨,那么這時候進(jìn)行l(wèi)ockedGetOrLoad(),這是一個大活兒呀邢。
V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
ReferenceEntry<K, V> e;
ValueReference<K, V> valueReference = null;
LoadingValueReference<K, V> loadingValueReference = null;
boolean createNewEntry = true;
lock();//加鎖,因?yàn)闀淖償?shù)據(jù)結(jié)構(gòu)
try {
// re-read ticker once inside the lock
long now = map.ticker.read();
preWriteCleanup(now);//清除引用隊(duì)列,Acess隊(duì)列和Write隊(duì)列中過期的數(shù)據(jù),這算是一次put操作
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
//定位目標(biāo)元素
for (e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
valueReference = e.getValueReference();
//如果目前處在loading狀態(tài)豹绪,不創(chuàng)建新元素
if (valueReference.isLoading()) {
createNewEntry = false;
} else {
V value = valueReference.get();
if (value == null) { //可能被GC掉了价淌,加入removeListener
enqueueNotification(
entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED);
} else if (map.isExpired(e, now)) { //可能過期了
// This is a duplicate check, as preWriteCleanup already purged expired
// entries, but let's accomodate an incorrect expiration queue.
enqueueNotification(
entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED);
} else {//目前就已經(jīng)加載過了,返回
recordLockedRead(e, now);
statsCounter.recordHits(1);
// we were concurrent with loading; don't consider refresh
return value;
}
//刪除在隊(duì)列中相應(yīng)的引用瞒津,因?yàn)楹竺嬉聞?chuàng)建
// immediately reuse invalid entries
writeQueue.remove(e);
accessQueue.remove(e);
this.count = newCount; // write-volatile
}
break;
}
}
//創(chuàng)建新的Entry,但是此時是沒有值的
if (createNewEntry) {
loadingValueReference = new LoadingValueReference<K, V>();
if (e == null) {
e = newEntry(key, hash, first);
e.setValueReference(loadingValueReference);
table.set(index, e);
} else {
e.setValueReference(loadingValueReference);
}
}
} finally {
unlock();
postWriteCleanup();
}
if (createNewEntry) {
try {
// Synchronizes on the entry to allow failing fast when a recursive load is
// detected. This may be circumvented when an entry is copied, but will fail fast most
// of the time.
synchronized (e) {
return loadSync(key, hash, loadingValueReference, loader);
}
} finally {
statsCounter.recordMisses(1);
}
} else {
// The entry already exists. Wait for loading.
return waitForLoadingValue(e, key, valueReference);
}
}
首先說一下為什么加鎖蝉衣,加鎖的原因有兩個:
- load算是一個寫操作,改變數(shù)據(jù)結(jié)構(gòu)巷蚪,需要加鎖病毡。
- 為了避免緩存擊穿,加鎖一個防止緩存擊穿的發(fā)生屁柏,當(dāng)然是JVm級別的不是分布式級別的剪验。
因?yàn)槭菍懰砸M(jìn)行preWriteCleanup,根據(jù)key定位一下Entry前联,如果能定位到功戚,那么判斷是否在Loading,如果是的話不創(chuàng)建新的Entry并且等待Loading結(jié)束似嗤。如果不是那么判斷value是否為null和是否過期啸臀,如果是的話都要進(jìn)行創(chuàng)建新Entry,如果都不是證明value是加載過了烁落,那么更新下Access隊(duì)列然后返回乘粒。
接下來清除一下Access和Write隊(duì)列的元素,創(chuàng)建新的Entry伤塌。這里比較有意思:
// at most one of loadSync/loadAsync may be called for any given LoadingValueReference
//同步刷新
V loadSync(
K key,
int hash,
LoadingValueReference<K, V> loadingValueReference,
CacheLoader<? super K, V> loader)
throws ExecutionException {
ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
return getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
}
這里創(chuàng)建了一個loadingReference灯萍,這也就是之前看到的判斷是否在Loading。如果是Loading狀態(tài)那么表面有一個線程正在更新Cache每聪,其他的線程等待就可以了旦棉。
這里可以看到其實(shí)也支持異步的刷新:
ListenableFuture<V> loadAsync(
final K key,
final int hash,
final LoadingValueReference<K, V> loadingValueReference,
CacheLoader<? super K, V> loader) {
final ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
loadingFuture.addListener(
new Runnable() {
@Override
public void run() {
try {
getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown during refresh", t);
loadingValueReference.setException(t);
}
}
},
directExecutor());
return loadingFuture;
}
后面更新的邏輯就不貼了齿风。
從上面我們可以看到,對于每一次get都會去進(jìn)行Access隊(duì)列的更新绑洛,同時對于多線程的更新只會引起一個線程去load數(shù)據(jù)救斑,對于不存在的數(shù)據(jù),get時也會進(jìn)行一次load操作真屯。同時通過同步操作解決了緩存擊穿的問題脸候。不得不說GuavaCache設(shè)計(jì)的很巧妙。
其實(shí)Guava還有一個比較好玩的東西绑蔫,asMap(),我們感覺GuavaCache像Map运沦,但是還不完全是Map,那么就提供了一個方法以Map的視圖去展現(xiàn)配深。
看下asMap()
@Override
public ConcurrentMap<K, V> asMap() {
return localCache;
}
其實(shí)就是localCache返回了携添,返回類型是ConcurrentMap,那么我們看看localCache的繼承結(jié)構(gòu):
@GwtCompatible(emulated = true)
class LocalCache<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V> {
果然和Map關(guān)系大大的凉馆,也就是說,LocalCache本身是個ConcurrentMap亡资,但是對于LocalCache的這些map方法我們是調(diào)用不到的澜共,因?yàn)槲覀冎荒苡肔oadingCache嘛。通過asMap我們能得到LocalCache锥腻,但是我們不能使用除了Map接口之外的方法嗦董,也就是說我們不能使用自動加載等一系列的功能。
正如官方Wiki說的:
至此所有的核心源碼分析完了瘦黑,覺得有點(diǎn)惡心京革,源碼這東西就要靜下來細(xì)細(xì)的看,收獲會很大幸斥。
由于文章比較長匹摇,如果有什么問題還請賜教。最后甲葬,祝自己這個苦逼碼農(nóng)圣誕快樂廊勃。