Floyd&Raft的源碼分析(二)

這部分接著上一部分,主要分析日志的同步和安全性频鉴,以及成員變更等榄檬,在server成為leader后,會立即更新維護follower的信息:

 72 void Peer::UpdatePeerInfo() {
 73   for (auto& pt : (*peers_)) {
 74     pt.second->set_next_index(raft_log_->GetLastLogIndex() + 1);
 75     pt.second->set_match_index(0);
 76   }
 77 }

這兩個字段的作用會在后面講刹悴,然后立即primary_->AddTask(kHeartBeat, false);

 89 void FloydPrimary::LaunchHeartBeat() {
 90   slash::MutexLock l(&context_->global_mu);
 91   if (context_->role == Role::kLeader) {
 92     NoticePeerTask(kNewCommand);
 93     AddTask(kHeartBeat);
 94   }
 95 }
140 void FloydPrimary::NoticePeerTask(TaskType type) {
141   for (auto& peer : (*peers_)) {
142     switch (type) {
143     case kHeartBeat:
146       peer.second->AddRequestVoteTask();
147       break;
148     case kNewCommand:
151       peer.second->AddAppendEntriesTask();
152       break;
153     default:
156     }
157   }
158 }

當新成為一個leader后(正常的同步會在后面完善起來)行楞,會同步一個空entry log而不是立即同步客戶端的請求,為什么這么做土匀,后面會分析子房。
然后開始向各follower同步,重要代碼如下:

