ringbuffer(環(huán)形隊列)在caffeine中的設計

引入

caffenie作為目前本地緩存的首選谆棺,其內(nèi)部設計思想有很多值得我們學習的地方宅广。緩存中最主要的數(shù)據(jù)競爭源于讀取數(shù)據(jù)的同時普舆,也會伴隨著對數(shù)據(jù)狀態(tài)的寫入操作帐萎;寫入數(shù)據(jù)的同時,也會伴隨著數(shù)據(jù)狀態(tài)的讀取操作敞曹。譬如账月,讀取時要同時更新數(shù)據(jù)的最近訪問時間和訪問計數(shù)器的狀態(tài)(當然caffeine為了追求高效,不會記錄時間和次數(shù)澳迫,譬如通過調(diào)整鏈表順序來表達時間先后局齿、通過 Sketch 結(jié)構(gòu)來表達熱度高低),以實現(xiàn)緩存的淘汰策略橄登;又或者讀取時要同時判斷數(shù)據(jù)的超期時間等信息抓歼,以實現(xiàn)失效重加載等其他擴展功能。對以上伴隨讀寫操作而來的狀態(tài)維護拢锹,有兩種可選擇的處理思路谣妻,一種是以 Guava Cache 為代表的同步處理機制,即在訪問數(shù)據(jù)時一并完成緩存淘汰卒稳、統(tǒng)計蹋半、失效等狀態(tài)變更操作,通過分段加鎖等優(yōu)化手段來盡量減少競爭充坑。另一種是以 Caffeine 為代表的異步日志提交機制湃窍,這種機制參考了經(jīng)典的數(shù)據(jù)庫設計理論闻蛀,將對數(shù)據(jù)的讀、寫過程看作是日志(即對數(shù)據(jù)的操作指令)的提交過程您市。盡管日志也涉及到寫入操作觉痛,有并發(fā)的數(shù)據(jù)變更就必然面臨鎖競爭,但異步提交的日志已經(jīng)將原本在 Map 內(nèi)的鎖轉(zhuǎn)移到日志的追加寫操作上茵休,日志里騰挪優(yōu)化的余地就比在 Map 中要大得多薪棒。
我們上文提到,對緩存的維護可以分為兩個場景榕莺,一個是讀取緩存時俐芯,另一個是向緩存寫入數(shù)據(jù)時,這兩個場景钉鸯,caffeine都是提交日志異步處理的方式維護緩存的淘汰吧史、統(tǒng)計、失效等唠雕。讀取緩存和寫入緩存兩個場景還是有差異贸营,本文是針對第一個場景讀取緩存時異步去維護緩存的設計剖析。

問題

在異步維護(淘汰岩睁、統(tǒng)計钞脂、失效等)緩存過程中,需要先向一個中間隊列去存儲key的信息捕儒,然后再由消費者去讀取隊列中的元素冰啃,繼而去維護(淘汰、統(tǒng)計刘莹、失效等)緩存的信息阎毅。那么,對這個中間隊列的性能要求就會近乎苛刻点弯,因為緩存意味著高并發(fā)扇调,所以隊列的并發(fā)寫入速度要足夠快。需要解決的問題大致體現(xiàn)在以下幾個方面
1蒲拉、因為隊列要求寫入速度肃拜,隊列中的元素不斷創(chuàng)建痴腌,可能會導致gc壓力過大
2雌团、因為隊列存在多線程并發(fā)寫入的情況,又要足夠快士聪,所以對隊列的并發(fā)競爭會很大锦援,我們需要想辦法解決并發(fā)競爭的問題。

解決思路:

