轉(zhuǎn)載:深入淺出etcd之raft實(shí)現(xiàn)

轉(zhuǎn)載:深入淺出etcd之raft實(shí)現(xiàn)

導(dǎo)語

etcd是coreOS使用golang開發(fā)的分布式威兜,一致性的kv存儲(chǔ)系統(tǒng)募逞,因其易用性和高可靠性被廣泛運(yùn)用于服務(wù)發(fā)現(xiàn)船万、消息發(fā)布和訂閱、分布式鎖和共享配置等方面墩划,也被認(rèn)為是zookeeper的強(qiáng)有力的競爭者春塌。作為分布式kv,其底層使用raft算法實(shí)現(xiàn)多副本數(shù)據(jù)的強(qiáng)一致性簇捍。etcd作為raft開源實(shí)現(xiàn)的標(biāo)桿只壳,在設(shè)計(jì)上,將 raft 算法邏輯和持久化暑塑、網(wǎng)絡(luò)吼句、線程等完全抽離出來單獨(dú)實(shí)現(xiàn),充分解耦事格,在工程上惕艳,實(shí)現(xiàn)了諸多性能優(yōu)化,是 raft 開源實(shí)踐中較早的工業(yè)級(jí)的實(shí)現(xiàn)驹愚,很多后來的 raft 實(shí)踐者都直接或者間接的參考了 ectd-raft 的設(shè)計(jì)和實(shí)現(xiàn)远搪,例如kubernetes,tiDb等逢捺。其廣泛的影響力和優(yōu)雅的golang代碼實(shí)踐也使得ectd成為golang的明星項(xiàng)目谁鳍。在我們實(shí)際的分布式存儲(chǔ)系統(tǒng)的項(xiàng)目開發(fā)中,raft也被應(yīng)用于元信息管理和數(shù)據(jù)存儲(chǔ)等多個(gè)模塊劫瞳,因此熟悉和理解etcd-raft的實(shí)現(xiàn)具有重大意義倘潜,本文從raft的基本原理出發(fā),深入淺出地分析了raft在ectd中的具體實(shí)現(xiàn)志于。

raft原理

架構(gòu)

image

每個(gè)節(jié)點(diǎn)都包含狀態(tài)機(jī)涮因,日志模塊和一致性模塊。功能分別是:

狀態(tài)機(jī):數(shù)據(jù)一致性指的即是狀態(tài)機(jī)的一致性伺绽,從內(nèi)部服務(wù)看表現(xiàn)為狀態(tài)機(jī)中的數(shù)據(jù)都保持一致

log模塊:保存了所有的操作記錄

一致性模塊:一致性模塊算法保證寫入log命令的一致性养泡,是raft的核心內(nèi)容。

實(shí)現(xiàn)一致性的過程可分為Leader選舉(Leader election)奈应,日志同步(Log replication),安全性(safty),日志壓縮(Log compaction)瓤荔,成員變更(membership change)

leader 選舉

競選過程

節(jié)點(diǎn)由Follower變?yōu)镃andidate,同時(shí)設(shè)置當(dāng)前Term钥组。

Candidate給自己投票输硝,帶上termid 和日志序號(hào),同時(shí)向其他節(jié)點(diǎn)發(fā)送拉票請(qǐng)求

等待結(jié)果程梦,成為Leader,follower 或者在選舉未成為產(chǎn)生結(jié)果的情況下節(jié)點(diǎn)狀態(tài)保持為Candidatae点把。

選舉結(jié)果

成功當(dāng)選收到超過半數(shù)的選票時(shí)橘荠,成為Leader,定時(shí)給其他節(jié)點(diǎn)發(fā)送心跳,并帶上任期id,其他節(jié)點(diǎn)發(fā)現(xiàn)當(dāng)前的任期id小于接收到leader發(fā)送過來的id,則將將狀態(tài)切換至follower.

選舉失敗在Candidate狀態(tài)接收到其他節(jié)點(diǎn)發(fā)送的心跳信息郎逃,且心跳中的任期id大于自己哥童,則變?yōu)閒ollower。

未產(chǎn)生結(jié)果沒有一個(gè)Candidate所獲得的選票超過半數(shù)褒翰,未產(chǎn)生leader,則Candidate再進(jìn)入下一輪投票贮懈。為了避免長期沒有l(wèi)eader產(chǎn)生,raft采用如下策略避免:

選舉超時(shí)時(shí)間為隨機(jī)值优训,第一個(gè)超時(shí)的節(jié)點(diǎn)帶著最大的任期id立刻進(jìn)入新一任的選舉

如果存在多個(gè)Candidate同時(shí)競選的情況朵你,發(fā)送拉票請(qǐng)求也是一段隨機(jī)延時(shí)。

日志同步(Log Replication)

image

Leader選出后接受客戶端請(qǐng)求揣非,Leader把請(qǐng)求日志作為日志條目加入到日志中抡医,然后向其他Follower節(jié)點(diǎn)復(fù)制日志,但超過半數(shù)的日志復(fù)制成功早敬,則Leader將日志應(yīng)用到狀態(tài)機(jī)并向客戶端返回執(zhí)行結(jié)果忌傻,同時(shí)Follower也將結(jié)果提交。如果存在Follower沒有成功復(fù)制日志,Leader會(huì)無限重試搞监。

日志同步的關(guān)鍵點(diǎn):

