上偏文章我們分析了consul 一致性default 和stale 兩種模式的一致性讀的實(shí)現(xiàn)原理,讓我們回憶下,
Stale模式
鏈接任何一個(gè)server節(jié)點(diǎn)都可以讀滑臊,容忍過(guò)期的數(shù)據(jù)癞埠,
Default 模式
這個(gè)是我們大多數(shù)人用的模式,需要從leader返回?cái)?shù)據(jù)挚赊,如果agent鏈接到server是follower節(jié)點(diǎn),則需要轉(zhuǎn)發(fā)給leader來(lái)處理捂襟。
Consistent 模式
而且我們也說(shuō)了Consistent 一致性讀比default模式還要嚴(yán)格咬腕,除了需要leader返回?cái)?shù)據(jù)外,還需要確認(rèn)當(dāng)前l(fā)eader是否唯一葬荷,怎么確認(rèn)涨共,就需要和follower 通信來(lái)確認(rèn),就是問(wèn)下幾個(gè)從節(jié)點(diǎn)宠漩,嘿举反,我還是leader嗎,如果超過(guò)半數(shù)的人都響應(yīng)是扒吁,則恭喜你火鼻,還是leader,就可以返回?cái)?shù)據(jù)了雕崩。否則返回失敗魁索。
下面我們分析下原理是怎么實(shí)現(xiàn)的,初步摸排了下盼铁,這個(gè)過(guò)程還挺復(fù)雜的粗蔚,所以需要單獨(dú)再開(kāi)一篇文章來(lái)說(shuō)明。
檢查入口
consul server 出來(lái)所有的請(qǐng)求基本上都支持阻塞查詢(xún)饶火,即blockingQuery方法鹏控,consul 在計(jì)算好超時(shí)時(shí)間后,會(huì)做是否為Consistent模式的讀肤寝。
// Validate
// If the read must be consistent we verify that we are still the leader.
// queryOpts 這里檢查是否一致性讀
if queryOpts.GetRequireConsistent() {
//如果是則通過(guò)consistentRead來(lái)實(shí)現(xiàn)
if err := s.consistentRead(); err != nil {
return err
}
}
//不是本地之間返回
我們來(lái)分析consistentRead的邏輯, 關(guān)鍵就時(shí)發(fā)起一個(gè)leader驗(yàn)證的請(qǐng)求当辐,然后等待結(jié)果,代碼如下:
// consistentRead is used to ensure we do not perform a stale
// read. This is done by verifying leadership before the read.
func (s *Server) consistentRead() error {
defer metrics.MeasureSince([]string{"rpc", "consistentRead"}, time.Now())
future := s.raft.VerifyLeader()
//查詢(xún)請(qǐng)求會(huì)阻塞在這里鲤看,即future.Error()缘揪,需要等過(guò)半驗(yàn)證成功才返回。
if err := future.Error(); err != nil {
return err //fail fast if leader verification fails
}
// poll consistent read readiness, wait for up to RPCHoldTimeout milliseconds
if s.isReadyForConsistentReads() {
return nil
}
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction)
deadline := time.Now().Add(s.config.RPCHoldTimeout)
for time.Now().Before(deadline) {
select {
case <-time.After(jitter):
// Drop through and check before we loop again.
case <-s.shutdownCh:
return fmt.Errorf("shutdown waiting for leader")
}
if s.isReadyForConsistentReads() {
return nil
}
}
return structs.ErrNotReadyForConsistentReads
}
我們的查詢(xún)就阻塞在future.Error() 這里,沒(méi)有驗(yàn)證完寺晌,這里是不會(huì)響應(yīng)的世吨。
那關(guān)鍵就在s.raft.VerifyLeader()這里,我們繼續(xù)往下挖呻征,下面是實(shí)現(xiàn):
// VerifyLeader is used to ensure the current node is still
// the leader. This can be done to prevent stale reads when a
// new leader has potentially been elected.
func (r *Raft) VerifyLeader() Future {
metrics.IncrCounter([]string{"raft", "verify_leader"}, 1)
verifyFuture := &verifyFuture{}
//init 會(huì)初始化errCh耘婚,頁(yè)就是前面error會(huì)阻塞在這個(gè)channel上。
verifyFuture.init()
select {
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.verifyCh <- verifyFuture:
//寫(xiě)一個(gè)future到verifyCh陆赋,consul leader主協(xié)程會(huì)watch這個(gè)verifyCh沐祷,會(huì)觸發(fā)驗(yàn)證的邏輯
return verifyFuture
}
}
老外寫(xiě)的代碼還是很友好的,注釋寫(xiě)的很清楚攒岛,看代碼的同時(shí)還能看寫(xiě)的注釋?zhuān)?yè)大概明白這個(gè)方法的意圖赖临,這個(gè)VerifyLeader 的核心邏輯就是創(chuàng)建一個(gè)verifyFuture,這個(gè)future很關(guān)鍵灾锯,關(guān)鍵指標(biāo)都在這里兢榨,定義如下:
/ verifyFuture is used to verify the current node is still
// the leader. This is to prevent a stale read.
type verifyFuture struct {
deferError
notifyCh chan *verifyFuture
quorumSize int //過(guò)半大小,默認(rèn)是0
votes int //驗(yàn)證leader時(shí)follower響應(yīng)ok顺饮,時(shí)votes會(huì)+1吵聪,如果超過(guò)quorumSize,則認(rèn)為還是leader兼雄。
voteLock sync.Mutex
}
votes 是每個(gè)follower返回驗(yàn)證成功時(shí)吟逝,會(huì)對(duì)votes+1,然后判斷是否大于過(guò)半quorumSize赦肋,默認(rèn)是0块攒,如果是過(guò)半,那恭喜你佃乘,目前你還是leader囱井,可以執(zhí)行當(dāng)前這次讀請(qǐng)求。
下面我們要看consul是怎么給follower發(fā)送驗(yàn)證請(qǐng)求的趣避,通過(guò)上面的代碼琅绅,可以看出,是向leader的verifyCh 寫(xiě)了一個(gè)future鹅巍,所以肯定有一個(gè)go routing 會(huì)阻塞在這個(gè)verifyCh channel上,這就是異步通知, 下面就是leader的主協(xié)程登場(chǎng)了料祠。
Leader 主協(xié)程
leader 完成初始化后骆捧,最后會(huì)啟動(dòng)一個(gè)循環(huán)函數(shù),先看下定義髓绽,同樣注釋也說(shuō)明的很清楚敛苇。
// leaderLoop is the hot loop for a leader. It is invoked
// after all the various leader setup is done.
func (r *Raft) leaderLoop() {
...
這個(gè)leaderLoop會(huì)監(jiān)聽(tīng)很多channel,比如rpc請(qǐng)求,commit等枫攀,其中一個(gè)就有verifyCh的channel括饶,代碼如下:
case v := <-r.verifyCh:
if v.quorumSize == 0 {
// Just dispatched, start the verification
r.verifyLeader(v)
} else if v.votes < v.quorumSize {
// Early return, means there must be a new leader
r.logger.Warn("new leader elected, stepping down")
r.setState(Follower)
delete(r.leaderState.notify, v)
for _, repl := range r.leaderState.replState {
repl.cleanNotify(v)
}
v.respond(ErrNotLeader)
} else {
// Quorum of members agree, we are still leader
delete(r.leaderState.notify, v)
for _, repl := range r.leaderState.replState {
repl.cleanNotify(v)
}
v.respond(nil)
}
我們前面查詢(xún)的goroutine 通過(guò)發(fā)了一個(gè)verifyFuture 給verifyCh,leader 的main goroutine 就監(jiān)聽(tīng)在這里来涨,我們前面也說(shuō)了图焰,quorumSize 默認(rèn)是0,所以第一次是會(huì)觸發(fā)verifyLeader的邏輯蹦掐,什么時(shí)候觸發(fā)另外兩個(gè)的邏輯呢技羔,要等驗(yàn)證超過(guò)一半的請(qǐng)求返還了,就會(huì)再給verifyCh發(fā)一個(gè)消息卧抗,這時(shí)候藤滥,正常情況下就是找v.respond的邏輯,最終通知最前面的query go routine 阻塞就會(huì)被喚醒社裆。
verifyLeader的核心邏輯就是初始化了quorumSize拙绊,另外就是對(duì)follower循環(huán),consul 有個(gè) replicate go routine 會(huì)和follower發(fā)心跳信息泳秀,每個(gè)follower一個(gè)标沪,除了定時(shí)發(fā)心跳外,還支持實(shí)時(shí)觸發(fā)心態(tài)晶默,也就是監(jiān)聽(tīng)notifyCh 這個(gè)channel谨娜,這個(gè)leader的go routine會(huì)發(fā)一個(gè)空的struct給這個(gè)channel來(lái)觸發(fā),會(huì)給每個(gè)follower都發(fā)一個(gè)類(lèi)型為rpcAppendEntries的消息磺陡,核心代碼如下:
// Trigger immediate heartbeats
for _, repl := range r.leaderState.replState {
repl.notifyLock.Lock()
repl.notify[v] = struct{}{}
repl.notifyLock.Unlock()
//通知主動(dòng)發(fā)一個(gè)心調(diào)到follower server
asyncNotifyCh(repl.notifyCh)
}
consul leader 對(duì)每個(gè)follower 維持一個(gè)heartbeat ,核心代碼如下:
for {
// Wait for the next heartbeat interval or forced notify
select {
case <-s.notifyCh://通知即刻執(zhí)行
case <-randomTimeout(r.conf.HeartbeatTimeout / 10)://定時(shí)執(zhí)行
case <-stopCh:
return
}
start := time.Now()
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
r.logger.Error("failed to heartbeat to", "peer", s.peer.Address, "error", err)
failures++
select {
case <-time.After(backoff(failureWait, failures, maxFailureScale)):
case <-stopCh:
}
} else {
//更新時(shí)間
s.setLastContact()
failures = 0
metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(s.peer.ID)}, start)
s.notifyAll(resp.Success)
}
}
繞了這么大半圈趴梢,真正驗(yàn)證的關(guān)鍵代碼總算出來(lái)了,也就是 r.trans.AppendEntries執(zhí)行的币他,這里就不用分析了坞靶,就是發(fā)一個(gè)rpc請(qǐng)求給follower。如果成功蝴悉。通過(guò)s.notifyAll(resp.Success) 來(lái)通知前面的future彰阴,核心代碼如下:
// vote is used to respond to a verifyFuture.
// This may block when responding on the notifyCh.
func (v *verifyFuture) vote(leader bool) {
v.voteLock.Lock()
defer v.voteLock.Unlock()
// Guard against having notified already
if v.notifyCh == nil {
return
}
if leader {
v.votes++
//防止一個(gè)follower響應(yīng)就通知了leader,比如5臺(tái)的時(shí)候拍冠,一臺(tái)響應(yīng)了+自己也就是2
if v.votes >= v.quorumSize {
v.notifyCh <- v
v.notifyCh = nil
}
} else {
v.notifyCh <- v
v.notifyCh = nil
}
}
如果follower響應(yīng)成功尿这,也就是認(rèn)為你還是leader,則對(duì)votes加1庆杜,v.notifyCh 這里其實(shí)就是我們前面leader 的verifych, 通過(guò)幻想我們前面的查詢(xún)請(qǐng)求阻塞在future.error哪里射众,整個(gè)經(jīng)過(guò)這么一個(gè)復(fù)雜的流程才能完成一次正常的讀請(qǐng)求,如果請(qǐng)求follower超時(shí)晃财,則會(huì)等待一定的時(shí)間叨橱。繼續(xù)請(qǐng)求。
總結(jié)
consul 官方文檔說(shuō)Consistent 模式讀,為了實(shí)現(xiàn)這個(gè)強(qiáng)一致性讀罗洗,consul 在背后做了這么多的事情愉舔,詳細(xì)你看了這篇文章,以及前面的一篇文章伙菜,對(duì)consul的三種模式的讀轩缤,應(yīng)該有了一個(gè)全面的了解,在用的時(shí)候也能根據(jù)你的業(yè)務(wù)場(chǎng)景做出正確的選擇仇让,對(duì)consul 感興趣的同學(xué)可以點(diǎn)關(guān)注典奉,后面再繼續(xù)分享consul的文章,帶你看明白consul的世界丧叽。