220 void Peer::AppendEntriesRPC() {
221   uint64_t prev_log_index = 0;
222   uint64_t num_entries = 0;
223   uint64_t prev_log_term = 0;
224   uint64_t last_log_index = 0;
225   uint64_t current_term = 0;
226   CmdRequest req;
227   CmdRequest_AppendEntries* append_entries = req.mutable_append_entries();
228   {
229   slash::MutexLock l(&context_->global_mu);
230   prev_log_index = next_index_ - 1;
231   last_log_index = raft_log_->GetLastLogIndex();
232   if (next_index_ > last_log_index && peer_last_op_time + options_.heartbeat_us > slash::NowMicros()) {
233     return;
234   }
235   peer_last_op_time = slash::NowMicros();
236 
237   if (prev_log_index != 0) {
238     Entry entry;
239     if (raft_log_->GetEntry(prev_log_index, &entry) != 0) {
240     } else {
241       prev_log_term = entry.term();
242     }
243   }
244   current_term = context_->current_term;
246   req.set_type(Type::kAppendEntries);
247   append_entries->set_ip(options_.local_ip);
248   append_entries->set_port(options_.local_port);
249   append_entries->set_term(current_term);
250   append_entries->set_prev_log_index(prev_log_index);
251   append_entries->set_prev_log_term(prev_log_term);
252   append_entries->set_leader_commit(context_->commit_index);
253   }
254 
255   Entry *tmp_entry = new Entry();
256   for (uint64_t index = next_index_; index <= last_log_index; index++) {
257     if (raft_log_->GetEntry(index, tmp_entry) == 0) {
258       Entry *entry = append_entries->add_entries();
259       *entry = *tmp_entry;
260     } else {
261       break;
262     }
263 
264     num_entries++;
265     if (num_entries >= options_.append_entries_count_once
266         || (uint64_t)append_entries->ByteSize() >= options_.append_entries_size_once) {
267       break;
268     }
269   }
270   delete tmp_entry;
271 
272   CmdResponse res;
273   Status result = pool_->SendAndRecv(peer_addr_, req, &res);

對于新成為的leader就轧,執(zhí)行pt.second->set_next_index(raft_log_->GetLastLogIndex() + 1);证杭,此時prev_log_indexlast_log_index相等;代碼行241?250去取最新一個已持久化的entry log妒御,關鍵的是prev_log_indexprev_log_term解愤,然后同步從next_index_last_log_index處的日志;新成為leader的那刻要同步的日志為空乎莉;代碼行252?258是rpc送讲;
當follower收到append entry log rpc后,如下:

789 int FloydImpl::ReplyAppendEntries(const CmdRequest& request, CmdResponse* response) {
790   bool success = false;
791   CmdRequest_AppendEntries append_entries = request.append_entries();
792   slash::MutexLock l(&context_->global_mu);
793   // update last_op_time to avoid another leader election
794   context_->last_op_time = slash::NowMicros();
795   // Ignore stale term
796   // if the append entries leader's term is smaller than my current term, then the caller must an older leader
797   if (append_entries.term() < context_->current_term) {
798     BuildAppendEntriesResponse(success, context_->current_term, raft_log_->GetLastLogIndex(), response);
799     return -1;
800   } else if ((append_entries.term() > context_->current_term)
801       || (append_entries.term() == context_->current_term &&
802           (context_->role == kCandidate || (context_->role == kFollower && context_->leader_ip == "")))) {
803       context_->BecomeFollower(append_entries.term(),append_entries.ip(), append_entries.port());
804       raft_meta_->SetCurrentTerm(context_->current_term);
805       raft_meta_->SetVotedForIp(context_->voted_for_ip);
806       raft_meta_->SetVotedForPort(context_->voted_for_port);
807   }
808 
809   if (append_entries.prev_log_index() > raft_log_->GetLastLogIndex()) {
810     BuildAppendEntriesResponse(success, context_->current_term, raft_log_->GetLastLogIndex(), response);
811     return -1;
812   }
814   // Append entry
815   if (append_entries.prev_log_index() < raft_log_->GetLastLogIndex()) {
816       raft_log_->TruncateSuffix(append_entries.prev_log_index() + 1);
817   }
818 
819   // we compare peer's prev index and term with my last log index and term
820   uint64_t my_last_log_term = 0;
821   Entry entry;
822   if (append_entries.prev_log_index() == 0) {
823     my_last_log_term = 0;
824   } else if (raft_log_->GetEntry(append_entries.prev_log_index(), &entry) == 0) {
825     my_last_log_term = entry.term();
826   } else {
827     BuildAppendEntriesResponse(success, context_->current_term, raft_log_->GetLastLogIndex(), response);
828     return -1;
829   }
830 
831   if (append_entries.prev_log_term() != my_last_log_term) {
832     raft_log_->TruncateSuffix(append_entries.prev_log_index());
833     BuildAppendEntriesResponse(success, context_->current_term, raft_log_->GetLastLogIndex(), response);
834     return -1;
835   }
837   std::vector<const Entry*> entries;
838   for (int i = 0; i < append_entries.entries().size(); i++) {
839     entries.push_back(&append_entries.entries(i));
840   }
841   if (append_entries.entries().size() > 0) {
842     if (raft_log_->Append(entries) <= 0) {
843       BuildAppendEntriesResponse(success, context_->current_term, raft_log_->GetLastLogIndex(), response);
844       return -1;
845     }
846   } else {
847   }
848   if (append_entries.leader_commit() != context_->commit_index) {
849     AdvanceFollowerCommitIndex(append_entries.leader_commit());
850     apply_->ScheduleApply();
851   }
852   success = true;
853   // only when follower successfully do appendentries, we will update commit index
854   BuildAppendEntriesResponse(success, context_->current_term, raft_log_->GetLastLogIndex(), response);
855   return 0;
856 }

情況一)當follower收到同步日志后梦鉴,行797?814李茫,如果leader的term比自己小,說明出現(xiàn)了分區(qū)的情況(比如a/b/c/d/e/f/g肥橙,此時abcd一個分區(qū),defg為一個分區(qū)魄宏,a為old leader,e為new leader存筏,d的term更高于a的term)宠互;還有處理的是狀態(tài)問題味榛;一般情況在leader任期內(nèi)term是相等的,那種網(wǎng)絡波動收不到心跳轉(zhuǎn)換角色或者其他問題是需要處理的予跌;

情況二)行817?822搏色,同步當前日志,帶上上一條已commit的日志term和index券册,如果follower的日志落后leader的频轿,需要同步這些,只有這樣烁焙,才能保證這整個log鏈是完全相同的航邢;

然后行826?831要刪除(truncate suffix)append_entries.prev_log_index() + 1raft_log_->GetLastLogIndex()處的日志,為什么刪除骄蝇,后面會說膳殷;

情況三)行835?858用leader的prev log和自己最新的entry log比較,會存在不匹配的情況九火,比如獲取不到prev index所對應的日志赚窃,再或者同樣index的日志,但term不一致需要刪除岔激;

情況四)行861?893開始寫log到磁盤勒极,并根據(jù)leader的commit index值選擇性移動commit index的值,最后apply于狀態(tài)機ApplyStateMachine()鹦倚,last_applied < commit_index河质;

回到上面冀惭,當leader收到follower的append entry log response后震叙,如下:

