guava cache 源碼之get put

在上一篇文章中介紹了guava cache是怎么構(gòu)造的序无,接下來(lái)cache中兩個(gè)重要的方法帝嗡,put狮辽、get喉脖,我們先來(lái)看看put方法:

@Override
    public void put(K key, V value) {
      localCache.put(key, value);
    }
  /**
   * 代理到Segment的put方法
   * @param key
   * @param value
   * @return
   */
  @Override
  public V put(K key, V value) {
    checkNotNull(key);
    checkNotNull(value);
    int hash = hash(key);
    return segmentFor(hash).put(key, hash, value, false);
  }

@Nullable
    V put(K key, int hash, V value, boolean onlyIfAbsent) {
      //保證線程安全,加鎖
      lock();
      try {
        //獲取當(dāng)前的時(shí)間
        long now = map.ticker.read();
        //清除隊(duì)列中的元素
        preWriteCleanup(now);
        //localCache的Count+1
        int newCount = this.count + 1;
        //擴(kuò)容操作
        if (newCount > this.threshold) { // ensure capacity
          expand();
          newCount = this.count + 1;
        }
        //獲取當(dāng)前Entry中的HashTable的Entry數(shù)組
        AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
        //定位
        int index = hash & (table.length() - 1);
        //獲取第一個(gè)元素
        ReferenceEntry<K, V> first = table.get(index);
        //遍歷整個(gè)Entry鏈表
        // Look for an existing entry.
        for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
          K entryKey = e.getKey();
          if (e.getHash() == hash
              && entryKey != null
              && map.keyEquivalence.equivalent(key, entryKey)) {
            // We found an existing entry.
            //如果找到相應(yīng)的元素
            ValueReference<K, V> valueReference = e.getValueReference();
            //獲取value
            V entryValue = valueReference.get();
            //如果entry的value為null,可能被GC掉了
            if (entryValue == null) {
              ++modCount;
              if (valueReference.isActive()) {
                enqueueNotification( //減小鎖時(shí)間的開(kāi)銷
                    key, hash, entryValue, valueReference.getWeight(), RemovalCause.COLLECTED);
                //利用原來(lái)的key并且刷新value
                setValue(e, key, value, now);//存儲(chǔ)數(shù)據(jù),并且將新增加的元素寫(xiě)入兩個(gè)隊(duì)列中
                newCount = this.count; // count remains unchanged
              } else {
                setValue(e, key, value, now);//存儲(chǔ)數(shù)據(jù)草冈,并且將新增加的元素寫(xiě)入兩個(gè)隊(duì)列中
                newCount = this.count + 1;
              }
              this.count = newCount; // write-volatile疲陕,保證內(nèi)存可見(jiàn)性
              //淘汰緩存
              evictEntries(e);
              return null;
            } else if (onlyIfAbsent) {//原來(lái)的Entry中包含指定key的元素,所以讀取一次诅岩,讀取操作需要更新Access隊(duì)列
              // Mimic
              // "if (!map.containsKey(key)) ...
              // else return map.get(key);
              recordLockedRead(e, now);
              return entryValue;
            } else {
              //如果value不為null,那么更新value
              // clobber existing entry, count remains unchanged
              ++modCount;
              //將replace的Cause添加到隊(duì)列中
              enqueueNotification(
                  key, hash, entryValue, valueReference.getWeight(), RemovalCause.REPLACED);
              setValue(e, key, value, now);//存儲(chǔ)數(shù)據(jù)鸳谜,并且將新增加的元素寫(xiě)入兩個(gè)隊(duì)列中
              //數(shù)據(jù)的淘汰
              evictEntries(e);
              return entryValue;
            }
          }
        }
        //如果目標(biāo)的entry不存在,那么新建entry
        // Create a new entry.
        ++modCount;
        ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
        setValue(newEntry, key, value, now);
        table.set(index, newEntry);
        newCount = this.count + 1;
        this.count = newCount; // write-volatile
        //淘汰多余的entry
        evictEntries(newEntry);
        return null;
      } finally {
        //解鎖
        unlock();
        //處理剛剛的remove Cause
        postWriteCleanup();
      }
    }

說(shuō)明

  • put方法一開(kāi)始就加鎖是為了保證線程安全蝗肪,這點(diǎn)和ConcurrentHashMap一致
  • preWriteCleanup在每次put之前都會(huì)清理動(dòng)作,清理啥呢豁延?
@GuardedBy("this")
    void preWriteCleanup(long now) {
      runLockedCleanup(now);
    }
    void runLockedCleanup(long now) {
      if (tryLock()) {
        try {
          drainReferenceQueues();
          expireEntries(now); // calls drainRecencyQueue
          readCount.set(0);
        } finally {
          unlock();
        }
      }
    }
    @GuardedBy("this")
    void drainReferenceQueues() {
      if (map.usesKeyReferences()) {
        drainKeyReferenceQueue();
      }
      if (map.usesValueReferences()) {
        drainValueReferenceQueue();
      }
    }
    @GuardedBy("this")
    void drainKeyReferenceQueue() {
      Reference<? extends K> ref;
      int i = 0;
      while ((ref = keyReferenceQueue.poll()) != null) {
        @SuppressWarnings("unchecked")
        ReferenceEntry<K, V> entry = (ReferenceEntry<K, V>) ref;
        map.reclaimKey(entry);
        if (++i == DRAIN_MAX) {
          break;
        }
      }
    }

