引入
caffenie作為目前本地緩存的首選谆棺,其內(nèi)部設計思想有很多值得我們學習的地方宅广。緩存中最主要的數(shù)據(jù)競爭源于讀取數(shù)據(jù)的同時普舆,也會伴隨著對數(shù)據(jù)狀態(tài)的寫入操作帐萎;寫入數(shù)據(jù)的同時,也會伴隨著數(shù)據(jù)狀態(tài)的讀取操作敞曹。譬如账月,讀取時要同時更新數(shù)據(jù)的最近訪問時間和訪問計數(shù)器的狀態(tài)(當然caffeine為了追求高效,不會記錄時間和次數(shù)澳迫,譬如通過調(diào)整鏈表順序來表達時間先后局齿、通過 Sketch 結(jié)構(gòu)來表達熱度高低),以實現(xiàn)緩存的淘汰策略橄登;又或者讀取時要同時判斷數(shù)據(jù)的超期時間等信息抓歼,以實現(xiàn)失效重加載等其他擴展功能。對以上伴隨讀寫操作而來的狀態(tài)維護拢锹,有兩種可選擇的處理思路谣妻,一種是以 Guava Cache 為代表的同步處理機制,即在訪問數(shù)據(jù)時一并完成緩存淘汰卒稳、統(tǒng)計蹋半、失效等狀態(tài)變更操作,通過分段加鎖等優(yōu)化手段來盡量減少競爭充坑。另一種是以 Caffeine 為代表的異步日志提交機制湃窍,這種機制參考了經(jīng)典的數(shù)據(jù)庫設計理論闻蛀,將對數(shù)據(jù)的讀、寫過程看作是日志(即對數(shù)據(jù)的操作指令)的提交過程您市。盡管日志也涉及到寫入操作觉痛,有并發(fā)的數(shù)據(jù)變更就必然面臨鎖競爭,但異步提交的日志已經(jīng)將原本在 Map 內(nèi)的鎖轉(zhuǎn)移到日志的追加寫操作上茵休,日志里騰挪優(yōu)化的余地就比在 Map 中要大得多薪棒。
我們上文提到,對緩存的維護可以分為兩個場景榕莺,一個是讀取緩存時俐芯,另一個是向緩存寫入數(shù)據(jù)時,這兩個場景钉鸯,caffeine都是提交日志異步處理的方式維護緩存的淘汰吧史、統(tǒng)計、失效等唠雕。讀取緩存和寫入緩存兩個場景還是有差異贸营,本文是針對第一個場景讀取緩存時異步去維護緩存的設計剖析。
問題
在異步維護(淘汰岩睁、統(tǒng)計钞脂、失效等)緩存過程中,需要先向一個中間隊列去存儲key的信息捕儒,然后再由消費者去讀取隊列中的元素冰啃,繼而去維護(淘汰、統(tǒng)計刘莹、失效等)緩存的信息阎毅。那么,對這個中間隊列的性能要求就會近乎苛刻点弯,因為緩存意味著高并發(fā)扇调,所以隊列的并發(fā)寫入速度要足夠快。需要解決的問題大致體現(xiàn)在以下幾個方面
1蒲拉、因為隊列要求寫入速度肃拜,隊列中的元素不斷創(chuàng)建痴腌,可能會導致gc壓力過大
2雌团、因為隊列存在多線程并發(fā)寫入的情況,又要足夠快士聪,所以對隊列的并發(fā)競爭會很大锦援,我們需要想辦法解決并發(fā)競爭的問題。
解決思路:
正常隊列的數(shù)據(jù)結(jié)構(gòu)就是數(shù)組剥悟,或者鏈表灵寺。java的jdk中曼库,我們常用的隊列有ArrayBlockingQueue、LinkedBlockingQueue略板、ConcurrentLinkedQueue毁枯。
1、針對第二個方面叮称,我們需要解決并發(fā)競爭的問題种玛,常規(guī)的解決思路有兩條,一條是加鎖保證臨界資源的安全瓤檐,還有一個就是原生的cas操作可以保證赂韵。加鎖并不是最優(yōu)的解決方案,在真正的高并發(fā)情況下挠蛉,鎖會帶來性能的開銷祭示,所以我們其實是希望能不用鎖的情況下就不用鎖的,如果能用原生的cas操作是最好的谴古。但是同樣质涛,用原生的cas操作,我們也需要解決cas操作在高并發(fā)下的惡性空自旋的問題讥电,這個我們根據(jù)具體情況看是否可以采用分離熱點的方式(具體可以參考高并發(fā)下的設計思想--分離熱點)
這么看的話蹂窖,ArrayBlockingQueue、LinkedBlockingQueue都是內(nèi)部都是采用了lock鎖的方式恩敌,并不合適瞬测。
2、再看ConcurrentLinkedQueue是無鎖的纠炮,內(nèi)部也是用cas操作保證的多線程安全月趟。但是針對第一點,ConcurrentLinkedQueue 內(nèi)部是鏈表實現(xiàn)恢口,高并發(fā)情況下孝宗,鏈表的節(jié)點會被頻繁的創(chuàng)建,這會給gc帶來壓力耕肩。那有沒有一種數(shù)據(jù)結(jié)構(gòu)因妇,是以數(shù)組為基礎,但是內(nèi)部的內(nèi)存空間又可以重復利用的呢猿诸?答案是環(huán)形隊列婚被,內(nèi)部用數(shù)組實現(xiàn)就可以了;并且環(huán)形隊列的好處不止于此梳虽,下文詳細描述址芯。
環(huán)形隊列
環(huán)形隊列是在實際編程極為有用的數(shù)據(jù)結(jié)構(gòu),它采用數(shù)組的線性空間,數(shù)據(jù)組織簡單谷炸,能很快知道隊列是否滿或空北专,能以很快速度的來存取數(shù)據(jù)。
從順時針看旬陡,環(huán)形隊列 有隊頭 head 和隊尾 tail拓颓。
- 生產(chǎn)的流程是:
生產(chǎn)者順時針向隊尾 tail 插入元素,這會導致 head 位置不變描孟,tail 位置在后移录粱; - 消費的流程是:
消費者則從隊頭 head 開始消費,這會導致 head 向后移動画拾,而tail 位置不變啥繁,如果隊列滿了就不能寫入。 - 環(huán)形隊列的特點:
隊頭 head 和隊尾 tail 的位置是不定的青抛,位置一直在循環(huán)流動旗闽,空間就被重復利用起來了。因為有簡單高效的原因蜜另,甚至在硬件都實現(xiàn)了環(huán)形隊列.适室。環(huán)形隊列廣泛用于網(wǎng)絡數(shù)據(jù)收發(fā),和不同程序間數(shù)據(jù)交換(比如內(nèi)核與應用程序大量交換數(shù)據(jù)举瑰,從硬件接收大量數(shù)據(jù))均使用了環(huán)形隊列捣辆。
環(huán)形核心的結(jié)構(gòu)和流程說明
- 約定head指向隊列的第一個元素,
也就是說data[head]就是隊頭數(shù)據(jù)此迅,head初始值為0汽畴。 - 約定tail指向隊列的最后一個元素的后一個位置,
也就是說data[tail-1]就是隊尾數(shù)據(jù)耸序,tail初始值為0忍些。 - 隊列滿的條件是:
( tail+1 )% maxSize == head - 隊列空的條件是:
tail == head - 隊列中的元素個數(shù)為:
( tail + maxsize - head) % maxSize - 有效數(shù)據(jù)只有maxSize-1個
因為tail指向隊尾的后面一個位置,這個位置就不能存數(shù)據(jù)坎怪,因此有效數(shù)據(jù)只有maxSize-1個
至此罢坝,我們清楚我們的數(shù)據(jù)結(jié)構(gòu)主體雛形--環(huán)形的數(shù)組隊列,同時我們需要將隊列采用分離熱點的思想搅窿,將隊列分散到一個數(shù)組中嘁酿,數(shù)組可以存多個環(huán)形隊列,將每個線程id對數(shù)組長度取模男应,然后映射到數(shù)組中的下標位置中闹司,這樣每個線程都有自己對應的環(huán)形數(shù)組,就可以極大地降低競爭殉了。同樣在消費隊列元素時开仰,消費數(shù)組中的多個環(huán)形隊列中節(jié)點元素拟枚。
caffeine的設計
通過以上分析薪铜,再結(jié)合caffeine中對讀緩異步隊列的設計众弓,我們可以看到,后者的設計思想和我們分析的基本一致隔箍,下面是其數(shù)據(jù)結(jié)構(gòu)圖:
在源碼中的體現(xiàn)上谓娃,table[]數(shù)組在父類StripedBuffer里,父類提供入隊和出隊的鉤子方法以及table數(shù)組初始化和擴容的具體實現(xiàn)蜒滩。RingBuffer繼承自StripedBuffer滨达,其中包含了環(huán)形隊列中節(jié)點元素值數(shù)組 AtomicReferenceArray,以及頭結(jié)點和尾節(jié)點俯艰;提供了入隊的具體實現(xiàn)捡遍,以及消費時批量出隊的具體實現(xiàn)方法drainTo。
StripedBuffer數(shù)據(jù)結(jié)構(gòu):
//table數(shù)組的最大長度
static final int MAXIMUM_TABLE_SIZE
//buffer數(shù)組竹握,數(shù)組里的元素是ringBuffer
transient volatile Buffer<E> @Nullable[] table
//擴容或者初始化數(shù)組的鎖標識
transient volatile int tableBusy
需要注意的是画株,什么時候會進入擴容、初始化數(shù)組的操作中啦辐。在追加到RingBuffer如果失敗了(存在競爭)谓传,是會進行擴容或者初始化邏輯的。并且在擴容初始化的邏輯中芹关,會在擴容后再次嘗試追加续挟,如果連續(xù)三次都失敗的話,那么這次寫入就是失敗的侥衬。所以ringbuffer的寫入會存在失敗的情況诗祸,不能保證強成功。不過在讀緩存后異步入隊列的場景中轴总,這種偶發(fā)的失敗是可以接受的贬媒。
/**
* 入隊邏輯
*/
public int offer(E e) {
int mask;
int result = 0;
Buffer<E> buffer;
//是否不存在競爭
boolean uncontended = true;
Buffer<E>[] buffers = table;
//是否已經(jīng)初始化
if ((buffers == null)
|| (mask = buffers.length - 1) < 0
////用thread的隨機值作為hash值,得到對應位置的RingBuffer
|| (buffer = buffers[getProbe() & mask]) == null
//檢查追加到RingBuffer是否成功,如果失敗肘习,會進行擴容初始化邏輯
|| !(uncontended = ((result = buffer.offer(e)) != Buffer.FAILED))) {
expandOrRetry(e, uncontended);
}
return result;
}
/**
* 批量出隊邏輯
*/
public void drainTo(Consumer<E> consumer) {
Buffer<E>[] buffers = table;
if (buffers == null) {
return;
}
for (Buffer<E> buffer : buffers) {
if (buffer != null) {
//調(diào)用ringBuffer的出隊方法
buffer.drainTo(consumer);
}
}
}
/**
* table數(shù)組擴容际乘、初始化邏輯
*/
final void expandOrRetry(E e, boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
//默認重試3次,允許丟失
for (int attempt = 0; attempt < ATTEMPTS; attempt++) {
Buffer<E>[] buffers;
Buffer<E> buffer;
int n;
//buffer數(shù)組不為空的情況
if (((buffers = table) != null) && ((n = buffers.length) > 0)) {
//對應數(shù)組下標元素為空
if ((buffer = buffers[(n - 1) & h]) == null) {
if ((tableBusy == 0) && casTableBusy()) { // Try to attach new Buffer
boolean created = false;
try { // Recheck under lock
Buffer<E>[] rs;
int mask, j;
if (((rs = table) != null) && ((mask = rs.length) > 0)
&& (rs[j = (mask - 1) & h] == null)) {
rs[j] = create(e);
created = true;
}
} finally {
tableBusy = 0;
}
if (created) {
break;
}
continue; // Slot is now non-empty
}
collide = false;
} else if (!wasUncontended) { // CAS already known to fail
wasUncontended = true; // Continue after rehash
}
//環(huán)形隊列里追加元素是否成功漂佩,這里的offer是ringbuffer環(huán)形隊列的入隊
else if (buffer.offer(e) != Buffer.FAILED) {
break;
}
//數(shù)組長度是否大于最大長度脖含,最大長度= 4*2的N次方(N=cpu核心數(shù))
else if (n >= MAXIMUM_TABLE_SIZE || table != buffers) {
collide = false; // At max size or stale
} else if (!collide) {
collide = true;
}
//真正的擴容邏輯(走到這步說明環(huán)形隊列中添加元素失敗,并發(fā)太高投蝉,此時正有其他線程也在這個buffer環(huán)形隊列中添加元素养葵,所以考慮擴容table數(shù)組的長度)
else if (tableBusy == 0 && casTableBusy()) {
try {
if (table == buffers) { // Expand table unless stale
table = Arrays.copyOf(buffers, n << 1);
}
} finally {
tableBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
//buffer數(shù)組為空則初始化數(shù)組
else if ((tableBusy == 0) && (table == buffers) && casTableBusy()) {
boolean init = false;
try { // Initialize table
if (table == buffers) {
@SuppressWarnings({"unchecked", "rawtypes"})
Buffer<E>[] rs = new Buffer[1];
rs[0] = create(e);
table = rs;
init = true;
}
} finally {
tableBusy = 0;
}
if (init) {
break;
}
}
}
}
RingBuffer的數(shù)據(jù)結(jié)構(gòu),這里需要注意的是瘩缆,為了方便理解关拒,羅列了部分主要屬性,實際readCounter和writeCounter在計算時,是相對于各自原始位置的偏移量着绊。
final class BoundedBuffer<E> extends StripedBuffer<E>{
static final class RingBuffer<E> extends BBHeader.ReadAndWriteCounterRef{
//引用類型原子數(shù)組(存放環(huán)形隊列中的節(jié)點值)
final AtomicReferenceArray<E> buffer;
//讀指針:相當于head
volatile long readCounter;
//寫指針:相當于tail
volatile long writeCounter;
public RingBuffer(E e) {
//BUFFER_SIZE = 16
buffer = new AtomicReferenceArray<>(BUFFER_SIZE);
buffer.lazySet(0, e);
}
/**
* 入隊
*/
public int offer(E e) {
long head = readCounter;
long tail = relaxedWriteCounter();
long size = (tail - head);
//判斷環(huán)形隊列是否已滿谐算,這里之所以可以用tail-head>=buffer_size 來判斷是否隊列已滿,
//是因為這里的head和tail是相對于各自原始位置的偏移量
if (size >= BUFFER_SIZE) {
return Buffer.FULL;
}
//將tail指針向后移動一格
if (casWriteCounter(tail, tail + 1)) {
int index = (int) (tail & MASK);
//設置環(huán)形數(shù)組中響應位置元素
buffer.lazySet(index, e);
return Buffer.SUCCESS;
}
return Buffer.FAILED;
}
/**
* 批量出隊
*/
public void drainTo(Consumer<E> consumer) {
long head = readCounter;
long tail = relaxedWriteCounter();
long size = (tail - head);
//環(huán)形隊列是否已空
if (size == 0) {
return;
}
do {
int index = (int) (head & MASK);
E e = buffer.get(index);
if (e == null) {
// not published yet
break;
}
//元素出隊归露,相應位置置為空
buffer.lazySet(index, null);
consumer.accept(e);
//head指針向后移動一格
head++;
} while (head != tail);
//批量設置head指針的偏移量
lazySetReadCounter(head);
}
}
}
總結(jié):
caffeine在緩存讀日志(讀操作) 高并發(fā)寫入隊列場景中洲脂,使用了環(huán)形隊列可以有效避免高并發(fā)場景下節(jié)點頻繁創(chuàng)建帶來的gc壓力,同時可以邊寫邊讀(消費)剧包,提高了隊列的吞吐量恐锦。并且采用原生cas操作和分離熱點的方法,將線程對隊列并發(fā)寫的競爭分散到數(shù)組中的每一個隊列中疆液,有效降低了并發(fā)的競爭一铅。