背景
前面一篇文章我們描述了raft 協(xié)議的實現(xiàn)數(shù)據(jù)一致性的基礎(chǔ)知識别渔,有了前面的基礎(chǔ)知識背景们衙,能很好的幫助我們理解consul 基于raft算法的實現(xiàn)剃斧,理論指導(dǎo)實踐,永遠(yuǎn)不過時显歧。
我們以consul key value 的一個例子來理清整個流程仪或,以寫一個key value來看,是我們?nèi)粘i_發(fā)中用的最多的一個例子追迟,讓我們來一起看看consul server到底是怎么實現(xiàn)的溶其,背后的邏輯是什么。
Consul Agent 請求
客戶端發(fā)起一個put key value的http請求敦间,由kvs_endpoint.go 的KVSEndpoint func 處理瓶逃,put的方法會路由給KVSPut 處理,除了一些校驗外和請求標(biāo)識廓块,比如是否有獲取鎖acquire或者release厢绝,這里提下一個檢查,就是value的大小檢查带猴,和web 容器一樣檢查防止請求數(shù)據(jù)太大昔汉,可以通過參數(shù)kv_max_value_size 控制,如果超過返回狀態(tài)碼413拴清,標(biāo)準(zhǔn)的http 狀態(tài)碼靶病。
檢查都OK后,consul agent就開始請求consul server了口予,當(dāng)然還是rpc 操作
// Copy the value
buf := bytes.NewBuffer(nil)
// 這里才開始讀請求的數(shù)據(jù)娄周。
if _, err := io.Copy(buf, req.Body); err != nil {
return nil, err
}
applyReq.DirEnt.Value = buf.Bytes()
// Make the RPC
var out bool
// 開始請求server
if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil {
return nil, err
}
// Only use the out value if this was a CAS
// 沒有出錯的話,這里就成功返回了
if applyReq.Op == api.KVSet {
return true, nil
}
請求的是consul 下面的kvs_endpoint.go 下面的Apply 方法沪停,所以我們的重點要來了
Server Apply
consul server的 apply方法煤辨,代碼還是show下,這里還有兩個邏輯說明下木张。
// Apply is used to apply a KVS update request to the data store.
func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
// 檢查機(jī)房dc是否匹配众辨,不是就轉(zhuǎn)發(fā)到對應(yīng)到dc的server。
if done, err := k.srv.forward("KVS.Apply", args, args, reply); done {
return err
}
// 中間不重要的去了舷礼,省得太多...
// 對權(quán)限token 應(yīng)用ACL policy
ok, err := kvsPreApply(k.logger, k.srv, authz, args.Op, &args.DirEnt)
if err != nil {
return err
}
if !ok {
*reply = false
return nil
}
// Apply the update.
// 這里是開啟raft 算法的之旅的入口鹃彻。
resp, err := k.srv.raftApply(structs.KVSRequestType, args)
if err != nil {
k.logger.Error("Raft apply failed", "error", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
// Check if the return type is a bool.
if respBool, ok := resp.(bool); ok {
*reply = respBool
}
return nil
}
在真正開始執(zhí)行raft 算法前,主要做了如下兩件事:
先檢查了dc是否是當(dāng)前dc妻献,如果不是會路由到正確的dc蛛株,這頁是consul 支持多機(jī)房部署的一個很好的特性虚婿,路由很方便,這也是多機(jī)房部署consul是很好的選擇泳挥。
檢查是否啟用了acl策略,如果有至朗,需要檢查屉符,沒有對應(yīng)的token是不能操作的。
上面2件事都沒有問題后锹引,開始執(zhí)行raft apply操作矗钟,我們真正感興趣的就要出來了,下面讓我們開始盤apply嫌变,
經(jīng)過一盤吨艇,在真正執(zhí)行raft前,consul還做了一些加工腾啥,不能蠻搞东涡,是非常嚴(yán)謹(jǐn)?shù)模厦嫱ㄟ^raftApply倘待,經(jīng)過幾跳后疮跑,會執(zhí)行到raftApplyWithEncoder方法,這里做的工作是很重要的凸舵,所以還是拿出來說下祖娘,是漲知識的地方,代碼如下:
// raftApplyWithEncoder is used to encode a message, run it through raft,
// and return the FSM response along with any errors. Unlike raftApply this
// takes the encoder to use as an argument.
func (s *Server) raftApplyWithEncoder(t structs.MessageType, msg interface{}, encoder raftEncoder) (interface{}, error) {
if encoder == nil {
return nil, fmt.Errorf("Failed to encode request: nil encoder")
}
// 對請求編碼啊奄。
buf, err := encoder(t, msg)
if err != nil {
return nil, fmt.Errorf("Failed to encode request: %v", err)
}
// Warn if the command is very large
if n := len(buf); n > raftWarnSize {
s.rpcLogger().Warn("Attempting to apply large raft entry", "size_in_bytes", n)
}
var chunked bool
var future raft.ApplyFuture
switch {
case len(buf) <= raft.SuggestedMaxDataSize || t != structs.KVSRequestType:
//請求的數(shù)據(jù)大小如果小于512 * 1024 即512k渐苏,則做一次log執(zhí)行。
future = s.raft.Apply(buf, enqueueLimit)
default:
//超過了512k菇夸,則需要分chunk琼富,每個chunk做為一個log來應(yīng)用。
chunked = true
//這里就是每個log一次future峻仇。
future = raftchunking.ChunkingApply(buf, nil, enqueueLimit, s.raft.ApplyLog)
}
//阻塞公黑,等待raft協(xié)議完成。
if err := future.Error(); err != nil {
return nil, err
}
resp := future.Response()
//...
return resp, nil
}
這里通過注釋摄咆,你也可以看出凡蚜,主要關(guān)心4件事情:
- 把請求編碼,這個不是我們的重點吭从,后面有時間可以單獨分析朝蜘。
- 檢查是否要拆包,是否要拆成多個raft command 來執(zhí)行涩金,這里有個參數(shù)控制谱醇,SuggestedMaxDataSize consul 默認(rèn)設(shè)置是512k暇仲,如果超過這個則拆,否則可以一次raft 協(xié)議搞定副渴。
- 有一個超時時間奈附,默認(rèn)是30秒,后面會用到煮剧。
- 最后事阻塞等待完成斥滤,是logfuture。
為什么要拆包
這些事raft 算法不會提的勉盅,這個事工程實踐才會有的一些優(yōu)化佑颇,此時你也和我一樣,為啥要做這個優(yōu)化呢草娜,有什么好處挑胸,解決什么問題,這是我們做一個架構(gòu)師必須要有的思考宰闰。
consul的官方就給出了解釋茬贵,所以閱讀優(yōu)秀的代碼就是一種享受,看注釋就能知道為啥這樣做议蟆,下面是他們對SuggestedMaxDataSize的注釋:
// Increasing beyond this risks RPC IO taking too long and preventing
// timely heartbeat signals which are sent in serial in current transports,
// potentially causing leadership instability.
SuggestedMaxDataSize = 512 * 1024
理解就是rpc的請求io 不能太大闷沥,因為還有非常重要的心跳包,如果發(fā)心跳包出現(xiàn)延遲咐容,就而影響leader的穩(wěn)定舆逃,這個事一個非常重要的優(yōu)化措施。
說完了拆包優(yōu)化邏輯后戳粒,我們看下ApplyLog的邏輯路狮,代碼如下:
// ApplyLog performs Apply but takes in a Log directly. The only values
// currently taken from the submitted Log are Data and Extensions.
func (r *Raft) ApplyLog(log Log, timeout time.Duration) ApplyFuture {
metrics.IncrCounter([]string{"raft", "apply"}, 1)
var timer <-chan time.Time
if timeout > 0 {
timer = time.After(timeout)
}
// Create a log future, no index or term yet
logFuture := &logFuture{
log: Log{
Type: LogCommand,
Data: log.Data,
Extensions: log.Extensions,
},
}
logFuture.init()
select {
case <-timer:
return errorFuture{ErrEnqueueTimeout}
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.applyCh <- logFuture:
return logFuture
}
}
這里主要關(guān)心這個applyCh channel,consul 在初始化leader的時候給創(chuàng)建的一個無緩沖區(qū)的通道蔚约,所以如果leader的協(xié)程在干其他的事情奄妨,那這個提交log就阻塞了,時間最長30s苹祟,寫入成功砸抛,就返回了logFuture,也就事前面我們看到future的阻塞树枫。
到這里整個consul leader server的插入請求從接受到阻塞等待的邏輯就完成了直焙,consul server 有個核心的go routine 在watch 這個applyCh,從定義可以看出砂轻,是應(yīng)用raft log的channel奔誓。
分組提交
consul leader 在初始化完成后,會啟動一個核心的go routine搔涝,執(zhí)行rpc厨喂,leader 驗證和措,這個我們前面分析過,還有一個最重要的就事raft log應(yīng)用了蜕煌,代碼如下:
case newLog := <-r.applyCh://這個是前面我們提交log future的
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
newLog.respond(ErrLeadershipTransferInProgress)
continue
}
// Group commit, gather all the ready commits
ready := []*logFuture{newLog}
GROUP_COMMIT_LOOP:
for i := 0; i < r.conf.MaxAppendEntries; i++ {
select {
case newLog := <-r.applyCh:
ready = append(ready, newLog)
default:
break GROUP_COMMIT_LOOP
}
}
// Dispatch the logs
if stepDown {
// we're in the process of stepping down as leader, don't process anything new
//如果發(fā)現(xiàn)我們不是leader了派阱,直接響應(yīng)失敗
for i := range ready {
ready[i].respond(ErrNotLeader)
}
} else {
r.dispatchLogs(ready)
}
這里的一個重要的點就是組提交,我們在基礎(chǔ)篇提過斜纪,這里就是實現(xiàn)了颁褂,就是讀applyCh的log,這個里做了組提交的優(yōu)化傀广,最多一次發(fā)送MaxAppendEntries個,默認(rèn)位64個彩届,如果并發(fā)高的情況下伪冰,這里是能讀到一個batch的,或者沒有了樟蠕,就不等了贮聂,這里是不能等的,因為raft算法要保證順序寨辩,這里是單線程出來的吓懈,下面就開始dispatch log了,代碼如下:
// dispatchLog is called on the leader to push a log to disk, mark it
// as inflight and begin replication of it.
func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
now := time.Now()
defer metrics.MeasureSince([]string{"raft", "leader", "dispatchLog"}, now)
//獲取當(dāng)前l(fā)eader的任期編號靡狞,這個不會重復(fù)是遞增的耻警,如果有心的leaer了,會比這個大甸怕。
term := r.getCurrentTerm()
//log 編號甘穿,寫一個加1
lastIndex := r.getLastIndex()
n := len(applyLogs)
logs := make([]*Log, n)
metrics.SetGauge([]string{"raft", "leader", "dispatchNumLogs"}, float32(n))
//設(shè)置每個log的編號和任期
for idx, applyLog := range applyLogs {
applyLog.dispatch = now
lastIndex++
applyLog.log.Index = lastIndex
applyLog.log.Term = term
logs[idx] = &applyLog.log
r.leaderState.inflight.PushBack(applyLog)
}
// Write the log entry locally
// log先寫入本地持久化,consul大部分的版本底層用的是boltdb梢杭,boltdb
// 是一個支持事物的數(shù)據(jù)庫温兼,非常方便,這里會涉及io操作武契。
if err := r.logs.StoreLogs(logs); err != nil {
r.logger.Error("failed to commit logs", "error", err)
//如果寫失敗募判,則直接響應(yīng),前面的future阻塞就會喚醒咒唆。
for _, applyLog := range applyLogs {
applyLog.respond(err)
}
//更新自己為follower
r.setState(Follower)
return
}
//這里很重要届垫,好就才看明白,這個是log 復(fù)制成功后钧排,最終應(yīng)用到狀態(tài)機(jī)的一個機(jī)制
//這里是記錄下leader自己的結(jié)果敦腔,因為過半leader也算一份。
r.leaderState.commitment.match(r.localID, lastIndex)
// Update the last log since it's on disk now
// 更新最新log entry的編號恨溜,寫到這里了符衔。
r.setLastLog(lastIndex, term)
// Notify the replicators of the new log
// 開始異步發(fā)送給所有的follower找前,這個leader主go routine的活就干完了。
for _, f := range r.leaderState.replState {
asyncNotifyCh(f.triggerCh)
}
}
這個dispatchlog的邏輯注釋里基本寫清楚了判族,核心的go routine 經(jīng)過一頓操作后躺盛,最主要就是兩點:
本地持久化log
記錄自己寫成功,因為計算過半時形帮,leader自己這一份也算在里面槽惫,這個很重要。
又異步交給了replicate go routine來處理辩撑,他就去繼續(xù)去分組提交了界斜,大概率如此循環(huán)往復(fù),不知疲倦的給replication routine 派活合冀。
復(fù)制GoRoutine
replication routine 會監(jiān)聽triggerCh channel各薇,接受領(lǐng)導(dǎo)的任務(wù),這個比較簡單君躺,就開始真正發(fā)給各自的follower了峭判,代碼如下:
case <-s.triggerCh:
lastLogIdx, _ := r.getLastLog()
//這個后面沒有異步了,就是這個rpc調(diào)用棕叫,判斷
shouldStop = r.replicateTo(s, lastLogIdx)
replicateTo 就是rpc調(diào)研林螃,真正遠(yuǎn)程rpc給follower,等待響應(yīng)俺泣。對于響應(yīng)的結(jié)果怎么處理疗认,怎么真正應(yīng)用到本地,還沒有分析伏钠,帶下一篇提交篇侮邀,因為插入請求還wait在哪里呢是不是。
總結(jié)
寫著寫著文章又很長了贝润,如果你讀到了這里绊茧,就給我點個贊,關(guān)注下打掘,我會馬不停蹄的開始下一篇华畏。
文本注意從consul leader server接受請求,做一些檢查尊蚁,token校驗亡笑,分配發(fā)送,然后異步交給了leader的核心goroutine横朋,核心go routine通過分組合并仑乌,計算好log 編號和任期term。就交給了replication routine,replication routine 把log 先本地持久化晰甚,然后異步發(fā)給所有的follower衙传,等待他們的結(jié)果,到底是commit 應(yīng)用到本地狀態(tài)機(jī)怎么實現(xiàn)的厕九,下面一篇見蓖捶,歡迎關(guān)注和轉(zhuǎn)發(fā)。