清理的是keyReferenceQueue和valueReferenceQueue這兩個(gè)隊(duì)列,這兩個(gè)隊(duì)列是引用隊(duì)列胰苏,Guava Cache為了支持弱引用和軟引用法焰,引入了引用清空隊(duì)列,同時(shí)將key和value包裝成了KeyReference 和 ValueReference,在構(gòu)造cache緩存器一文中我們知道在創(chuàng)建Entry的時(shí)候有:

@GuardedBy("this")
    ReferenceEntry<K, V> newEntry(K key, int hash, @Nullable ReferenceEntry<K, V> next) {
      return map.entryFactory.newEntry(this, checkNotNull(key), hash, next);
    }

利用map.entryFactory創(chuàng)建Entry埃仪。Factory的初始化是通過(guò)

entryFactory = EntryFactory.getFactory(keyStrength, usesAccessEntries(), usesWriteEntries());

可選的工廠有:

static final EntryFactory[] factories = {
      STRONG,
      STRONG_ACCESS,
      STRONG_WRITE,
      STRONG_ACCESS_WRITE,
      WEAK,
      WEAK_ACCESS,
      WEAK_WRITE,
      WEAK_ACCESS_WRITE,
    };

通過(guò)相應(yīng)的工廠創(chuàng)建對(duì)應(yīng)的Entry乙濒,這里主要說(shuō)一下WeakEntry:

WEAK {
      @Override
      <K, V> ReferenceEntry<K, V> newEntry(
          Segment<K, V> segment, K key, int hash, @Nullable ReferenceEntry<K, V> next) {
        return new WeakEntry<K, V>(segment.keyReferenceQueue, key, hash, next);
      }
    },
/**
   * Used for weakly-referenced keys.
   */
  static class WeakEntry<K, V> extends WeakReference<K> implements ReferenceEntry<K, V> {
    WeakEntry(ReferenceQueue<K> queue, K key, int hash, @Nullable ReferenceEntry<K, V> next) {
      super(key, queue);
      this.hash = hash;
      this.next = next;
    }

    @Override
    public K getKey() {
      return get();
    }

    /*
     * It'd be nice to get these for free from AbstractReferenceEntry, but we're already extending
     * WeakReference<K>.
     */

    // null access

    @Override
    public long getAccessTime() {
      throw new UnsupportedOperationException();
    }

WeakEntry繼承了WeakReference實(shí)現(xiàn)了ReferenceEntry,也就是說(shuō)這個(gè)引用是弱引用卵蛉。WeakEntry引用的key和Value隨時(shí)可能會(huì)被回收颁股。構(gòu)造的時(shí)候參數(shù)里面有ReferenceQueue<K> queue,這個(gè)就是我們上面提到的KeyReferenceQueue,所以在Key被GC掉的時(shí)候甘有,會(huì)自動(dòng)的將引用加入到ReferenceQueue這樣我們就能處理對(duì)應(yīng)的Entry了。Value也是一樣的葡缰。 清理KeyReferenceQueue的過(guò)程:

@GuardedBy("this")
    void drainKeyReferenceQueue() {
      Reference<? extends K> ref;
      int i = 0;
      while ((ref = keyReferenceQueue.poll()) != null) {
        @SuppressWarnings("unchecked")
        ReferenceEntry<K, V> entry = (ReferenceEntry<K, V>) ref;
        map.reclaimKey(entry);
        if (++i == DRAIN_MAX) {
          break;
        }
      }
    }

    void reclaimKey(ReferenceEntry<K, V> entry) {
    int hash = entry.getHash();
    segmentFor(hash).reclaimKey(entry, hash);
  }

    /**
     * Removes an entry whose key has been garbage collected.
     */
    boolean reclaimKey(ReferenceEntry<K, V> entry, int hash) {
      lock();
      try {
        int newCount = count - 1;
        AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
        int index = hash & (table.length() - 1);
        ReferenceEntry<K, V> first = table.get(index);

        for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
          if (e == entry) {
            ++modCount;
            ReferenceEntry<K, V> newFirst =
                removeValueFromChain(
                    first,
                    e,
                    e.getKey(),
                    hash,
                    e.getValueReference().get(),
                    e.getValueReference(),
                    RemovalCause.COLLECTED);
            newCount = this.count - 1;
            table.set(index, newFirst);
            this.count = newCount; // write-volatile
            return true;
          }
        }

        return false;
      } finally {
        unlock();
        postWriteCleanup();
      }
    }

上面就是清理過(guò)程了亏掀,如果發(fā)現(xiàn)key或者value被GC了忱反,那么會(huì)在put的時(shí)候觸發(fā)清理。