正常隊列的數(shù)據(jù)結(jié)構(gòu)就是數(shù)組剥悟,或者鏈表灵寺。java的jdk中曼库,我們常用的隊列有ArrayBlockingQueue、LinkedBlockingQueue略板、ConcurrentLinkedQueue毁枯。
1、針對第二個方面叮称,我們需要解決并發(fā)競爭的問題种玛,常規(guī)的解決思路有兩條,一條是加鎖保證臨界資源的安全瓤檐,還有一個就是原生的cas操作可以保證赂韵。加鎖并不是最優(yōu)的解決方案,在真正的高并發(fā)情況下挠蛉,鎖會帶來性能的開銷祭示,所以我們其實是希望能不用鎖的情況下就不用鎖的,如果能用原生的cas操作是最好的谴古。但是同樣质涛,用原生的cas操作,我們也需要解決cas操作在高并發(fā)下的惡性空自旋的問題讥电,這個我們根據(jù)具體情況看是否可以采用分離熱點的方式(具體可以參考高并發(fā)下的設計思想--分離熱點
這么看的話蹂窖,ArrayBlockingQueue、LinkedBlockingQueue都是內(nèi)部都是采用了lock鎖的方式恩敌,并不合適瞬测。
2、再看ConcurrentLinkedQueue是無鎖的纠炮,內(nèi)部也是用cas操作保證的多線程安全月趟。但是針對第一點,ConcurrentLinkedQueue 內(nèi)部是鏈表實現(xiàn)恢口,高并發(fā)情況下孝宗,鏈表的節(jié)點會被頻繁的創(chuàng)建,這會給gc帶來壓力耕肩。那有沒有一種數(shù)據(jù)結(jié)構(gòu)因妇,是以數(shù)組為基礎,但是內(nèi)部的內(nèi)存空間又可以重復利用的呢猿诸?答案是環(huán)形隊列婚被,內(nèi)部用數(shù)組實現(xiàn)就可以了;并且環(huán)形隊列的好處不止于此梳虽,下文詳細描述址芯。

環(huán)形隊列

環(huán)形隊列是在實際編程極為有用的數(shù)據(jù)結(jié)構(gòu),它采用數(shù)組的線性空間,數(shù)據(jù)組織簡單谷炸,能很快知道隊列是否滿或空北专,能以很快速度的來存取數(shù)據(jù)。
從順時針看旬陡,環(huán)形隊列 有隊頭 head 和隊尾 tail拓颓。


環(huán)形隊列
  • 生產(chǎn)的流程是:
    生產(chǎn)者順時針向隊尾 tail 插入元素,這會導致 head 位置不變描孟,tail 位置在后移录粱;
  • 消費的流程是:
    消費者則從隊頭 head 開始消費,這會導致 head 向后移動画拾,而tail 位置不變啥繁,如果隊列滿了就不能寫入。
  • 環(huán)形隊列的特點:
    隊頭 head 和隊尾 tail 的位置是不定的青抛,位置一直在循環(huán)流動旗闽,空間就被重復利用起來了。因為有簡單高效的原因蜜另,甚至在硬件都實現(xiàn)了環(huán)形隊列.适室。環(huán)形隊列廣泛用于網(wǎng)絡數(shù)據(jù)收發(fā),和不同程序間數(shù)據(jù)交換(比如內(nèi)核與應用程序大量交換數(shù)據(jù)举瑰,從硬件接收大量數(shù)據(jù))均使用了環(huán)形隊列捣辆。

環(huán)形核心的結(jié)構(gòu)和流程說明

  1. 約定head指向隊列的第一個元素,
    也就是說data[head]就是隊頭數(shù)據(jù)此迅,head初始值為0汽畴。
  2. 約定tail指向隊列的最后一個元素的后一個位置,
    也就是說data[tail-1]就是隊尾數(shù)據(jù)耸序,tail初始值為0忍些。
  3. 隊列滿的條件是:
    ( tail+1 )% maxSize == head
  4. 隊列空的條件是:
    tail == head
  5. 隊列中的元素個數(shù)為:
    ( tail + maxsize - head) % maxSize
  6. 有效數(shù)據(jù)只有maxSize-1個
    因為tail指向隊尾的后面一個位置,這個位置就不能存數(shù)據(jù)坎怪,因此有效數(shù)據(jù)只有maxSize-1個

至此罢坝,我們清楚我們的數(shù)據(jù)結(jié)構(gòu)主體雛形--環(huán)形的數(shù)組隊列,同時我們需要將隊列采用分離熱點的思想搅窿,將隊列分散到一個數(shù)組中嘁酿,數(shù)組可以存多個環(huán)形隊列,將每個線程id對數(shù)組長度取模男应,然后映射到數(shù)組中的下標位置中闹司,這樣每個線程都有自己對應的環(huán)形數(shù)組,就可以極大地降低競爭殉了。同樣在消費隊列元素時开仰,消費數(shù)組中的多個環(huán)形隊列中節(jié)點元素拟枚。

caffeine的設計

通過以上分析薪铜,再結(jié)合caffeine中對讀緩異步隊列的設計众弓,我們可以看到,后者的設計思想和我們分析的基本一致隔箍,下面是其數(shù)據(jù)結(jié)構(gòu)圖:

環(huán)形隊列

在源碼中的體現(xiàn)上谓娃,table[]數(shù)組在父類StripedBuffer里,父類提供入隊和出隊的鉤子方法以及table數(shù)組初始化和擴容的具體實現(xiàn)蜒滩。RingBuffer繼承自StripedBuffer滨达,其中包含了環(huán)形隊列中節(jié)點元素值數(shù)組 AtomicReferenceArray,以及頭結(jié)點和尾節(jié)點俯艰;提供了入隊的具體實現(xiàn)捡遍,以及消費時批量出隊的具體實現(xiàn)方法drainTo。

StripedBuffer數(shù)據(jù)結(jié)構(gòu):

//table數(shù)組的最大長度
static final int MAXIMUM_TABLE_SIZE
//buffer數(shù)組竹握,數(shù)組里的元素是ringBuffer
transient volatile Buffer<E> @Nullable[] table
//擴容或者初始化數(shù)組的鎖標識
transient volatile int tableBusy

需要注意的是画株,什么時候會進入擴容、初始化數(shù)組的操作中啦辐。在追加到RingBuffer如果失敗了(存在競爭)谓传,是會進行擴容或者初始化邏輯的。并且在擴容初始化的邏輯中芹关,會在擴容后再次嘗試追加续挟,如果連續(xù)三次都失敗的話,那么這次寫入就是失敗的侥衬。所以ringbuffer的寫入會存在失敗的情況诗祸,不能保證強成功。不過在讀緩存后異步入隊列的場景中轴总,這種偶發(fā)的失敗是可以接受的贬媒。

   /**
    * 入隊邏輯
    */
  public int offer(E e) {
    int mask;
    int result = 0;
    Buffer<E> buffer;
    //是否不存在競爭
    boolean uncontended = true;
    Buffer<E>[] buffers = table;
    //是否已經(jīng)初始化
    if ((buffers == null)
        || (mask = buffers.length - 1) < 0
        ////用thread的隨機值作為hash值,得到對應位置的RingBuffer
        || (buffer = buffers[getProbe() & mask]) == null
        //檢查追加到RingBuffer是否成功,如果失敗肘习,會進行擴容初始化邏輯
        || !(uncontended = ((result = buffer.offer(e)) != Buffer.FAILED))) {
      expandOrRetry(e, uncontended);
    }
    return result;
  }

  /**
    * 批量出隊邏輯
    */
  public void drainTo(Consumer<E> consumer) {
    Buffer<E>[] buffers = table;
    if (buffers == null) {
      return;
    }
    for (Buffer<E> buffer : buffers) {
      if (buffer != null) {
        //調(diào)用ringBuffer的出隊方法
        buffer.drainTo(consumer);
      }
    }
  }

  /**
    * table數(shù)組擴容际乘、初始化邏輯
    */
  final void expandOrRetry(E e, boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
      ThreadLocalRandom.current(); // force initialization
      h = getProbe();
      wasUncontended = true;
    }
    boolean collide = false; // True if last slot nonempty
    //默認重試3次,允許丟失
    for (int attempt = 0; attempt < ATTEMPTS; attempt++) {
      Buffer<E>[] buffers;
      Buffer<E> buffer;
      int n;
      //buffer數(shù)組不為空的情況
      if (((buffers = table) != null) && ((n = buffers.length) > 0)) {
        //對應數(shù)組下標元素為空
        if ((buffer = buffers[(n - 1) & h]) == null) {
          if ((tableBusy == 0) && casTableBusy()) { // Try to attach new Buffer
            boolean created = false;
            try { // Recheck under lock
              Buffer<E>[] rs;
              int mask, j;
              if (((rs = table) != null) && ((mask = rs.length) > 0)
                  && (rs[j = (mask - 1) & h] == null)) {
                rs[j] = create(e);
                created = true;
              }
            } finally {
              tableBusy = 0;
            }
            if (created) {
              break;
            }
            continue; // Slot is now non-empty
          }
          collide = false;
        } else if (!wasUncontended) { // CAS already known to fail
          wasUncontended = true;      // Continue after rehash
        } 
        //環(huán)形隊列里追加元素是否成功漂佩,這里的offer是ringbuffer環(huán)形隊列的入隊
        else if (buffer.offer(e) != Buffer.FAILED) {
          break;
        } 
        //數(shù)組長度是否大于最大長度脖含,最大長度= 4*2的N次方(N=cpu核心數(shù))
        else if (n >= MAXIMUM_TABLE_SIZE || table != buffers) {
          collide = false; // At max size or stale
        } else if (!collide) {
          collide = true;
        } 
        //真正的擴容邏輯(走到這步說明環(huán)形隊列中添加元素失敗,并發(fā)太高投蝉,此時正有其他線程也在這個buffer環(huán)形隊列中添加元素养葵,所以考慮擴容table數(shù)組的長度)
        else if (tableBusy == 0 && casTableBusy()) {
          try {
            if (table == buffers) { // Expand table unless stale
              table = Arrays.copyOf(buffers, n << 1);
            }
          } finally {
            tableBusy = 0;
          }
          collide = false;
          continue; // Retry with expanded table
        }
        h = advanceProbe(h);
      } 
      //buffer數(shù)組為空則初始化數(shù)組
      else if ((tableBusy == 0) && (table == buffers) && casTableBusy()) {
        boolean init = false;
        try { // Initialize table
          if (table == buffers) {
            @SuppressWarnings({"unchecked", "rawtypes"})
            Buffer<E>[] rs = new Buffer[1];
            rs[0] = create(e);
            table = rs;
            init = true;
          }
        } finally {
          tableBusy = 0;
        }
        if (init) {
          break;
        }
      }
    }
  }

