導(dǎo)讀
我們還是按照上一篇博文的那三個(gè)函數(shù)順序往下走:Put
,Get
和Delete
,以自上而下的視角一點(diǎn)一點(diǎn)剖析leveldb。那么本篇就主要講Put的實(shí)現(xiàn)划咐。Put的實(shí)現(xiàn)講的還是偏上層的,因?yàn)镻ut可能引發(fā)compaction钧萍,而compaction等細(xì)節(jié)內(nèi)容還是需要專門一篇文章才能描述清楚褐缠。那么我們開始吧!
流程
還記得我們上一篇講解的Put的流程是咋樣的嗎风瘦?
Put操作首先將操作記錄寫入log文件队魏,然后寫入memtable,返回寫成功万搔。整體來看是這樣胡桨,但是會引發(fā)下面的問題:
1. 寫log的時(shí)候是實(shí)時(shí)刷到磁盤的嗎?
2. 寫入的時(shí)候memtable過大了咋辦瞬雹?
3. 同時(shí)多個(gè)線程并發(fā)寫咋辦昧谊?
......
下面的分析中就會面對這些問題,有些答案很清晰酗捌,有些涉及到超級多細(xì)節(jié)呢诬。
step by step
在開始之前,我們知道Write操作是要記錄到log文件中的胖缤。那么一個(gè)記錄它的格式是怎樣的呢尚镰?看圖:
這里特別解釋一下,Delete操作也是通過Put實(shí)現(xiàn)的哪廓,只是圖中的類型字段是0狗唉,而正常Write的操作類型是1,由此區(qū)分寫操作和刪除操作涡真!
step 1
很簡單吧分俯。這里講解一下那三個(gè)參數(shù):
- WriteOptions:提供一些寫操作的配置項(xiàng),例如要不要寫log的時(shí)候馬上flush磁盤
- key和value就是對應(yīng)的keyValue综膀,slice只是作者自己封裝的char數(shù)組存儲數(shù)據(jù)而已澳迫。<b>大牛喜歡把所有東西都封裝一下局齿,賦予數(shù)據(jù)結(jié)構(gòu)意義剧劝!</b>這在大的工程里面是很有意義的,既方便操作抓歼,也方便思考(這樣就不用思考底層的真實(shí)的char數(shù)組還是啥)讥此。
step 2
這里的WriteBatch的意思就是多個(gè)寫并起來操作的意思拢锹。這里可以學(xué)習(xí)一個(gè)思維,支持多key寫萄喳,單key寫就是key為1的特例卒稳。底層完全按照WriteBatch的實(shí)現(xiàn)方式
step 3
下面是真實(shí)的各種復(fù)雜邏輯的操作了。下面的代碼有點(diǎn)長他巨,所以基本都在代碼中注釋講解了充坑。部分調(diào)用函數(shù)的細(xì)節(jié)下面會繼續(xù),請耐心欣賞染突,畢竟人家也不是幾分鐘就寫出來了捻爷,哈哈:
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
Writer w(&mutex_);
w.batch = my_batch;
w.sync = options.sync; //log是否馬上刷到磁盤,如果false會有數(shù)據(jù)丟失的風(fēng)險(xiǎn)
w.done = false; //標(biāo)記寫入是否完成
//串行化writer,如果有其他writer在執(zhí)行則進(jìn)入隊(duì)列等待被喚醒執(zhí)行
MutexLock l(&mutex_);
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
//writer的任務(wù)可能被其他writer幫忙執(zhí)行了
//如果是則直接返回
if (w.done) {
return w.status;
}
// May temporarily unlock and wait.
Status status = MakeRoomForWrite(my_batch == NULL); //寫入前的各種檢查
//是否該停寫,是否該切memtable,是否該compact
//獲取本次寫入的版本號,其實(shí)就是個(gè)uint64
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
//這里writer還是隊(duì)列中第一個(gè),由于下面會隊(duì)列前面的writers也可能合并起來,所以last_writer指針會指向被合并的最后一個(gè)writer
if (status.ok() && my_batch != NULL) { // NULL batch is for compactions
WriteBatch* updates = BuildBatchGroup(&last_writer); //這里會把writers隊(duì)列中的其他適合的寫操作一起執(zhí)行
WriteBatchInternal::SetSequence(updates, last_sequence + 1); //把版本號寫入batch中
last_sequence += WriteBatchInternal::Count(updates); //updates如果合并了n條操作,版本號也會向前跳躍n
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(updates)); //寫log了,不容易啊,等那么久
bool sync_error = false;
if (status.ok() && options.sync) { //馬上要flush到磁盤
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_); //恩,是時(shí)候插入memtable中了
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (updates == tmp_batch_) tmp_batch_->Clear();
versions_->SetLastSequence(last_sequence);
}
while (true) { //在這里喚醒已經(jīng)幫它干完活的writer線程,讓它早早回家,別傻傻等了
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
// Notify new head of write queue
if (!writers_.empty()) { //喚醒還沒干活的等待的第一位,叫醒它自己去干活,老子干完了,哈哈,典型FIFO,公平
writers_.front()->cv.Signal();
}
return status;
}
嗯,核心的write代碼已經(jīng)在這里欣賞完了份企∫查基本邏輯分為五步:
- 隊(duì)列化請求
- 寫入的前期檢查和保證
- 按格式組裝數(shù)據(jù)為二進(jìn)制
- 寫入log文件和memtable
- 喚醒隊(duì)列的其他人去干活,自己返回
這個(gè)函數(shù)的內(nèi)容就這么多司志,但是有些內(nèi)容調(diào)用我們還沒有講甜紫,下面會選擇部分重中之重的講。例如添加前的檢查保證骂远,compaction囚霸。
step 4 MakeRoomForWrite函數(shù)
我們在step 3的時(shí)候知道了MakeRoomForWrite函數(shù)的存在,就是寫入前的各種檢查激才,例如:
- memtable滿了達(dá)到大小的限制了沒邮辽?沒有直接可以插入
- memtable達(dá)到太大之后需要切換為Imuable memtable,這時(shí)候需要檢查舊的Imuable memtable已經(jīng)load到磁盤沒
- 由于Imuable需要load入磁盤贸营,level 0的文件數(shù)超過限制沒
吨述。。钞脂。揣云。。冰啃。
在進(jìn)入邏輯代碼之前需要解釋一個(gè)概念:<b>compaction</b>
<b>compaction</b>是指:Imuable memtablecompact到level 0 或者level n compact 到level n + 1邓夕, 就是數(shù)據(jù)向搞level流動的操作,這個(gè)操作會保證數(shù)據(jù)在level中是全局有序的阎毅,除了level 0.
這里只是簡單列舉幾個(gè)需要考慮的邏輯焚刚,具體檢查看下面MakeRoomForWrite的實(shí)現(xiàn):
Status DBImpl::MakeRoomForWrite(bool force) { // force代表需要強(qiáng)制compaction與否
mutex_.AssertHeld(); //保證進(jìn)入該函數(shù)前已經(jīng)加鎖
assert(!writers_.empty());
bool allow_delay = !force; //allow_delay代表compaction可以延后
Status s;
while (true) {
if (!bg_error_.ok()) { //如果后臺任務(wù)已經(jīng)出錯,直接返回錯誤
// Yield previous error
s = bg_error_;
break;
} else if (//level0的文件數(shù)限制超過8,睡眠1ms,簡單等待后臺任務(wù)執(zhí)行
allow_delay &&
versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) { //level 0超過8個(gè)sst文件了
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
// individual write by 1ms to reduce latency variance. Also,
// this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer.
mutex_.Unlock();
env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { //memtable的大小限制,default為4MB
// There is room in current memtable
break;
} else if (imm_ != NULL) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
Log(options_.info_log, "Current memtable full; waiting...\n"); //等待之前的imuable memtable完成compact到level0
bg_cv_.Wait(); // bg_cv_為后臺程序的條件變量,后臺程序就是做compact的
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { //level0的文件數(shù)超過12,強(qiáng)制等待
// There are too many level-0 files.
Log(options_.info_log, "Too many L0 files; waiting...\n");
bg_cv_.Wait();
} else { //下面是切換到新的memtable和觸發(fā)舊的進(jìn)行compaction
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = NULL;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile); //生成新的log文件
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
//刪除舊的log對象分配新的
delete log_;
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_; //切換memtable到Imuable memtable
has_imm_.Release_Store(imm_);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction(); //如果需要進(jìn)行compaction,后臺執(zhí)行
}
}
return s;
}
由上面的代碼可知,這是一個(gè)while循環(huán)扇调,直到確保數(shù)據(jù)庫可以寫入了才會返回矿咕。
流程大概是:
1. 后臺任務(wù)有無出現(xiàn)錯誤?出現(xiàn)錯誤直接返回錯誤。(compaction是后臺線程執(zhí)行的)
2. level 0 的文件數(shù)超過8個(gè)碳柱,則等待1s再繼續(xù)執(zhí)行捡絮,因?yàn)閘evel 0的文件數(shù)目需要嚴(yán)格控制
3. 如果memtable的大小小于4MB(默認(rèn)值,可以修改)莲镣,直接返回可以插入福稳;
4. 到達(dá)4說明memtable已經(jīng)滿了,這時(shí)候需要切換為Imuable memtable瑞侮。所以這時(shí)候需要等待舊的Imuable memtable compact到level 0的圆,進(jìn)入等待
5. 到達(dá)5說明舊的Imuable memtable已經(jīng)compact到level 0了,這時(shí)候假如level 0的文件數(shù)目到達(dá)了12個(gè)半火,也需要等待
6. 到達(dá)6說明舊的Imuable memtable已經(jīng)compact到磁盤了略板,level 0的文件數(shù)目也符合要求,這時(shí)候就可以生成新的memtable用于數(shù)據(jù)的寫入了慈缔。
由此我們知道這里并沒有涉及到具體的compaction是如何進(jìn)行的叮称,只是保證memtable可以寫入和可能觸發(fā)后臺的compaction。至于compaction的邏輯藐鹤,后面需要專門的一篇文章才能說明清楚瓤檐。
好了,MakeRoomForWrite函數(shù)就說這么多娱节。
step 5 BuildBatchGroup函數(shù)
還有一個(gè)函數(shù)我覺得可以說一下挠蛉,那就是BuildBatchGroup函數(shù)。前面我們說到了writer會被隊(duì)列化肄满,而只有排在第一位的writer才會往下執(zhí)行谴古。而writer都是樂于助人的,它有可能會把排在它后面的writer的活也拿過來一起干了稠歉。那么這段幫忙的邏輯就是在BuildBatchGroup函數(shù)中掰担。
請看這個(gè)函數(shù)做了寫啥:
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
assert(!writers_.empty());
Writer* first = writers_.front(); //當(dāng)前執(zhí)行的writer兄弟
WriteBatch* result = first->batch;
assert(result != NULL);
size_t size = WriteBatchInternal::ByteSize(first->batch); // 計(jì)算要自己要寫入的byte大小
// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
// down the small write too much.
//計(jì)算maxsize,避免自己幫太多忙了導(dǎo)致寫入數(shù)據(jù)過大
size_t max_size = 1 << 20;
if (size <= (128<<10)) {
max_size = size + (128<<10);
}
*last_writer = first;
std::deque<Writer*>::iterator iter = writers_.begin();
++iter; // Advance past "first"
for (; iter != writers_.end(); ++iter) {
Writer* w = *iter;
if (w->sync && !first->sync) { //sync類型不同,你的活我不幫你了
// Do not include a sync write into a batch handled by a non-sync write.
break;
}
if (w->batch != NULL) {
size += WriteBatchInternal::ByteSize(w->batch);
if (size > max_size) { //這個(gè)幫忙數(shù)據(jù)量過大了,我也不幫忙了
// Do not make batch too big
break;
}
// Append to *result
if (result == first->batch) {
// Switch to temporary batch instead of disturbing caller's batch
result = tmp_batch_;
assert(WriteBatchInternal::Count(result) == 0);
WriteBatchInternal::Append(result, first->batch); //來吧,你的活我可以幫你干了
}
WriteBatchInternal::Append(result, w->batch);
}
*last_writer = w;
}
return result;
}
看上面代碼,這段代碼的邏輯比較簡單怒炸。就是遍歷隊(duì)列后面的writer們带饱,一直到遇到幫不了忙的writer就返回了。那么幫不幫忙的標(biāo)準(zhǔn)是啥阅羹?
兩個(gè)標(biāo)準(zhǔn):
- sync類型是否一樣(我不需要馬上flush到磁盤而你要勺疼,你的活還是自己干吧)
- 寫入的數(shù)據(jù)量是不是過大了?(避免單次寫入數(shù)據(jù)量太大)
只要沒有符合這兩個(gè)限制條件捏鱼,就可以幫忙执庐,合并多條write操作為一條操作!
總結(jié)
哇嗚导梆,看了這么多代碼轨淌,是不是對level db的write操作有了一定的了解了迂烁。其實(shí)就是文章開始說的,
- 先寫入log文件
- 再寫入memtable
這兩個(gè)邏輯而已猿诸。然后在寫入之前需要保證可以寫入啊婚被,不然內(nèi)存會跪的狡忙。然后這個(gè)檢查會導(dǎo)致后臺一系列的compaction可能梳虽。
至于什么多個(gè)操作合并為一個(gè)操作,完全是優(yōu)化灾茁,為了更高的性能窜觉!
記住了,我們講完了Put操作北专,其實(shí)Delete操作也已經(jīng)講完了禀挫。為什么?不知道的好好看上面的正文內(nèi)容吧拓颓。下一篇文章语婴,我們將迎來Get函數(shù),我們會一起進(jìn)入level db的查找數(shù)據(jù)之旅驶睦!