接下來(lái)是setvalue方法了滤愕,它做的是將value寫(xiě)入Entry

 /**
     * Sets a new value of an entry. Adds newly created entries at the end of the access queue.
     */
    @GuardedBy("Segment.this")
    void setValue(ReferenceEntry<K, V> entry, K key, V value, long now) {
      ValueReference<K, V> previous = entry.getValueReference();
      int weight = map.weigher.weigh(key, value);
      checkState(weight >= 0, "Weights must be non-negative");

      ValueReference<K, V> valueReference =
          map.valueStrength.referenceValue(this, entry, value, weight);
      entry.setValueReference(valueReference);
      recordWrite(entry, weight, now);
      previous.notifyNewValue(value);
    }
    /**
     * Updates eviction metadata that {@code entry} was just written. This currently amounts to
     * adding {@code entry} to relevant eviction lists.
     */
    @GuardedBy("Segment.this")
    void recordWrite(ReferenceEntry<K, V> entry, int weight, long now) {
      // we are already under lock, so drain the recency queue immediately
      drainRecencyQueue();
      totalWeight += weight;

      if (map.recordsAccess()) {
        entry.setAccessTime(now);
      }
      if (map.recordsWrite()) {
        entry.setWriteTime(now);
      }
      accessQueue.add(entry);
      writeQueue.add(entry);
    }

GuavaCache會(huì)維護(hù)兩個(gè)隊(duì)列温算,一個(gè)Write隊(duì)列、一個(gè)Access隊(duì)列间影,用這兩個(gè)隊(duì)列來(lái)實(shí)現(xiàn)最近讀和最近寫(xiě)的清楚操作注竿,來(lái)看看AccessQueue的實(shí)現(xiàn):

 /**
   * A custom queue for managing access order. Note that this is tightly integrated with
   * {@code ReferenceEntry}, upon which it reliese to perform its linking.
   *
   * <p>Note that this entire implementation makes the assumption that all elements which are in
   * the map are also in this queue, and that all elements not in the queue are not in the map.
   *
   * <p>The benefits of creating our own queue are that (1) we can replace elements in the middle
   * of the queue as part of copyWriteEntry, and (2) the contains method is highly optimized
   * for the current model.
   */
  static final class AccessQueue<K, V> extends AbstractQueue<ReferenceEntry<K, V>> {
    final ReferenceEntry<K, V> head = new AbstractReferenceEntry<K, V>() {

      @Override
      public long getAccessTime() {
        return Long.MAX_VALUE;
      }

      @Override
      public void setAccessTime(long time) {}

      ReferenceEntry<K, V> nextAccess = this;

      @Override
      public ReferenceEntry<K, V> getNextInAccessQueue() {
        return nextAccess;
      }

      @Override
      public void setNextInAccessQueue(ReferenceEntry<K, V> next) {
        this.nextAccess = next;
      }

      ReferenceEntry<K, V> previousAccess = this;

      @Override
      public ReferenceEntry<K, V> getPreviousInAccessQueue() {
        return previousAccess;
      }

      @Override
      public void setPreviousInAccessQueue(ReferenceEntry<K, V> previous) {
        this.previousAccess = previous;
      }
    };

    // implements Queue

    @Override
    public boolean offer(ReferenceEntry<K, V> entry) {
      // unlink
      connectAccessOrder(entry.getPreviousInAccessQueue(), entry.getNextInAccessQueue());

      // add to tail
      connectAccessOrder(head.getPreviousInAccessQueue(), entry);
      connectAccessOrder(entry, head);

      return true;
    }

    @Override
    public ReferenceEntry<K, V> peek() {
      ReferenceEntry<K, V> next = head.getNextInAccessQueue();
      return (next == head) ? null : next;
    }

    @Override
    public ReferenceEntry<K, V> poll() {
      ReferenceEntry<K, V> next = head.getNextInAccessQueue();
      if (next == head) {
        return null;
      }

      remove(next);
      return next;
    }

    @Override
    @SuppressWarnings("unchecked")
    public boolean remove(Object o) {
      ReferenceEntry<K, V> e = (ReferenceEntry) o;
      ReferenceEntry<K, V> previous = e.getPreviousInAccessQueue();
      ReferenceEntry<K, V> next = e.getNextInAccessQueue();
      connectAccessOrder(previous, next);
      nullifyAccessOrder(e);

      return next != NullEntry.INSTANCE;
    }

重點(diǎn)看下offer方法做了啥:
1.將Entry和它的前節(jié)點(diǎn)后節(jié)點(diǎn)的關(guān)聯(lián)斷開(kāi),這樣就需要Entry中維護(hù)它的前向和后向引用魂贬。
2.將新增加的節(jié)點(diǎn)加入到隊(duì)列的尾部巩割,尋找尾節(jié)點(diǎn)用了head.getPreviousInAccessQueue()∷骈伲可以看出來(lái)是個(gè)環(huán)形隊(duì)列喂分。
3.將新增加的節(jié)點(diǎn),或者新調(diào)整出來(lái)的節(jié)點(diǎn)設(shè)為尾部節(jié)點(diǎn)机蔗。

可以得知蒲祈,最近更新的節(jié)點(diǎn)一定是在尾部的,head后面的節(jié)點(diǎn)一定是不活躍的萝嘁,在每一次清除過(guò)期節(jié)點(diǎn)的時(shí)候一定清除head之后的超時(shí)的節(jié)點(diǎn)梆掸。

接下來(lái)看看evictEntry,當(dāng)Cache中設(shè)置了緩存最大條目前提下可能會(huì)觸發(fā):

/**
     * Performs eviction if the segment is full. This should only be called prior to adding a new
     * entry and increasing {@code count}.
     */
    @GuardedBy("Segment.this")
    void evictEntries() {
      if (!map.evictsBySize()) {
        return;
      }

      drainRecencyQueue();
      while (totalWeight > maxSegmentWeight) {
        ReferenceEntry<K, V> e = getNextEvictable();
        if (!removeEntry(e, e.getHash(), RemovalCause.SIZE)) {
          throw new AssertionError();
        }
      }
    }

先判斷是否開(kāi)啟淘汰牙言,之后清理ReferenceQueue酸钦,然后判斷是否超過(guò)權(quán)重,超過(guò)的話就清理最近最少使用的Entry咱枉。

最后再看看在finall執(zhí)行的postWriteCleanup卑硫,回調(diào)相應(yīng)的listener方法。

 void processPendingNotifications() {
    RemovalNotification<K, V> notification;
    while ((notification = removalNotificationQueue.poll()) != null) {
      try {
        removalListener.onRemoval(notification);
      } catch (Throwable e) {
        logger.log(Level.WARNING, "Exception thrown by removal listener", e);
      }
    }
  }

get 方法源碼

get 流程

  1. 獲取對(duì)象引用(引用可能是非alive的蚕断,比如是需要失效的欢伏、比如是loading的);
  2. 判斷對(duì)象引用是否是alive的(如果entry是非法的亿乳、部分回收的硝拧、loading狀態(tài)、需要失效的葛假,則認(rèn)為不是alive)障陶。
  3. 如果對(duì)象是alive的,如果設(shè)置refresh聊训,則異步刷新查詢value抱究,然后等待返回最新value。
  4. 針對(duì)不是alive的带斑,但卻是在loading的媳维,等待loading完成(阻塞等待)酿雪。
  5. 這里如果value還沒(méi)有拿到,則查詢loader方法獲取對(duì)應(yīng)的值(阻塞獲戎豆簟)指黎。
// LoadingCache methods
    //local cache的代理
    @Override
    public V get(K key) throws ExecutionException {
      return localCache.getOrLoad(key);
    }

      /**
   * 根據(jù)key獲取value,如果獲取不到進(jìn)行l(wèi)oad
   * @param key
   * @return
   * @throws ExecutionException
     */
  V getOrLoad(K key) throws ExecutionException {
    return get(key, defaultLoader);
  }

    V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
    int hash = hash(checkNotNull(key));//hash——>rehash
    return segmentFor(hash).get(key, hash, loader);
  }

  // loading
    //進(jìn)行指定key對(duì)應(yīng)的value的獲取,讀取不加鎖
    V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
      //保證key-value不為null
      checkNotNull(key);
      checkNotNull(loader);

      try {
        if (count != 0) { // read-volatile  volatile讀會(huì)刷新緩存州丹,盡量保證可見(jiàn)性,如果為0那么直接load
          // don't call getLiveEntry, which would ignore loading values
          ReferenceEntry<K, V> e = getEntry(key, hash);
          //如果對(duì)應(yīng)的Entry不為Null,證明值還在
          if (e != null) {
            long now = map.ticker.read();//獲取當(dāng)前的時(shí)間醋安,根據(jù)當(dāng)前的時(shí)間進(jìn)行Live的數(shù)據(jù)的讀取
            V value = getLiveValue(e, now);  //  判斷是否為alive(此處是懶失效,在每次get時(shí)才檢查是否達(dá)到失效時(shí)機(jī))
            //元素不為null的話可以不刷新
            if (value != null) {
              recordRead(e, now);//為entry增加accessTime,同時(shí)加入recencyQueue
              statsCounter.recordHits(1);//更新當(dāng)前的狀態(tài)墓毒,增加為命中,可以用于計(jì)算命中率
              // 如果指定了定時(shí)刷新 就嘗試刷新value
              return scheduleRefresh(e, key, hash, value, now, loader);
            }
            //value為null,如果此時(shí)value正在刷新吓揪,那么此時(shí)等待刷新結(jié)果
            ValueReference<K, V> valueReference = e.getValueReference();
            if (valueReference.isLoading()) {
// 如果正在加載的,等待加載完成后所计,返回加載的值柠辞。(阻塞,future的get)
              return waitForLoadingValue(e, key, valueReference);
            }
          }
        }
        //如果取不到值主胧,那么進(jìn)行統(tǒng)一的加鎖get
        // at this point e is either null or expired;     此處或者為null叭首,或者已經(jīng)被失效。
        return lockedGetOrLoad(key, hash, loader);
      } catch (ExecutionException ee) {
        Throwable cause = ee.getCause();
        if (cause instanceof Error) {
          throw new ExecutionError((Error) cause);
        } else if (cause instanceof RuntimeException) {
          throw new UncheckedExecutionException(cause);
        }
        throw ee;
      } finally {
        postReadCleanup();//每次Put和get之后都要進(jìn)行一次Clean
      }
    }