RingBuffer的數(shù)據(jù)結(jié)構(gòu),這里需要注意的是瘩缆,為了方便理解关拒,羅列了部分主要屬性,實際readCounter和writeCounter在計算時,是相對于各自原始位置的偏移量着绊。

final class BoundedBuffer<E> extends StripedBuffer<E>{

    static final class RingBuffer<E> extends BBHeader.ReadAndWriteCounterRef{
        //引用類型原子數(shù)組(存放環(huán)形隊列中的節(jié)點值)
        final AtomicReferenceArray<E> buffer;
        //讀指針:相當于head
        volatile long readCounter;
        //寫指針:相當于tail
        volatile long writeCounter;

        public RingBuffer(E e) {
             //BUFFER_SIZE = 16
             buffer = new AtomicReferenceArray<>(BUFFER_SIZE);
             buffer.lazySet(0, e);
        }

        /**
         * 入隊
        */
        public int offer(E e) {
          long head = readCounter;
          long tail = relaxedWriteCounter();
          long size = (tail - head);
          //判斷環(huán)形隊列是否已滿谐算,這里之所以可以用tail-head>=buffer_size 來判斷是否隊列已滿,
          //是因為這里的head和tail是相對于各自原始位置的偏移量
          if (size >= BUFFER_SIZE) {
            return Buffer.FULL;
          }
          //將tail指針向后移動一格
          if (casWriteCounter(tail, tail + 1)) {
            int index = (int) (tail & MASK);
            //設置環(huán)形數(shù)組中響應位置元素
            buffer.lazySet(index, e);
            return Buffer.SUCCESS;
          }
          return Buffer.FAILED;
        }
        
