這篇是這個系列的最后一篇了桥狡,整個分析持續(xù)了兩個月多,包括floyd和pink源碼,差不多就每個周日會花幾個小時分析一下满钟,和查些資料嘴拢,畢竟自己也是在學(xué)習(xí)過程中桩盲,可能會有些理解不正確的地方。
在學(xué)習(xí)raft的過程中席吴,看了好些文章和別人的分析赌结,理解起來似乎比較容易捞蛋,但如果是自己從零實現(xiàn)一個可能要花些時間,況且不會靈活的運用到具體項目中柬姚,和結(jié)合現(xiàn)有的開源項目拟杉,比如leveldb和rocksdb等。
這篇結(jié)束后量承,準(zhǔn)備分析下leveldb里的一些實現(xiàn)搬设,可能網(wǎng)上已經(jīng)有很多類型的文章,但看別人分析還不如自己深入源碼宴合,以前也做過類似的事情焕梅,比如redis和leveldb,但時間較久也沒有沉淀下來卦洽,所以也陌生了贞言。計劃是后兩個月分析下libco/pebble里的協(xié)程實現(xiàn),后面大半年就分析單機版的kv存儲如leveldb和消息中間件阀蒂,比較傾向于rocketmq或消息隊列phxqueue该窗。
先列一下floyd框架,再分析下raft節(jié)點增加和減少的情況蚤霞。
floyd是單進程多線程的框架項目酗失,就直接上代碼了,不畫流程圖和類圖了昧绣」骐龋看了下example程序,在main中實例化一個floyd對象夜畴,如下:
368 Status Floyd::Open(const Options& options, Floyd** floyd) {
369 *floyd = NULL;
370 Status s;
371 FloydImpl *impl = new FloydImpl(options);
372 s = impl->Init();
373 if (s.ok()) {
374 *floyd = impl;
375 } else {
376 delete impl;
377 }
378 return s;
379 }
全部功能都放在Init中拖刃,部分代碼如下:
274 Status FloydImpl::Init() {
275 slash::CreatePath(options_.path);
276 if (NewLogger(options_.path + "/LOG", &info_log_) != 0) {
277 return Status::Corruption("Open LOG failed, ", strerror(errno));
278 }
279
280 // TODO(anan) set timeout and retry
281 worker_client_pool_ = new ClientPool(info_log_);
282
283 // Create DB
284 rocksdb::Options options;
285 options.create_if_missing = true;
286 options.write_buffer_size = 1024 * 1024 * 1024;
287 options.max_background_flushes = 8;
288 rocksdb::Status s = rocksdb::DB::Open(options, options_.path + "/db/", &db_);
289 if (!s.ok()) {
291 return Status::Corruption("Open DB failed, " + s.ToString());
292 }
293
294 s = rocksdb::DB::Open(options, options_.path + "/log/", &log_and_meta_);
295 if (!s.ok()) {
297 return Status::Corruption("Open DB log_and_meta failed, " + s.ToString());
298 }
300 // Recover Context
301 raft_log_ = new RaftLog(log_and_meta_, info_log_);
302 raft_meta_ = new RaftMeta(log_and_meta_, info_log_);
303 raft_meta_->Init();
304 context_ = new FloydContext(options_);
305 context_->RecoverInit(raft_meta_);
306
307 // Recover Members when exist
308 std::string mval;
309 Membership db_members;
310 s = db_->Get(rocksdb::ReadOptions(), kMemberConfigKey, &mval);
311 if (s.ok()
312 && db_members.ParseFromString(mval)) {
315 for (int i = 0; i < db_members.nodes_size(); i++) {
316 context_->members.insert(db_members.nodes(i));
317 }
318 } else {
319 BuildMembership(options_.members, &db_members);
320 if(!db_members.SerializeToString(&mval)) {
322 return Status::Corruption("Serialize Membership failed");
323 }
324 s = db_->Put(rocksdb::WriteOptions(), kMemberConfigKey, mval);
325 if (!s.ok()) {
327 return Status::Corruption("Record membership in db failed! error: " + s.ToString());
328 }
330 for (const auto& m : options_.members) {
331 context_->members.insert(m);
332 }
333 }
337 primary_ = new FloydPrimary(context_, &peers_, raft_meta_, options_, info_log_);
340 worker_ = new FloydWorker(options_.local_port, 1000, this);
341 int ret = 0;
342 if ((ret = worker_->Start()) != 0) {
344 return Status::Corruption("failed to start worker, return " + std::to_string(ret));
345 }
347 apply_ = new FloydApply(context_, db_, raft_meta_, raft_log_, this, info_log_);
348
349 InitPeers();
354 if ((ret = primary_->Start()) != 0) {
356 return Status::Corruption("failed to start primary thread, return " + std::to_string(ret));
357 }
358 primary_->AddTask(kCheckLeader);
361 apply_->Start();
365 return Status::OK();
366 }
以上代碼主要工作做了:創(chuàng)始日志,創(chuàng)建客戶端對象池(里面會對每個server addr創(chuàng)建一條連接贪绘,源碼在pink項目中)用于后續(xù)發(fā)送請求兑牡,創(chuàng)建兩個db,用于state machine和log entry税灌;從log中恢復(fù)狀態(tài)數(shù)據(jù)比如term/voteip/voteport/commit_index/last_applied等均函;接著從db中或參數(shù)中恢復(fù)節(jié)點信息;創(chuàng)建FloydPrimary線程菱涤,此時還沒啟動苞也,它的主要工作是心跳,定時檢查leader粘秆,執(zhí)行command墩朦,向其他節(jié)點發(fā)送vote rpc或append log entry rpc,體現(xiàn)在這三個函數(shù)中翻擒,由AddTask分發(fā):
62 static void LaunchHeartBeatWrapper(void *arg);
63 void LaunchHeartBeat();
64 static void LaunchCheckLeaderWrapper(void *arg);
65 void LaunchCheckLeader();
66 static void LaunchNewCommandWrapper(void *arg);
67 void LaunchNewCommand();
接著創(chuàng)建FloydWorker線程并啟動它氓涣,它的工作是處理請求牛哺,比如給節(jié)點線程發(fā)vote rpc/append log entry rpc任務(wù),由DealMessage分發(fā)劳吠,然后根據(jù)request_type具體走不同的邏輯引润,調(diào)用FloydImpl類中的函數(shù);
接著創(chuàng)建FloydApply線程痒玩,它的工作是執(zhí)行command淳附,即把log entry中的command apply到 state machine中,處理成員變更的情況蠢古;
接著根據(jù)節(jié)點個數(shù)奴曙,創(chuàng)建對應(yīng)個數(shù)的節(jié)點線程并啟動,主要工作是向?qū)?yīng)server發(fā)送vote rpc/append log entry rpc請求和處理響應(yīng)草讶,維護狀態(tài)等洽糟;
最后是啟動FloydPrimary并立即發(fā)一個選舉leader的任務(wù)(節(jié)點剛啟動時都為follower),接著啟動FloydApply堕战;
以上線程坤溃,有些創(chuàng)建便啟動,有些等其他線程啟動完后再啟動嘱丢,這里有個順序依賴薪介,主要看誰驅(qū)動誰,比如FloydPrimary線程里會用到節(jié)點線程越驻,就不能以相反的順序start否則可能引起coredump汁政;
大致整個框架差不多分析完了,pink中的相關(guān)源碼沒有在這里列出來缀旁,跳過了记劈。
在成員變更的同時,需要保證安全必一诵棵,即“在任何時候抠蚣,都不會出現(xiàn)雙主祝旷÷陌模”
主要有兩種方法:
One-Server變更:一階段變更,要求每次成員組從G1變成G2時怀跛,G2相比G1加一個成員或者減一個成員距贷。
Joint Consensus:支持任意的變更,即從成員組G1變成G2吻谋,不要求G1和G2有什么關(guān)聯(lián)忠蝗,比如可以完全沒有交集。
第一種比較容易理解漓拾,實現(xiàn)起來也簡單阁最,floyd中也使用的第一種戒祠,這邊大概說一下基本流程吧,至于為什么這種方法可行速种,可以參考下面鏈接的分析姜盈;
以增加成員為例,raft在收到增加server成員請求時配阵,每次只增加一臺server馏颂,后臺程序的Leader執(zhí)行流程如下:
-->AddServer-->BuildAddServerRequest-->DoCommand-->ExecuteCommand-->BuildLogEntry-->Append-->AddTask-->NoticePeerTask-->AddAppendEntriesTask
假設(shè)經(jīng)過大多數(shù)的返回后,leader 把command apply到狀態(tài)機中后棋傍,后續(xù)follower也推進apply id救拉,把這條日志apply 狀態(tài)機中去,此時流程如下:
-->Apply-->MembershipChange-->AddNewPeer
219 void FloydImpl::AddNewPeer(const std::string& server) {
220 if (IsSelf(server)) {
221 return;
222 }
223 // Add Peer
224 auto peers_iter = peers_.find(server);
225 if (peers_iter == peers_.end()) {
226 LOGV(INFO_LEVEL, info_log_, "FloydImpl::ApplyAddMember server %s:%d add new peer thread %s",
227 options_.local_ip.c_str(), options_.local_port, server.c_str());
228 Peer* pt = new Peer(server, &peers_, context_, primary_, raft_meta_, raft_log_,
229 worker_client_pool_, apply_, options_, info_log_);
230 peers_.insert(std::pair<std::string, Peer*>(server, pt));
231 pt->Start();
232 }
233 }
http://loopjump.com/raft_paper_note/
http://loopjump.com/raft_one_server_reconfiguration/