get的實(shí)現(xiàn)和JDK1.6的ConcurrentHashMap思想一致踪栋,都是put加鎖焙格,但是get是用volatile保證。這里主要做了:

  1. 首先獲取Entry夷都,Entry不為null獲取對(duì)應(yīng)的Value眷唉,如果Value不為空,那么證明值還在囤官,那么這時(shí)候判斷一下是否要刷新直接返回了冬阳。否則判斷目前引用是否在Loading,如果是就等待Loading結(jié)束党饮。
  2. 如果取不到Entry或者Value為null 并且沒(méi)有在Loading肝陪,那么這時(shí)候進(jìn)行l(wèi)ockedGetOrLoad(),這是一個(gè)大活兒。
V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
      ReferenceEntry<K, V> e;
      ValueReference<K, V> valueReference = null;
      LoadingValueReference<K, V> loadingValueReference = null;
      boolean createNewEntry = true;

      lock();//加鎖劫谅,因?yàn)闀?huì)改變數(shù)據(jù)結(jié)構(gòu)
      try {
        // re-read ticker once inside the lock
        long now = map.ticker.read();
        preWriteCleanup(now);//清除引用隊(duì)列,Acess隊(duì)列和Write隊(duì)列中過(guò)期的數(shù)據(jù),這算是一次put操作

        int newCount = this.count - 1;
        AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
        int index = hash & (table.length() - 1);
        ReferenceEntry<K, V> first = table.get(index);
        //定位目標(biāo)元素
        for (e = first; e != null; e = e.getNext()) {
          K entryKey = e.getKey();
          if (e.getHash() == hash
              && entryKey != null
              && map.keyEquivalence.equivalent(key, entryKey)) {
            valueReference = e.getValueReference();
            //如果目前處在loading狀態(tài),不創(chuàng)建新元素
            if (valueReference.isLoading()) {
              createNewEntry = false;
            } else {
              V value = valueReference.get();
              if (value == null) { //可能被GC掉了嚷掠,加入removeListener
                enqueueNotification(
                    entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED);
              } else if (map.isExpired(e, now)) { //可能過(guò)期了
                // This is a duplicate check, as preWriteCleanup already purged expired
                // entries, but let's accomodate an incorrect expiration queue.
                enqueueNotification(
                    entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED);
              } else {//目前就已經(jīng)加載過(guò)了捏检,返回
                recordLockedRead(e, now);
                statsCounter.recordHits(1);
                // we were concurrent with loading; don't consider refresh
                return value;
              }
              //刪除在隊(duì)列中相應(yīng)的引用,因?yàn)楹竺嬉聞?chuàng)建
              // immediately reuse invalid entries
              writeQueue.remove(e);
              accessQueue.remove(e);
              this.count = newCount; // write-volatile
            }
            break;
          }
        }
        //創(chuàng)建新的Entry,但是此時(shí)是沒(méi)有值的
        if (createNewEntry) {
          loadingValueReference = new LoadingValueReference<K, V>();

          if (e == null) {
            e = newEntry(key, hash, first);
            e.setValueReference(loadingValueReference);
            table.set(index, e);
          } else {
        // 到此不皆,說(shuō)e找到贯城,但是是非法的,數(shù)據(jù)已被移除霹娄。e放入新建的引用
            e.setValueReference(loadingValueReference);
          }
        }
      } finally {
        unlock();
        postWriteCleanup();
      }
 // 上面加鎖部分建完了新的entry能犯,設(shè)置完valueReference(isAlive為false鲫骗,isLoading 為false),到此踩晶,鎖已經(jīng)被釋放执泰,其他線程可以拿到一個(gè)loading狀態(tài)的引用。這就符合get時(shí)渡蜻,拿到loading狀態(tài)引用后术吝,阻塞等待加載的邏輯了。
      if (createNewEntry) {
        try {
          // Synchronizes on the entry to allow failing fast when a recursive load is
          // detected. This may be circumvented when an entry is copied, but will fail fast most
          // of the time.
      // 這里只對(duì)e加鎖茸苇,而不是segment排苍,允許get操作進(jìn)入。
          synchronized (e) {
            return loadSync(key, hash, loadingValueReference, loader);
          }
        } finally {
          statsCounter.recordMisses(1);
        }
      } else {
        // The entry already exists. Wait for loading.
        return waitForLoadingValue(e, key, valueReference);
      }
    }

