??本文主要講解了RocksDB中二階段提交的實(shí)現(xiàn)。本文總結(jié)一下共有如下幾個(gè)要點(diǎn):
- Modification of the WAL format
- Extension of the existing transaction API
- Modification of the write path
- Modification of the recovery path
- Integration with MyRocks
1玻募、 Modification of WAL Format
??WAL包含一個(gè)或多個(gè)log文件,每個(gè)log的內(nèi)容都是序列化后的WriteBatches疑俭,在執(zhí)行recovery 時(shí),WriteBatches 可以從logs種重建出來(lái)崖面。要修改WAL的格式或者擴(kuò)展其功能侄柔,只需要關(guān)注WriteBatch即可既们。
??WriteBatch就是Records的有序集合,這些Record主要包括Put(k,v), Merge(k,v), Delete(k), SingleDelete(k)不铆,每一個(gè)都代表了RocksDB的一種寫操作蝌焚。每一個(gè)Record都有一個(gè)二進(jìn)制的字符串表示。當(dāng)Records 添加到WriteBatch時(shí)誓斥,他們的二進(jìn)制表示也被append到WriteBatch的二進(jìn)制字符串表示中只洒。WriteBatch的二進(jìn)制字符串前綴是其起始的序列號(hào)以及batch中的record 個(gè)數(shù)。每個(gè)record都會(huì)有一個(gè)column family modifier record(如果column family是default的話劳坑,可以省略)毕谴。
??可以通過(guò)擴(kuò)展WriteBatch::Handler來(lái)遍歷WriteBatch并執(zhí)行一些操作。MemTableInserter 就是WriteBatch::Handler的擴(kuò)展,其功能就是將WriteBatch中的操作寫入到對(duì)應(yīng)的 column family的MemTable中涝开。
?? WriteBatch的邏輯形式有可能是這樣:
Sequence(0);NumRecords(3);Put(a,1);Merge(a,1);Delete(a);
??2PC的WriteBatch format還包括另外四條Records
- Prepare(xid)
- EndPrepare()
- Commit(xid)
- Rollback(xid)
??一個(gè)可以2PC的WriteBatch可能類似下面的邏輯:
Sequence(0);NumRecords(6);Prepare(foo);Put(a,b);Put(x,y);EndPrepare();Put(j,k);Commit(foo);
??Prepare(foo)和EndPrepare()之間的記錄是transaction (ID='foo')的操作循帐。Commit(foo)表示提交這個(gè)transaction,Rollback(foo)表示回滾這個(gè)transaction舀武。
Sequence ID Distribution
??當(dāng)WriteBatch通過(guò)MemTableInserter被寫入到memtable時(shí)拄养,WriteBatch中的每一個(gè)operation的sequence ID加上這個(gè)WriteBatch中的Oprator的index。但是银舱,在2PC的WriteBatch中并沒有繼續(xù)保持這種sequence id的映射方法瘪匿。Operations contained within a Prepare() enclosure will consume sequence IDs as if they were inserted starting at the location of their relative Commit() marker. This Commit() marker may be in a different WriteBatch or log from the prepared operations to which it applies.
Backwards Compatibility
??WAL format并沒有版本話,所以我們需要注意后相兼容纵朋。當(dāng)前版本的RocksDB不能從一個(gè)包含2PC 標(biāo)記的WAL 文件中recovery柿顶。在實(shí)際recover時(shí)茄袖,遇到不能識(shí)別的Record會(huì)打印fatal 信息操软。有點(diǎn)麻煩,但是開發(fā)者可以對(duì)當(dāng)前的RocksDB版本打patch以便能夠跳過(guò)prepared sections和不能識(shí)別的markers宪祥,這樣就可以從新版本的WAL format 恢復(fù)數(shù)據(jù)聂薪。
2、Extension of Transaction API
??當(dāng)前我們只focus到樂(lè)觀事物的2PC蝗羊。client必須提前聲明是否使用二階段提交藏澳,例如以下代碼:
TransactionDB* db;
TransactionDB::Open(Options(), TransactionDBOptions(), "foodb", &db);
TransactionOptions txn_options;
txn_options.two_phase_commit = tr
txn_options.xid = "12345";
Transaction* txn = db->BeginTransaction(write_options, txn_options);
txn->Put(...);
txn->Prepare();
txn->Commit();
transaction狀態(tài)有:
enum ExecutionStatus {
STARTED = 0,
AWAITING_PREPARE = 1,
PREPARED = 2,
AWAITING_COMMIT = 3,
COMMITED = 4,
AWAITING_ROLLBACK = 5,
ROLLEDBACK = 6,
LOCKS_STOLEN = 7,
};
??transaction API會(huì)調(diào)用一個(gè)Prepare()函數(shù)。Prepare函數(shù)會(huì)通過(guò)一個(gè)context調(diào)用WriteImpl耀找,通過(guò)context翔悠,WriteImpl和WriteThread可以訪問(wèn)ExcutionStatus、XID和WriteBatch野芒。WriteBatch會(huì)先寫入一個(gè)Prepare(xid)標(biāo)記蓄愁,然后寫入WriteBatch的內(nèi)容,再寫入EndPrepare()標(biāo)記狞悲。這期間并沒有memtable的寫入撮抓。當(dāng)transaction執(zhí)行了commit時(shí),會(huì)再次調(diào)用WriteImpl摇锋。此時(shí)丹拯,Commit()標(biāo)記會(huì)寫入WAL,WriteBatch的內(nèi)容會(huì)寫入相應(yīng)的memtable荸恕。當(dāng)transaction調(diào)用Rollback()時(shí)乖酬,transaction內(nèi)容會(huì)被清除,然后調(diào)用WriteImpl融求,寫入Rollback(xid)標(biāo)記(如果當(dāng)前事物處于Prepare狀態(tài))咬像。
??這些所謂的"meta markers"(Prepare(xid), EndPrepare(), Commit(xid), Rollback(xid))不會(huì)直接寫入到write batch中。write path (WriteImpl())會(huì)持有正在寫的事物的context,并使用這個(gè)context將相關(guān)的markers寫入到WAL(所以這些標(biāo)記在寫入到WAL之前先寫入到聚合后的WriteBatch)施掏。在recovery時(shí)钮惠,這些標(biāo)記會(huì)被MemTableInserter 用來(lái)重建prepared transactions。
Transaction Wallclock Expiration
??在transaction 提交時(shí)七芭,會(huì)有一個(gè)callback素挽,這個(gè)callback在transaction過(guò)期后會(huì)fail掉整個(gè)寫操作。如果transaction過(guò)期了狸驳,那么鎖很容易被其他transction搶占预明。如果一個(gè)transaction在prepare階段沒有過(guò)期的話,那么也不可能在commit階段過(guò)期耙箍。
TransactionDB Modification
??使用transaction前撰糠,client必須打開一個(gè)TransactionDB。這個(gè)TransactionDB 實(shí)例接下來(lái)就可以創(chuàng)建Transactions辩昆。TransactionDB 會(huì)持有一個(gè)映射(from XID to 其創(chuàng)建的所有兩階段的Transaction)阅酪。當(dāng)Transaction被刪除或者Rollback時(shí),就會(huì)從mapping中刪除掉汁针。RocksDB提供API來(lái)查詢所有正在進(jìn)行中的處于Prepare狀態(tài)的transaction术辐。
??TransactionDB 記錄著一個(gè)min heap(所有包含prepared section的log numbers)。當(dāng)transaction處于prepared狀態(tài)時(shí)施无,WriteBatch也會(huì)寫入log辉词,這個(gè)log number就會(huì)存儲(chǔ)在transaction 對(duì)象中,隨后存入到min heap猾骡。當(dāng)transaction commit時(shí)瑞躺,log number就會(huì)從min heap中刪除,但是log number并不會(huì)用于被遺忘掉兴想。接下來(lái)幢哨,就是各個(gè)memtable來(lái)記錄the oldest log,直到memtable flush到L0為止襟企。
3嘱么、Modification of the Write Path
??write path可以被拆解為兩個(gè)主要點(diǎn):DBImpl::WriteImpl(...) and the MemTableInserter。多個(gè)client線程都會(huì)調(diào)用WriteImpl顽悼。第一個(gè)線程會(huì)被設(shè)定角色為 leader曼振,剩余的線程會(huì)被設(shè)定為follower。leader和followers會(huì)被group到一起蔚龙,成為一個(gè)邏輯上的write group冰评。leader負(fù)責(zé)取出writegroup中的所有WriteBatches,聚合在一起木羹,然后將blob寫入到WAL甲雅。結(jié)合writegroup的大小和當(dāng)前內(nèi)存表對(duì)并行寫的支持解孙,leader可以將所有WriteBatches寫入到memtable,也可以由各個(gè)線程寫入線程自己負(fù)責(zé)的WrtieBatches到內(nèi)存表中抛人。
??所有的memtable inserts都是由MemTableInserter負(fù)責(zé)弛姜。 a WriteBatch iterator handler也是WriteBatch::Handler的一種實(shí)現(xiàn)。這個(gè)handler遍歷WriteBatch中的所有元素(Put, Delete, Merge)妖枚,將每個(gè)call寫入到對(duì)應(yīng)的MemTable廷臼。MemTableInserter 也會(huì)處理已就緒的merges, deletes and updates。
??Modification of the write path需要傳入一個(gè)參數(shù)到DBImpl::WriteImpl绝页,這個(gè)參數(shù)是一個(gè)指針荠商,指向一個(gè)2PC的transaction實(shí)例。通過(guò)這個(gè)實(shí)例续誉,可以查詢到二階段transaction的當(dāng)前狀態(tài)莱没。一個(gè)2PC transaction會(huì)在preparation、commit和roll-back時(shí)各調(diào)用一次WriteImpl 酷鸦。
Status DBImpl::WriteImpl(
const WriteOptions& write_options,
WriteBatch* my_batch,
WriteCallback* callback,
Transaction* txn
) {
WriteThread::Writer w;
//...
w.txn = txn; // writethreads also have txn context for memtable insert
// we are now the group leader
int total_count = 0;
uint64_t total_byte_size = 0;
for (auto writer : write_group) {
if (writer->CheckCallback(this)) {
if (writer->ShouldWriteToMem())
total_count += WriteBatchInternal::Count(writer->batch)
}
}
const SequenceNumber current_sequence = last_sequence + 1;
last_sequence += total_count;
// now we produce the WAL entry from our write group
for (auto writer : write_group) {
// currently only optimistic transactions use callbacks
// and optimistic transaction do not support 2pc
if (writer->CallbackFailed()) {
continue;
} else if (writer->IsCommitPhase()) {
WriteBatchInternal::MarkCommit(merged_batch, writer->txn->XID_);
} else if (writer->IsRollbackPhase()) {
WriteBatchInternal::MarkRollback(merged_batch, writer->txn->XID_);
} else if (writer->IsPreparePhase()) {
WriteBatchInternal::MarkBeginPrepare(merged_batch, writer->txn->XID_);
WriteBatchInternal::Append(merged_batch, writer->batch);
WriteBatchInternal::MarkEndPrepare(merged_batch);
writer->txn->log_number_ = logfile_number_;
} else {
assert(writer->ShouldWriteToMem());
WriteBatchInternal::Append(merged_batch, writer->batch);
}
}
//now do MemTable Inserts for WriteGroup
}
WriteBatchInternal::InsertInto也可以調(diào)整為只遍歷沒有相關(guān)聯(lián)的Transaction 或處于COMMIT狀態(tài)的寫饰躲。由上述代碼可以看出,當(dāng)transaction處于prepared狀態(tài)時(shí)井佑,transaction會(huì)記錄log num属铁。在insert時(shí)眠寿,每個(gè)Memtable都會(huì)記錄最小的log number躬翁。
4、Modification of Recovery Path
??當(dāng)前的recovery path已經(jīng)很好地適配了兩階段提交盯拱,按照順序盒发,依次遍歷log中的所有batches,按照l(shuí)og number 依次feed到MemTableInserter狡逢。MemTableInserter 會(huì)遍歷所有的batches宁舰,然后將值寫入到正確的MemTable中∩莼耄基于當(dāng)前的log number蛮艰,每個(gè)MemTable知道該忽略掉哪些values。
??要想recovery 時(shí)可以處理2PC的一些操作雀彼,我們需要擴(kuò)展MemTableInserter 壤蚜,使其感知到4個(gè)新的meta markers。
??需要記住的是:當(dāng)2PC transaction commit時(shí)徊哑,就會(huì)包含一些操作在多個(gè)CF上的insertions袜刷。這些MemTable是在不同的時(shí)間點(diǎn)上執(zhí)行flush。我們?nèi)匀豢梢允褂肅F的log number莺丑,在recovered, two phase, committed transaction時(shí)避免重復(fù)寫入著蟹。
1墩蔓、Two Phase Transactions TXN inserts into CFA and CFB
2、TXN prepared to LOG 1
3萧豆、TXN marked as COMMITTED in LOG 2
4奸披、TXN is inserted into MemTables
5、CFA is flushed to L0
6涮雷、CFA log_number is now LOG 3
7源内、CFB has not been flushed and it still referencing LOG 1 prep section
8、CRASH RECOVERY
9份殿、LOG 1 is still around because CFB was referencing LOG 1 prep section
10膜钓、Iterate over logs starting at LOG 1
11、CFB has prepared values reinserted into mem, again referencing LOG 1 prep section
12卿嘲、CFA skips insertion from commit marker in LOG 2 because it is 13颂斜、consistent to LOG 3
13、CFB is flushed to L0 and is now consistent to LOG 3
14拾枣、LOG 1, LOG 2 can now be released
Rebuilding Transactions
??如上所述沃疮,modification of the recovery path只需修改MemTableInserter ,使其可以handle 新的meta-markers即可梅肤。在recovery時(shí)司蔬,我們不能訪問(wèn)TransactionDB的實(shí)例,我們必須重建一個(gè)hollow ‘shill’的transaction姨蝴。這就是所有recovered prepared transactions的衣蛾mapping(XID → (WriteBatch, log_number))俊啼。當(dāng)遇到一個(gè)Commit(xid) marker時(shí),就會(huì)嘗試查找對(duì)應(yīng)xid的shill transaction左医,然后寫入到Mem授帕。如果遇到一個(gè)rollback(xid) marker,我們就會(huì)delete 這個(gè)shill transaction浮梢。recovery末期跛十,以shill的形式剩下一個(gè)所有處于Prepared狀態(tài)的transaction。
log lifespan
??要想知道最小的log秕硝,我們必須找到每個(gè)CF的最小的log number芥映。我們也需要考慮TransactionDB的prepared sections heap中的最小value。這代表了最早的log(包含一個(gè)還沒有提交的prepared section)远豺。我們也需要考慮all MemTables和沒有flush的ImmutableMemTables 的最小prep section奈偏。這三種value的最小值就是含有數(shù)據(jù)但是還沒有flush到L0的最早的log。