297   // here we may get a larger term, and transfer to follower
298   // so we need to judge the role here
299   if (context_->role == Role::kLeader) {
300     /*
301      * receiver has higer term than myself, so turn from candidate to follower
302      */
303     if (res.append_entries_res().term() > context_->current_term) {
304       context_->BecomeFollower(res.append_entries_res().term());
305       raft_meta_->SetCurrentTerm(context_->current_term);
306       raft_meta_->SetVotedForIp(context_->voted_for_ip);
307       raft_meta_->SetVotedForPort(context_->voted_for_port);
308     } else if (res.append_entries_res().success() == true) {
309       if (num_entries > 0) {
310         match_index_ = prev_log_index + num_entries;
311         // only log entries from the leader's current term are committed
312         // by counting replicas
313         if (append_entries->entries(num_entries - 1).term() == context_->current_term) {
314           AdvanceLeaderCommitIndex();
315           apply_->ScheduleApply();
316         } 
317         next_index_ = prev_log_index + num_entries + 1;
318       } 
319     } else {
320       uint64_t adjust_index = std::min(res.append_entries_res().last_log_index() + 1,
321                                        next_index_ - 1);
322       if (adjust_index > 0) {
323         // Prev log don't match, so we retry with more prev one according to
324         next_index_ = adjust_index;
325         AddAppendEntriesTask();
326       }
327     }
328   } else if (context_->role == Role::kFollower) {
329   } else if (context_->role == Role::kCandidate) {
330   }
331   return;
332 }

因為在同步log的時候,是有可能發(fā)現(xiàn)狀態(tài)變更的散休,比如從leader轉(zhuǎn)變?yōu)閏andidate或follower媒楼;如果還是leader的話,行305?313因為收到更大term戚丸,從leader轉(zhuǎn)變?yōu)閒ollower划址;如果收到true了,如果num_entries大于0限府,更新match_index的值夺颤,表示已同步到哪兒;其中最重要的:

318         if (append_entries->entries(num_entries - 1).term() == context_->current_term) {
319           AdvanceLeaderCommitIndex();
320           apply_->ScheduleApply();
321         }

178 uint64_t Peer::QuorumMatchIndex() {
179   std::vector<uint64_t> values;
180   std::map<std::string, Peer*>::iterator iter;
181   for (iter = peers_->begin(); iter != peers_->end(); iter++) {
182     if (iter->first == peer_addr_) {
183       values.push_back(match_index_);
184       continue;
185     }
186     values.push_back(iter->second->match_index());
187   }
190   std::sort(values.begin(), values.end());
191   return values.at(values.size() / 2);
192 }
193 
194 // only leader will call AdvanceCommitIndex
195 // follower only need set commit as leader's
196 void Peer::AdvanceLeaderCommitIndex() {
197   Entry entry;
198   uint64_t new_commit_index = QuorumMatchIndex();
199   if (context_->commit_index < new_commit_index) {
200     context_->commit_index = new_commit_index;
201     raft_meta_->SetCommitIndex(context_->commit_index);
202   }
203   return;
204 }