說(shuō)明:

  • load算是一個(gè)寫(xiě)操作学密,需要加鎖淘衙,
  • 為了避免緩存被擊穿,每個(gè)線程都去load容易導(dǎo)致數(shù)據(jù)庫(kù)雪崩

因?yàn)槭菍?xiě)所以要進(jìn)行preWriteCleanup腻暮,根據(jù)key定位一下Entry彤守,如果能定位到,那么判斷是否在Loading西壮,如果是的話不創(chuàng)建新的Entry并且等待Loading結(jié)束遗增。如果不是那么判斷value是否為null和是否過(guò)期,如果是的話都要進(jìn)行創(chuàng)建新Entry款青,如果都不是證明value是加載過(guò)了做修,那么更新下Access隊(duì)列然后返回,

// at most one of loadSync/loadAsync may be called for any given LoadingValueReference
    //同步刷新
    V loadSync(
        K key,
        int hash,
        LoadingValueReference<K, V> loadingValueReference,
        CacheLoader<? super K, V> loader)
        throws ExecutionException {
      ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
      return getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
    }

這里創(chuàng)建了一個(gè)loadingReference抡草,這也就是之前看到的判斷是否在Loading饰及。如果是Loading狀態(tài)那么表明有一個(gè)線程正在更新Cache,其他的線程等待就可以了康震。

從上面我們可以看到燎含,對(duì)于每一次get都會(huì)去進(jìn)行Access隊(duì)列的更新,同時(shí)對(duì)于多線程的更新只會(huì)引起一個(gè)線程去load數(shù)據(jù)腿短,對(duì)于不存在的數(shù)據(jù)屏箍,get時(shí)也會(huì)進(jìn)行一次load操作。同時(shí)通過(guò)同步操作解決了緩存擊穿的問(wèn)題橘忱。

