轉(zhuǎn)載:深入淺出etcd之raft實(shí)現(xiàn)
導(dǎo)語
etcd是coreOS使用golang開發(fā)的分布式威兜,一致性的kv存儲(chǔ)系統(tǒng)募逞,因其易用性和高可靠性被廣泛運(yùn)用于服務(wù)發(fā)現(xiàn)船万、消息發(fā)布和訂閱、分布式鎖和共享配置等方面墩划,也被認(rèn)為是zookeeper的強(qiáng)有力的競爭者春塌。作為分布式kv,其底層使用raft算法實(shí)現(xiàn)多副本數(shù)據(jù)的強(qiáng)一致性簇捍。etcd作為raft開源實(shí)現(xiàn)的標(biāo)桿只壳,在設(shè)計(jì)上,將 raft 算法邏輯和持久化暑塑、網(wǎng)絡(luò)吼句、線程等完全抽離出來單獨(dú)實(shí)現(xiàn),充分解耦事格,在工程上惕艳,實(shí)現(xiàn)了諸多性能優(yōu)化,是 raft 開源實(shí)踐中較早的工業(yè)級(jí)的實(shí)現(xiàn)驹愚,很多后來的 raft 實(shí)踐者都直接或者間接的參考了 ectd-raft 的設(shè)計(jì)和實(shí)現(xiàn)远搪,例如kubernetes,tiDb等逢捺。其廣泛的影響力和優(yōu)雅的golang代碼實(shí)踐也使得ectd成為golang的明星項(xiàng)目谁鳍。在我們實(shí)際的分布式存儲(chǔ)系統(tǒng)的項(xiàng)目開發(fā)中,raft也被應(yīng)用于元信息管理和數(shù)據(jù)存儲(chǔ)等多個(gè)模塊劫瞳,因此熟悉和理解etcd-raft的實(shí)現(xiàn)具有重大意義倘潜,本文從raft的基本原理出發(fā),深入淺出地分析了raft在ectd中的具體實(shí)現(xiàn)志于。
raft原理
架構(gòu)
image
每個(gè)節(jié)點(diǎn)都包含狀態(tài)機(jī)涮因,日志模塊和一致性模塊。功能分別是:
狀態(tài)機(jī):數(shù)據(jù)一致性指的即是狀態(tài)機(jī)的一致性伺绽,從內(nèi)部服務(wù)看表現(xiàn)為狀態(tài)機(jī)中的數(shù)據(jù)都保持一致
log模塊:保存了所有的操作記錄
一致性模塊:一致性模塊算法保證寫入log命令的一致性养泡,是raft的核心內(nèi)容。
實(shí)現(xiàn)一致性的過程可分為Leader選舉(Leader election)奈应,日志同步(Log replication),安全性(safty),日志壓縮(Log compaction)瓤荔,成員變更(membership change)
leader 選舉
競選過程
節(jié)點(diǎn)由Follower變?yōu)镃andidate,同時(shí)設(shè)置當(dāng)前Term钥组。
Candidate給自己投票输硝,帶上termid 和日志序號(hào),同時(shí)向其他節(jié)點(diǎn)發(fā)送拉票請(qǐng)求
等待結(jié)果程梦,成為Leader,follower 或者在選舉未成為產(chǎn)生結(jié)果的情況下節(jié)點(diǎn)狀態(tài)保持為Candidatae点把。
選舉結(jié)果
成功當(dāng)選收到超過半數(shù)的選票時(shí)橘荠,成為Leader,定時(shí)給其他節(jié)點(diǎn)發(fā)送心跳,并帶上任期id,其他節(jié)點(diǎn)發(fā)現(xiàn)當(dāng)前的任期id小于接收到leader發(fā)送過來的id,則將將狀態(tài)切換至follower.
選舉失敗在Candidate狀態(tài)接收到其他節(jié)點(diǎn)發(fā)送的心跳信息郎逃,且心跳中的任期id大于自己哥童,則變?yōu)閒ollower。
未產(chǎn)生結(jié)果沒有一個(gè)Candidate所獲得的選票超過半數(shù)褒翰,未產(chǎn)生leader,則Candidate再進(jìn)入下一輪投票贮懈。為了避免長期沒有l(wèi)eader產(chǎn)生,raft采用如下策略避免:
選舉超時(shí)時(shí)間為隨機(jī)值优训,第一個(gè)超時(shí)的節(jié)點(diǎn)帶著最大的任期id立刻進(jìn)入新一任的選舉
如果存在多個(gè)Candidate同時(shí)競選的情況朵你,發(fā)送拉票請(qǐng)求也是一段隨機(jī)延時(shí)。
日志同步(Log Replication)
image
Leader選出后接受客戶端請(qǐng)求揣非,Leader把請(qǐng)求日志作為日志條目加入到日志中抡医,然后向其他Follower節(jié)點(diǎn)復(fù)制日志,但超過半數(shù)的日志復(fù)制成功早敬,則Leader將日志應(yīng)用到狀態(tài)機(jī)并向客戶端返回執(zhí)行結(jié)果忌傻,同時(shí)Follower也將結(jié)果提交。如果存在Follower沒有成功復(fù)制日志,Leader會(huì)無限重試搞监。
日志同步的關(guān)鍵點(diǎn):
日志由有序編號(hào)的日志條目組成水孩,每條日志包含創(chuàng)建的任期和用于執(zhí)行的命令,日志是保證所有節(jié)點(diǎn)數(shù)據(jù)一致的關(guān)鍵琐驴。
Leader 負(fù)責(zé)一致性檢查荷愕,同時(shí)讓所有的Follower都和自己保持一致。
在Leader發(fā)生切換時(shí)棍矛,如何保證各節(jié)點(diǎn)日志一致安疗。leader為每一個(gè)follower維護(hù)一個(gè)nextIndex,將index和termid信息發(fā)送至follower,從缺失的termid和index 為follow 補(bǔ)齊數(shù)據(jù)够委,直至和leader完全一致荐类。
只允許主節(jié)點(diǎn)提交包含當(dāng)前term的日志。否則會(huì)出現(xiàn)已經(jīng)commit的日志出現(xiàn)更改的情況
安全性
安全性的原則是一個(gè)term只有一個(gè)leader茁帽,被提交至狀態(tài)機(jī)的數(shù)據(jù)不能發(fā)生更改玉罐。保證安全性主要通過限制leader的選舉來保證:
Candidate在拉票時(shí)需要攜帶本地已持久化的最新的日志信息,如果投票節(jié)點(diǎn)發(fā)現(xiàn)本地的日志信息比Candidate更新潘拨,則拒絕投票吊输。
只允許Leader提交當(dāng)前Term的日志。
擁有最新的已提交的log entry的Follower才有資格成為Leader铁追。
raft協(xié)議實(shí)現(xiàn)
raft的golang的開源實(shí)現(xiàn)主要包含兩個(gè):coreOS的raft實(shí)現(xiàn) , 使用的項(xiàng)目如tidb和cockroachdb這兩個(gè)經(jīng)典的newsql季蚂。另外一個(gè)是hashicrop的raft實(shí)現(xiàn),使用的項(xiàng)目如服務(wù)發(fā)現(xiàn)解決方案consul和時(shí)序數(shù)據(jù)庫influxdb。對(duì)比二者的實(shí)現(xiàn)主要有如下特點(diǎn):
hashicrop的實(shí)現(xiàn)完整度高扭屁,包含了snapshot,wal,storage等算谈,在集成時(shí)只需要關(guān)注業(yè)務(wù)邏輯
etcd中的raft模塊則是raft協(xié)議的輕量級(jí)實(shí)現(xiàn),對(duì)于上述功能只定義了相關(guān)interface料滥,需要業(yè)務(wù)方去具體實(shí)現(xiàn)然眼,優(yōu)點(diǎn)是增加靈活性,etcdserver就是集成raft算法并實(shí)現(xiàn)snapshot,wal,storage這樣一個(gè)應(yīng)用程序。
etcd/raft 代碼結(jié)構(gòu)
日志持久化storage.go:持久化日志保存模塊葵腹,以interface的方式定義了實(shí)現(xiàn)的方式,并基于內(nèi)存實(shí)現(xiàn)了memoryStorage用于存儲(chǔ)日志數(shù)據(jù)高每。log.go:raft算法日志模塊的邏輯log_unstable.go:raft 算法的日志緩存,日志優(yōu)先寫緩存践宴,待狀態(tài)穩(wěn)定后進(jìn)行持久化
節(jié)點(diǎn)node.go: raft集群節(jié)點(diǎn)行為的實(shí)現(xiàn)鲸匿,定義了各節(jié)點(diǎn)通信方式process.go:從leader的角度,為每個(gè)follower維護(hù)一個(gè)子狀態(tài)機(jī)浴井,根據(jù)狀態(tài)的切換決定leader該發(fā)什么消息給Follower.
Raft算法raft.go:raft算法的具體邏輯實(shí)現(xiàn),每個(gè)節(jié)點(diǎn)都有一個(gè)raft實(shí)例read_only.go: 實(shí)現(xiàn)了線性一致讀(linearizable read)霉撵,線性一致讀要求讀請(qǐng)求讀到最新提交的數(shù)據(jù)磺浙。針對(duì)raft存在的stale read(多l(xiāng)eader場景),此模塊通過ReadIndex的方式保證了一致性徒坡。
etcd/raft的實(shí)現(xiàn)分析
分析raft的實(shí)現(xiàn)流程撕氧,我們可以從raft的幾個(gè)核心問題入手:
如何選舉leader?
如何實(shí)現(xiàn)log的復(fù)制喇完?
如何進(jìn)行l(wèi)eadership的transfer伦泥?
如何實(shí)現(xiàn)線性一致讀?
其中l(wèi)eader的選舉锦溪、log復(fù)制和線性一致讀是raft協(xié)議的最基本要求不脯,而leadership的轉(zhuǎn)移在工程實(shí)踐中有重大意義。
核心數(shù)據(jù)結(jié)構(gòu)
struct node node 中主要定義一系列channel,raft的實(shí)現(xiàn)就是通過channel 傳遞消息刻诊,當(dāng)節(jié)點(diǎn)啟動(dòng)通過select機(jī)制監(jiān)聽上述channel確定相應(yīng)的狀態(tài)切換防楷。
// node is the canonical implementation of the Node interfacetype node struct {propc? ? ? chan msgWithResultrecvc? ? ? chan pb.Messageconfc? ? ? chan pb.ConfChangeconfstatec chan pb.ConfStatereadyc? ? chan Readyadvancec? chan struct{}tickc? ? ? chan struct{}done? ? ? chan struct{}stop? ? ? chan struct{}status? ? chan chan Statuslogger Logger}
interface node定義了node要實(shí)現(xiàn)raft算法必須實(shí)現(xiàn)的方法
type Node interface {Tick() //時(shí)鐘的實(shí)現(xiàn),選舉超時(shí)和心跳超時(shí)基于此實(shí)現(xiàn)Campaign(ctx context.Context) error //參與leader競爭Propose(ctx context.Context, data []byte) error //在日志中追加數(shù)據(jù)则涯,需要實(shí)現(xiàn)方保證數(shù)據(jù)追加的成功ProposeConfChange(ctx context.Context, cc pb.ConfChange) error // 集群配置變更Step(ctx context.Context, msg pb.Message) error //根據(jù)消息變更狀態(tài)機(jī)的狀態(tài)//標(biāo)志某一狀態(tài)的完成复局,收到狀態(tài)變化的節(jié)點(diǎn)必須提交變更Ready() <-chan Ready//進(jìn)行狀態(tài)的提交,收到完成標(biāo)志后粟判,必須提交過后節(jié)點(diǎn)才會(huì)實(shí)際進(jìn)行狀態(tài)機(jī)的更新亿昏。在包含快照的場景,為了避免快照落地帶來的長時(shí)間阻塞档礁,允許繼續(xù)接受和提交其他狀態(tài)角钩,即使之前的快照狀態(tài)變更并沒有完成。Advance()//進(jìn)行集群配置變更ApplyConfChange(cc pb.ConfChange) *pb.ConfState//變更leaderTransferLeadership(ctx context.Context, lead, transferee uint64)//保證線性一致性讀,ReadIndex(ctx context.Context, rctx []byte) error//狀態(tài)機(jī)當(dāng)前的配置Status() Status// ReportUnreachable reports the given node is not reachable for the last send.//上報(bào)節(jié)點(diǎn)的不可達(dá)ReportUnreachable(id uint64)//上報(bào)快照狀態(tài)ReportSnapshot(id uint64, status SnapshotStatus)//停止節(jié)點(diǎn)Stop()}
節(jié)點(diǎn)的啟動(dòng)和運(yùn)行
節(jié)點(diǎn)初始化raft彤断,讀取配置啟動(dòng)各個(gè)各個(gè)節(jié)點(diǎn)野舶,初始化logindex.啟動(dòng)后 以for-loop方式循環(huán)運(yùn)行,用select 機(jī)制監(jiān)聽不同的channel 實(shí)現(xiàn)對(duì)狀態(tài)變化的監(jiān)聽宰衙,并執(zhí)行相應(yīng)動(dòng)作平道。
//啟動(dòng)func StartNode(c *Config, peers []Peer) Node {r := newRaft(c) //初始化raft算法實(shí)例r.becomeFollower(1, None)//將配置中的節(jié)點(diǎn)加入集群for _, peer := range peers {...}//初始化logindexr.raftLog.committed = r.raftLog.lastIndex()for _, peer := range peers {//初始化節(jié)點(diǎn)狀態(tài)機(jī)(progress)r.addNode(peer.ID)}n := newNode()n.logger = c.Loggergo n.run(r)return &n}//運(yùn)行func (n *node) run(r *raft) {...select {//接收到寫消息case pm := <-propc:...//接收到readindex 請(qǐng)求case m := <-n.recvc:...//配置變更c(diǎn)ase cc := <-n.confc:...//超時(shí)時(shí)間到,包括心跳超時(shí)和選舉超時(shí)等case <-n.tickc:...//數(shù)據(jù)readycase readyc <- rd:...//可以進(jìn)行狀態(tài)變更和日志提交case <-advancec:...//節(jié)點(diǎn)狀態(tài)信號(hào)case c := <-n.status:...//收到停止信號(hào)case <-n.stop:...}}}
leader 選舉
初始化node為follower,設(shè)置任期為1,并初始化tickElection函數(shù),這是實(shí)際參與選舉的函數(shù),同時(shí)也初始化step為stepFollower供炼,這是作為follower的核心信息處理函數(shù)一屋,后續(xù)選舉,日志復(fù)制和快照等功能都基于此函數(shù)進(jìn)行:
r := newRaft(c)r.becomeFollower(1, None)
當(dāng)節(jié)點(diǎn)接收leader的heartbeat超時(shí)時(shí)(每個(gè)節(jié)點(diǎn)都有隨機(jī)的超時(shí)時(shí)間)袋哼,會(huì)觸發(fā)run函數(shù)中的tickc這個(gè)channel冀墨。發(fā)送MsgHup消息,并調(diào)用campaign參選涛贯, 將自身設(shè)置為candidate,并遞增currentTerm,向其他節(jié)點(diǎn)發(fā)送競選消息诽嘉。其他節(jié)點(diǎn)通過監(jiān)聽propc channel獲取其他節(jié)點(diǎn)發(fā)送的投票消息,并調(diào)用Step對(duì)消息進(jìn)行判斷弟翘,選擇是否投票虫腋。
其中投票的判斷邏輯主要分兩步:1.如果投票信息中的任期id 是否 小于自身的id,則直接返回nil稀余。2.通過isUpToDate判斷能否投票悦冀,通過和本地已存在的最新log比較,首先要有最大任期id睛琳,如果任期id相同則要求有最大的logindex盒蟆。
candidate節(jié)點(diǎn)收到其他節(jié)點(diǎn)的回復(fù)后,判斷獲取的票數(shù)是否超過半數(shù)师骗,如果是則設(shè)置自身為leader,否則為follower历等。
func (n *node) run(r *raft) {? ? ...? ? for {? ? ? ? select {? ? ? ? ? ? ...? ? ? ? ? ? //觸發(fā)heartbeat 超時(shí)? ? ? ? ? ? case <-n.tickc:? ? r.tick()? ? ? ? ? ? ...? ? ? ? }? ? }}//超時(shí)觸發(fā)選舉func (r *raft) tickElection() {r.electionElapsed++if r.promotable() && r.pastElectionTimeout() {r.electionElapsed = 0r.Step(pb.Message{From: r.id, Type: pb.MsgHup})}}//隨機(jī)超時(shí)時(shí)間func (r *raft) pastElectionTimeout() bool {return r.electionElapsed >= r.randomizedElectionTimeout}func (r *raft) resetRandomizedElectionTimeout() {r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)}//參與選舉func (r *raft) campaign(t CampaignType) {var term uint64var voteMsg pb.MessageType//成為candicate,將任期id加1if t == campaignPreElection {r.becomePreCandidate()voteMsg = pb.MsgPreVoteterm = r.Term + 1} else {r.becomeCandidate()voteMsg = pb.MsgVoteterm = r.Term}//判斷獲取的票數(shù)是否超過半數(shù),如果是當(dāng)選為leaderif r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {if t == campaignPreElection {r.campaign(campaignElection)} else {r.becomeLeader()}return}//向其他節(jié)點(diǎn)發(fā)送競選消息for id := range r.prs {if id == r.id {continue}var ctx []byteif t == campaignTransfer {ctx = []byte(t)}r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})}}//節(jié)點(diǎn)投票過程func (r *raft) Step(m pb.Message) error {...//比較任期idcase m.Term > r.Term:if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {force := bytes.Equal(m.Context, []byte(campaignTransfer))inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeoutif !force && inLease {return nil}}switch m.Type {case pb.MsgVote, pb.MsgPreVote:...//與本地最新的持久化日志比較if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {//發(fā)送投票信息r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})if m.Type == pb.MsgVote {// Only record real votes.r.electionElapsed = 0r.Vote = m.From}} ...return nil}func (l *raftLog) isUpToDate(lasti, term uint64) bool {return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex())}//投票結(jié)果判斷case myVoteRespType:gr := r.poll(m.From, m.Type, !m.Reject)//計(jì)算票數(shù)是否超過半數(shù)switch r.quorum() {case gr:if r.state == StatePreCandidate {r.campaign(campaignElection)} else {r.becomeLeader()r.bcastAppend()}case len(r.votes) - gr:r.becomeFollower(r.Term, None)}
日志復(fù)制
node節(jié)點(diǎn)為外界提供了日志提交接口 Propose,在ectd的server對(duì)該接口進(jìn)行了封裝辟癌。Propose 內(nèi)部具體調(diào)用stepWithWaitOption實(shí)現(xiàn)日志消息的傳遞募闲,并阻塞/非阻塞地等待結(jié)果的返回。
func (n *node) Propose(ctx context.Context, data []byte) error {return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})}func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {? ? ...//提交日志數(shù)據(jù)至 node的 propc channel 隊(duì)列select {case ch <- pm:if !wait {? ? //非阻塞直接返回return nil}case <-ctx.Done():return ctx.Err()case <-n.done:return ErrStopped}select {//等待結(jié)果的返回case rsp := <-pm.result:if rsp != nil {return rsp}case <-ctx.Done():return ctx.Err()case <-n.done:return ErrStopped}return nil}
proc消息進(jìn)入stepFollower處理愿待,因?yàn)橹挥衛(wèi)eader才能處理客戶端提交的信息浩螺,因此將消息的接收者設(shè)置為leader后轉(zhuǎn)發(fā)。在stepLeader中調(diào)用appendEntry將消息追到leader的raftLog之中仍侥,但不進(jìn)行數(shù)據(jù)的commit要出。之后調(diào)用bcastAppend 將消息廣播至其他follower節(jié)點(diǎn)。
func stepLeader(r *raft, m pb.Message) error {case pb.MsgProp:...if !r.appendEntry(m.Entries...) {return ErrProposalDropped}r.bcastAppend()...}
follower節(jié)點(diǎn)接收到請(qǐng)求后农渊,調(diào)用handleAppendEntries判斷是否接受leader提交的日志患蹂。判斷邏輯如下:如果leader提交的logindex小于本地已經(jīng)提交的logindex則將本地的logindex回復(fù)給leader或颊。查找追加的日志和本地log的沖突,如果有沖突传于,則先找到?jīng)_突的位置囱挑,用leader的日志從沖突位置開始進(jìn)行覆蓋,日志追加成功后沼溜,返回最新的logindex至leader平挑。如何任期信息不一致,則直接拒絕leader的追加請(qǐng)求系草。
func (r *raft) handleAppendEntries(m pb.Message) {? ? //leader提交的logindex小于本地已經(jīng)提交的logindexif m.Index < r.raftLog.committed {r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})return}//追加日志通熄,可能存在沖突的情況,需要找到?jīng)_突的位置用leader的日志進(jìn)行覆蓋if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {? ? //mlastIndex表示最佳成功的最新位置r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})} else {? ? //任期信息不一致找都,拒絕此次追加請(qǐng)求唇辨,并把最新的logindex回復(fù)給leader,便于進(jìn)行追加r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})}}
leader接收到follower的請(qǐng)求后,針對(duì)拒絕和接收的兩個(gè)場景有不同的處理邏輯能耻,這也是保證follower一致性的關(guān)鍵環(huán)節(jié)赏枚。
follower 正常接收append請(qǐng)求 當(dāng)leader 確認(rèn)follower已經(jīng)接收了append請(qǐng)求后,則調(diào)用maybeCommit進(jìn)行提交晓猛,在提交過程中確認(rèn)各個(gè)節(jié)點(diǎn)的matchindex饿幅,排序后取中間值比較,如果中間值都都比本地的commitindex大鞍帝,就認(rèn)為超過半數(shù)已經(jīng)認(rèn)可此次提交诫睬,可以進(jìn)行commit煞茫,之后調(diào)用sendAppend向所有節(jié)點(diǎn)廣播消息帕涌,follower接收到請(qǐng)求后調(diào)用maybeAppend進(jìn)行日志的提交。值得注意的是续徽,日志的append過程可能由于之前的請(qǐng)求被拒絕蚓曼,等待snapshot或者消息發(fā)送窗口(inflight)已滿導(dǎo)致中止,這時(shí)需要重新向follower節(jié)點(diǎn)發(fā)送最新的append請(qǐng)求钦扭。
? func stepLeader(r *raft, m pb.Message) error {? ? case pb.MsgAppResp:? ? pr.RecentActive = true? ? if m.Reject {...} else {oldPaused := pr.IsPaused()//更新索引信息纫版,更新該follower的match index 和next index.if pr.maybeUpdate(m.Index) {switch {//日志追加成功,狀態(tài)由復(fù)制探測狀態(tài)變成復(fù)制狀態(tài)客情,加快日志的追加case pr.State == ProgressStateProbe:pr.becomeReplicate()case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)pr.becomeProbe()//pr.ins用于限制消息發(fā)送的速率其弊,用于統(tǒng)計(jì)當(dāng)前處于發(fā)送狀態(tài)的日志數(shù)量case pr.State == ProgressStateReplicate:pr.ins.freeTo(m.Index)}//leader進(jìn)行本地的提交if r.maybeCommit() {//廣播至所有follower 通知進(jìn)行l(wèi)og的提交r.bcastAppend()} else if oldPaused {//append請(qǐng)求被中止,則重新發(fā)送最新的請(qǐng)求r.sendAppend(m.From)}}}}? ? }? ? ? ? func (r *raft) maybeCommit() bool {if cap(r.matchBuf) < len(r.prs) {r.matchBuf = make(uint64Slice, len(r.prs))}mis := r.matchBuf[:len(r.prs)]idx := 0for _, p := range r.prs {mis[idx] = p.Matchidx++}//排序取取中間值sort.Sort(mis)mci := mis[len(mis)-r.quorum()]return r.raftLog.maybeCommit(mci, r.Term)}func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {//match的中間值是否已經(jīng)大于本地已經(jīng)commit的matchindexif maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {l.commitTo(maxIndex)return true}return false}
follower拒絕leader的append請(qǐng)求 在異常情況下膀斋,follower會(huì)拒絕leader的append請(qǐng)求梭伐。其判斷邏輯主要位于matchTerm,當(dāng)leader append請(qǐng)求中的logindex在當(dāng)前節(jié)點(diǎn)已提交的日志中到不到對(duì)應(yīng)的任期,或者任期與leader提交的任期不一致時(shí)follower會(huì)拒絕當(dāng)前append請(qǐng)求仰担。leader接收到拒絕請(qǐng)求后會(huì)進(jìn)入探測狀態(tài)糊识,探測follower最新匹配的位置。
? //follower接收leader的請(qǐng)求? func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {? ? ? if l.matchTerm(index, logTerm) {? ? ? ...? ? ? }? ? ? //拒絕leader當(dāng)前的append請(qǐng)求? ? ? return 0, false? ? }? //對(duì)leader提交append請(qǐng)求中的logindex和termid進(jìn)行判斷? func (l *raftLog) matchTerm(i, term uint64) bool {? ? ? t, err := l.term(i)? ? ? if err != nil {? ? ? return false? ? ? }? ? ? return t == term? ? }? ? ? ? func stepLeader(r *raft, m pb.Message) error {? ? ? case pb.MsgAppResp:? ? ? pr.RecentActive = true? ? ? ? ? if m.Reject {? ? ? if pr.maybeDecrTo(m.Index, m.RejectHint) {? ? ? //由復(fù)制狀態(tài)進(jìn)入探測狀態(tài),探測follower最新的匹配位置? ? ? if pr.State == ProgressStateReplicate {? ? ? pr.becomeProbe()? ? ? }? ? ? r.sendAppend(m.From)? ? ? }? ? }
下面來分析leader接收到拒絕請(qǐng)求后的處理邏輯赂苗。由于各種原因可能導(dǎo)致follower節(jié)點(diǎn)的日志與leader不一致愉耙,如下圖所示:
日志同步
在raft的論文中提出通過遍歷index和term的方式保證日志的一致性。具體的實(shí)現(xiàn)位于maybeDecrTo拌滋,因?yàn)閒ollower在拒絕請(qǐng)求時(shí)帶上了當(dāng)前最新的logindex朴沿,因此在進(jìn)行日志補(bǔ)推時(shí),直接將next至為follower中最新的logindex 和當(dāng)前index中的最小值鸠真。
func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {? ? ? if pr.State == ProgressStateReplicate {? ? ? if rejected <= pr.Match {? ? ? return false? ? ? }? ? ? // directly decrease next to match + 1? ? ? //復(fù)制狀態(tài)將pr的next置為當(dāng)前匹配位置+1? ? ? pr.Next = pr.Match + 1? ? ? return true? ? ? }? ? ? ? ? if pr.Next-1 != rejected {? ? ? return false? ? ? }? ? ? //如果是探測狀態(tài)悯仙,則將next置為follower中最新的logindex? 和當(dāng)前index中的最小值。? ? ? if pr.Next = min(rejected, last+1); pr.Next < 1 {? ? ? pr.Next = 1? ? ? }? ? ? pr.resume()? ? ? return true? ? }? ? ? 日志推送的具體實(shí)現(xiàn)位于maybeSendAppend.func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {pr := r.getProgress(to)if pr.IsPaused() {return false}m := pb.Message{}m.To = to//發(fā)送給follower的最后一條日志對(duì)應(yīng)的任期term, errt := r.raftLog.term(pr.Next - 1)//需要發(fā)送給follower的日志條數(shù)ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)if len(ents) == 0 && !sendIfEmpty {return false}if errt != nil || erre != nil { // send snapshot if we failed to get term or entries...} else {m.Type = pb.MsgAppm.Index = pr.Next - 1m.LogTerm = termm.Entries = ents//leader 已經(jīng)提交的最新indexm.Commit = r.raftLog.committedif n := len(m.Entries); n != 0 {switch pr.State {//在日志復(fù)制狀態(tài)吠卷,樂觀地增加next, 加快日志的推送速度case ProgressStateReplicate:last := m.Entries[n-1].Indexpr.optimisticUpdate(last)pr.ins.add(last)case ProgressStateProbe:pr.pause()default:r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)}}}r.send(m)return true}
至此raft集群的日志復(fù)制基本已經(jīng)完成锡垄,但是僅限于raft協(xié)議層面,日志和快照目前還是保存在Ready結(jié)構(gòu)中祭隔,并放入了readyc隊(duì)列货岭,等待上游的模塊處理。之前提到過etcd-raft 只是協(xié)議層的實(shí)現(xiàn)疾渴,提供了WAL渠退,snapshot和storage等模塊的擴(kuò)展接口休蟹,應(yīng)用層需要實(shí)現(xiàn)上述接口最終實(shí)現(xiàn)的數(shù)據(jù)的落地。
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {...//日志數(shù)據(jù)rd := Ready{Entries:? ? ? ? ? r.raftLog.unstableEntries(),CommittedEntries: r.raftLog.nextEnts(),Messages:? ? ? ? r.msgs,}...}
leadership transfer
leadership transfer 指的是leader身份的轉(zhuǎn)換,raft提供接口允許客戶端進(jìn)行l(wèi)eader切換大州,此功能可用來做負(fù)載均衡,讓客戶端有機(jī)會(huì)結(jié)合實(shí)際的機(jī)器和負(fù)載情況去選擇最優(yōu)的leader雷绢;同時(shí)也是multi-raft實(shí)現(xiàn)的基礎(chǔ)争群。下面具體分析transfer的實(shí)現(xiàn)。
raft協(xié)議提供了transferLeaderShip方法供應(yīng)用層使用用于觸發(fā)leader的轉(zhuǎn)換店量,transferLeaderShip會(huì)發(fā)送MsgTransferLeader類型消息至recvc消息隊(duì)列中(channel)芜果。當(dāng)follower收到TransferLeader消息后不處理將消息轉(zhuǎn)發(fā)至leader進(jìn)行處理。
//etcd/raft/raft.go func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) {? ? select {? ? //通過recvc發(fā)送MsgTransferLeader消息至集群中節(jié)點(diǎn)? ? case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}:? ? case <-n.done:? ? case <-ctx.Done():? ? }? ? }
leader收到transfer消息后融师,如果發(fā)現(xiàn)當(dāng)前正在進(jìn)行l(wèi)eader切換或者不發(fā)生leader變換則直接放棄右钾。一個(gè)節(jié)點(diǎn)要成為leader的要求是有最新的日志數(shù)據(jù)。如果有則立即發(fā)送MsgTimeoutNow消息旱爆,transfee收到消息后立即調(diào)用campaign方法進(jìn)行選擇舀射,而不是像正常leader選舉時(shí)需要等待超時(shí),而且也不需要采用預(yù)投票的方式怀伦,之后的選舉流程與正常選舉過程一致脆烟。如果transfee沒有最新的日志數(shù)據(jù),則leader進(jìn)行日志的同步空镜,當(dāng)同步完成收到回復(fù)且正處在leader transfer的過程中浩淘,發(fā)送MsgTimeoutNow捌朴,之后與上述流程一致。
//etcd/raft/raft.go func stepLeader(r *raft, m pb.Message) error {switch m.Type {...case pb.MsgTransferLeader:if pr.IsLearner {r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)return nil}leadTransferee := m.FromlastLeadTransferee := r.leadTransferee//上一次transfer正在進(jìn)行if lastLeadTransferee != None {if lastLeadTransferee == leadTransferee {r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",r.id, r.Term, leadTransferee, leadTransferee)return nil}r.abortLeaderTransfer()r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)}//transfee和當(dāng)前l(fā)eader相同if leadTransferee == r.id {r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)return nil}// Transfer leadership to third party.// Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.r.electionElapsed = 0r.leadTransferee = leadTransfereeif pr.Match == r.raftLog.lastIndex() {//transfee的日志已經(jīng)是最新和leader保持一致了张抄,則立刻發(fā)送MsgTimeoutNow砂蔽,觸發(fā)選舉r.sendTimeoutNow(leadTransferee)r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)} else {//日志非最新進(jìn)行日志的同步r.sendAppend(leadTransferee)}}return nil}
線性一致讀
線性一致性讀是分布式系統(tǒng)的基本要求,在raft中l(wèi)eader和follower都可以接受讀請(qǐng)求署惯,但在以下場景下可能出現(xiàn)數(shù)據(jù)的不一致:
Leader和Follower復(fù)制期間的狀態(tài)不一致
因?yàn)榫W(wǎng)絡(luò)分區(qū)導(dǎo)致多個(gè)leader的存在左驾,不同leader間的狀態(tài)不一致,即腦裂(split-brain)現(xiàn)象极谊。如果請(qǐng)求分別被新舊leader處理诡右,所得的結(jié)果也不一致
為解決raft的線性一致讀問題,etcd-raft提供了兩種實(shí)現(xiàn)方案:
ReadIndex(ReadOnlySafe)轻猖。其原理是接收到客戶端請(qǐng)求后帆吻,向集群發(fā)起ReadIndex請(qǐng)求來讀取commitedIndex,Leader收到請(qǐng)求后向節(jié)點(diǎn)發(fā)送心跳,當(dāng)收到大多數(shù)節(jié)點(diǎn)的確認(rèn)自己仍是leader后咙边,回復(fù)ReadIndex請(qǐng)求并告知最新的commitedIndex猜煮。ReadIndex是etcd-raft的默認(rèn)方案。
Lease read方案(ReadOnlyLeaseBased)败许。其原理是通過維護(hù)leader的租期王带,確認(rèn)leader的唯一性,不需要通過心跳來進(jìn)行l(wèi)eader的確認(rèn)市殷。其風(fēng)險(xiǎn)在于需要全局一直的時(shí)鐘來保證lease機(jī)制的準(zhǔn)確性愕撰。etcd-raft不推薦采用此方案,pingcap開源的分布式數(shù)據(jù)庫tidb中的pd 模塊在實(shí)現(xiàn)TSO(Timestamp Oracle)的前提下醋寝,采用此方案搞挣。
ReadIndex實(shí)現(xiàn)分析
在raft初始化的過程中完成了linearizable read的配置,包括需要采用的方案甥桂。
? func newRaft(c *Config) *raft {? ...? }? r := &raft{? id:? ? ? ? ? ? ? ? ? ? ? ? c.ID,? ...? //初始化readOnly配置? readOnly:? ? ? ? ? ? ? ? ? newReadOnly(c.ReadOnlyOption),? disableProposalForwarding: c.DisableProposalForwarding,? }? }? ? ? const (? //ReadIndex方案? ReadOnlySafe ReadOnlyOption = iota? //leaseRead方案? ReadOnlyLeaseBased? )
阻塞的recvc channel收到ReadIndex請(qǐng)求后柿究,將請(qǐng)求加入隊(duì)列邮旷,初始化ReadIndex狀態(tài)黄选。之后發(fā)送廣播心跳。
? func stepLeader(r *raft, m pb.Message) error {? switch m.Type {? ...? case pb.MsgReadIndex:? switch r.readOnly.option {? case ReadOnlySafe:? //加入請(qǐng)求隊(duì)列? r.readOnly.addRequest(r.raftLog.committed, m)? //廣播心跳消息? r.bcastHeartbeatWithCtx(m.Entries[0].Data)? }? } else {? r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})? }? }? }? ? ? func (ro *readOnly) addRequest(index uint64, m pb.Message) {? ctx := string(m.Entries[0].Data)? if _, ok := ro.pendingReadIndex[ctx]; ok {? return? }? //index是當(dāng)前集群的committedIndex,acks 用來收集節(jié)點(diǎn)心跳回復(fù)包? ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}? ro.readIndexQueue = append(ro.readIndexQueue, ctx)? }
當(dāng)leader收到心跳回復(fù)后婶肩,對(duì)心跳進(jìn)行統(tǒng)計(jì)办陷,如果是本地請(qǐng)求直接將消息追加到readstatus中,最終會(huì)由newReady函數(shù)將消息發(fā)送到readyc channel,監(jiān)聽ready channel的客戶端會(huì)最終回復(fù)請(qǐng)求律歼。
? func stepLeader(r *raft, m pb.Message) error {? case pb.MsgHeartbeatResp:? ...? }? //統(tǒng)計(jì)回復(fù)結(jié)果民镜,如果未超過半數(shù)則直接返回? ackCount := r.readOnly.recvAck(m)? if ackCount < r.quorum() {? return nil? }? ? ? rss := r.readOnly.advance(m)? for _, rs := range rss {? req := rs.req? //如果是本地的請(qǐng)求? if req.From == None || req.From == r.id { // from local member? r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})? } else {? //如果是來自follower的請(qǐng)求,將結(jié)果返回給follower? r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})? }? }? }? ? ? func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {? rd := Ready{? Entries:? ? ? ? ? r.raftLog.unstableEntries(),? CommittedEntries: r.raftLog.nextEnts(),? Messages:? ? ? ? r.msgs,? }? ...? //readIndex消息追加? if len(r.readStates) != 0 {? rd.ReadStates = r.readStates? }? rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))? return rd? }? ? ? func (n *node) run(r *raft) {? ....? for {? if advancec != nil {? readyc = nil? } else {? //消息加入readyc隊(duì)列? rd = newReady(r, prevSoftSt, prevHardSt)? if rd.containsUpdates() {? readyc = n.readyc? } else {? readyc = nil? }? }? ....? }
如果是follower接收到ReadIndex請(qǐng)求险毁,直接將消息轉(zhuǎn)發(fā)至leader制圈,leader按上述流程處理们童,follower接收到消息后采用上述類似機(jī)制加入readyc隊(duì)列,異步回復(fù)客戶端鲸鹦。
? func stepFollower(r *raft, m pb.Message) error {? ...? case pb.MsgReadIndex:? if r.lead == None {? r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)? return nil? }? //將ReadIndex請(qǐng)求轉(zhuǎn)發(fā)給leader? m.To = r.lead? r.send(m)? case pb.MsgReadIndexResp:? if len(m.Entries) != 1 {? r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))? return nil? }? //收到leader回復(fù)后將消息加入readStatus? r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})? ...? }
總結(jié)
本文從raft算法的基本原理出發(fā)慧库,簡單的分析了leader選舉和日志復(fù)制的實(shí)現(xiàn)過程。之后從工程實(shí)踐的角度出發(fā)分析了etcd-raft的代碼實(shí)現(xiàn)馋嗜,重點(diǎn)剖析了leader選舉齐板,日志復(fù)制,leadership transfer和線性一致讀的核心流程葛菇。而raft算法博大精深甘磨,etcd也是工業(yè)級(jí)的完整實(shí)現(xiàn),除了本文介紹的幾個(gè)核心環(huán)節(jié)外眯停,leader的預(yù)選舉(prevote)济舆、節(jié)點(diǎn)成員變更、配置變更和日志的批量追加等也是raft的關(guān)鍵環(huán)節(jié)莺债,因篇幅所限就不再一一介紹吗冤。