這部分接著上一部分,主要分析日志的同步和安全性频鉴,以及成員變更等榄檬,在server成為leader后,會立即更新維護follower的信息:
72 void Peer::UpdatePeerInfo() {
73 for (auto& pt : (*peers_)) {
74 pt.second->set_next_index(raft_log_->GetLastLogIndex() + 1);
75 pt.second->set_match_index(0);
76 }
77 }
這兩個字段的作用會在后面講刹悴,然后立即primary_->AddTask(kHeartBeat, false);
89 void FloydPrimary::LaunchHeartBeat() {
90 slash::MutexLock l(&context_->global_mu);
91 if (context_->role == Role::kLeader) {
92 NoticePeerTask(kNewCommand);
93 AddTask(kHeartBeat);
94 }
95 }
140 void FloydPrimary::NoticePeerTask(TaskType type) {
141 for (auto& peer : (*peers_)) {
142 switch (type) {
143 case kHeartBeat:
146 peer.second->AddRequestVoteTask();
147 break;
148 case kNewCommand:
151 peer.second->AddAppendEntriesTask();
152 break;
153 default:
156 }
157 }
158 }
當新成為一個leader后(正常的同步會在后面完善起來)行楞,會同步一個空entry log而不是立即同步客戶端的請求,為什么這么做土匀,后面會分析子房。
然后開始向各follower同步,重要代碼如下:
220 void Peer::AppendEntriesRPC() {
221 uint64_t prev_log_index = 0;
222 uint64_t num_entries = 0;
223 uint64_t prev_log_term = 0;
224 uint64_t last_log_index = 0;
225 uint64_t current_term = 0;
226 CmdRequest req;
227 CmdRequest_AppendEntries* append_entries = req.mutable_append_entries();
228 {
229 slash::MutexLock l(&context_->global_mu);
230 prev_log_index = next_index_ - 1;
231 last_log_index = raft_log_->GetLastLogIndex();
232 if (next_index_ > last_log_index && peer_last_op_time + options_.heartbeat_us > slash::NowMicros()) {
233 return;
234 }
235 peer_last_op_time = slash::NowMicros();
236
237 if (prev_log_index != 0) {
238 Entry entry;
239 if (raft_log_->GetEntry(prev_log_index, &entry) != 0) {
240 } else {
241 prev_log_term = entry.term();
242 }
243 }
244 current_term = context_->current_term;
246 req.set_type(Type::kAppendEntries);
247 append_entries->set_ip(options_.local_ip);
248 append_entries->set_port(options_.local_port);
249 append_entries->set_term(current_term);
250 append_entries->set_prev_log_index(prev_log_index);
251 append_entries->set_prev_log_term(prev_log_term);
252 append_entries->set_leader_commit(context_->commit_index);
253 }
254
255 Entry *tmp_entry = new Entry();
256 for (uint64_t index = next_index_; index <= last_log_index; index++) {
257 if (raft_log_->GetEntry(index, tmp_entry) == 0) {
258 Entry *entry = append_entries->add_entries();
259 *entry = *tmp_entry;
260 } else {
261 break;
262 }
263
264 num_entries++;
265 if (num_entries >= options_.append_entries_count_once
266 || (uint64_t)append_entries->ByteSize() >= options_.append_entries_size_once) {
267 break;
268 }
269 }
270 delete tmp_entry;
271
272 CmdResponse res;
273 Status result = pool_->SendAndRecv(peer_addr_, req, &res);
對于新成為的leader就轧,執(zhí)行pt.second->set_next_index(raft_log_->GetLastLogIndex() + 1);
证杭,此時prev_log_index
和last_log_index
相等;代碼行241?250去取最新一個已持久化的entry log妒御,關鍵的是prev_log_index
和prev_log_term
解愤,然后同步從next_index_
到last_log_index
處的日志;新成為leader的那刻要同步的日志為空乎莉;代碼行252?258是rpc送讲;
當follower收到append entry log rpc后,如下:
789 int FloydImpl::ReplyAppendEntries(const CmdRequest& request, CmdResponse* response) {
790 bool success = false;
791 CmdRequest_AppendEntries append_entries = request.append_entries();
792 slash::MutexLock l(&context_->global_mu);
793 // update last_op_time to avoid another leader election
794 context_->last_op_time = slash::NowMicros();
795 // Ignore stale term
796 // if the append entries leader's term is smaller than my current term, then the caller must an older leader
797 if (append_entries.term() < context_->current_term) {
798 BuildAppendEntriesResponse(success, context_->current_term, raft_log_->GetLastLogIndex(), response);
799 return -1;
800 } else if ((append_entries.term() > context_->current_term)
801 || (append_entries.term() == context_->current_term &&
802 (context_->role == kCandidate || (context_->role == kFollower && context_->leader_ip == "")))) {
803 context_->BecomeFollower(append_entries.term(),append_entries.ip(), append_entries.port());
804 raft_meta_->SetCurrentTerm(context_->current_term);
805 raft_meta_->SetVotedForIp(context_->voted_for_ip);
806 raft_meta_->SetVotedForPort(context_->voted_for_port);
807 }
808
809 if (append_entries.prev_log_index() > raft_log_->GetLastLogIndex()) {
810 BuildAppendEntriesResponse(success, context_->current_term, raft_log_->GetLastLogIndex(), response);
811 return -1;
812 }
814 // Append entry
815 if (append_entries.prev_log_index() < raft_log_->GetLastLogIndex()) {
816 raft_log_->TruncateSuffix(append_entries.prev_log_index() + 1);
817 }
818
819 // we compare peer's prev index and term with my last log index and term
820 uint64_t my_last_log_term = 0;
821 Entry entry;
822 if (append_entries.prev_log_index() == 0) {
823 my_last_log_term = 0;
824 } else if (raft_log_->GetEntry(append_entries.prev_log_index(), &entry) == 0) {
825 my_last_log_term = entry.term();
826 } else {
827 BuildAppendEntriesResponse(success, context_->current_term, raft_log_->GetLastLogIndex(), response);
828 return -1;
829 }
830
831 if (append_entries.prev_log_term() != my_last_log_term) {
832 raft_log_->TruncateSuffix(append_entries.prev_log_index());
833 BuildAppendEntriesResponse(success, context_->current_term, raft_log_->GetLastLogIndex(), response);
834 return -1;
835 }
837 std::vector<const Entry*> entries;
838 for (int i = 0; i < append_entries.entries().size(); i++) {
839 entries.push_back(&append_entries.entries(i));
840 }
841 if (append_entries.entries().size() > 0) {
842 if (raft_log_->Append(entries) <= 0) {
843 BuildAppendEntriesResponse(success, context_->current_term, raft_log_->GetLastLogIndex(), response);
844 return -1;
845 }
846 } else {
847 }
848 if (append_entries.leader_commit() != context_->commit_index) {
849 AdvanceFollowerCommitIndex(append_entries.leader_commit());
850 apply_->ScheduleApply();
851 }
852 success = true;
853 // only when follower successfully do appendentries, we will update commit index
854 BuildAppendEntriesResponse(success, context_->current_term, raft_log_->GetLastLogIndex(), response);
855 return 0;
856 }
情況一)當follower收到同步日志后梦鉴,行797?814李茫,如果leader的term比自己小,說明出現(xiàn)了分區(qū)的情況(比如a/b/c/d/e/f/g肥橙,此時abcd一個分區(qū),defg為一個分區(qū)魄宏,a為old leader,e為new leader存筏,d的term更高于a的term)宠互;還有處理的是狀態(tài)問題味榛;一般情況在leader任期內(nèi)term是相等的,那種網(wǎng)絡波動收不到心跳轉(zhuǎn)換角色或者其他問題是需要處理的予跌;
情況二)行817?822搏色,同步當前日志,帶上上一條已commit的日志term和index券册,如果follower的日志落后leader的频轿,需要同步這些,只有這樣烁焙,才能保證這整個log鏈是完全相同的航邢;
然后行826?831要刪除(truncate suffix)append_entries.prev_log_index() + 1
到raft_log_->GetLastLogIndex()
處的日志,為什么刪除骄蝇,后面會說膳殷;
情況三)行835?858用leader的prev log和自己最新的entry log比較,會存在不匹配的情況九火,比如獲取不到prev index所對應的日志赚窃,再或者同樣index的日志,但term不一致需要刪除岔激;
情況四)行861?893開始寫log到磁盤勒极,并根據(jù)leader的commit index值選擇性移動commit index的值,最后apply于狀態(tài)機ApplyStateMachine()
鹦倚,last_applied < commit_index
河质;
回到上面冀惭,當leader收到follower的append entry log response后震叙,如下:
297 // here we may get a larger term, and transfer to follower
298 // so we need to judge the role here
299 if (context_->role == Role::kLeader) {
300 /*
301 * receiver has higer term than myself, so turn from candidate to follower
302 */
303 if (res.append_entries_res().term() > context_->current_term) {
304 context_->BecomeFollower(res.append_entries_res().term());
305 raft_meta_->SetCurrentTerm(context_->current_term);
306 raft_meta_->SetVotedForIp(context_->voted_for_ip);
307 raft_meta_->SetVotedForPort(context_->voted_for_port);
308 } else if (res.append_entries_res().success() == true) {
309 if (num_entries > 0) {
310 match_index_ = prev_log_index + num_entries;
311 // only log entries from the leader's current term are committed
312 // by counting replicas
313 if (append_entries->entries(num_entries - 1).term() == context_->current_term) {
314 AdvanceLeaderCommitIndex();
315 apply_->ScheduleApply();
316 }
317 next_index_ = prev_log_index + num_entries + 1;
318 }
319 } else {
320 uint64_t adjust_index = std::min(res.append_entries_res().last_log_index() + 1,
321 next_index_ - 1);
322 if (adjust_index > 0) {
323 // Prev log don't match, so we retry with more prev one according to
324 next_index_ = adjust_index;
325 AddAppendEntriesTask();
326 }
327 }
328 } else if (context_->role == Role::kFollower) {
329 } else if (context_->role == Role::kCandidate) {
330 }
331 return;
332 }
因為在同步log的時候,是有可能發(fā)現(xiàn)狀態(tài)變更的散休,比如從leader轉(zhuǎn)變?yōu)閏andidate或follower媒楼;如果還是leader的話,行305?313因為收到更大term戚丸,從leader轉(zhuǎn)變?yōu)閒ollower划址;如果收到true了,如果num_entries大于0限府,更新match_index的值夺颤,表示已同步到哪兒;其中最重要的:
318 if (append_entries->entries(num_entries - 1).term() == context_->current_term) {
319 AdvanceLeaderCommitIndex();
320 apply_->ScheduleApply();
321 }
178 uint64_t Peer::QuorumMatchIndex() {
179 std::vector<uint64_t> values;
180 std::map<std::string, Peer*>::iterator iter;
181 for (iter = peers_->begin(); iter != peers_->end(); iter++) {
182 if (iter->first == peer_addr_) {
183 values.push_back(match_index_);
184 continue;
185 }
186 values.push_back(iter->second->match_index());
187 }
190 std::sort(values.begin(), values.end());
191 return values.at(values.size() / 2);
192 }
193
194 // only leader will call AdvanceCommitIndex
195 // follower only need set commit as leader's
196 void Peer::AdvanceLeaderCommitIndex() {
197 Entry entry;
198 uint64_t new_commit_index = QuorumMatchIndex();
199 if (context_->commit_index < new_commit_index) {
200 context_->commit_index = new_commit_index;
201 raft_meta_->SetCommitIndex(context_->commit_index);
202 }
203 return;
204 }
只commit當前任期內(nèi)的log或間接提交之前term的log胁勺,而不能直接提交世澜,引用一下原因“commitIndex之后的log覆蓋:是允許的,如leader發(fā)送AppendEntries RPC請求給follower署穗,follower都會進行覆蓋糾正寥裂,以保持和leader一致嵌洼。
commitIndex及其之前的log覆蓋:是禁止的,因為這些已經(jīng)被應用到狀態(tài)機中了封恰,一旦再覆蓋就出現(xiàn)了不一致性麻养。而上述案例中的覆蓋就是指這種情況的覆蓋。
從這個案例中我們得到的一個新約束就是:當前term的leader不能“直接”提交之前term的entries必須要等到當前term有entry過半了诺舔,才順便一起將之前term的entries進行提交”[https://yq.aliyun.com/articles/62425]鳖昌,然后統(tǒng)計并排序,看是否過半低飒,如果是的話則把最新的entry log進行ScheduleApply
遗遵,后面再同步給follower進行apply;
回到上面留下的坑逸嘀,為什么server成為新的leader后會同步一條新的空entry呢(NOP)车要?出于安全性,必須要確保新的leader提交新的entry時把上一個term未提交的entry一起提交崭倘,而不是通過大多數(shù)去(直接)單獨提交上一個term的entry[http://catkang.github.io/2017/11/30/raft-safty.html]翼岁;另外,在對成員變更的情況下司光,由于代碼中使用的是第一種方式琅坡,即每次只增加/減少一個server,引用如下鏈接中的說明“ Raft中成員變更如果保證是只變一個server這個前提残家,就不會出現(xiàn)雙主問題榆俺。如果在同一term下,可以由leader確定這件事坞淮,但是如果有兩個并發(fā)的成員變更茴晋,并且同時發(fā)生了切主,就有可能保證不了這一前提回窘。
問題在于新主接收到客戶端下一次成員變更請求的時候诺擅,可能集群中還可能有未確認的之前的成員變更日志,這個日志在將來可能會確認啡直。
修復方案:Raft中烁涌,新主上任前要先確認一條NOP日志,然后才開始對外提供服務酒觅。
一旦新主在本term內(nèi)確認了一條日志撮执,那么說明新主已經(jīng)確認了最新的成員組,并且集群中不會出現(xiàn)未確認的成員變更日志將來會被確認的情況了舷丹,在NOP確認后再開啟成員變更就能保證每次只變一個server的前提抒钱。”[http://loopjump.com/raft_one_server_reconfiguration/]
從其他文章中復制兩張ppt:
大概分析就到這里了,可能說的比較粗糙继效,更詳細的建議看論文症杏,最后一篇想結合代碼分析下server的增加及減少的過程及可能出現(xiàn)的問題,及整個Floyd框架性的東西瑞信。
最近因工作非常忙厉颤,后面想慢慢的分析下leveldb里的實現(xiàn)和前面說的協(xié)程。
http://www.reibang.com/p/10bdc956a305
http://www.reibang.com/p/ee7646c0f4cf#
https://cloud.tencent.com/developer/article/1005803
https://cloud.tencent.com/developer/article/1005804
http://catkang.github.io/2017/06/30/raft-subproblem.html [Raft和它的三個子問題]
http://catkang.github.io/2017/11/30/raft-safty.html [Why Raft never commits log entries from previous terms directly]