日志由有序編號(hào)的日志條目組成水孩,每條日志包含創(chuàng)建的任期和用于執(zhí)行的命令,日志是保證所有節(jié)點(diǎn)數(shù)據(jù)一致的關(guān)鍵琐驴。

Leader 負(fù)責(zé)一致性檢查荷愕,同時(shí)讓所有的Follower都和自己保持一致。

在Leader發(fā)生切換時(shí)棍矛,如何保證各節(jié)點(diǎn)日志一致安疗。leader為每一個(gè)follower維護(hù)一個(gè)nextIndex,將index和termid信息發(fā)送至follower,從缺失的termid和index 為follow 補(bǔ)齊數(shù)據(jù)够委,直至和leader完全一致荐类。

只允許主節(jié)點(diǎn)提交包含當(dāng)前term的日志。否則會(huì)出現(xiàn)已經(jīng)commit的日志出現(xiàn)更改的情況

安全性

安全性的原則是一個(gè)term只有一個(gè)leader茁帽,被提交至狀態(tài)機(jī)的數(shù)據(jù)不能發(fā)生更改玉罐。保證安全性主要通過限制leader的選舉來保證:

Candidate在拉票時(shí)需要攜帶本地已持久化的最新的日志信息,如果投票節(jié)點(diǎn)發(fā)現(xiàn)本地的日志信息比Candidate更新潘拨,則拒絕投票吊输。

只允許Leader提交當(dāng)前Term的日志。

擁有最新的已提交的log entry的Follower才有資格成為Leader铁追。

raft協(xié)議實(shí)現(xiàn)

raft的golang的開源實(shí)現(xiàn)主要包含兩個(gè):coreOS的raft實(shí)現(xiàn) , 使用的項(xiàng)目如tidb和cockroachdb這兩個(gè)經(jīng)典的newsql季蚂。另外一個(gè)是hashicrop的raft實(shí)現(xiàn),使用的項(xiàng)目如服務(wù)發(fā)現(xiàn)解決方案consul和時(shí)序數(shù)據(jù)庫influxdb。對(duì)比二者的實(shí)現(xiàn)主要有如下特點(diǎn):

hashicrop的實(shí)現(xiàn)完整度高扭屁,包含了snapshot,wal,storage等算谈,在集成時(shí)只需要關(guān)注業(yè)務(wù)邏輯

etcd中的raft模塊則是raft協(xié)議的輕量級(jí)實(shí)現(xiàn),對(duì)于上述功能只定義了相關(guān)interface料滥,需要業(yè)務(wù)方去具體實(shí)現(xiàn)然眼,優(yōu)點(diǎn)是增加靈活性,etcdserver就是集成raft算法并實(shí)現(xiàn)snapshot,wal,storage這樣一個(gè)應(yīng)用程序。

etcd/raft 代碼結(jié)構(gòu)

日志持久化storage.go:持久化日志保存模塊葵腹,以interface的方式定義了實(shí)現(xiàn)的方式,并基于內(nèi)存實(shí)現(xiàn)了memoryStorage用于存儲(chǔ)日志數(shù)據(jù)高每。log.go:raft算法日志模塊的邏輯log_unstable.go:raft 算法的日志緩存,日志優(yōu)先寫緩存践宴,待狀態(tài)穩(wěn)定后進(jìn)行持久化

節(jié)點(diǎn)node.go: raft集群節(jié)點(diǎn)行為的實(shí)現(xiàn)鲸匿,定義了各節(jié)點(diǎn)通信方式process.go:從leader的角度,為每個(gè)follower維護(hù)一個(gè)子狀態(tài)機(jī)浴井,根據(jù)狀態(tài)的切換決定leader該發(fā)什么消息給Follower.

Raft算法raft.go:raft算法的具體邏輯實(shí)現(xiàn),每個(gè)節(jié)點(diǎn)都有一個(gè)raft實(shí)例read_only.go: 實(shí)現(xiàn)了線性一致讀(linearizable read)霉撵,線性一致讀要求讀請(qǐng)求讀到最新提交的數(shù)據(jù)磺浙。針對(duì)raft存在的stale read(多l(xiāng)eader場景),此模塊通過ReadIndex的方式保證了一致性徒坡。

etcd/raft的實(shí)現(xiàn)分析

分析raft的實(shí)現(xiàn)流程撕氧,我們可以從raft的幾個(gè)核心問題入手:

如何選舉leader?

如何實(shí)現(xiàn)log的復(fù)制喇完?

如何進(jìn)行l(wèi)eadership的transfer伦泥?

如何實(shí)現(xiàn)線性一致讀?

其中l(wèi)eader的選舉锦溪、log復(fù)制和線性一致讀是raft協(xié)議的最基本要求不脯,而leadership的轉(zhuǎn)移在工程實(shí)踐中有重大意義。

核心數(shù)據(jù)結(jié)構(gòu)

struct node node 中主要定義一系列channel,raft的實(shí)現(xiàn)就是通過channel 傳遞消息刻诊,當(dāng)節(jié)點(diǎn)啟動(dòng)通過select機(jī)制監(jiān)聽上述channel確定相應(yīng)的狀態(tài)切換防楷。

// node is the canonical implementation of the Node interfacetype node struct {propc? ? ? chan msgWithResultrecvc? ? ? chan pb.Messageconfc? ? ? chan pb.ConfChangeconfstatec chan pb.ConfStatereadyc? ? chan Readyadvancec? chan struct{}tickc? ? ? chan struct{}done? ? ? chan struct{}stop? ? ? chan struct{}status? ? chan chan Statuslogger Logger}

