1. 比賽攻略 (Rapids團(tuán)隊(duì) - C++實(shí)現(xiàn))
- 注意:詳細(xì)代碼和答辯PPT下載請(qǐng)查看Github倉庫RapidsAtHKUST/EngineRaceRapids。
- Rapids團(tuán)隊(duì)成員: CheYulin, SunShixuan, WangLipeng。
1.1. 賽題介紹
評(píng)測(cè)程序分為2個(gè)階段:
1)Recover正確性評(píng)測(cè): 此階段評(píng)測(cè)程序會(huì)并發(fā)寫入特定數(shù)據(jù)(key 8B、value 4KB)
同時(shí)進(jìn)行任意次kill -9來模擬進(jìn)程意外退出(參賽引擎需要保證進(jìn)程意外退出時(shí)數(shù)據(jù)持久化不丟失)逆瑞,接著重新打開DB,調(diào)用Read、Range接口來進(jìn)行正確性校驗(yàn)亮垫。
2)性能評(píng)測(cè)
- 隨機(jī)寫入:64個(gè)線程并發(fā)隨機(jī)寫入羔沙,每個(gè)線程使用Write各寫100萬次隨機(jī)數(shù)據(jù)(key 8B躺涝、value 4KB)
- 隨機(jī)讀取:64個(gè)線程并發(fā)隨機(jī)讀取扼雏,每個(gè)線程各使用Read讀取100萬次隨機(jī)數(shù)據(jù)
- 順序讀燃崾取:64個(gè)線程并發(fā)順序讀取,每個(gè)線程各使用Range有序(增序)遍歷全量數(shù)據(jù)2次
注:
2.2階段會(huì)對(duì)所有讀取的kv校驗(yàn)是否匹配诗充,如不通過則終止苍蔬,評(píng)測(cè)失敗蝴蜓;
2.3階段除了對(duì)迭代出來每條的kv校驗(yàn)是否匹配外碟绑,還會(huì)額外校驗(yàn)是否嚴(yán)格遞增,如不通過則終止茎匠,評(píng)測(cè)失敗蜈敢。
1.2. 最終線上效果
- 進(jìn)程耗時(shí)
子階段 | 進(jìn)程最佳占用時(shí)間 | 階段IO吞吐量波動(dòng)范圍 |
---|---|---|
隨機(jī)寫入 | 114.1 seconds左右 |
2176.27-2195.34 MB/s |
隨機(jī)讀取 | 105.9 seconds左右 (包括0.2 seconds 索引構(gòu)建) |
2287.23-2291.56 MB/s |
順序讀取 | 192.1 seconds左右 (包括0.2 seconds 索引構(gòu)建) |
2596.04-2601.45 MB/s |
順序讀取中,在內(nèi)存并發(fā)讀取visit總時(shí)間大概在105
-110
秒汽抚,吞吐量: 284.1
-296.2
GB/s 抓狭。
- 進(jìn)程啟動(dòng)間隔(波動(dòng)不大,取其中一次作為樣本)
階段之間間隔 | 耗時(shí) |
---|---|
隨機(jī)寫入啟動(dòng)的間隔 |
0.1 seconds 左右 |
隨機(jī)寫入到讀取的間隔 |
0.35 seconds 左右 |
隨機(jī)讀取到順序讀取的間隔 |
0.45 seconds 左右 |
- 最優(yōu)成績
最優(yōu)成績 | 總時(shí)間 |
---|---|
理論歷史最佳成績 | 114.1+105.9+192.1+0.1+0.35+0.45=413.00 seconds |
歷史最佳成績 | 413.69 seconds |
- 歷史最佳成績離理論歷史最佳成績累加差了
0.69 seconds
造烁;
這應(yīng)該是與機(jī)器讀寫性能波動(dòng)有關(guān)否过,其中寫性能波動(dòng)最大。
2. 賽題背景分析及理解
2.1. 賽題注意點(diǎn)
- Recover正確性評(píng)測(cè)要求: 每次Write調(diào)用都要將數(shù)據(jù)至少寫入到page-cache惭蟋。
- 每個(gè)階段開始苗桂,DB Engine都會(huì)被重新打開;
每個(gè)階段進(jìn)行中告组, 只有讀取或者寫入中的一種情況發(fā)生煤伟;
每一階段結(jié)束,page cache會(huì)被清空(清空page cache的時(shí)間占用總時(shí)間)木缝。 - 對(duì)于復(fù)賽的順序讀取便锨,recovery階段使用單線程,并且只遍歷全量數(shù)據(jù)1次我碟,選手需要做相應(yīng)處理放案。
- 評(píng)測(cè)中, Key-Value大小都是固定的矫俺, Key可用64bit無符號(hào)整數(shù)表示吱殉,并且分布均勻掸冤。
- 評(píng)測(cè)中,允許使用的最大內(nèi)存
2GB
(不計(jì)評(píng)測(cè)程序內(nèi)存占用)友雳;各子階段會(huì)有固定的64線程隨機(jī)寫入或隨機(jī)讀取或順序讀取稿湿。
2.2. 賽題分析
-
傲騰存儲(chǔ)特征
- 為達(dá)到磁盤的峰值吞吐量,
需要保證iostat
中的兩個(gè)關(guān)鍵參數(shù)avgrq-sz
(扇區(qū)個(gè)數(shù)押赊,一般每個(gè)扇區(qū)512B
)和avgqu-sz
(IO隊(duì)列長度) 處于合適大小缎罢。
并且每個(gè)request有最大大小,不能設(shè)置太大考杉,具體可以查看blockdev --getmaxsect /dev/sda
(/dev/sda
為查看的設(shè)備)策精。 - 隨機(jī)讀寫只要有合適大小塊(
avgrq-sz
),保證足夠IO隊(duì)列長度(avgqu-sz
)就可以達(dá)到接近順序讀寫的效果崇棠。 - 順序讀取通過
128KB
大小請(qǐng)求,QD=8
可以達(dá)到2595-2605 MB/s
左右咽袜。 - 隨機(jī)讀取
4KB
大小請(qǐng)求,QD=20+
可以達(dá)到2290 MB/s
左右,波動(dòng)很小枕稀。 - 隨機(jī)寫入
16KB
大小請(qǐng)求,QD=20+
可以達(dá)到2180-2200 MB/s
询刹。
- 為達(dá)到磁盤的峰值吞吐量,
-
注意點(diǎn)
- 隨機(jī)寫入:保證
iostat
中的合適大小的avgrq-sz
(通過實(shí)驗(yàn)測(cè)得 mmap buffer 16KB比較優(yōu)),
保證avgqu-sz
(handle tail threads)萎坷,至少需要QD=8
(實(shí)驗(yàn)中測(cè)得8,16,32都差不多)凹联,通過寫文件時(shí)候系統(tǒng)的同步(鎖)。 - 隨機(jī)讀榷叩怠:保證
iostat
中的合適大小的avgqu-sz
(handle tail threads)蔽挠。 - 順序讀取:保證
iostat
中的合適avgrq-sz
和avgqu-sz
瓜浸。做好充分的overlap-io和內(nèi)存訪問(100-110秒左右)澳淑。 - 三階段是獨(dú)立進(jìn)行的(無混合的讀寫操作),因此評(píng)測(cè)不要求索引支持
O(logn)
或更低復(fù)雜度的動(dòng)態(tài)插入插佛。
- 隨機(jī)寫入:保證
3. 核心思路
每一階段結(jié)束杠巡,page cache會(huì)被清空。
因此雇寇,整體設(shè)計(jì)中除了內(nèi)存映射文件(meta-file, key/val buffer files),
其他文件操作都通過DirectIO氢拥。
3.1. 存儲(chǔ)和索引設(shè)計(jì)
-
文件設(shè)計(jì)
- Key和Value都使用write-ahead-log方式,順序append到相應(yīng)位置锨侯,并通過一個(gè)meta文件記錄相應(yīng)記錄個(gè)數(shù)嫩海。
- Value大小固定為
4KB
,因此在設(shè)計(jì)索引時(shí)候可以不存Value長度识腿,
并且可以將偏移量直接除以4KB
來減少空間占用出革。 - Key大小固定為
8B
, 根據(jù)PolarString
定義, Key可以被轉(zhuǎn)化為64bit-Big-Endian無符號(hào)整數(shù)表示渡讼,
Key分布比較隨機(jī)骂束,因此可按Key分Bucket來設(shè)計(jì)存儲(chǔ)結(jié)構(gòu), 來支持并發(fā)恢復(fù)索引。 - 可以將Key-Value在寫入時(shí)候一一對(duì)應(yīng)成箫,這樣就不用寫偏移量展箱,而通過順序遍歷write-ahead-log恢復(fù)出來。
-
索引設(shè)計(jì)
- 評(píng)測(cè)中不要求索引支持
O(logn)
或更低復(fù)雜度的動(dòng)態(tài)插入蹬昌,因此采用 bucket + sorted array 的方案混驰;
按照Key的Big-Endian無符號(hào)整數(shù)高位分bucket,每個(gè)Bucekt內(nèi)采用根據(jù)Bucket內(nèi)偏移比較的sorted array皂贩;
通過計(jì)算bucket id 和 branchless-lower-bound加跳過重復(fù)支持value in-bucket offset查詢栖榨。 - 并行索引構(gòu)建(0.2秒,
throughput = 488.28125MB/0.2s = 2441.4 MB/s
)明刷,每個(gè)線程分配一些buckets婴栽,
因?yàn)槊總€(gè)bucket的大小均勻所以workload balanced,多bucket使得sort時(shí)間可以忽略辈末,
此外sort還和io overlap在一起愚争。
- 評(píng)測(cè)中不要求索引支持
-
寫入相關(guān)的文件設(shè)計(jì)
- 寫入buffer必須使用文件作為backend的(通過mmap),來保證正確性挤聘。
- 考慮到range時(shí)候順序讀盤更快轰枝,寫入時(shí)候同一bucket寫在同一區(qū)域。
3.2. 文件設(shè)計(jì)
- key/value write-ahead logs 各32個(gè)(一個(gè)文件里面分bucket, bucket id相鄰的在文件中相鄰)组去。
- meta-file, mmap key buffer file, mmap value buffer file各1個(gè), slice成
BUCKET_NUM
的views (e.g. 1024)鞍陨。
文件整體設(shè)計(jì)分為三部分:
(1) K-V-Log文件, (2) Meta-Count文件, (3) Key/Value Buffer文件。
3.2.1. K-V Log文件
- key-value的對(duì)應(yīng):
邏輯上, key-value被寫到一個(gè)的相同bucket, 對(duì)應(yīng)到相同的in-bucket offset,
通過write-ahead追加到對(duì)應(yīng)的log文件从隆。
我們把8-byte-key
通過big-endian轉(zhuǎn)化出uint64_t
類型的整數(shù)key
湾戳。
對(duì)應(yīng)從key
到bucket_id
的計(jì)算如下代碼所示:
inline uint32_t get_par_bucket_id(uint64_t key) {
return static_cast<uint32_t >((key >> (64 - BUCKET_DIGITS)) & 0xffffffu);
}
- 邏輯上的bucket到實(shí)際中的文件, 通過下面的函數(shù)算出,
在設(shè)計(jì)中,
我們讓相鄰的value buckets被group到同一個(gè)value log file,
來為range查詢順序讀服務(wù)。
inline pair<uint32_t, uint64_t> get_key_fid_foff(uint32_t bucket_id, uint32_t bucket_off) {
constexpr uint32_t BUCKET_NUM_PER_FILE = (BUCKET_NUM / KEY_FILE_NUM);
uint32_t fid = bucket_id / BUCKET_NUM_PER_FILE;
uint64_t foff = MAX_KEY_BUCKET_SIZE * (bucket_id % BUCKET_NUM_PER_FILE) + bucket_off;
return make_pair(fid, foff * sizeof(uint64_t));
}
inline pair<uint32_t, uint64_t> get_value_fid_foff(uint32_t bucket_id, uint32_t bucket_off) {
// Buckets 0,1,2,3... grouped together.
constexpr uint32_t BUCKET_NUM_PER_FILE = (BUCKET_NUM / VAL_FILE_NUM);
uint32_t fid = bucket_id / BUCKET_NUM_PER_FILE;
uint64_t foff = MAX_VAL_BUCKET_SIZE * (bucket_id % BUCKET_NUM_PER_FILE) + bucket_off;
return make_pair(fid, foff * VALUE_SIZE);
}
- 最終設(shè)計(jì)中, 我們采用了32個(gè)value文件和32個(gè)key文件广料。
這是因?yàn)槎嗑€程寫入同一文件時(shí)候可以有一定的同步作用(有寫入鎖的存在),
來緩解最后剩下tail threads打不滿IO的情況砾脑。
此外,文件過多容易觸發(fā)Linux操作系統(tǒng)的bug艾杏,從DirectIO進(jìn)入BufferIO, 即使已經(jīng)標(biāo)志flag設(shè)置了O_DIRECT
.
3.2.2. Meta-Count 文件
- meta-count文件用來記錄每個(gè)bucket中現(xiàn)在write-ahead進(jìn)行到第幾個(gè)in-bucket位置了,
該文件通過內(nèi)存映射的方式, 來通過操作對(duì)應(yīng)數(shù)組mmap_meta_cnt_
韧衣,
記錄每個(gè)bucket寫入write-ahead-entry個(gè)數(shù)。
// Meta.
meta_cnt_file_dp_ = open(meta_file_path.c_str(), O_RDWR | O_CREAT, FILE_PRIVILEGE);
ftruncate(meta_cnt_file_dp_, sizeof(uint32_t) * BUCKET_NUM);
mmap_meta_cnt_ = (uint32_t *) mmap(nullptr, sizeof(uint32_t) * (BUCKET_NUM),
PROT_READ | PROT_WRITE, MAP_SHARED, meta_cnt_file_dp_, 0);
memset(mmap_meta_cnt_, 0, sizeof(uint32_t) * (BUCKET_NUM));
3.2.3. Key/Value Buffer 文件
buffer files用來在寫入時(shí)候進(jìn)行對(duì)每個(gè)bucket-entries的buffer
(通過內(nèi)存映射文件得到aligned buffer, 來具備kill-9
容錯(cuò)功能).
一個(gè)bucket對(duì)應(yīng)相應(yīng)的key-buffer和value-buffer;
所有的key-buffers從一個(gè)key-buffer文件內(nèi)存映射出來;
同理所有的val-buffers從一個(gè)val-buffer文件映射出來购桑。
我們給出一個(gè)Value Buffer文件的示例, Key Buffer文件相關(guān)的設(shè)計(jì)與之類似畅铭。
// Value Buffers. (To be sliced into BUCKET_NUM slices)
value_buffer_file_dp_ = open(value_buffers_file_path.c_str(), O_RDWR | O_CREAT, FILE_PRIVILEGE);
if (value_buffer_file_dp_ < 0) {
log_info("valbuf err info: %s", strerror(errno));
exit(-1);
}
ftruncate(value_buffer_file_dp_, tmp_buffer_value_file_size);
mmap_value_aligned_buffer_ = (char *) mmap(nullptr, tmp_buffer_value_file_size, \
PROT_READ | PROT_WRITE, MAP_SHARED, value_buffer_file_dp_, 0);
for (int i = 0; i < BUCKET_NUM; i++) {
mmap_value_aligned_buffer_view_[i] =
mmap_value_aligned_buffer_ + VALUE_SIZE * TMP_VALUE_BUFFER_SIZE * i;
}
- 在設(shè)計(jì)中, 我們使用了
16KB
value buffer 和4KB
key buffer,
分別整除VALUE_SIZE
和sizeof(uint64_t)
。
我們選擇較小的buffer是為了讓IO盡可能快地均衡地被打出去(不要有很少的線程最后還在打IO以致于打不滿),
value buffer不選擇更小是為了防止sys-cpu過高影響性能勃蜘,并且我們對(duì)磁盤的benchmark顯示16KB
是一個(gè)比較優(yōu)的值硕噩。
3.3. 寫入階段思路
- 清空page cache占用總時(shí)間,傲騰存儲(chǔ)iops和throughput都高缭贡,
因此使用DirectIO繞過page cache手動(dòng)管理緩沖和寫盤炉擅。 - 寫入時(shí)候先對(duì)bucket加鎖辉懒,將Key/Value一一對(duì)應(yīng),分別寫入這個(gè)bucket對(duì)應(yīng)的key-mmap-buffer和value-mmap-buffer谍失,
在buffer滿的時(shí)候?qū)懭雔og文件眶俩。
3.4. 隨機(jī)和順序讀取階段設(shè)計(jì)思路
-
隨機(jī)讀取
- 做最小粒度的同步。奇數(shù)和偶數(shù)線程同步快鱼,來保證64線程間較小的讀取進(jìn)度差別和穩(wěn)定的queue-depth(24左右)颠印。
- 嘗試了讀
8KB
,并進(jìn)行緩存置換抹竹,命中率不高:2%线罕;這說明通過讀更大block-size來優(yōu)化隨機(jī)讀取不可行。
-
順序讀取
- 流水線設(shè)計(jì):io部分(io協(xié)調(diào)者窃判, io線程)钞楼,通過set-affinity減少numa結(jié)點(diǎn)間的切換;內(nèi)存讀取部分(64threads)同步讀一個(gè)bucket與下一塊bucket的io overlap在一起兢孝。
- 每塊buffer為一個(gè)讀取單位窿凤,io協(xié)調(diào)者通過提交任務(wù)給io線程打到合適
avgrq-sz
和avgqu-sz
,從而打滿IO throughput跨蟹,詳見io協(xié)調(diào)者雳殊。 - 流水線使用2塊buffer滾動(dòng)。
- 在多次全量遍歷中窗轩,偶數(shù)次次遍歷重用奇數(shù)次的前幾塊buffer(塊數(shù)通過程序中的
KEEP_REUSE_BUFFER_NUM
指定夯秃,作為cache)。
3.5. 容錯(cuò)(K/V Buffer Files Flush)的思路
大體思路: 我們通過
ParallelFlushTmp
并行flush key, value buffers;
該函數(shù)在寫入階段的析構(gòu)函數(shù)調(diào)用(如果進(jìn)行到對(duì)應(yīng)代碼),
否則在讀取階段構(gòu)建index前會(huì)調(diào)用痢艺。優(yōu)化: 我們通過
ftruncate
對(duì)應(yīng)文件長度為0表示所有buckets對(duì)應(yīng)的需要flush的buffers已經(jīng)Flush出去了,
避免重復(fù)的Flush. 相應(yīng)邏輯在FlushTmpFiles
函數(shù)中仓洼。
4. 關(guān)鍵代碼
4.1. 隨機(jī)寫入
4.1.1. 實(shí)現(xiàn)邏輯
通過鎖一個(gè)bucket使得key-value在bucket中一一對(duì)應(yīng),
并且使得bucket的meta-count被正確地更改;
寫入之前先寫bucket對(duì)應(yīng)buffer, buffer滿了之后進(jìn)行阻塞的pwrite
系統(tǒng)調(diào)用堤舒。
大體邏輯如下代碼所示:
{
unique_lock<mutex> lock(bucket_mtx_[bucket_id]);
// Write value to the value file, with a tmp file as value_buffer.
uint32_t val_buffer_offset = (mmap_meta_cnt_[bucket_id] % TMP_VALUE_BUFFER_SIZE) * VALUE_SIZE;
char *value_buffer = mmap_value_aligned_buffer_view_[bucket_id];
memcpy(value_buffer + val_buffer_offset, value.data(), VALUE_SIZE);
// Write value to the value file.
if ((mmap_meta_cnt_[bucket_id] + 1) % TMP_VALUE_BUFFER_SIZE == 0) {
uint32_t in_bucket_id = mmap_meta_cnt_[bucket_id] - (TMP_VALUE_BUFFER_SIZE - 1);
uint32_t fid;
uint64_t foff;
tie(fid, foff) = get_value_fid_foff(bucket_id, in_bucket_id);
pwrite(value_file_dp_[fid], value_buffer, VALUE_SIZE * TMP_VALUE_BUFFER_SIZE, foff);
}
// Write key to the key file.
uint32_t key_buffer_offset = (mmap_meta_cnt_[bucket_id] % TMP_KEY_BUFFER_SIZE);
uint64_t *key_buffer = mmap_key_aligned_buffer_view_[bucket_id];
key_buffer[key_buffer_offset] = key_int_big_endian;
if (((mmap_meta_cnt_[bucket_id] + 1) % TMP_KEY_BUFFER_SIZE) == 0) {
uint32_t in_bucket_id = (mmap_meta_cnt_[bucket_id] - (TMP_KEY_BUFFER_SIZE - 1));
uint32_t fid;
uint64_t foff;
tie(fid, foff) = get_key_fid_foff(bucket_id, in_bucket_id);
pwrite(key_file_dp_[fid], key_buffer, sizeof(uint64_t) * TMP_KEY_BUFFER_SIZE, foff);
}
// Update the meta data.
mmap_meta_cnt_[bucket_id]++;
}
4.1.2. 優(yōu)化: 調(diào)整文件個(gè)數(shù)為32
調(diào)整文件個(gè)數(shù)為32后色建,
可以利用不同線程寫同一文件時(shí)候的阻塞,取得一定程度上的同步效果舌缤。
使得QueueDepth在基本所有時(shí)刻(包括最終快結(jié)束時(shí)刻)還處于大于8的水平箕戳,
來應(yīng)對(duì)tail threads queue-depth打不高的挑戰(zhàn)。
4.2. 并行索引構(gòu)建設(shè)計(jì)
思路:
對(duì)每個(gè)Bucket構(gòu)建SortedArray作為Index国撵。回顧:
文件設(shè)計(jì)中統(tǒng)一bucket的key-value對(duì)應(yīng)起來了,
那么在構(gòu)建中key的in-bucket offset和value的in-bucket offset是一樣的陵吸。
每個(gè)worker處理對(duì)應(yīng)的buckets, 邏輯上的buckets可以通過之前講的K-V Log文件設(shè)計(jì)對(duì)應(yīng)過去。
在整個(gè)數(shù)組被填充好了之后可以根據(jù)下面這個(gè)comparator函數(shù)對(duì)象進(jìn)行排序.
[](KeyEntry l, KeyEntry r) {
if (l.key_ == r.key_) {
return l.value_offset_ > r.value_offset_;
} else {
return l.key_ < r.key_;
}
具體邏輯:
每個(gè)線程分配到的一個(gè)任務(wù)分成兩部分:1. 讀取填充in-bucket-offset, 2. 排序介牙。
1024 buckets被均勻地分到64個(gè)線程(key大致均勻地分布到每個(gè)bucket)壮虫,
構(gòu)建過程中排序和磁盤IO是overlap在一起的。這個(gè)階段主要時(shí)間開銷在于讀key-logs文件(sort開銷可以忽略不計(jì)), 總開銷大概
0.2 seconds
左右环础。詳細(xì)代碼如下:
vector <thread> workers(NUM_READ_KEY_THREADS);
for (uint32_t tid = 0; tid < NUM_READ_KEY_THREADS; ++tid) {
workers[tid] = thread([tid, local_buffers_g, this]() {
uint64_t *local_buffer = local_buffers_g[tid];
uint32_t avg = BUCKET_NUM / NUM_READ_KEY_THREADS;
for (uint32_t bucket_id = tid * avg; bucket_id < (tid + 1) * avg; bucket_id++) {
uint32_t entry_count = mmap_meta_cnt_[bucket_id];
if (entry_count > 0) {
uint32_t passes = entry_count / KEY_READ_BLOCK_COUNT;
uint32_t remain_entries_count = entry_count - passes * KEY_READ_BLOCK_COUNT;
uint32_t file_offset = 0;
auto fid_foff = get_key_fid_foff(bucket_id, 0);
uint32_t key_fid = fid_foff.first;
size_t read_offset = fid_foff.second;
for (uint32_t j = 0; j < passes; ++j) {
auto ret = pread(key_file_dp_[key_fid], local_buffer,
KEY_READ_BLOCK_COUNT * sizeof(uint64_t), read_offset);
if (ret != KEY_READ_BLOCK_COUNT * sizeof(uint64_t)) {
log_info("ret: %d, err: %s", ret, strerror(errno));
}
for (uint32_t k = 0; k < KEY_READ_BLOCK_COUNT; k++) {
index_[bucket_id][file_offset].key_ = local_buffer[k];
index_[bucket_id][file_offset].value_offset_ = file_offset;
file_offset++;
}
read_offset += KEY_READ_BLOCK_COUNT * sizeof(uint64_t);
}
if (remain_entries_count != 0) {
size_t num_bytes = (remain_entries_count * sizeof(uint64_t) + FILESYSTEM_BLOCK_SIZE - 1) /
FILESYSTEM_BLOCK_SIZE * FILESYSTEM_BLOCK_SIZE;
auto ret = pread(key_file_dp_[key_fid], local_buffer, num_bytes, read_offset);
if (ret < static_cast<ssize_t>(remain_entries_count * sizeof(uint64_t))) {
log_info("ret: %d, err: %s, fid:%zu off: %zu", ret, strerror(errno), key_fid,
read_offset);
}
for (uint32_t k = 0; k < remain_entries_count; k++) {
index_[bucket_id][file_offset].key_ = local_buffer[k];
index_[bucket_id][file_offset].value_offset_ = file_offset;
file_offset++;
}
}
sort(index_[bucket_id], index_[bucket_id] + entry_count, [](KeyEntry l, KeyEntry r) {
if (l.key_ == r.key_) {
return l.value_offset_ > r.value_offset_;
} else {
return l.key_ < r.key_;
}
});
}
}
});
}
4.3. 隨機(jī)讀取
4.3.1. 實(shí)現(xiàn)邏輯
隨機(jī)讀取基本邏輯就是:查詢index, 如果是key-not-found就返回; 否則讀文件囚似。
查詢index代碼如下剩拢,其中主要用了帶prefetch優(yōu)化的二分查找:
uint64_t big_endian_key_uint = bswap_64(TO_UINT64(key.data()));
KeyEntry tmp{};
tmp.key_ = big_endian_key_uint;
auto bucket_id = get_par_bucket_id(big_endian_key_uint);
auto it = index_[bucket_id] + branchfree_search(index_[bucket_id], mmap_meta_cnt_[bucket_id], tmp);
- 剩余的key-not-found判斷和讀value邏輯如下所示:
if (it == index_[bucket_id] + mmap_meta_cnt_[bucket_id] || it->key_ != big_endian_key_uint) {
NotifyRandomReader(local_block_offset, tid);
return kNotFound;
}
uint32_t fid;
uint64_t foff;
std::tie(fid, foff) = get_value_fid_foff(bucket_id, it->value_offset_);
pread(value_file_dp_[fid], value_buffer, VALUE_SIZE, foff);
NotifyRandomReader(local_block_offset, tid);
value->assign(value_buffer, VALUE_SIZE);
4.3.2. 優(yōu)化:細(xì)粒度同步
思路: 我們?cè)O(shè)計(jì)了同步策略來保證足夠的queue-depth (25-30之間)的同時(shí),
又使得不同線程可以盡量同時(shí)退出, 盡量避免少queue-depth打IO情況的出現(xiàn)谆构。
實(shí)現(xiàn)細(xì)節(jié): 我們引入了4個(gè)blocking queues
notify_queues_
來作為
偶數(shù)和奇數(shù)線程的 當(dāng)前和下一輪讀取的同步通信工具 (tid%2==0
與tid%2==1
線程互相通知)裸扶。實(shí)現(xiàn)細(xì)節(jié)1 (初始化邏輯): 初始化時(shí)候放入偶數(shù)線程的blocking queue來讓他們啟動(dòng)起來框都。
if (local_block_offset == 0) {
if (tid == 0) {
notify_queues_.resize(4);
for (auto i = 0; i < 4; i++) {
// Even-0,1 Odd-2,3
notify_queues_[i] = new moodycamel::BlockingConcurrentQueue<int32_t>(NUM_THREADS);
}
for (uint32_t i = 0; i < NUM_THREADS / 2; i++) {
notify_queues_[0]->enqueue(1);
}
}
read_barrier_.Wait();
}
- 實(shí)現(xiàn)細(xì)節(jié)2 (等待邏輯): 每一round的開始的時(shí)候會(huì)有一個(gè)等待搬素。
uint32_t current_round = local_block_offset - 1;
if ((current_round % SHRINK_SYNC_FACTOR) == 0) {
uint32_t notify_big_round_idx = get_notify_big_round(current_round);
if (tid % 2 == 0) {
notify_queues_[notify_big_round_idx % 2]->wait_dequeue(tmp_val);
} else {
notify_queues_[notify_big_round_idx % 2 + 2]->wait_dequeue(tmp_val);
}
}
- 實(shí)現(xiàn)細(xì)節(jié)3 (通知邏輯): 偶數(shù)線程通知奇數(shù)線程當(dāng)前round, 奇數(shù)線程通知偶數(shù)線程下一round。
void EngineRace::NotifyRandomReader(uint32_t local_block_offset, int64_t tid) {
uint32_t current_round = local_block_offset - 1;
if ((current_round % SHRINK_SYNC_FACTOR) == SHRINK_SYNC_FACTOR - 1) {
uint32_t notify_big_round_idx = get_notify_big_round(current_round);
if (tid % 2 == 0) {
notify_queues_[(notify_big_round_idx) % 2 + 2]->enqueue(1); // Notify This Round
} else {
notify_queues_[(notify_big_round_idx + 1) % 2]->enqueue(1); // Notify Next Round
}
}
}
4.4. 順序讀取和并發(fā)全量遍歷
4.4.1. 內(nèi)存訪問和磁盤IO流水線設(shè)計(jì)
主體邏輯: 單個(gè)IO協(xié)調(diào)線程一直發(fā)任務(wù)讓IO線程打IO, 其他線程消費(fèi)內(nèi)存,
每進(jìn)行一個(gè)bucket進(jìn)行一次barrier來防止visit內(nèi)存占用太多資源魏保。實(shí)現(xiàn)細(xì)節(jié)1 (IO協(xié)調(diào)線程通知memory visit 線程 value buffer結(jié)果ready 的同步):
通過使用promise和future進(jìn)行 (promises_
,futures_
).
每個(gè)bucket會(huì)對(duì)應(yīng)一個(gè)promise, 來表示一個(gè)未來的獲取到的返回結(jié)果(也就是讀取完的buffer),
這個(gè)promise對(duì)應(yīng)了一個(gè)shared_future
, 使得所有visitors可以等待該返回結(jié)果熬尺。實(shí)現(xiàn)細(xì)節(jié)2 (通知IO協(xié)調(diào)線程free buffers已經(jīng)有了):
通過一個(gè)blocking queuefree_buffers_
來記錄free buffers,
visitor線程push buffer進(jìn)入free_buffers_
, IO協(xié)調(diào)者從中pop buffer。
4.4.2. 具體實(shí)現(xiàn)
具體實(shí)現(xiàn)分為三個(gè)部分:IO協(xié)調(diào)者谓罗,IO線程粱哼,以及內(nèi)存vistor線程。
- 對(duì)應(yīng)的IO協(xié)調(diào)thread邏輯如下 (其中最重要的
ReadBucketToBuffer
通過保證request-size = 128KB
檩咱,和queue-depth=8
來打滿IO):
single_range_io_worker_ = new thread([this]() {
// Odd Round.
log_info("In Range IO");
for (uint32_t next_bucket_idx = 0; next_bucket_idx < BUCKET_NUM; next_bucket_idx++) {
// 1st: Pop Buffer.
auto range_clock_beg = high_resolution_clock::now();
char *buffer = free_buffers_->pop(total_io_sleep_time_);
auto range_clock_end = high_resolution_clock::now();
double elapsed_time =
duration_cast<nanoseconds>(range_clock_end - range_clock_beg).count() /
static_cast<double>(1000000000);
total_blocking_queue_time_ += elapsed_time;
// 2nd: Read
ReadBucketToBuffer(next_bucket_idx, buffer);
promises_[next_bucket_idx].set_value(buffer);
}
log_info("In Range IO, Finish Odd Round");
// Even Round.
for (uint32_t next_bucket_idx = 0; next_bucket_idx < BUCKET_NUM; next_bucket_idx++) {
uint32_t future_id = next_bucket_idx + BUCKET_NUM;
char *buffer;
if (next_bucket_idx >= KEEP_REUSE_BUFFER_NUM) {
// 1st: Pop Buffer.
auto range_clock_beg = high_resolution_clock::now();
buffer = free_buffers_->pop(total_io_sleep_time_);
auto range_clock_end = high_resolution_clock::now();
double elapsed_time =
duration_cast<nanoseconds>(range_clock_end - range_clock_beg).count() /
static_cast<double>(1000000000);
total_blocking_queue_time_ += elapsed_time;
// 2nd: Read
ReadBucketToBuffer(next_bucket_idx, buffer);
} else {
buffer = cached_front_buffers_[next_bucket_idx];
}
promises_[future_id].set_value(buffer);
}
log_info("In Range IO, Finish Even Round");
});
- 其中IO協(xié)調(diào)thread具體的submit讀單個(gè)bucket任務(wù)的ReadBucketToBufferh函數(shù)揭措,
通過保證request-size = 128KB
,和queue-depth=8
來打滿IO, 詳細(xì)邏輯如下:
void EngineRace::ReadBucketToBuffer(uint32_t bucket_id, char *value_buffer) {
auto range_clock_beg = high_resolution_clock::now();
if (value_buffer == nullptr) {
return;
}
// Get fid, and off.
uint32_t fid;
uint64_t foff;
std::tie(fid, foff) = get_value_fid_foff(bucket_id, 0);
uint32_t value_num = mmap_meta_cnt_[bucket_id];
uint32_t remain_value_num = value_num % VAL_AGG_NUM;
uint32_t total_block_num = (remain_value_num == 0 ? (value_num / VAL_AGG_NUM) :
(value_num / VAL_AGG_NUM + 1));
uint32_t completed_block_num = 0;
uint32_t last_block_size = (remain_value_num == 0 ? (VALUE_SIZE * VAL_AGG_NUM) :
(remain_value_num * VALUE_SIZE));
uint32_t submitted_block_num = 0;
// Submit to Maintain Queue Depth.
while (completed_block_num < total_block_num) {
for (uint32_t io_id = 0; io_id < RANGE_QUEUE_DEPTH; io_id++) {
// Peek Completions If Possible.
if (range_worker_status_tls_[io_id] == WORKER_COMPLETED) {
completed_block_num++;
range_worker_status_tls_[io_id] = WORKER_IDLE;
}
// Submit If Possible.
if (submitted_block_num < total_block_num && range_worker_status_tls_[io_id] == WORKER_IDLE) {
size_t offset = submitted_block_num * (size_t) VAL_AGG_NUM * VALUE_SIZE;
uint32_t size = (submitted_block_num == (total_block_num - 1) ?
last_block_size : (VAL_AGG_NUM * VALUE_SIZE));
range_worker_status_tls_[io_id] = WORKER_SUBMITTED;
range_worker_task_tls_[io_id]->enqueue(
UserIOCB(value_buffer + offset, value_file_dp_[fid], size, offset + foff));
submitted_block_num++;
}
}
}
auto range_clock_end = high_resolution_clock::now();
double elapsed_time = duration_cast<nanoseconds>(range_clock_end - range_clock_beg).count() /
static_cast<double>(1000000000);
total_time_ += elapsed_time;
}
- 對(duì)應(yīng)的IO線程邏輯如下:
void EngineRace::InitPoolingContext() {
io_threads_ = vector<thread>(RANGE_QUEUE_DEPTH);
range_worker_task_tls_.resize(RANGE_QUEUE_DEPTH);
range_worker_status_tls_ = new atomic_int[RANGE_QUEUE_DEPTH];
for (uint32_t io_id = 0; io_id < RANGE_QUEUE_DEPTH; io_id++) {
range_worker_task_tls_[io_id] = new moodycamel::BlockingConcurrentQueue<UserIOCB>();
range_worker_status_tls_[io_id] = WORKER_IDLE;
io_threads_[io_id] = thread([this, io_id]() {
UserIOCB user_iocb;
#ifdef IO_AFFINITY_EXP
setThreadSelfAffinity(io_id);
#endif
double wait_time = 0;
for (;;) {
range_worker_task_tls_[io_id]->wait_dequeue(user_iocb);
if (user_iocb.fd_ == FD_FINISHED) {
log_info("yes! notified, %d", io_id);
break;
} else {
pread(user_iocb.fd_, user_iocb.buffer_, user_iocb.size_, user_iocb.offset_);
range_worker_status_tls_[io_id] = WORKER_COMPLETED;
}
}
});
}
}
- 對(duì)應(yīng)的內(nèi)存visitor線程的邏輯如下:
其中每個(gè)bucket開始有個(gè)barrier過程, 在每次結(jié)束的時(shí)候會(huì)更新free_buffers_
刻蚯。
更新buffers的邏輯就是最后一個(gè)線程將使用完的buffer放入blocking queue绊含。
// End of inner loop, Submit IO Jobs.
int32_t my_order = ++bucket_consumed_num_[future_id];
if (my_order == total_range_num_threads_) {
if ((future_id % (2 * BUCKET_NUM)) < KEEP_REUSE_BUFFER_NUM) {
cached_front_buffers_[future_id] = shared_buffer;
} else {
free_buffers_->push(shared_buffer);
}
}
static thread_local uint32_t bucket_future_id_beg = 0;
uint32_t lower_key_par_id = 0;
uint32_t upper_key_par_id = BUCKET_NUM - 1;
for (uint32_t bucket_id = lower_key_par_id; bucket_id < upper_key_par_id + 1; bucket_id++) {
range_barrier_ptr_->Wait();
uint32_t future_id = bucket_id + bucket_future_id_beg;
char *shared_buffer;
uint32_t relative_id = future_id % (2 * BUCKET_NUM);
if (relative_id >= BUCKET_NUM && relative_id < BUCKET_NUM + KEEP_REUSE_BUFFER_NUM) {
shared_buffer = cached_front_buffers_[relative_id - BUCKET_NUM];
} else {
if (tid == 0) {
auto wait_start_clock = high_resolutIOn_clock::now();
shared_buffer = futures_[future_id].get();
auto wait_end_clock = high_resolutIOn_clock::now();
double elapsed_time = duratIOn_cast<nanoseconds>(wait_end_clock - wait_start_clock).count() /
static_cast<double>(1000000000);
wait_get_time_ += elapsed_time;
} else {
shared_buffer = futures_[future_id].get();
}
}
uint32_t in_par_id_beg = 0;
uint32_t in_par_id_end = mmap_meta_cnt_[bucket_id];
uint64_t prev_key = 0;
for (uint32_t in_par_id = in_par_id_beg; in_par_id < in_par_id_end; in_par_id++) {
// Skip the equalities.
uint64_t big_endian_key = index_[bucket_id][in_par_id].key_;
if (in_par_id != in_par_id_beg) {
if (big_endian_key == prev_key) {
continue;
}
}
prev_key = big_endian_key;
// Key (to little endian first).
(*(uint64_t *) polar_key_ptr_->data()) = bswap_64(big_endian_key);
// Value.
uint64_t val_id = index_[bucket_id][in_par_id].value_offset_;
polar_val_ptr_ = PolarString(shared_buffer + val_id * VALUE_SIZE, VALUE_SIZE);
// Visit Key/Value.
visitor.Visit(*polar_key_ptr_, polar_val_ptr_);
}
// End of inner loop, Submit IO Jobs.
int32_t my_order = ++bucket_consumed_num_[future_id];
if (my_order == total_range_num_threads_) {
if ((future_id % (2 * BUCKET_NUM)) < KEEP_REUSE_BUFFER_NUM) {
cached_front_buffers_[future_id] = shared_buffer;
} else {
free_buffers_->push(shared_buffer);
}
}
}
bucket_future_id_beg += BUCKET_NUM;
4.4.3. 優(yōu)化1: 增大IO和內(nèi)存訪問的overlap區(qū)域
- 每次處理兩輪的任務(wù), 來最小化沒有overlapped的IO和內(nèi)存訪問的時(shí)間。
詳細(xì)可見IO線程的邏輯(Odd Round, Even Round)炊汹。
4.4.4. 優(yōu)化2: 減少IO數(shù)量 (充分利用剩余的內(nèi)存)
利用cache一些buffers減少IO數(shù)量: cache前幾塊buffer來進(jìn)行第二次range的優(yōu)化, 減少IO數(shù)量躬充。
我們?cè)O(shè)計(jì)了free_buffers_
的邏輯來精確地控制IO buffers和cache的使用。
詳細(xì)實(shí)現(xiàn)可見內(nèi)存visitor線程收尾階段讨便,核心代碼如下:
// End of inner loop, Submit IO Jobs.
int32_t my_order = ++bucket_consumed_num_[future_id];
if (my_order == total_range_num_threads_) {
if ((future_id % (2 * BUCKET_NUM)) < KEEP_REUSE_BUFFER_NUM) {
cached_front_buffers_[future_id] = shared_buffer;
} else {
free_buffers_->push(shared_buffer);
}
}
4.4.5. 優(yōu)化3: 第一次IO前Populate value-buffer內(nèi)存
- 通過耗時(shí)
0.06
seconds 左右的內(nèi)存populate充甚,來使得buffer在第一次使用時(shí)候也能達(dá)到磁盤讀取峰值性能(減少0.1秒)。
if (is_first && tid < MAX_TOTAL_BUFFER_NUM) {
// Really populate the physical memory.
log_info("Tid: %d, Load Physical Mem %d", tid, tid);
for (uint32_t off = 0; off < val_buffer_max_size_; off += FILESYSTEM_BLOCK_SIZE) {
value_shared_buffers_[tid][off] = -1;
}
is_first = false;
}
4.4.6. 優(yōu)化4: 設(shè)置affinity
- 通過set-affinity減少numa之間的切換(io線程綁定到 core-id
0-7
)霸褒,(減少0.2秒)伴找。
可參考文檔:https://blogs.igalia.com/dpino/2015/10/15/multicore-architectures-and-cpu-affinity/ 。
If a process is running on a core which heavily interacts with an I/O device
belonging to different NUMA node, performance degradation issues may appear.
NUMA considerably benefits from the data locality principle,
so devices and processes operating on the same data should run within the same NUMA node.
5. 比賽經(jīng)驗(yàn)總結(jié)和感想
在Key-Value存儲(chǔ)引擎比賽中废菱,我們學(xué)習(xí)了POLARDB數(shù)據(jù)庫存儲(chǔ)引擎相關(guān)技術(shù)技矮,
設(shè)計(jì)了相應(yīng)的文件結(jié)構(gòu)和并發(fā)查詢算法,來充分榨干傲騰存儲(chǔ)的IO能力昙啄。
阿里巴巴一向是一個(gè)注重開放穆役,使用和創(chuàng)新技術(shù)的公司,感謝貴司舉辦的活動(dòng)梳凛,
讓我們了解最新的存儲(chǔ)硬件耿币,業(yè)界需求以及和其他高手互相切磋,共同進(jìn)步
6. 附錄
6.1. 代碼中的一些常量
6.1.1. 存儲(chǔ)和索引設(shè)計(jì)相關(guān)
// Buffers.
#define TMP_KEY_BUFFER_SIZE (512)
#define TMP_VALUE_BUFFER_SIZE (4)
// Key/Value Files.
#define VALUE_SIZE (4096)
// Buckets.
#define BUCKET_DIGITS (10) // k-v-buckets must be the same for the range query
#define BUCKET_NUM (1 << BUCKET_DIGITS)
// Max Bucket Size * BUCKET_NUM.
#define MAX_TOTAL_SIZE (68 * 1024 * 1024)
#define KEY_FILE_DIGITS (5) // must make sure same bucket in the same file
#define KEY_FILE_NUM (1 << KEY_FILE_DIGITS)
#define MAX_KEY_BUCKET_SIZE (MAX_TOTAL_SIZE / BUCKET_NUM / FILESYSTEM_BLOCK_SIZE * FILESYSTEM_BLOCK_SIZE)
#define VAL_FILE_DIGITS (5)
#define VAL_FILE_NUM (1 << VAL_FILE_DIGITS) // must make sure same bucket in the same file
#define MAX_VAL_BUCKET_SIZE (MAX_TOTAL_SIZE / BUCKET_NUM / FILESYSTEM_BLOCK_SIZE * FILESYSTEM_BLOCK_SIZE)
6.1.2. 三個(gè)階段邏輯相關(guān)
// Write.
#define WRITE_BARRIER_NUM (16)
// Read.
#define NUM_READ_KEY_THREADS (NUM_THREADS)
#define NUM_FLUSH_TMP_THREADS (32u)
#define KEY_READ_BLOCK_COUNT (8192u)
// Range.
#define RECYCLE_BUFFER_NUM (2u)
#define KEEP_REUSE_BUFFER_NUM (3u)
#define MAX_TOTAL_BUFFER_NUM (RECYCLE_BUFFER_NUM + KEEP_REUSE_BUFFER_NUM)
#define SHRINK_SYNC_FACTOR (2) // should be divided
// Range Thread Pool.
#define RANGE_QUEUE_DEPTH (8u)
#define VAL_AGG_NUM (32)
#define WORKER_IDLE (0)
#define WORKER_SUBMITTED (1)
#define WORKER_COMPLETED (2)
#define FD_FINISHED (-2)