這里講下引用隊(duì)列(Reference Queue)

  • 引用隊(duì)列可以很容易地實(shí)現(xiàn)跟蹤不需要的引用赴魁。
  • 一旦弱引用對(duì)象開(kāi)始返回null,該弱引用指向的對(duì)象就被標(biāo)記成了垃圾钝诚。
  • 當(dāng)你在構(gòu)造WeakReference時(shí)傳入一個(gè)ReferenceQueue對(duì)象颖御,當(dāng)該引用指向的對(duì)象被標(biāo)記為垃圾的時(shí)候,這個(gè)引用對(duì)象會(huì)自動(dòng)地加入到引用隊(duì)列里面凝颇。

ReferenceEntry

  • Cache中的所有Entry都是基于ReferenceEntry的實(shí)現(xiàn)潘拱。
  • 信息包括:自身hash值疹鳄,寫(xiě)入時(shí)間,讀取時(shí)間芦岂。每次寫(xiě)入和讀取的隊(duì)列瘪弓。以及鏈表指針。
  • 每個(gè)Entry中均包含一個(gè)ValueReference類型來(lái)表示值盔腔。

ValueReference的類圖

  • 對(duì)于ValueReference杠茬,有三個(gè)實(shí)現(xiàn)類:StrongValueReference、SoftValueReference弛随、WeakValueReference瓢喉。為了支持動(dòng)態(tài)加載機(jī)制,它還有一個(gè)LoadingValueReference舀透,在需要?jiǎng)討B(tài)加載一個(gè)key的值時(shí)栓票,先把該值封裝在LoadingValueReference中,以表達(dá)該key對(duì)應(yīng)的值已經(jīng)在加載了愕够,如果其他線程也要查詢?cè)搆ey對(duì)應(yīng)的值走贪,就能得到該引用,并且等待改值加載完成惑芭,從而保證該值只被加載一次(可以在evict以后重新加載)坠狡。在該值加載完成后,將LoadingValueReference替換成其他ValueReference類型遂跟。

  • 每個(gè)ValueReference都紀(jì)錄了weight值逃沿,所謂weight從字面上理解是“該值的重量”,它由Weighter接口計(jì)算而得幻锁。

  • 還定義了Stength枚舉類型作為ValueReference的factory類凯亮,它有三個(gè)枚舉值:Strong、Soft哄尔、Weak假消,這三個(gè)枚舉值分別創(chuàng)建各自的ValueReference。

WeakEntry為例子

在cache的put操作和帶CacheBuilder中的都有newEntry的操作岭接。newEntry根據(jù)cache builder的配置生成不用級(jí)別的引用富拗,比如put操作:

// Create a new entry.
++modCount;
// 新建一個(gè)entry
ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
// 設(shè)置值,也就是valueRerence
setValue(newEntry, key, value, now);

newEntry方法
根據(jù)cache創(chuàng)建時(shí)的配置(代碼中生成的工廠)鸣戴,生成不同的Entry啃沪。

ReferenceEntry<K, V> newEntry(K key, int hash, @Nullable ReferenceEntry<K, V> next) {
  return map.entryFactory.newEntry(this, checkNotNull(key), hash, next);
}

調(diào)用WEAK的newEntry,其中segment.keyReferenceQueue是key的引用隊(duì)列葵擎。還有一個(gè)value的引用隊(duì)列谅阿,valueReferenceQueue一會(huì)會(huì)出現(xiàn)半哟。

WEAK {
  @Override
  <K, V> ReferenceEntry<K, V> newEntry(
      Segment<K, V> segment, K key, int hash, @Nullable ReferenceEntry<K, V> next) {
    return new WeakEntry<K, V>(segment.keyReferenceQueue, key, hash, next);
  }
},

setValue方法
首先要生成一個(gè)valueReference酬滤,然后set到entry中签餐。

ValueReference<K, V> valueReference =
      map.valueStrength.referenceValue(this, entry, value, weight);
entry.setValueReference(valueReference);

Value的WEAK跟key的WEAK形式很像盯串。只不過(guò)氯檐,增加了weight值(cachebuilder復(fù)寫(xiě)不同k,v對(duì)應(yīng)的權(quán)重)和value的比較方法体捏。

WEAK {
  @Override
  <K, V> ValueReference<K, V> referenceValue(
      Segment<K, V> segment, ReferenceEntry<K, V> entry, V value, int weight) {
    return (weight == 1)
        ? new WeakValueReference<K, V>(segment.valueReferenceQueue, value, entry)
        : new WeightedWeakValueReference<K, V>(
            segment.valueReferenceQueue, value, entry, weight);
  }

  @Override
  Equivalence<Object> defaultEquivalence() {
    return Equivalence.identity();
  }
};

cache如何基于引用做清理

如果key或者value的引用不是Strong類型冠摄,那么它們必然會(huì)被gc回收掉〖哥裕回收掉后河泳,引用對(duì)象依然存在,只是值為null了年栓,這也是上文提到從entry中得到的ValueReference要判斷的原因了拆挥。

**
 * Drain the key and value reference queues, cleaning up internal entries containing garbage
 * collected keys or values.
 */
@GuardedBy("this")
void drainReferenceQueues() {
  if (map.usesKeyReferences()) {
    drainKeyReferenceQueue();
  }
  if (map.usesValueReferences()) {
    drainValueReferenceQueue();
  }
}