        /**
          * 批量出隊
         */
        public void drainTo(Consumer<E> consumer) {
           long head = readCounter;
           long tail = relaxedWriteCounter();
           long size = (tail - head);
           //環(huán)形隊列是否已空
           if (size == 0) {
             return;
           }
           do {
             int index = (int) (head & MASK);
             E e = buffer.get(index);
             if (e == null) {
               // not published yet
               break;
             }
             //元素出隊归露,相應位置置為空
             buffer.lazySet(index, null);
             consumer.accept(e);
             //head指針向后移動一格
             head++;
           } while (head != tail);
           //批量設置head指針的偏移量
           lazySetReadCounter(head);
         }
    }


}

總結(jié):

caffeine在緩存讀日志(讀操作) 高并發(fā)寫入隊列場景中洲脂,使用了環(huán)形隊列可以有效避免高并發(fā)場景下節(jié)點頻繁創(chuàng)建帶來的gc壓力,同時可以邊寫邊讀(消費)剧包,提高了隊列的吞吐量恐锦。并且采用原生cas操作和分離熱點的方法,將線程對隊列并發(fā)寫的競爭分散到數(shù)組中的每一個隊列中疆液,有效降低了并發(fā)的競爭一铅。

引用
鳳凰架構(gòu)-服務器端緩存
caffeine源碼

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市堕油,隨后出現(xiàn)的幾起案子馅闽,更是在濱河造成了極大的恐慌,老刑警劉巖馍迄,帶你破解...
    沈念sama閱讀 223,002評論 6 519
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件福也,死亡現(xiàn)場離奇詭異,居然都是意外死亡攀圈,警方通過查閱死者的電腦和手機暴凑,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,357評論 3 400
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來赘来,“玉大人现喳,你說我怎么就攤上這事∪剑” “怎么了嗦篱?”我有些...
    開封第一講書人閱讀 169,787評論 0 365
  • 文/不壞的土叔 我叫張陵,是天一觀的道長幌缝。 經(jīng)常有香客問我灸促,道長,這世上最難降的妖魔是什么涵卵? 我笑而不...
    開封第一講書人閱讀 60,237評論 1 300
  • 正文 為了忘掉前任浴栽,我火速辦了婚禮,結(jié)果婚禮上轿偎,老公的妹妹穿的比我還像新娘典鸡。我一直安慰自己,他們只是感情好坏晦,可當我...
    茶點故事閱讀 69,237評論 6 398
  • 文/花漫 我一把揭開白布萝玷。 她就那樣靜靜地躺著嫁乘,像睡著了一般。 火紅的嫁衣襯著肌膚如雪球碉。 梳的紋絲不亂的頭發(fā)上蜓斧,一...
    開封第一講書人閱讀 52,821評論 1 314
  • 那天,我揣著相機與錄音汁尺,去河邊找鬼。 笑死多律,一個胖子當著我的面吹牛痴突,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播狼荞,決...
    沈念sama閱讀 41,236評論 3 424
  • 文/蒼蘭香墨 我猛地睜開眼辽装,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了相味?” 一聲冷哼從身側(cè)響起拾积,我...
    開封第一講書人閱讀 40,196評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎丰涉,沒想到半個月后拓巧,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,716評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡一死,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,794評論 3 343
  • 正文 我和宋清朗相戀三年肛度,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片投慈。...
    茶點故事閱讀 40,928評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡承耿,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出伪煤,到底是詐尸還是另有隱情加袋,我是刑警寧澤,帶...
    沈念sama閱讀 36,583評論 5 351
  • 正文 年R本政府宣布抱既,位于F島的核電站职烧,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏防泵。R本人自食惡果不足惜阳堕,卻給世界環(huán)境...
    茶點故事閱讀 42,264評論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望择克。 院中可真熱鬧恬总,春花似錦、人聲如沸肚邢。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,755評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至贱纠,卻和暖如春峻厚,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背谆焊。 一陣腳步聲響...
    開封第一講書人閱讀 33,869評論 1 274
  • 我被黑心中介騙來泰國打工惠桃, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人辖试。 一個月前我還...
    沈念sama閱讀 49,378評論 3 379
  • 正文 我出身青樓辜王,卻偏偏與公主長得像,于是被迫代替她去往敵國和親罐孝。 傳聞我的和親對象是個殘疾皇子呐馆,可洞房花燭夜當晚...
    茶點故事閱讀 45,937評論 2 361

推薦閱讀更多精彩內(nèi)容