build創(chuàng)建緩存
static Cache<String, String> cache = CacheBuilder.newBuilder()
.maximumSize(40000)
.expireAfterWrite(3, TimeUnit.SECONDS)
.expireAfterAccess(3, TimeUnit.SECONDS)
.removalListener(notification -> {
System.out.println(Thread.currentThread().getName() + " -> remove : " + notification.getKey());
return;
}).build();
build一個cache實例,實例內(nèi)部是由多個segemnt組成.上面的參數(shù) 會創(chuàng)建4個segment ,并且買個segment最大容量是10000.代碼如下:
//new
public <K1 extends K, V1 extends V> Cache<K1, V1> build() {
checkWeightWithWeigher();
checkNonLoadingCache();
//↓↓↓↓
return new LocalCache.LocalManualCache<K1, V1>(this);
}
//manual cache
LocalManualCache(CacheBuilder<? super K, ? super V> builder) {
//↓↓↓
this(new LocalCache<K, V>(builder, null));
}
//核心創(chuàng)建邏輯
LocalCache(
CacheBuilder<? super K, ? super V> builder, @Nullable CacheLoader<? super K, V> loader) {
concurrencyLevel = Math.min(builder.getConcurrencyLevel(), MAX_SEGMENTS);
keyStrength = builder.getKeyStrength();
valueStrength = builder.getValueStrength();
keyEquivalence = builder.getKeyEquivalence();
valueEquivalence = builder.getValueEquivalence();
maxWeight = builder.getMaximumWeight();
weigher = builder.getWeigher();
expireAfterAccessNanos = builder.getExpireAfterAccessNanos();
expireAfterWriteNanos = builder.getExpireAfterWriteNanos();
refreshNanos = builder.getRefreshNanos();
//創(chuàng)建的listener 此處加入
removalListener = builder.getRemovalListener();
removalNotificationQueue =
(removalListener == NullListener.INSTANCE)
? LocalCache.<RemovalNotification<K, V>>discardingQueue()
: new ConcurrentLinkedQueue<RemovalNotification<K, V>>();
ticker = builder.getTicker(recordsTime());
entryFactory = EntryFactory.getFactory(keyStrength, usesAccessEntries(), usesWriteEntries());
globalStatsCounter = builder.getStatsCounterSupplier().get();
defaultLoader = loader;
int initialCapacity = Math.min(builder.getInitialCapacity(), MAXIMUM_CAPACITY);
if (evictsBySize() && !customWeigher()) {
initialCapacity = Math.min(initialCapacity, (int) maxWeight);
}
// Find the lowest power-of-two segmentCount that exceeds concurrencyLevel, unless
// maximumSize/Weight is specified in which case ensure that each segment gets at least 10
// entries. The special casing for size-based eviction is only necessary because that eviction
// happens per segment instead of globally, so too many segments compared to the maximum size
// will result in random eviction behavior.
int segmentShift = 0;
int segmentCount = 1;
//這里進行segment的計算,最終得到 segment=4
while (segmentCount < concurrencyLevel && (!evictsBySize() || segmentCount * 20 <= maxWeight)) {
++segmentShift;
segmentCount <<= 1;
}
this.segmentShift = 32 - segmentShift;
segmentMask = segmentCount - 1;
//創(chuàng)建segemnt數(shù)組
this.segments = newSegmentArray(segmentCount);
//這里計算segment的size , 總build中分成四份
int segmentCapacity = initialCapacity / segmentCount;
if (segmentCapacity * segmentCount < initialCapacity) {
++segmentCapacity;
}
int segmentSize = 1;
while (segmentSize < segmentCapacity) {
segmentSize <<= 1;
}
//是否根據(jù)最大容量進行逐出 ,本實例是按照容量逐出的
if (evictsBySize()) {
// Ensure sum of segment max weights = overall max weights
long maxSegmentWeight = maxWeight / segmentCount + 1;
long remainder = maxWeight % segmentCount;
for (int i = 0; i < this.segments.length; ++i) {
if (i == remainder) {
maxSegmentWeight--;
}
//對segment中的對象進行初始化 ,
this.segments[i] =
createSegment(segmentSize, maxSegmentWeight, builder.getStatsCounterSupplier().get());
}
} else {
for (int i = 0; i < this.segments.length; ++i) {
//如果不是按照容量進行逐出,則第二個參數(shù)為-1(UNSET_INT) ,表示無上限的限制
this.segments[i] =
createSegment(segmentSize, UNSET_INT, builder.getStatsCounterSupplier().get());
}
}
}
總結(jié): Guava通過將LocalManualCache與LocalLoadingCache設計為LocalCache的靜態(tài)內(nèi)部類吨凑,并在LocalManualCache類中設置一個final修飾的LocalCache成員變量,在緩存器構(gòu)建時完成對LocalCache成員變量的初始化膛堤,這樣不管緩存器是Cache或LoadingCache類型实撒,用戶對緩存器的操作都可以轉(zhuǎn)換為對LocalCache的操作。
put 操作
// put 發(fā)生了哪些動作
cache.put("abc", "value");
put的過程會首先對key進行hash找到他對應的segment,然后對segment進行預清理.然后才執(zhí)行put ,put之后又會執(zhí)行后續(xù)的其他清理.下面根據(jù)代碼一步一步查看
注意put的過程會對當前的segment進行過期key的清理操作
//# LocalManualCache
@Override
public void put(K key, V value) {
localCache.put(key, value);
}
// localcache 中的put
@Override
public V put(K key, V value) {
checkNotNull(key);
checkNotNull(value);
int hash = hash(key);
return segmentFor(hash).put(key, hash, value, false);
}
首先根據(jù)key找到他的segment
/**
* Returns the segment that should be used for a key with the given hash.
*
* @param hash the hash code for the key
* @return the segment
*/
Segment<K, V> segmentFor(int hash) {
// TODO(fry): Lazily create segments?
return segments[(hash >>> segmentShift) & segmentMask];
}
然后對segemnt進行put,segment中就是一個localcache對象 ,下面是他的構(gòu)造函數(shù)
//可以看到 ,segment構(gòu)造的時候需要把localcache傳入構(gòu)造器
Segment(
LocalCache<K, V> map,
int initialCapacity,
long maxSegmentWeight,
StatsCounter statsCounter) {
this.map = map;
this.maxSegmentWeight = maxSegmentWeight;
this.statsCounter = checkNotNull(statsCounter);
initTable(newEntryArray(initialCapacity));
keyReferenceQueue = map.usesKeyReferences() ? new ReferenceQueue<K>() : null;
valueReferenceQueue = map.usesValueReferences() ? new ReferenceQueue<V>() : null;
recencyQueue =
map.usesAccessQueue()
? new ConcurrentLinkedQueue<ReferenceEntry<K, V>>()
: LocalCache.<ReferenceEntry<K, V>>discardingQueue();
writeQueue =
map.usesWriteQueue()
? new WriteQueue<K, V>()
: LocalCache.<ReferenceEntry<K, V>>discardingQueue();
accessQueue =
map.usesAccessQueue()
? new AccessQueue<K, V>()
: LocalCache.<ReferenceEntry<K, V>>discardingQueue();
}
下面是核心部分的put邏輯 ,重點關注注釋的地方: 預清理,末尾通知
// put執(zhí)行過程中包含了 preWriteCleanup 和 finally中的postWriteCleanup
@Nullable
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();
try {
long now = map.ticker.read();
//預清理動作,這個操作會清理掉當前segemnt中所有過期的元素
preWriteCleanup(now);
int newCount = this.count + 1;
if (newCount > this.threshold) { // ensure capacity
expand();
newCount = this.count + 1;
}
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
// Look for an existing entry.
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
// We found an existing entry.
ValueReference<K, V> valueReference = e.getValueReference();
V entryValue = valueReference.get();
if (entryValue == null) {
++modCount;
if (valueReference.isActive()) {
enqueueNotification(
key, hash, entryValue, valueReference.getWeight(), RemovalCause.COLLECTED);
setValue(e, key, value, now);
newCount = this.count; // count remains unchanged
} else {
setValue(e, key, value, now);
newCount = this.count + 1;
}
this.count = newCount; // write-volatile
evictEntries(e);
return null;
} else if (onlyIfAbsent) {
// Mimic
// "if (!map.containsKey(key)) ...
// else return map.get(key);
recordLockedRead(e, now);
return entryValue;
} else {
// clobber existing entry, count remains unchanged
++modCount;
enqueueNotification(
key, hash, entryValue, valueReference.getWeight(), RemovalCause.REPLACED);
setValue(e, key, value, now);
evictEntries(e);
return entryValue;
}
}
}
// Create a new entry.
++modCount;
ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
setValue(newEntry, key, value, now);
table.set(index, newEntry);
newCount = this.count + 1;
this.count = newCount; // write-volatile
evictEntries(newEntry);
return null;
} finally {
unlock();
//這里面進行過期元素的通知 ,上面的預清理動作已經(jīng)把過期 的元素加入到了,指定隊列,這里面直接調(diào)用當前隊列的listener.會觸發(fā)用戶的回調(diào)
postWriteCleanup();
}
}
下面著重看下 預清理 和 通知
@GuardedBy("this")
void preWriteCleanup(long now) {
runLockedCleanup(now);
}
//
void runLockedCleanup(long now) {
if (tryLock()) {
try {
drainReferenceQueues();
//這個expire 方法是關鍵
expireEntries(now); // calls drainRecencyQueue
readCount.set(0);
} finally {
unlock();
}
}
}
//
@GuardedBy("this")
void expireEntries(long now) {
drainRecencyQueue();
ReferenceEntry<K, V> e;
//這里面會循環(huán)當前隊列的所有元素,判斷每個元素是否已經(jīng)過期 ,如果過期 會進行相應處理,重點關注他的 remvoeEntry方法
while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) {
if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
throw new AssertionError();
}
}
while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {
if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
throw new AssertionError();
}
}
}
//removeentry中重點關注 removeValueFromChain 這里執(zhí)行清理操作
@VisibleForTesting
@GuardedBy("this")
boolean removeEntry(ReferenceEntry<K, V> entry, int hash, RemovalCause cause) {
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
if (e == entry) {
++modCount;
ReferenceEntry<K, V> newFirst =
removeValueFromChain(
first,
e,
e.getKey(),
hash,
e.getValueReference().get(),
e.getValueReference(),
cause);
newCount = this.count - 1;
table.set(index, newFirst);
this.count = newCount; // write-volatile
return true;
}
}
return false;
}
//重點關注 enqueueNotification
@GuardedBy("this")
@Nullable
ReferenceEntry<K, V> removeValueFromChain(
ReferenceEntry<K, V> first,
ReferenceEntry<K, V> entry,
@Nullable K key,
int hash,
V value,
ValueReference<K, V> valueReference,
RemovalCause cause) {
//這一句 ,會把要清理的元素 包裝成一個通知 ,再put完成之后 ,finally的通知函數(shù)里面 ,進行逐個通知
enqueueNotification(key, hash, value, valueReference.getWeight(), cause);
writeQueue.remove(entry);
accessQueue.remove(entry);
if (valueReference.isLoading()) {
valueReference.notifyNewValue(null);
return first;
} else {
return removeEntryFromChain(first, entry);
}
}
//本方法會把刪除的key生成通知對象放入removalNotificationQueue中 ,通過offer方法
@GuardedBy("this")
void enqueueNotification(
@Nullable K key, int hash, @Nullable V value, int weight, RemovalCause cause) {
totalWeight -= weight;
if (cause.wasEvicted()) {
statsCounter.recordEviction();
}
if (map.removalNotificationQueue != DISCARDING_QUEUE) {
RemovalNotification<K, V> notification = RemovalNotification.create(key, value, cause);
// 生成notifycation , 放入通知隊列,給后續(xù)的回調(diào)方法使用
map.removalNotificationQueue.offer(notification);
}
}
////////////////預清理執(zhí)行完畢/////////////////////////
////// 下面看下 通知邏輯 /////////////////////////////
//put之后的finally中有代碼 `postWriteCleanup`
/**
* Performs routine cleanup following a write.
*/
void postWriteCleanup() {
runUnlockedCleanup();
}
//
void runUnlockedCleanup() {
// locked cleanup may generate notifications we can send unlocked
if (!isHeldByCurrentThread()) {
map.processPendingNotifications();
}
}
}
//removalListener.onRemoval 這里面是回調(diào)用戶穿進來的listenr代碼
void processPendingNotifications() {
RemovalNotification<K, V> notification;
while ((notification = removalNotificationQueue.poll()) != null) {
try {
removalListener.onRemoval(notification);
} catch (Throwable e) {
logger.log(Level.WARNING, "Exception thrown by removal listener", e);
}
}
}
總結(jié) : 在put操作的時候,會進行過期元素的清理動作 .并且這個動作是再同一個線程中執(zhí)行的 .但是清理的不是所有的cache,只是key所在的segemnt中的所有key
get 操作
get()
基本用法如下
//允許傳入一個loader.在緩存中沒有命中的情況下,執(zhí)行l(wèi)oader獲取數(shù)據(jù)
cache.get("adder", () -> {
return "a";
});
get操作的基本流程是: 根據(jù)key首先hash出他對應的segment,然后去對應的segment獲取對應的元素,這其中也會進行清理動作,過期的元素會被直接刪除,并且發(fā)出通知. 代碼如下
//LocalManualCache
@Override
public V get(K key, final Callable<? extends V> valueLoader) throws ExecutionException {
checkNotNull(valueLoader);
// ↓↓↓↓
return localCache.get(
key,
new CacheLoader<Object, V>() {
@Override
public V load(Object key) throws Exception {
return valueLoader.call();
}
});
}
//
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
checkNotNull(key);
checkNotNull(loader);
try {
if (count != 0) { // read-volatile
// don't call getLiveEntry, which would ignore loading values
ReferenceEntry<K, V> e = getEntry(key, hash);
if (e != null) {
long now = map.ticker.read();
V value = getLiveValue(e, now);
if (value != null) {
recordRead(e, now);
statsCounter.recordHits(1);
return scheduleRefresh(e, key, hash, value, now, loader);
}
ValueReference<K, V> valueReference = e.getValueReference();
if (valueReference.isLoading()) {
return waitForLoadingValue(e, key, valueReference);
}
}
}
//重點邏輯 ,get動作并且執(zhí)行相應的清理
//↓↓↓↓↓↓↓↓↓↓↓↓
//
return lockedGetOrLoad(key, hash, loader);
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof Error) {
throw new ExecutionError((Error) cause);
} else if (cause instanceof RuntimeException) {
throw new UncheckedExecutionException(cause);
}
throw ee;
} finally {
postReadCleanup();
}
}
//
V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
ReferenceEntry<K, V> e;
ValueReference<K, V> valueReference = null;
LoadingValueReference<K, V> loadingValueReference = null;
boolean createNewEntry = true;
lock();
try {
// re-read ticker once inside the lock
long now = map.ticker.read();
//預清理 ,會直接清理過期的元素并且把過期的元素做成通知放入隊列
//等方法執(zhí)行完畢,回調(diào)用戶傳入的listener
preWriteCleanup(now);
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
valueReference = e.getValueReference();
if (valueReference.isLoading()) {
createNewEntry = false;
} else {
V value = valueReference.get();
if (value == null) {
enqueueNotification(
entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED);
} else if (map.isExpired(e, now)) {
// This is a duplicate check, as preWriteCleanup already purged expired
// entries, but let's accomodate an incorrect expiration queue.
enqueueNotification(
entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED);
} else {
recordLockedRead(e, now);
statsCounter.recordHits(1);
// we were concurrent with loading; don't consider refresh
return value;
}
// immediately reuse invalid entries
writeQueue.remove(e);
accessQueue.remove(e);
this.count = newCount; // write-volatile
}
break;
}
}
if (createNewEntry) {
loadingValueReference = new LoadingValueReference<K, V>();
if (e == null) {
e = newEntry(key, hash, first);
e.setValueReference(loadingValueReference);
table.set(index, e);
} else {
e.setValueReference(loadingValueReference);
}
}
} finally {
unlock();
//進行清理之后的通知, 執(zhí)行用戶傳入的listener
postWriteCleanup();
}
if (createNewEntry) {
try {
// Synchronizes on the entry to allow failing fast when a recursive load is
// detected. This may be circumvented when an entry is copied, but will fail fast most
// of the time.
synchronized (e) {
return loadSync(key, hash, loadingValueReference, loader);
}
} finally {
statsCounter.recordMisses(1);
}
} else {
// The entry already exists. Wait for loading.
return waitForLoadingValue(e, key, valueReference);
}
}
總結(jié): 根據(jù)上面的get的核心邏輯 ,重點看下 預清理和通知, 清理和通知的邏輯參看 上文 put的邏輯.根據(jù)這個邏輯可以總結(jié)出: put操作和get操作都會執(zhí)行過期元素的檢查清理和通知,并且是再同一個線程中執(zhí)行相關的操作. 并沒有另外開啟線程.
getIfPresent 操作
本方法的意思是 ,如果緩存中有指定的key就直接返回, 否則返回null.另外清理操作是怎么實現(xiàn)的 ? 和put/get是一樣的嗎 ?
//用法示例
cache.getIfPresent("test");
//如果不存在,返回null
下面看一下他的獲取邏輯,是否和get操作保持一致的 ?
@Override
@Nullable
public V getIfPresent(Object key) {
//↓↓↓↓↓↓↓
return localCache.getIfPresent(key);
}
//可以看到,也是先獲取hash找到對應的段(segment)的位置
@Nullable
public V getIfPresent(Object key) {
int hash = hash(checkNotNull(key));
//↓↓↓↓↓↓↓
V value = segmentFor(hash).get(key, hash);
if (value == null) {
globalStatsCounter.recordMisses(1);
} else {
globalStatsCounter.recordHits(1);
}
return value;
}
//
下面看看 get操作的核心邏輯
//關注 : postReadCleanup ,由于再finally中,表示他肯定會被執(zhí)行
@Nullable
V get(Object key, int hash) {
try {
if (count != 0) { // read-volatile
long now = map.ticker.read();
ReferenceEntry<K, V> e = getLiveEntry(key, hash, now);
if (e == null) {
return null;
}
V value = e.getValueReference().get();
if (value != null) {
recordRead(e, now);
return scheduleRefresh(e, e.getKey(), hash, value, now, map.defaultLoader);
}
tryDrainReferenceQueues();
}
return null;
} finally {
//也執(zhí)行的清理的動作 那是不是表示 每次get也會清理過期的keys?
postReadCleanup();
}
}
postReadCleanup
的清理邏輯
//這里面有一個 readCount,每次get一次,這個值都會增加
//并且這個是每個segment獨有的 .每個segment自己遞增
void postReadCleanup() {
if ((readCount.incrementAndGet() & DRAIN_THRESHOLD) == 0) {
cleanUp();
}
}
//cleanup邏輯
void cleanUp() {
long now = map.ticker.read();
//執(zhí)行清理邏輯
runLockedCleanup(now);
//發(fā)送清理通知
runUnlockedCleanup();
}
從上面的代碼可以看到 雖然每次都執(zhí)行清理,但是有一個前置條件 :
readCount.incrementAndGet() & DRAIN_THRESHOLD == 0
滿足條件才能執(zhí)行清理 .根據(jù)代碼發(fā)現(xiàn)DRAIN_THRESHOLD
為64, 也就是說執(zhí)行64次getifpresent 才會執(zhí)行一次清理 .
總結(jié): 和
put
get
操作不同getIfPresent
并不是每次都執(zhí)行過期key的維護,而是執(zhí)行方法64次才執(zhí)行一次緩存清理.