如何失效,因?yàn)閗和v的失效方法基本一樣某抓,只舉例drainValueReferenceQueue纸兔。(執(zhí)行前都會(huì)tryLock,執(zhí)行時(shí)保證有鎖)

void drainValueReferenceQueue() {
  Reference<? extends V> ref;
  int i = 0;
  while ((ref = valueReferenceQueue.poll()) != null) {
    @SuppressWarnings("unchecked")
    ValueReference<K, V> valueReference = (ValueReference<K, V>) ref;
    // 回收
    map.reclaimValue(valueReference);
    if (++i == DRAIN_MAX) {
      break;
    }
  }
}

如何回收呢?

  1. map是segment維護(hù)的cache的引用否副,再次hash到segment汉矿。
  2. 找到segment后,加鎖备禀,hash找到entry table洲拇。遍歷鏈表,根據(jù)key找到一個(gè)entry痹届。
  3. 如果找到呻待,且跟入?yún)⒌膙alueReference==比較相等,執(zhí)行removeValueFromChain
  4. 如果沒(méi)找到队腐,返回false蚕捉。
  5. 如果找到,不等柴淘,返回false迫淹。

removeValueFromChain

ReferenceEntry<K, V> removeValueFromChain(ReferenceEntry<K, V> first,
    ReferenceEntry<K, V> entry, @Nullable K key, int hash, ValueReference<K, V> valueReference,
    RemovalCause cause) {
  enqueueNotification(key, hash, valueReference, cause);
  writeQueue.remove(entry);
  accessQueue.remove(entry);

  if (valueReference.isLoading()) {
    valueReference.notifyNewValue(null);
    return first;
  } else {
    return removeEntryFromChain(first, entry);
  }
}
  1. 需要執(zhí)行remove的通知,入隊(duì)列为严。
  2. 針對(duì)LoadingValueReference敛熬,直接返回。
  3. 非loading執(zhí)行移除第股。
    具體如何執(zhí)行remove呢应民?removeEntryFromChain
ReferenceEntry<K, V> removeEntryFromChain(ReferenceEntry<K, V> first,
                                          ReferenceEntry<K, V> entry) {
    int newCount = count;
    ReferenceEntry<K, V> newFirst = entry.getNext();
    for (ReferenceEntry<K, V> e = first; e != entry; e = e.getNext()) {
        // 這個(gè)方法是copy當(dāng)前結(jié)點(diǎn)(e),然后將新的結(jié)點(diǎn)指向newFirst,返回copy得到的結(jié)點(diǎn)(next)诲锹。
        // 如果改entry是需要回收的繁仁,那么該方法返回null。
        ReferenceEntry<K, V> next = copyEntry(e, newFirst);
        if (next != null) {
            newFirst = next;
        } else {
             // 如果偶遇k或者v已經(jīng)回收了的entry归园,進(jìn)入需要通知的隊(duì)列黄虱。
            removeCollectedEntry(e);
            newCount--;
        }
    }
    this.count = newCount;
    return newFirst;
}

這段邏輯是,從first開(kāi)始庸诱,一直到要remove結(jié)點(diǎn)(entry)的next(newFirst)捻浦,依次copy每個(gè)結(jié)點(diǎn),指向newFirst桥爽,然后將newFirst變成自身朱灿。最后這條鏈表的頭就變成,最后copy的那個(gè)結(jié)點(diǎn)钠四,也就是entry的上一個(gè)結(jié)點(diǎn)母剥。

Cache的失效和回調(diào)

基于讀寫(xiě)時(shí)間失效
失效邏輯和過(guò)程:

  • Entry在進(jìn)行一次讀寫(xiě)操作后,會(huì)標(biāo)識(shí)accessTime和writeTime形导。
f (map.recordsAccess()) {
      entry.setAccessTime(now);
  }
  if (map.recordsWrite()) {
      entry.setWriteTime(now);
  }
  • accessQueue和writeQueue分別會(huì)在讀寫(xiě)操作適時(shí)的添加环疼。
  accessQueue.add(entry);
  writeQueue.add(entry);
  • 遍歷accessQueue和writeQueue
void expireEntries(long now) {
    drainRecencyQueue();

    ReferenceEntry<K, V> e;
    // 取出entry,判斷是否失效
    while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) {
      if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
        throw new AssertionError();
      }
    }
    while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {
      if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
        throw new AssertionError();
      }
    }
  }
  • 判斷是否失效
if (expiresAfterAccess()
      && (now - entry.getAccessTime() >= expireAfterAccessNanos)) {
      return true;
  }  
  • removeEntry就是調(diào)用上文的removeValueFromChain朵耕。

write鏈(writeQueue)和access鏈(accessQueue)

這兩條鏈都是一個(gè)雙向鏈表炫隶,通過(guò)ReferenceEntry中的previousInWriteQueue、nextInWriteQueue和previousInAccessQueue阎曹、nextInAccessQueue鏈接而成伪阶,但是以Queue的形式表達(dá)。