只commit當前任期內(nèi)的log或間接提交之前term的log胁勺,而不能直接提交世澜,引用一下原因“commitIndex之后的log覆蓋:是允許的,如leader發(fā)送AppendEntries RPC請求給follower署穗,follower都會進行覆蓋糾正寥裂,以保持和leader一致嵌洼。
commitIndex及其之前的log覆蓋:是禁止的,因為這些已經(jīng)被應用到狀態(tài)機中了封恰,一旦再覆蓋就出現(xiàn)了不一致性麻养。而上述案例中的覆蓋就是指這種情況的覆蓋。
從這個案例中我們得到的一個新約束就是:當前term的leader不能“直接”提交之前term的entries必須要等到當前term有entry過半了诺舔,才順便一起將之前term的entries進行提交”[https://yq.aliyun.com/articles/62425]鳖昌,然后統(tǒng)計并排序,看是否過半低飒,如果是的話則把最新的entry log進行ScheduleApply遗遵,后面再同步給follower進行apply;

回到上面留下的坑逸嘀,為什么server成為新的leader后會同步一條新的空entry呢(NOP)车要?出于安全性,必須要確保新的leader提交新的entry時把上一個term未提交的entry一起提交崭倘,而不是通過大多數(shù)去(直接)單獨提交上一個term的entry[http://catkang.github.io/2017/11/30/raft-safty.html]翼岁;另外,在對成員變更的情況下司光,由于代碼中使用的是第一種方式琅坡,即每次只增加/減少一個server,引用如下鏈接中的說明“ Raft中成員變更如果保證是只變一個server這個前提残家,就不會出現(xiàn)雙主問題榆俺。如果在同一term下,可以由leader確定這件事坞淮,但是如果有兩個并發(fā)的成員變更茴晋,并且同時發(fā)生了切主,就有可能保證不了這一前提回窘。
問題在于新主接收到客戶端下一次成員變更請求的時候诺擅,可能集群中還可能有未確認的之前的成員變更日志,這個日志在將來可能會確認啡直。
修復方案:Raft中烁涌,新主上任前要先確認一條NOP日志,然后才開始對外提供服務酒觅。
一旦新主在本term內(nèi)確認了一條日志撮执,那么說明新主已經(jīng)確認了最新的成員組,并且集群中不會出現(xiàn)未確認的成員變更日志將來會被確認的情況了舷丹,在NOP確認后再開啟成員變更就能保證每次只變一個server的前提抒钱。”[http://loopjump.com/raft_one_server_reconfiguration/]

從其他文章中復制兩張ppt:


圖一
圖二

大概分析就到這里了,可能說的比較粗糙继效,更詳細的建議看論文症杏,最后一篇想結合代碼分析下server的增加及減少的過程及可能出現(xiàn)的問題,及整個Floyd框架性的東西瑞信。

最近因工作非常忙厉颤,后面想慢慢的分析下leveldb里的實現(xiàn)和前面說的協(xié)程。

http://www.reibang.com/p/10bdc956a305
http://www.reibang.com/p/ee7646c0f4cf#
https://cloud.tencent.com/developer/article/1005803
https://cloud.tencent.com/developer/article/1005804

http://catkang.github.io/2017/06/30/raft-subproblem.html [Raft和它的三個子問題]
http://catkang.github.io/2017/11/30/raft-safty.html [Why Raft never commits log entries from previous terms directly]

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末凡简,一起剝皮案震驚了整個濱河市逼友,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌秤涩,老刑警劉巖帜乞,帶你破解...
    沈念sama閱讀 216,544評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異筐眷,居然都是意外死亡黎烈,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評論 3 392
  • 文/潘曉璐 我一進店門匀谣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來照棋,“玉大人,你說我怎么就攤上這事武翎×姨浚” “怎么了?”我有些...
    開封第一講書人閱讀 162,764評論 0 353
  • 文/不壞的土叔 我叫張陵宝恶,是天一觀的道長符隙。 經(jīng)常有香客問我,道長垫毙,這世上最難降的妖魔是什么霹疫? 我笑而不...
    開封第一講書人閱讀 58,193評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮露久,結果婚禮上更米,老公的妹妹穿的比我還像新娘欺栗。我一直安慰自己毫痕,他們只是感情好,可當我...
    茶點故事閱讀 67,216評論 6 388
  • 文/花漫 我一把揭開白布迟几。 她就那樣靜靜地躺著消请,像睡著了一般。 火紅的嫁衣襯著肌膚如雪类腮。 梳的紋絲不亂的頭發(fā)上臊泰,一...
    開封第一講書人閱讀 51,182評論 1 299
  • 那天,我揣著相機與錄音蚜枢,去河邊找鬼缸逃。 笑死针饥,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的需频。 我是一名探鬼主播丁眼,決...
    沈念sama閱讀 40,063評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼昭殉!你這毒婦竟也來了苞七?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 38,917評論 0 274
  • 序言:老撾萬榮一對情侶失蹤挪丢,失蹤者是張志新(化名)和其女友劉穎蹂风,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體乾蓬,經(jīng)...
    沈念sama閱讀 45,329評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡惠啄,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,543評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了任内。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片礁阁。...
    茶點故事閱讀 39,722評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖族奢,靈堂內(nèi)的尸體忽然破棺而出姥闭,到底是詐尸還是另有隱情,我是刑警寧澤越走,帶...
    沈念sama閱讀 35,425評論 5 343
  • 正文 年R本政府宣布棚品,位于F島的核電站,受9級特大地震影響廊敌,放射性物質(zhì)發(fā)生泄漏铜跑。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,019評論 3 326
  • 文/蒙蒙 一骡澈、第九天 我趴在偏房一處隱蔽的房頂上張望锅纺。 院中可真熱鬧,春花似錦肋殴、人聲如沸囤锉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,671評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽官地。三九已至,卻和暖如春烙懦,著一層夾襖步出監(jiān)牢的瞬間驱入,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,825評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留亏较,地道東北人莺褒。 一個月前我還...
    沈念sama閱讀 47,729評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像雪情,于是被迫代替她去往敵國和親癣朗。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,614評論 2 353

推薦閱讀更多精彩內(nèi)容