這篇博文是借助360開源的Floyd項(xiàng)目和百度開源的Braft項(xiàng)目來分析Raft的實(shí)現(xiàn)扰魂,至于Raft的理論(概念)和原理及一些問題點(diǎn)可以從下面的連接中獲取淤翔,本身分析它是興趣剔难,利用周末時間來延伸自己的知識點(diǎn)锹漱,如有說的不正確箭养,請指教哈,這篇估計要寫一段時間哥牍,平時比較忙露懒。
Floyd里有關(guān)多線程的東西使用了Pink實(shí)現(xiàn)闯冷,但在分析Raft的時候會簡化它,不然東西比較多懈词。我會按照網(wǎng)上的說明蛇耀,把它至少分為三個部分來分析,第一部分是Leader選舉坎弯,第二部分是日志復(fù)制纺涤,第三部分是安全性,可能關(guān)鍵代碼比較多以及每個字段的作用抠忘。然后最后分析如何啟動(框架)撩炊,有幾個線程,以及各個承擔(dān)的職責(zé)崎脉。另外的Paxos挺復(fù)雜的拧咳,我也不準(zhǔn)備去分析,微信團(tuán)隊也開源了一個類似的實(shí)現(xiàn)PhxPaxos囚灼,有興趣的可自己分析下骆膝。
幾個關(guān)鍵的字段作用結(jié)合下代碼來分析比較好,每個節(jié)點(diǎn)使用FloydContext來描述該時刻的狀態(tài)和一些數(shù)據(jù):
34 struct FloydContext {
35 // Role related
36 explicit FloydContext(const Options& _options)
37 : options(_options),
38 voted_for_ip(""),
39 voted_for_port(0),
40 leader_ip(""),
41 leader_port(0),
42 vote_quorum(0),
43 commit_index(0),
44 last_applied(0),
45 last_op_time(0),
46 apply_cond(&apply_mu) {}
47
48 void RecoverInit(RaftMeta *raft);
49 void BecomeFollower(uint64_t new_iterm,
50 const std::string leader_ip = "", int port = 0);
51 void BecomeCandidate();
52 void BecomeLeader();
53
54 Options options;
55 // Role related
56 uint64_t current_term;
58 Role role;
59 std::string voted_for_ip;
60 int voted_for_port;
61 std::string leader_ip;
62 int leader_port;
63 uint32_t vote_quorum;
64
65 uint64_t commit_index;
66 std::atomic<uint64_t> last_applied;
67 uint64_t last_op_time;
68
69 std::set<std::string> members;
70
71 // mutex protect commit_index
72 // used in floyd_apply thread and floyd_peer thread
73 slash::Mutex global_mu;
74 slash::Mutex apply_mu;
75 slash::CondVar apply_cond;
76 };
current_term表示當(dāng)前任期灶体,role表示節(jié)點(diǎn)在該任期是處于什么狀態(tài)阅签,每個成員變量的作用從命名大概知道司顿,不具體分析了帘撰。
19 void FloydContext::RecoverInit(RaftMeta *raft_meta) {
20 current_term = raft_meta->GetCurrentTerm();
21 voted_for_ip = raft_meta->GetVotedForIp();
22 voted_for_port = raft_meta->GetVotedForPort();
23 commit_index = raft_meta->GetCommitIndex();
24 last_applied = raft_meta->GetLastApplied();
25 role = Role::kFollower;
26 }
28 void FloydContext::BecomeFollower(uint64_t new_term,
29 const std::string _leader_ip, int _leader_port) {
30 // when requestvote receive a large term, then we transfer from candidate to follower
31 // then we should set voted_for_ip to the leader_ip
32 // if (current_term < new_term) {
33 voted_for_ip = _leader_ip;
34 voted_for_port = _leader_port;
35 // }
36 current_term = new_term;
37 leader_ip = _leader_ip;
38 leader_port = _leader_port;
39 role = Role::kFollower;
40 }
42 void FloydContext::BecomeCandidate() {
43 current_term++;
44 role = Role::kCandidate;
45 leader_ip.clear();
46 leader_port = 0;
47 voted_for_ip = options.local_ip;
48 voted_for_port = options.local_port;
49 vote_quorum = 1;
50 }
52 void FloydContext::BecomeLeader() {
53 role = Role::kLeader;
54 leader_ip = options.local_ip;
55 leader_port = options.local_port;
56 }
每個節(jié)點(diǎn)在同一時刻(任期term)只會有一種狀態(tài)(Leader/Follower/Candidate),初始啟動時潮秘,集群中的節(jié)點(diǎn)都為Follower樟结,像current_term
等是在RecoverInit
中從rocksdb數(shù)據(jù)庫中恢復(fù)(初始化的)养交,每次狀態(tài)變更都會涉及到成員變量的修改,至于如何從一個狀態(tài)至另一個狀轉(zhuǎn)變的瓢宦,先引用下轉(zhuǎn)換圖层坠,然后從代碼層面分析。
40 class FloydPrimary {
41 public:
42 FloydPrimary(FloydContext* context, PeersSet* peers, RaftMeta* raft_meta,
43 const Options& options, Logger* info_log);
44 virtual ~FloydPrimary();
45
46 int Start();
47 int Stop();
48 void AddTask(TaskType type, bool is_delay = true);
49 private:
50 FloydContext* const context_;
51 PeersSet* const peers_;
52 RaftMeta* const raft_meta_;
53 Options options_;
54 Logger* const info_log_;
55
56 std::atomic<uint64_t> reset_elect_leader_time_;
57 std::atomic<uint64_t> reset_leader_heartbeat_time_;
58 pink::BGThread bg_thread_;
59
60 // The Launch* work is done by floyd_peer_thread
61 // Cron task
62 static void LaunchHeartBeatWrapper(void *arg);
63 void LaunchHeartBeat();
64 static void LaunchCheckLeaderWrapper(void *arg);
65 void LaunchCheckLeader();
66 static void LaunchNewCommandWrapper(void *arg);
67 void LaunchNewCommand();
68
69 void NoticePeerTask(TaskType type);
74 };
這個線程主要的工作是做些心跳機(jī)制刁笙,狀態(tài)轉(zhuǎn)換等破花,F(xiàn)loyd啟動的時候會先塞一個任務(wù)primary_->AddTask(kCheckLeader)
主要是用于選擇Leader,所有節(jié)點(diǎn)初始狀態(tài)為Follower疲吸;
52 void FloydPrimary::AddTask(TaskType type, bool is_delay) {
59 switch (type) {
60 case kHeartBeat:
61 if (is_delay) {
62 uint64_t timeout = options_.heartbeat_us;
63 bg_thread_.DelaySchedule(timeout / 1000LL, LaunchHeartBeatWrapper, this);
64 } else {
65 bg_thread_.Schedule(LaunchHeartBeatWrapper, this);
66 }
67 break;
68 case kCheckLeader:
69 if (is_delay) {
70 uint64_t timeout = options_.check_leader_us;
71 bg_thread_.DelaySchedule(timeout / 1000LL, LaunchCheckLeaderWrapper, this);
72 } else {
73 bg_thread_.Schedule(LaunchCheckLeaderWrapper, this);
74 }
75 break;
76 case kNewCommand:
77 bg_thread_.Schedule(LaunchNewCommandWrapper, this);
78 break;
79 default:
81 break;
82 }
83 }
85 void FloydPrimary::LaunchHeartBeatWrapper(void *arg) {
86 reinterpret_cast<FloydPrimary *>(arg)->LaunchHeartBeat();
87 }
88
89 void FloydPrimary::LaunchHeartBeat() {
90 slash::MutexLock l(&context_->global_mu);
91 if (context_->role == Role::kLeader) {
92 NoticePeerTask(kNewCommand);
93 AddTask(kHeartBeat);
94 }
95 }
96
97 void FloydPrimary::LaunchCheckLeaderWrapper(void *arg) {
98 reinterpret_cast<FloydPrimary *>(arg)->LaunchCheckLeader();
99 }
101 void FloydPrimary::LaunchCheckLeader() {
102 slash::MutexLock l(&context_->global_mu);
103 if (context_->role == Role::kFollower || context_->role == Role::kCandidate) {
104 if (options_.single_mode) {
105 //code...
111 } else if (context_->last_op_time + options_.check_leader_us < slash::NowMicros()) {
112 context_->BecomeCandidate();
116 raft_meta_->SetCurrentTerm(context_->current_term);
117 raft_meta_->SetVotedForIp(context_->voted_for_ip);
118 raft_meta_->SetVotedForPort(context_->voted_for_port);
119 NoticePeerTask(kHeartBeat);
120 }
121 }
122 AddTask(kCheckLeader);
123 }
124
125 void FloydPrimary::LaunchNewCommandWrapper(void *arg) {
126 reinterpret_cast<FloydPrimary *>(arg)->LaunchNewCommand();
127 }
125 void FloydPrimary::LaunchNewCommandWrapper(void *arg) {
126 reinterpret_cast<FloydPrimary *>(arg)->LaunchNewCommand();
127 }
128
129 void FloydPrimary::LaunchNewCommand() {
131 if (context_->role != Role::kLeader) {
133 return;
134 }
135 NoticePeerTask(kNewCommand);
136 }
140 void FloydPrimary::NoticePeerTask(TaskType type) {
141 for (auto& peer : (*peers_)) {
142 switch (type) {
143 case kHeartBeat:
146 peer.second->AddRequestVoteTask();
147 break;
148 case kNewCommand:
151 peer.second->AddAppendEntriesTask();
152 break;
153 default:
156 }
157 }
158 }
其中心跳檢查也做了些投票邏輯(沒有分開)座每,具體代碼如上,沒有多少難點(diǎn)摘悴,能做什么是取決于是什么狀態(tài)峭梳。
其中check_leader_us
這個值在每個節(jié)點(diǎn)是不相同的:check_leader_us = std::rand() % 2000000 + check_leader_us
,引用一段話“服務(wù)器啟動時初始狀態(tài)都是follower,如果在超時時間內(nèi)沒有收到leader發(fā)送的心跳包葱椭,則進(jìn)入candidate狀態(tài)進(jìn)行選舉捂寿,服務(wù)器啟動時和leader掛掉時處理一樣。為了避免選票瓜分的情況孵运,raft利用隨機(jī)超時機(jī)制避免選票瓜分情況秦陋。選舉超時時間從一個固定的區(qū)間隨機(jī)選擇,由于每個服務(wù)器的超時時間不同治笨,則leader掛掉后驳概,超時時間最短且擁有最多日志的follower最先開始選主,并成為leader旷赖。一旦candidate成為leader顺又,就會向其他服務(wù)器發(fā)送心跳包阻止新一輪的選舉開始〉确酰”簡來說稚照,隨機(jī)化超時時間檢查,檢查時間大概是 [3s, 5s)俯萌。
代碼行101?123就是判斷有沒有超時果录,超時的話從kFollower
轉(zhuǎn)換為kCandidate
,BecomeCandidate
函數(shù)要做的事就是增加當(dāng)前的任期數(shù)绳瘟,給自己投一票雕憔,然后通過心跳機(jī)制的方式向其他節(jié)點(diǎn)請求投票姿骏,并再次AddTask(kCheckLeader)
糖声,進(jìn)行下一輪檢查(超時后),來看一下AddRequestVoteTask
主要做了什么分瘦,以及當(dāng)其他節(jié)點(diǎn)收到心跳后的處理蘸泻,這里為了使分析更簡單,假設(shè)有A/B兩個節(jié)點(diǎn)嘲玫,A向B進(jìn)行請求投票AddRequestVoteTask
悦施,此時A的某個工作線程進(jìn)行了RequestVoteRPC
[不考慮異步回調(diào)的事件]
93 void Peer::RequestVoteRPC() {
94 uint64_t last_log_term;
95 uint64_t last_log_index;
96 CmdRequest req;
97 {
98 slash::MutexLock l(&context_->global_mu);
99 raft_log_->GetLastLogTermAndIndex(&last_log_term, &last_log_index);
100
101 req.set_type(Type::kRequestVote);
102 CmdRequest_RequestVote* request_vote = req.mutable_request_vote();
103 request_vote->set_ip(options_.local_ip);
104 request_vote->set_port(options_.local_port);
105 request_vote->set_term(context_->current_term);
106 request_vote->set_last_log_term(last_log_term);
107 request_vote->set_last_log_index(last_log_index);
消息協(xié)議使用了google的pb,這個線程會把自己的當(dāng)前任期數(shù)去团,ip/port抡诞,以last_log_term
和last_log_index
,發(fā)給其他節(jié)點(diǎn)SendAndRecv
[后面兩個字段的作用和何時修改土陪,會在后面分析]昼汗;
B節(jié)點(diǎn)的工作線程收到請求后,進(jìn)行了如下處理:
89 case Type::kRequestVote:
90 response_.set_type(Type::kRequestVote);
91 floyd_->ReplyRequestVote(request_, &response_);
92 response_.set_code(StatusCode::kOk);
93 break;
714 int FloydImpl::ReplyRequestVote(const CmdRequest& request, CmdResponse* response) {
715 slash::MutexLock l(&context_->global_mu);
716 bool granted = false;
717 CmdRequest_RequestVote request_vote = request.request_vote();
718 /*
719 * If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (5.1)
720 */
721 if (request_vote.term() > context_->current_term) {
722 context_->BecomeFollower(request_vote.term());
723 raft_meta_->SetCurrentTerm(context_->current_term);
724 }
725 // if caller's term smaller than my term, then I will notice him
726 if (request_vote.term() < context_->current_term) {
727 BuildRequestVoteResponse(context_->current_term, granted, response);
728 return -1;
729 }
730 uint64_t my_last_log_term = 0;
731 uint64_t my_last_log_index = 0;
732 raft_log_->GetLastLogTermAndIndex(&my_last_log_term, &my_last_log_index);
733 // if votedfor is null or candidateId, and candidated's log is at least as up-to-date
734 // as receiver's log, grant vote
735 if ((request_vote.last_log_term() < my_last_log_term) ||
736 ((request_vote.last_log_term() == my_last_log_term) && (request_vote.last_log_index() < my_last_log_index))) {
737 BuildRequestVoteResponse(context_->current_term, granted, response);
738 return -1;
739 }
740
741 if (vote_for_.find(request_vote.term()) != vote_for_.end()
742 && vote_for_[request_vote.term()] != std::make_pair(request_vote.ip(), request_vote.port())) {
743 BuildRequestVoteResponse(context_->current_term, granted, response);
744 return -1;
745 }
746 vote_for_[request_vote.term()] = std::make_pair(request_vote.ip(), request_vote.port());
747 context_->BecomeFollower(request_vote.term());
748 raft_meta_->SetCurrentTerm(context_->current_term);
749 raft_meta_->SetVotedForIp(context_->voted_for_ip);
750 raft_meta_->SetVotedForPort(context_->voted_for_port);
751 // Got my vote
752 GrantVote(request_vote.term(), request_vote.ip(), request_vote.port());
753 granted = true;
754 context_->last_op_time = slash::NowMicros();
755 BuildRequestVoteResponse(context_->current_term, granted, response);
756 return 0;
757 }
以上的就是收到請求投票的節(jié)點(diǎn)處理鬼雀,分為以下幾種情況:
1)如果B的任期數(shù)小于A顷窒,則B變?yōu)?code>kFollower,如果B的任期數(shù)大于A源哩,則拒絕投票鞋吉,因?yàn)橹挥凶钚碌腃andidate才有可能成為Leader鸦做;
2)如果request_vote.last_log_term() < my_last_log_term
,或者((request_vote.last_log_term() == my_last_log_term) && (request_vote.last_log_index() < my_last_log_index))
谓着,則拒絕投票泼诱;
3)然后判斷該term內(nèi)是否對同一個節(jié)點(diǎn)進(jìn)行了重復(fù)投票;
代碼行746?755進(jìn)行投票漆魔,并修改狀態(tài)坷檩;
my_last_log_index
和my_last_log_term
表示節(jié)點(diǎn)的最新日志條目的索引值和當(dāng)時的任期值,主要是為了安全性考慮改抡,如果沒有這兩個字段參與的條件判斷矢炼,那成為leader的那個節(jié)點(diǎn)可能沒有最新的日志,term最大并不能解決這個問題阿纤,具有最新最全日志的server的才有資格去競選當(dāng)上Leader句灌,并且根據(jù)任期號可以判斷一條日志的新舊程度。
A收到投票響應(yīng)后欠拾,進(jìn)行了如下處理:
128 if (res.request_vote_res().term() > context_->current_term) {
129 context_->BecomeFollower(res.request_vote_res().term());
130 raft_meta_->SetCurrentTerm(context_->current_term);
131 raft_meta_->SetVotedForIp(context_->voted_for_ip);
132 raft_meta_->SetVotedForPort(context_->voted_for_port);
133 return;
134 }
135 if (context_->role == Role::kCandidate) {
136 if (res.request_vote_res().vote_granted() == true) { // granted
137 if (CheckAndVote(res.request_vote_res().term())) {
138 context_->BecomeLeader();
139 UpdatePeerInfo();
140 primary_->AddTask(kHeartBeat, false);
141 }
142 } else {
143 context_->BecomeFollower(res.request_vote_res().term());
144 raft_meta_->SetCurrentTerm(context_->current_term);
145 raft_meta_->SetVotedForIp(context_->voted_for_ip);
146 raft_meta_->SetVotedForPort(context_->voted_for_port);
147 }
148 } else if (context_->role == Role::kFollower) {
149 } else if (context_->role == Role::kLeader) {
150 }
151 }
152 return;
代碼行128?134判斷自己的任期數(shù)是否小于對端的胰锌,是的話則自己變?yōu)?code>kFollower;因?yàn)槭钱惒秸埱笸镀泵暾栽谑盏侥硞€節(jié)點(diǎn)的投票響應(yīng)后资昧,此時的節(jié)點(diǎn)狀態(tài)可能已經(jīng)發(fā)生變化了,故在檢查時荆忍,需要結(jié)合節(jié)點(diǎn)當(dāng)前的狀態(tài)格带;
如果是kCandidate
,然后檢查當(dāng)前任期內(nèi)的投票數(shù)是否大于一定的數(shù)刹枉,如果滿足則變?yōu)?code>kLeader叽唱,并且
72 void Peer::UpdatePeerInfo() {
73 for (auto& pt : (*peers_)) {
74 pt.second->set_next_index(raft_log_->GetLastLogIndex() + 1);
75 pt.second->set_match_index(0);
76 }
77 }
然后每個follower進(jìn)行AddAppendEntriesTask
,具體做了什么下次分析微宝。
65 bool Peer::CheckAndVote(uint64_t vote_term) {
66 if (context_->current_term != vote_term) {
67 return false;
68 }
69 return (++context_->vote_quorum) > (options_.members.size() / 2);
70 }
如果是kFollower
或者kLeader
棺亭,啥也不做。
補(bǔ)充一下蟋软,見代碼行135?140镶摘,在Server
成為新Leader
后,會立刻AddTask
岳守,寫一條NOP
日志凄敢,之后才能提供服務(wù),主要是為了安全性考慮棺耍,下一篇會把這個坑填上贡未。
下面幾個鏈接是我在分析源碼時,所做的一些參考,最后三個是一些相關(guān)的應(yīng)用俊卤,包括優(yōu)化方案嫩挤,原始Raft論文里的實(shí)現(xiàn)性能并不怎么好。
參考:
https://github.com/maemual/raft-zh_cn/blob/master/raft-zh_cn.md
https://www.cnblogs.com/katsura/p/6549344.html
https://yq.aliyun.com/articles/62425
http://www.reibang.com/p/4711c4c32aab
http://www.reibang.com/p/2a2ba021f721
http://www.reibang.com/p/ee7646c0f4cf
https://yq.aliyun.com/articles/398232?spm=a2c4e.11153940.blogcont62425.18.1e944bd0b1UuMv
https://yq.aliyun.com/articles/398237?spm=a2c4e.11153940.blogcont62425.19.1e944bd0b1UuMv
https://pingcap.com/blog-cn/#Raft
https://www.zhihu.com/question/54997169
https://zhuanlan.zhihu.com/p/25735592