WriteQueue和AccessQueue都是自定義了offer处嫌、add(直接調(diào)用offer)栅贴、remove、poll等操作的邏輯熏迹。

  • 對(duì)于offer(add)操作檐薯,如果是新加的節(jié)點(diǎn),則直接加入到該鏈的隊(duì)首注暗,如果是已存在的節(jié)點(diǎn)坛缕,則將該節(jié)點(diǎn)鏈接的鏈?zhǔn)住#╤ead始終保持在隊(duì)首捆昏,新節(jié)點(diǎn)不斷插入到隊(duì)首赚楚。邏輯上最先插入的結(jié)點(diǎn)保持在,允許訪問(wèn)的頭部)
  • 對(duì)remove操作骗卜,直接從該鏈中移除該節(jié)點(diǎn)宠页;
  • 對(duì)poll操作左胞,將頭節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)移除,并返回举户。
@Override
public boolean offer(ReferenceEntry<K, V> entry) {
  // unlink
  connectAccessOrder(entry.getPreviousInAccessQueue(), entry.getNextInAccessQueue());

  // add to tail
  connectAccessOrder(head.getPreviousInAccessQueue(), entry);
  connectAccessOrder(entry, head);

  return true;
}

失效的通知回調(diào)

void enqueueNotification(@Nullable K key, int hash, ValueReference<K, V> valueReference,
                         RemovalCause cause) {
    totalWeight -= valueReference.getWeight();
    if (cause.wasEvicted()) {
        statsCounter.recordEviction();
    }
    if (map.removalNotificationQueue != DISCARDING_QUEUE) {
        V value = valueReference.get();
        RemovalNotification<K, V> notification = new RemovalNotification<K, V>(key, value, cause);
        map.removalNotificationQueue.offer(notification);
    }
}
  1. 首先判斷移除的原因RemovalCause:EXPLICIT(remove罩句、clear等用戶有預(yù)期的操作),REPLACED(put敛摘、replace),COLLECTED乳愉,EXPIRED兄淫,SIZE。RemovalCause有個(gè)方法wasEvicted蔓姚,表示是否是被驅(qū)逐的捕虽。前兩種是false,后三種是true坡脐。

  2. 生成一個(gè)notification對(duì)象泄私,入隊(duì)列。

removalNotificationQueue何時(shí)被清理
在讀寫(xiě)操作的finally階段备闲,都會(huì)執(zhí)行晌端。

 void processPendingNotifications() {
  RemovalNotification<K, V> notification;
  while ((notification = removalNotificationQueue.poll()) != null) {
    try {
      // 這里就是回調(diào)構(gòu)造cache時(shí)注冊(cè)的listener了
      removalListener.onRemoval(notification);
    } catch (Throwable e) {
      logger.log(Level.WARNING, "Exception thrown by removal listener", e);
    }
  }
}

至此,guava cache的核心源碼分析完了恬砂,感覺(jué)挺惡心的咧纠,,挺復(fù)雜的泻骤,還需要多復(fù)習(xí)鞏固才能真正掌握漆羔。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市狱掂,隨后出現(xiàn)的幾起案子演痒,更是在濱河造成了極大的恐慌,老刑警劉巖趋惨,帶你破解...
    沈念sama閱讀 218,546評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件鸟顺,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡器虾,警方通過(guò)查閱死者的電腦和手機(jī)诊沪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)曾撤,“玉大人端姚,你說(shuō)我怎么就攤上這事〖废ぃ” “怎么了渐裸?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,911評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵巫湘,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我昏鹃,道長(zhǎng)尚氛,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,737評(píng)論 1 294
  • 正文 為了忘掉前任洞渤,我火速辦了婚禮阅嘶,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘载迄。我一直安慰自己讯柔,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,753評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布护昧。 她就那樣靜靜地躺著魂迄,像睡著了一般。 火紅的嫁衣襯著肌膚如雪惋耙。 梳的紋絲不亂的頭發(fā)上捣炬,一...
    開(kāi)封第一講書(shū)人閱讀 51,598評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音绽榛,去河邊找鬼湿酸。 笑死,一個(gè)胖子當(dāng)著我的面吹牛灭美,可吹牛的內(nèi)容都是我干的稿械。 我是一名探鬼主播,決...
    沈念sama閱讀 40,338評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼冲粤,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼美莫!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起梯捕,我...
    開(kāi)封第一講書(shū)人閱讀 39,249評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤厢呵,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后傀顾,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體襟铭,經(jīng)...
    沈念sama閱讀 45,696評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,888評(píng)論 3 336
  • 正文 我和宋清朗相戀三年短曾,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了寒砖。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,013評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡嫉拐,死狀恐怖哩都,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情婉徘,我是刑警寧澤漠嵌,帶...
    沈念sama閱讀 35,731評(píng)論 5 346
  • 正文 年R本政府宣布咐汞,位于F島的核電站,受9級(jí)特大地震影響儒鹿,放射性物質(zhì)發(fā)生泄漏化撕。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,348評(píng)論 3 330
  • 文/蒙蒙 一约炎、第九天 我趴在偏房一處隱蔽的房頂上張望植阴。 院中可真熱鬧,春花似錦圾浅、人聲如沸掠手。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,929評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至伊脓,卻和暖如春府寒,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背报腔。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,048評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工株搔, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人纯蛾。 一個(gè)月前我還...
    沈念sama閱讀 48,203評(píng)論 3 370
  • 正文 我出身青樓纤房,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親翻诉。 傳聞我的和親對(duì)象是個(gè)殘疾皇子炮姨,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,960評(píng)論 2 355