interface node定義了node要實(shí)現(xiàn)raft算法必須實(shí)現(xiàn)的方法

type Node interface {Tick() //時(shí)鐘的實(shí)現(xiàn),選舉超時(shí)和心跳超時(shí)基于此實(shí)現(xiàn)Campaign(ctx context.Context) error //參與leader競爭Propose(ctx context.Context, data []byte) error //在日志中追加數(shù)據(jù)则涯,需要實(shí)現(xiàn)方保證數(shù)據(jù)追加的成功ProposeConfChange(ctx context.Context, cc pb.ConfChange) error // 集群配置變更Step(ctx context.Context, msg pb.Message) error //根據(jù)消息變更狀態(tài)機(jī)的狀態(tài)//標(biāo)志某一狀態(tài)的完成复局,收到狀態(tài)變化的節(jié)點(diǎn)必須提交變更Ready() <-chan Ready//進(jìn)行狀態(tài)的提交,收到完成標(biāo)志后粟判,必須提交過后節(jié)點(diǎn)才會(huì)實(shí)際進(jìn)行狀態(tài)機(jī)的更新亿昏。在包含快照的場景,為了避免快照落地帶來的長時(shí)間阻塞档礁,允許繼續(xù)接受和提交其他狀態(tài)角钩,即使之前的快照狀態(tài)變更并沒有完成。Advance()//進(jìn)行集群配置變更ApplyConfChange(cc pb.ConfChange) *pb.ConfState//變更leaderTransferLeadership(ctx context.Context, lead, transferee uint64)//保證線性一致性讀,ReadIndex(ctx context.Context, rctx []byte) error//狀態(tài)機(jī)當(dāng)前的配置Status() Status// ReportUnreachable reports the given node is not reachable for the last send.//上報(bào)節(jié)點(diǎn)的不可達(dá)ReportUnreachable(id uint64)//上報(bào)快照狀態(tài)ReportSnapshot(id uint64, status SnapshotStatus)//停止節(jié)點(diǎn)Stop()}

節(jié)點(diǎn)的啟動(dòng)和運(yùn)行

節(jié)點(diǎn)初始化raft彤断,讀取配置啟動(dòng)各個(gè)各個(gè)節(jié)點(diǎn)野舶,初始化logindex.啟動(dòng)后 以for-loop方式循環(huán)運(yùn)行,用select 機(jī)制監(jiān)聽不同的channel 實(shí)現(xiàn)對(duì)狀態(tài)變化的監(jiān)聽宰衙,并執(zhí)行相應(yīng)動(dòng)作平道。

//啟動(dòng)func StartNode(c *Config, peers []Peer) Node {r := newRaft(c) //初始化raft算法實(shí)例r.becomeFollower(1, None)//將配置中的節(jié)點(diǎn)加入集群for _, peer := range peers {...}//初始化logindexr.raftLog.committed = r.raftLog.lastIndex()for _, peer := range peers {//初始化節(jié)點(diǎn)狀態(tài)機(jī)(progress)r.addNode(peer.ID)}n := newNode()n.logger = c.Loggergo n.run(r)return &n}//運(yùn)行func (n *node) run(r *raft) {...select {//接收到寫消息case pm := <-propc:...//接收到readindex 請(qǐng)求case m := <-n.recvc:...//配置變更c(diǎn)ase cc := <-n.confc:...//超時(shí)時(shí)間到,包括心跳超時(shí)和選舉超時(shí)等case <-n.tickc:...//數(shù)據(jù)readycase readyc <- rd:...//可以進(jìn)行狀態(tài)變更和日志提交case <-advancec:...//節(jié)點(diǎn)狀態(tài)信號(hào)case c := <-n.status:...//收到停止信號(hào)case <-n.stop:...}}}

leader 選舉

初始化node為follower,設(shè)置任期為1,并初始化tickElection函數(shù),這是實(shí)際參與選舉的函數(shù),同時(shí)也初始化step為stepFollower供炼,這是作為follower的核心信息處理函數(shù)一屋,后續(xù)選舉,日志復(fù)制和快照等功能都基于此函數(shù)進(jìn)行:

r := newRaft(c)r.becomeFollower(1, None)

當(dāng)節(jié)點(diǎn)接收leader的heartbeat超時(shí)時(shí)(每個(gè)節(jié)點(diǎn)都有隨機(jī)的超時(shí)時(shí)間)袋哼,會(huì)觸發(fā)run函數(shù)中的tickc這個(gè)channel冀墨。發(fā)送MsgHup消息,并調(diào)用campaign參選涛贯, 將自身設(shè)置為candidate,并遞增currentTerm,向其他節(jié)點(diǎn)發(fā)送競選消息诽嘉。其他節(jié)點(diǎn)通過監(jiān)聽propc channel獲取其他節(jié)點(diǎn)發(fā)送的投票消息,并調(diào)用Step對(duì)消息進(jìn)行判斷弟翘,選擇是否投票虫腋。

其中投票的判斷邏輯主要分兩步:1.如果投票信息中的任期id 是否 小于自身的id,則直接返回nil稀余。2.通過isUpToDate判斷能否投票悦冀,通過和本地已存在的最新log比較,首先要有最大任期id睛琳,如果任期id相同則要求有最大的logindex盒蟆。

candidate節(jié)點(diǎn)收到其他節(jié)點(diǎn)的回復(fù)后,判斷獲取的票數(shù)是否超過半數(shù)师骗,如果是則設(shè)置自身為leader,否則為follower历等。

func (n *node) run(r *raft) {? ? ...? ? for {? ? ? ? select {? ? ? ? ? ? ...? ? ? ? ? ? //觸發(fā)heartbeat 超時(shí)? ? ? ? ? ? case <-n.tickc:? ? r.tick()? ? ? ? ? ? ...? ? ? ? }? ? }}//超時(shí)觸發(fā)選舉func (r *raft) tickElection() {r.electionElapsed++if r.promotable() && r.pastElectionTimeout() {r.electionElapsed = 0r.Step(pb.Message{From: r.id, Type: pb.MsgHup})}}//隨機(jī)超時(shí)時(shí)間func (r *raft) pastElectionTimeout() bool {return r.electionElapsed >= r.randomizedElectionTimeout}func (r *raft) resetRandomizedElectionTimeout() {r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)}//參與選舉func (r *raft) campaign(t CampaignType) {var term uint64var voteMsg pb.MessageType//成為candicate,將任期id加1if t == campaignPreElection {r.becomePreCandidate()voteMsg = pb.MsgPreVoteterm = r.Term + 1} else {r.becomeCandidate()voteMsg = pb.MsgVoteterm = r.Term}//判斷獲取的票數(shù)是否超過半數(shù),如果是當(dāng)選為leaderif r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {if t == campaignPreElection {r.campaign(campaignElection)} else {r.becomeLeader()}return}//向其他節(jié)點(diǎn)發(fā)送競選消息for id := range r.prs {if id == r.id {continue}var ctx []byteif t == campaignTransfer {ctx = []byte(t)}r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})}}//節(jié)點(diǎn)投票過程func (r *raft) Step(m pb.Message) error {...//比較任期idcase m.Term > r.Term:if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {force := bytes.Equal(m.Context, []byte(campaignTransfer))inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeoutif !force && inLease {return nil}}switch m.Type {case pb.MsgVote, pb.MsgPreVote:...//與本地最新的持久化日志比較if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {//發(fā)送投票信息r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})if m.Type == pb.MsgVote {// Only record real votes.r.electionElapsed = 0r.Vote = m.From}} ...return nil}func (l *raftLog) isUpToDate(lasti, term uint64) bool {return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex())}//投票結(jié)果判斷case myVoteRespType:gr := r.poll(m.From, m.Type, !m.Reject)//計(jì)算票數(shù)是否超過半數(shù)switch r.quorum() {case gr:if r.state == StatePreCandidate {r.campaign(campaignElection)} else {r.becomeLeader()r.bcastAppend()}case len(r.votes) - gr:r.becomeFollower(r.Term, None)}

日志復(fù)制

node節(jié)點(diǎn)為外界提供了日志提交接口 Propose,在ectd的server對(duì)該接口進(jìn)行了封裝辟癌。Propose 內(nèi)部具體調(diào)用stepWithWaitOption實(shí)現(xiàn)日志消息的傳遞募闲,并阻塞/非阻塞地等待結(jié)果的返回。

func (n *node) Propose(ctx context.Context, data []byte) error {return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})}func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {? ? ...//提交日志數(shù)據(jù)至 node的 propc channel 隊(duì)列select {case ch <- pm:if !wait {? ? //非阻塞直接返回return nil}case <-ctx.Done():return ctx.Err()case <-n.done:return ErrStopped}select {//等待結(jié)果的返回case rsp := <-pm.result:if rsp != nil {return rsp}case <-ctx.Done():return ctx.Err()case <-n.done:return ErrStopped}return nil}

proc消息進(jìn)入stepFollower處理愿待,因?yàn)橹挥衛(wèi)eader才能處理客戶端提交的信息浩螺,因此將消息的接收者設(shè)置為leader后轉(zhuǎn)發(fā)。在stepLeader中調(diào)用appendEntry將消息追到leader的raftLog之中仍侥,但不進(jìn)行數(shù)據(jù)的commit要出。之后調(diào)用bcastAppend 將消息廣播至其他follower節(jié)點(diǎn)。

func stepLeader(r *raft, m pb.Message) error {case pb.MsgProp:...if !r.appendEntry(m.Entries...) {return ErrProposalDropped}r.bcastAppend()...}

follower節(jié)點(diǎn)接收到請(qǐng)求后农渊,調(diào)用handleAppendEntries判斷是否接受leader提交的日志患蹂。判斷邏輯如下:如果leader提交的logindex小于本地已經(jīng)提交的logindex則將本地的logindex回復(fù)給leader或颊。查找追加的日志和本地log的沖突,如果有沖突传于,則先找到?jīng)_突的位置囱挑,用leader的日志從沖突位置開始進(jìn)行覆蓋,日志追加成功后沼溜,返回最新的logindex至leader平挑。如何任期信息不一致,則直接拒絕leader的追加請(qǐng)求系草。

func (r *raft) handleAppendEntries(m pb.Message) {? ? //leader提交的logindex小于本地已經(jīng)提交的logindexif m.Index < r.raftLog.committed {r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})return}//追加日志通熄,可能存在沖突的情況,需要找到?jīng)_突的位置用leader的日志進(jìn)行覆蓋if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {? ? //mlastIndex表示最佳成功的最新位置r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})} else {? ? //任期信息不一致找都,拒絕此次追加請(qǐng)求唇辨,并把最新的logindex回復(fù)給leader,便于進(jìn)行追加r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})}}

leader接收到follower的請(qǐng)求后,針對(duì)拒絕和接收的兩個(gè)場景有不同的處理邏輯能耻,這也是保證follower一致性的關(guān)鍵環(huán)節(jié)赏枚。

follower 正常接收append請(qǐng)求 當(dāng)leader 確認(rèn)follower已經(jīng)接收了append請(qǐng)求后,則調(diào)用maybeCommit進(jìn)行提交晓猛,在提交過程中確認(rèn)各個(gè)節(jié)點(diǎn)的matchindex饿幅,排序后取中間值比較,如果中間值都都比本地的commitindex大鞍帝,就認(rèn)為超過半數(shù)已經(jīng)認(rèn)可此次提交诫睬,可以進(jìn)行commit煞茫,之后調(diào)用sendAppend向所有節(jié)點(diǎn)廣播消息帕涌,follower接收到請(qǐng)求后調(diào)用maybeAppend進(jìn)行日志的提交。值得注意的是续徽,日志的append過程可能由于之前的請(qǐng)求被拒絕蚓曼,等待snapshot或者消息發(fā)送窗口(inflight)已滿導(dǎo)致中止,這時(shí)需要重新向follower節(jié)點(diǎn)發(fā)送最新的append請(qǐng)求钦扭。

? func stepLeader(r *raft, m pb.Message) error {? ? case pb.MsgAppResp:? ? pr.RecentActive = true? ? if m.Reject {...} else {oldPaused := pr.IsPaused()//更新索引信息纫版,更新該follower的match index 和next index.if pr.maybeUpdate(m.Index) {switch {//日志追加成功,狀態(tài)由復(fù)制探測狀態(tài)變成復(fù)制狀態(tài)客情,加快日志的追加case pr.State == ProgressStateProbe:pr.becomeReplicate()case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)pr.becomeProbe()//pr.ins用于限制消息發(fā)送的速率其弊,用于統(tǒng)計(jì)當(dāng)前處于發(fā)送狀態(tài)的日志數(shù)量case pr.State == ProgressStateReplicate:pr.ins.freeTo(m.Index)}//leader進(jìn)行本地的提交if r.maybeCommit() {//廣播至所有follower 通知進(jìn)行l(wèi)og的提交r.bcastAppend()} else if oldPaused {//append請(qǐng)求被中止,則重新發(fā)送最新的請(qǐng)求r.sendAppend(m.From)}}}}? ? }? ? ? ? func (r *raft) maybeCommit() bool {if cap(r.matchBuf) < len(r.prs) {r.matchBuf = make(uint64Slice, len(r.prs))}mis := r.matchBuf[:len(r.prs)]idx := 0for _, p := range r.prs {mis[idx] = p.Matchidx++}//排序取取中間值sort.Sort(mis)mci := mis[len(mis)-r.quorum()]return r.raftLog.maybeCommit(mci, r.Term)}func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {//match的中間值是否已經(jīng)大于本地已經(jīng)commit的matchindexif maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {l.commitTo(maxIndex)return true}return false}

follower拒絕leader的append請(qǐng)求 在異常情況下膀斋,follower會(huì)拒絕leader的append請(qǐng)求梭伐。其判斷邏輯主要位于matchTerm,當(dāng)leader append請(qǐng)求中的logindex在當(dāng)前節(jié)點(diǎn)已提交的日志中到不到對(duì)應(yīng)的任期,或者任期與leader提交的任期不一致時(shí)follower會(huì)拒絕當(dāng)前append請(qǐng)求仰担。leader接收到拒絕請(qǐng)求后會(huì)進(jìn)入探測狀態(tài)糊识,探測follower最新匹配的位置。

? //follower接收leader的請(qǐng)求? func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {? ? ? if l.matchTerm(index, logTerm) {? ? ? ...? ? ? }? ? ? //拒絕leader當(dāng)前的append請(qǐng)求? ? ? return 0, false? ? }? //對(duì)leader提交append請(qǐng)求中的logindex和termid進(jìn)行判斷? func (l *raftLog) matchTerm(i, term uint64) bool {? ? ? t, err := l.term(i)? ? ? if err != nil {? ? ? return false? ? ? }? ? ? return t == term? ? }? ? ? ? func stepLeader(r *raft, m pb.Message) error {? ? ? case pb.MsgAppResp:? ? ? pr.RecentActive = true? ? ? ? ? if m.Reject {? ? ? if pr.maybeDecrTo(m.Index, m.RejectHint) {? ? ? //由復(fù)制狀態(tài)進(jìn)入探測狀態(tài),探測follower最新的匹配位置? ? ? if pr.State == ProgressStateReplicate {? ? ? pr.becomeProbe()? ? ? }? ? ? r.sendAppend(m.From)? ? ? }? ? }

下面來分析leader接收到拒絕請(qǐng)求后的處理邏輯赂苗。由于各種原因可能導(dǎo)致follower節(jié)點(diǎn)的日志與leader不一致愉耙,如下圖所示:

日志同步

在raft的論文中提出通過遍歷index和term的方式保證日志的一致性。具體的實(shí)現(xiàn)位于maybeDecrTo拌滋,因?yàn)閒ollower在拒絕請(qǐng)求時(shí)帶上了當(dāng)前最新的logindex朴沿,因此在進(jìn)行日志補(bǔ)推時(shí),直接將next至為follower中最新的logindex 和當(dāng)前index中的最小值鸠真。

func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {? ? ? if pr.State == ProgressStateReplicate {? ? ? if rejected <= pr.Match {? ? ? return false? ? ? }? ? ? // directly decrease next to match + 1? ? ? //復(fù)制狀態(tài)將pr的next置為當(dāng)前匹配位置+1? ? ? pr.Next = pr.Match + 1? ? ? return true? ? ? }? ? ? ? ? if pr.Next-1 != rejected {? ? ? return false? ? ? }? ? ? //如果是探測狀態(tài)悯仙,則將next置為follower中最新的logindex? 和當(dāng)前index中的最小值。? ? ? if pr.Next = min(rejected, last+1); pr.Next < 1 {? ? ? pr.Next = 1? ? ? }? ? ? pr.resume()? ? ? return true? ? }? ? ? 日志推送的具體實(shí)現(xiàn)位于maybeSendAppend.func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {pr := r.getProgress(to)if pr.IsPaused() {return false}m := pb.Message{}m.To = to//發(fā)送給follower的最后一條日志對(duì)應(yīng)的任期term, errt := r.raftLog.term(pr.Next - 1)//需要發(fā)送給follower的日志條數(shù)ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)if len(ents) == 0 && !sendIfEmpty {return false}if errt != nil || erre != nil { // send snapshot if we failed to get term or entries...} else {m.Type = pb.MsgAppm.Index = pr.Next - 1m.LogTerm = termm.Entries = ents//leader 已經(jīng)提交的最新indexm.Commit = r.raftLog.committedif n := len(m.Entries); n != 0 {switch pr.State {//在日志復(fù)制狀態(tài)吠卷,樂觀地增加next, 加快日志的推送速度case ProgressStateReplicate:last := m.Entries[n-1].Indexpr.optimisticUpdate(last)pr.ins.add(last)case ProgressStateProbe:pr.pause()default:r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)}}}r.send(m)return true}

至此raft集群的日志復(fù)制基本已經(jīng)完成锡垄,但是僅限于raft協(xié)議層面,日志和快照目前還是保存在Ready結(jié)構(gòu)中祭隔,并放入了readyc隊(duì)列货岭,等待上游的模塊處理。之前提到過etcd-raft 只是協(xié)議層的實(shí)現(xiàn)疾渴,提供了WAL渠退,snapshot和storage等模塊的擴(kuò)展接口休蟹,應(yīng)用層需要實(shí)現(xiàn)上述接口最終實(shí)現(xiàn)的數(shù)據(jù)的落地。

func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {...//日志數(shù)據(jù)rd := Ready{Entries:? ? ? ? ? r.raftLog.unstableEntries(),CommittedEntries: r.raftLog.nextEnts(),Messages:? ? ? ? r.msgs,}...}

leadership transfer

leadership transfer 指的是leader身份的轉(zhuǎn)換,raft提供接口允許客戶端進(jìn)行l(wèi)eader切換大州,此功能可用來做負(fù)載均衡,讓客戶端有機(jī)會(huì)結(jié)合實(shí)際的機(jī)器和負(fù)載情況去選擇最優(yōu)的leader雷绢;同時(shí)也是multi-raft實(shí)現(xiàn)的基礎(chǔ)争群。下面具體分析transfer的實(shí)現(xiàn)。

raft協(xié)議提供了transferLeaderShip方法供應(yīng)用層使用用于觸發(fā)leader的轉(zhuǎn)換店量,transferLeaderShip會(huì)發(fā)送MsgTransferLeader類型消息至recvc消息隊(duì)列中(channel)芜果。當(dāng)follower收到TransferLeader消息后不處理將消息轉(zhuǎn)發(fā)至leader進(jìn)行處理。

//etcd/raft/raft.go func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) {? ? select {? ? //通過recvc發(fā)送MsgTransferLeader消息至集群中節(jié)點(diǎn)? ? case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}:? ? case <-n.done:? ? case <-ctx.Done():? ? }? ? }

leader收到transfer消息后融师,如果發(fā)現(xiàn)當(dāng)前正在進(jìn)行l(wèi)eader切換或者不發(fā)生leader變換則直接放棄右钾。一個(gè)節(jié)點(diǎn)要成為leader的要求是有最新的日志數(shù)據(jù)。如果有則立即發(fā)送MsgTimeoutNow消息旱爆,transfee收到消息后立即調(diào)用campaign方法進(jìn)行選擇舀射,而不是像正常leader選舉時(shí)需要等待超時(shí),而且也不需要采用預(yù)投票的方式怀伦,之后的選舉流程與正常選舉過程一致脆烟。如果transfee沒有最新的日志數(shù)據(jù),則leader進(jìn)行日志的同步空镜,當(dāng)同步完成收到回復(fù)且正處在leader transfer的過程中浩淘,發(fā)送MsgTimeoutNow捌朴,之后與上述流程一致。

//etcd/raft/raft.go func stepLeader(r *raft, m pb.Message) error {switch m.Type {...case pb.MsgTransferLeader:if pr.IsLearner {r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)return nil}leadTransferee := m.FromlastLeadTransferee := r.leadTransferee//上一次transfer正在進(jìn)行if lastLeadTransferee != None {if lastLeadTransferee == leadTransferee {r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",r.id, r.Term, leadTransferee, leadTransferee)return nil}r.abortLeaderTransfer()r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)}//transfee和當(dāng)前l(fā)eader相同if leadTransferee == r.id {r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)return nil}// Transfer leadership to third party.// Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.r.electionElapsed = 0r.leadTransferee = leadTransfereeif pr.Match == r.raftLog.lastIndex() {//transfee的日志已經(jīng)是最新和leader保持一致了张抄,則立刻發(fā)送MsgTimeoutNow砂蔽,觸發(fā)選舉r.sendTimeoutNow(leadTransferee)r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)} else {//日志非最新進(jìn)行日志的同步r.sendAppend(leadTransferee)}}return nil}

線性一致讀

線性一致性讀是分布式系統(tǒng)的基本要求,在raft中l(wèi)eader和follower都可以接受讀請(qǐng)求署惯,但在以下場景下可能出現(xiàn)數(shù)據(jù)的不一致:

Leader和Follower復(fù)制期間的狀態(tài)不一致

因?yàn)榫W(wǎng)絡(luò)分區(qū)導(dǎo)致多個(gè)leader的存在左驾,不同leader間的狀態(tài)不一致,即腦裂(split-brain)現(xiàn)象极谊。如果請(qǐng)求分別被新舊leader處理诡右,所得的結(jié)果也不一致

為解決raft的線性一致讀問題,etcd-raft提供了兩種實(shí)現(xiàn)方案:

ReadIndex(ReadOnlySafe)轻猖。其原理是接收到客戶端請(qǐng)求后帆吻,向集群發(fā)起ReadIndex請(qǐng)求來讀取commitedIndex,Leader收到請(qǐng)求后向節(jié)點(diǎn)發(fā)送心跳,當(dāng)收到大多數(shù)節(jié)點(diǎn)的確認(rèn)自己仍是leader后咙边,回復(fù)ReadIndex請(qǐng)求并告知最新的commitedIndex猜煮。ReadIndex是etcd-raft的默認(rèn)方案。

Lease read方案(ReadOnlyLeaseBased)败许。其原理是通過維護(hù)leader的租期王带,確認(rèn)leader的唯一性,不需要通過心跳來進(jìn)行l(wèi)eader的確認(rèn)市殷。其風(fēng)險(xiǎn)在于需要全局一直的時(shí)鐘來保證lease機(jī)制的準(zhǔn)確性愕撰。etcd-raft不推薦采用此方案,pingcap開源的分布式數(shù)據(jù)庫tidb中的pd 模塊在實(shí)現(xiàn)TSO(Timestamp Oracle)的前提下醋寝,采用此方案搞挣。

ReadIndex實(shí)現(xiàn)分析

在raft初始化的過程中完成了linearizable read的配置,包括需要采用的方案甥桂。

? func newRaft(c *Config) *raft {? ...? }? r := &raft{? id:? ? ? ? ? ? ? ? ? ? ? ? c.ID,? ...? //初始化readOnly配置? readOnly:? ? ? ? ? ? ? ? ? newReadOnly(c.ReadOnlyOption),? disableProposalForwarding: c.DisableProposalForwarding,? }? }? ? ? const (? //ReadIndex方案? ReadOnlySafe ReadOnlyOption = iota? //leaseRead方案? ReadOnlyLeaseBased? )

阻塞的recvc channel收到ReadIndex請(qǐng)求后柿究,將請(qǐng)求加入隊(duì)列邮旷,初始化ReadIndex狀態(tài)黄选。之后發(fā)送廣播心跳。

? func stepLeader(r *raft, m pb.Message) error {? switch m.Type {? ...? case pb.MsgReadIndex:? switch r.readOnly.option {? case ReadOnlySafe:? //加入請(qǐng)求隊(duì)列? r.readOnly.addRequest(r.raftLog.committed, m)? //廣播心跳消息? r.bcastHeartbeatWithCtx(m.Entries[0].Data)? }? } else {? r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})? }? }? }? ? ? func (ro *readOnly) addRequest(index uint64, m pb.Message) {? ctx := string(m.Entries[0].Data)? if _, ok := ro.pendingReadIndex[ctx]; ok {? return? }? //index是當(dāng)前集群的committedIndex,acks 用來收集節(jié)點(diǎn)心跳回復(fù)包? ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}? ro.readIndexQueue = append(ro.readIndexQueue, ctx)? }

當(dāng)leader收到心跳回復(fù)后婶肩,對(duì)心跳進(jìn)行統(tǒng)計(jì)办陷,如果是本地請(qǐng)求直接將消息追加到readstatus中,最終會(huì)由newReady函數(shù)將消息發(fā)送到readyc channel,監(jiān)聽ready channel的客戶端會(huì)最終回復(fù)請(qǐng)求律歼。

? func stepLeader(r *raft, m pb.Message) error {? case pb.MsgHeartbeatResp:? ...? }? //統(tǒng)計(jì)回復(fù)結(jié)果民镜,如果未超過半數(shù)則直接返回? ackCount := r.readOnly.recvAck(m)? if ackCount < r.quorum() {? return nil? }? ? ? rss := r.readOnly.advance(m)? for _, rs := range rss {? req := rs.req? //如果是本地的請(qǐng)求? if req.From == None || req.From == r.id { // from local member? r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})? } else {? //如果是來自follower的請(qǐng)求,將結(jié)果返回給follower? r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})? }? }? }? ? ? func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {? rd := Ready{? Entries:? ? ? ? ? r.raftLog.unstableEntries(),? CommittedEntries: r.raftLog.nextEnts(),? Messages:? ? ? ? r.msgs,? }? ...? //readIndex消息追加? if len(r.readStates) != 0 {? rd.ReadStates = r.readStates? }? rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))? return rd? }? ? ? func (n *node) run(r *raft) {? ....? for {? if advancec != nil {? readyc = nil? } else {? //消息加入readyc隊(duì)列? rd = newReady(r, prevSoftSt, prevHardSt)? if rd.containsUpdates() {? readyc = n.readyc? } else {? readyc = nil? }? }? ....? }

如果是follower接收到ReadIndex請(qǐng)求险毁,直接將消息轉(zhuǎn)發(fā)至leader制圈,leader按上述流程處理们童,follower接收到消息后采用上述類似機(jī)制加入readyc隊(duì)列,異步回復(fù)客戶端鲸鹦。

? func stepFollower(r *raft, m pb.Message) error {? ...? case pb.MsgReadIndex:? if r.lead == None {? r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)? return nil? }? //將ReadIndex請(qǐng)求轉(zhuǎn)發(fā)給leader? m.To = r.lead? r.send(m)? case pb.MsgReadIndexResp:? if len(m.Entries) != 1 {? r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))? return nil? }? //收到leader回復(fù)后將消息加入readStatus? r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})? ...? }

總結(jié)

本文從raft算法的基本原理出發(fā)慧库,簡單的分析了leader選舉和日志復(fù)制的實(shí)現(xiàn)過程。之后從工程實(shí)踐的角度出發(fā)分析了etcd-raft的代碼實(shí)現(xiàn)馋嗜,重點(diǎn)剖析了leader選舉齐板,日志復(fù)制,leadership transfer和線性一致讀的核心流程葛菇。而raft算法博大精深甘磨,etcd也是工業(yè)級(jí)的完整實(shí)現(xiàn),除了本文介紹的幾個(gè)核心環(huán)節(jié)外眯停,leader的預(yù)選舉(prevote)济舆、節(jié)點(diǎn)成員變更、配置變更和日志的批量追加等也是raft的關(guān)鍵環(huán)節(jié)莺债,因篇幅所限就不再一一介紹吗冤。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市九府,隨后出現(xiàn)的幾起案子椎瘟,更是在濱河造成了極大的恐慌,老刑警劉巖侄旬,帶你破解...
    沈念sama閱讀 212,686評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件肺蔚,死亡現(xiàn)場離奇詭異,居然都是意外死亡儡羔,警方通過查閱死者的電腦和手機(jī)宣羊,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,668評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來汰蜘,“玉大人仇冯,你說我怎么就攤上這事∽宀伲” “怎么了苛坚?”我有些...
    開封第一講書人閱讀 158,160評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長色难。 經(jīng)常有香客問我泼舱,道長,這世上最難降的妖魔是什么枷莉? 我笑而不...
    開封第一講書人閱讀 56,736評(píng)論 1 284
  • 正文 為了忘掉前任娇昙,我火速辦了婚禮,結(jié)果婚禮上笤妙,老公的妹妹穿的比我還像新娘冒掌。我一直安慰自己噪裕,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,847評(píng)論 6 386
  • 文/花漫 我一把揭開白布股毫。 她就那樣靜靜地躺著州疾,像睡著了一般。 火紅的嫁衣襯著肌膚如雪皇拣。 梳的紋絲不亂的頭發(fā)上严蓖,一...
    開封第一講書人閱讀 50,043評(píng)論 1 291
  • 那天,我揣著相機(jī)與錄音氧急,去河邊找鬼颗胡。 笑死,一個(gè)胖子當(dāng)著我的面吹牛吩坝,可吹牛的內(nèi)容都是我干的毒姨。 我是一名探鬼主播,決...
    沈念sama閱讀 39,129評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼钉寝,長吁一口氣:“原來是場噩夢啊……” “哼弧呐!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起嵌纲,我...
    開封第一講書人閱讀 37,872評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤俘枫,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后逮走,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鸠蚪,經(jīng)...
    沈念sama閱讀 44,318評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,645評(píng)論 2 327
  • 正文 我和宋清朗相戀三年师溅,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了茅信。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,777評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡墓臭,死狀恐怖蘸鲸,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情窿锉,我是刑警寧澤酌摇,帶...
    沈念sama閱讀 34,470評(píng)論 4 333
  • 正文 年R本政府宣布,位于F島的核電站榆综,受9級(jí)特大地震影響妙痹,放射性物質(zhì)發(fā)生泄漏铸史。R本人自食惡果不足惜鼻疮,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,126評(píng)論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望琳轿。 院中可真熱鬧判沟,春花似錦耿芹、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,861評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至迹炼,卻和暖如春砸彬,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背斯入。 一陣腳步聲響...
    開封第一講書人閱讀 32,095評(píng)論 1 267
  • 我被黑心中介騙來泰國打工砂碉, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人刻两。 一個(gè)月前我還...
    沈念sama閱讀 46,589評(píng)論 2 362
  • 正文 我出身青樓增蹭,卻偏偏與公主長得像,于是被迫代替她去往敵國和親磅摹。 傳聞我的和親對(duì)象是個(gè)殘疾皇子滋迈,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,687評(píng)論 2 351

推薦閱讀更多精彩內(nèi)容