Consul Raft協(xié)議源碼分析上篇——日志復(fù)制

背景

前面一篇文章我們描述了raft 協(xié)議的實現(xiàn)數(shù)據(jù)一致性的基礎(chǔ)知識别渔,有了前面的基礎(chǔ)知識背景们衙,能很好的幫助我們理解consul 基于raft算法的實現(xiàn)剃斧,理論指導(dǎo)實踐,永遠(yuǎn)不過時显歧。

我們以consul key value 的一個例子來理清整個流程仪或,以寫一個key value來看,是我們?nèi)粘i_發(fā)中用的最多的一個例子追迟,讓我們來一起看看consul server到底是怎么實現(xiàn)的溶其,背后的邏輯是什么。

Consul Agent 請求

客戶端發(fā)起一個put key value的http請求敦间,由kvs_endpoint.go 的KVSEndpoint func 處理瓶逃,put的方法會路由給KVSPut 處理,除了一些校驗外和請求標(biāo)識廓块,比如是否有獲取鎖acquire或者release厢绝,這里提下一個檢查,就是value的大小檢查带猴,和web 容器一樣檢查防止請求數(shù)據(jù)太大昔汉,可以通過參數(shù)kv_max_value_size 控制,如果超過返回狀態(tài)碼413拴清,標(biāo)準(zhǔn)的http 狀態(tài)碼靶病。

檢查都OK后,consul agent就開始請求consul server了口予,當(dāng)然還是rpc 操作

// Copy the value
buf := bytes.NewBuffer(nil)
// 這里才開始讀請求的數(shù)據(jù)娄周。
if _, err := io.Copy(buf, req.Body); err != nil {
   return nil, err
}
applyReq.DirEnt.Value = buf.Bytes()

// Make the RPC
var out bool
// 開始請求server
if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil {
   return nil, err
}

// Only use the out value if this was a CAS
// 沒有出錯的話,這里就成功返回了
if applyReq.Op == api.KVSet {
        return true, nil
}

請求的是consul 下面的kvs_endpoint.go 下面的Apply 方法沪停,所以我們的重點要來了

Server Apply

consul server的 apply方法煤辨,代碼還是show下,這里還有兩個邏輯說明下木张。

// Apply is used to apply a KVS update request to the data store.
func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
   // 檢查機(jī)房dc是否匹配众辨,不是就轉(zhuǎn)發(fā)到對應(yīng)到dc的server。
   if done, err := k.srv.forward("KVS.Apply", args, args, reply); done {
      return err
   }
   // 中間不重要的去了舷礼,省得太多...
   // 對權(quán)限token 應(yīng)用ACL policy
   ok, err := kvsPreApply(k.logger, k.srv, authz, args.Op, &args.DirEnt)
   if err != nil {
      return err
   }
   if !ok {
      *reply = false
      return nil
   }

   // Apply the update.
   // 這里是開啟raft 算法的之旅的入口鹃彻。
   resp, err := k.srv.raftApply(structs.KVSRequestType, args)
   if err != nil {
      k.logger.Error("Raft apply failed", "error", err)
      return err
   }
   if respErr, ok := resp.(error); ok {
      return respErr
   }

   // Check if the return type is a bool.
   if respBool, ok := resp.(bool); ok {
      *reply = respBool
   }
   return nil
}

在真正開始執(zhí)行raft 算法前,主要做了如下兩件事:

先檢查了dc是否是當(dāng)前dc妻献,如果不是會路由到正確的dc蛛株,這頁是consul 支持多機(jī)房部署的一個很好的特性虚婿,路由很方便,這也是多機(jī)房部署consul是很好的選擇泳挥。
檢查是否啟用了acl策略,如果有至朗,需要檢查屉符,沒有對應(yīng)的token是不能操作的。
上面2件事都沒有問題后锹引,開始執(zhí)行raft apply操作矗钟,我們真正感興趣的就要出來了,下面讓我們開始盤apply嫌变,

經(jīng)過一盤吨艇,在真正執(zhí)行raft前,consul還做了一些加工腾啥,不能蠻搞东涡,是非常嚴(yán)謹(jǐn)?shù)模厦嫱ㄟ^raftApply倘待,經(jīng)過幾跳后疮跑,會執(zhí)行到raftApplyWithEncoder方法,這里做的工作是很重要的凸舵,所以還是拿出來說下祖娘,是漲知識的地方,代碼如下:

// raftApplyWithEncoder is used to encode a message, run it through raft,
// and return the FSM response along with any errors. Unlike raftApply this
// takes the encoder to use as an argument.
func (s *Server) raftApplyWithEncoder(t structs.MessageType, msg interface{}, encoder raftEncoder) (interface{}, error) {
   if encoder == nil {
      return nil, fmt.Errorf("Failed to encode request: nil encoder")
   }
   // 對請求編碼啊奄。
   buf, err := encoder(t, msg)
   if err != nil {
      return nil, fmt.Errorf("Failed to encode request: %v", err)
   }

   // Warn if the command is very large
   if n := len(buf); n > raftWarnSize {
      s.rpcLogger().Warn("Attempting to apply large raft entry", "size_in_bytes", n)
   }

   var chunked bool
   var future raft.ApplyFuture
   switch {
   case len(buf) <= raft.SuggestedMaxDataSize || t != structs.KVSRequestType:
      //請求的數(shù)據(jù)大小如果小于512 * 1024 即512k渐苏,則做一次log執(zhí)行。
      future = s.raft.Apply(buf, enqueueLimit)
   default:
      //超過了512k菇夸,則需要分chunk琼富,每個chunk做為一個log來應(yīng)用。
      chunked = true
      //這里就是每個log一次future峻仇。
      future = raftchunking.ChunkingApply(buf, nil, enqueueLimit, s.raft.ApplyLog)
   }

   //阻塞公黑,等待raft協(xié)議完成。
   if err := future.Error(); err != nil {
      return nil, err
   }

   resp := future.Response()

   //...
   return resp, nil
}

這里通過注釋摄咆,你也可以看出凡蚜,主要關(guān)心4件事情:

  1. 把請求編碼,這個不是我們的重點吭从,后面有時間可以單獨分析朝蜘。
  2. 檢查是否要拆包,是否要拆成多個raft command 來執(zhí)行涩金,這里有個參數(shù)控制谱醇,SuggestedMaxDataSize consul 默認(rèn)設(shè)置是512k暇仲,如果超過這個則拆,否則可以一次raft 協(xié)議搞定副渴。
  3. 有一個超時時間奈附,默認(rèn)是30秒,后面會用到煮剧。
  4. 最后事阻塞等待完成斥滤,是logfuture。

為什么要拆包

這些事raft 算法不會提的勉盅,這個事工程實踐才會有的一些優(yōu)化佑颇,此時你也和我一樣,為啥要做這個優(yōu)化呢草娜,有什么好處挑胸,解決什么問題,這是我們做一個架構(gòu)師必須要有的思考宰闰。

consul的官方就給出了解釋茬贵,所以閱讀優(yōu)秀的代碼就是一種享受,看注釋就能知道為啥這樣做议蟆,下面是他們對SuggestedMaxDataSize的注釋:

// Increasing beyond this risks RPC IO taking too long and preventing
// timely heartbeat signals which are sent in serial in current transports,
// potentially causing leadership instability.
SuggestedMaxDataSize = 512 * 1024

理解就是rpc的請求io 不能太大闷沥,因為還有非常重要的心跳包,如果發(fā)心跳包出現(xiàn)延遲咐容,就而影響leader的穩(wěn)定舆逃,這個事一個非常重要的優(yōu)化措施。

說完了拆包優(yōu)化邏輯后戳粒,我們看下ApplyLog的邏輯路狮,代碼如下:

// ApplyLog performs Apply but takes in a Log directly. The only values
// currently taken from the submitted Log are Data and Extensions.
func (r *Raft) ApplyLog(log Log, timeout time.Duration) ApplyFuture {
   metrics.IncrCounter([]string{"raft", "apply"}, 1)

   var timer <-chan time.Time
   if timeout > 0 {
      timer = time.After(timeout)
   }

   // Create a log future, no index or term yet
   logFuture := &logFuture{
      log: Log{
         Type:       LogCommand,
         Data:       log.Data,
         Extensions: log.Extensions,
      },
   }
   logFuture.init()

   select {
   case <-timer:
      return errorFuture{ErrEnqueueTimeout}
   case <-r.shutdownCh:
      return errorFuture{ErrRaftShutdown}
   case r.applyCh <- logFuture:
      return logFuture
   }
}

這里主要關(guān)心這個applyCh channel,consul 在初始化leader的時候給創(chuàng)建的一個無緩沖區(qū)的通道蔚约,所以如果leader的協(xié)程在干其他的事情奄妨,那這個提交log就阻塞了,時間最長30s苹祟,寫入成功砸抛,就返回了logFuture,也就事前面我們看到future的阻塞树枫。

到這里整個consul leader server的插入請求從接受到阻塞等待的邏輯就完成了直焙,consul server 有個核心的go routine 在watch 這個applyCh,從定義可以看出砂轻,是應(yīng)用raft log的channel奔誓。

分組提交

consul leader 在初始化完成后,會啟動一個核心的go routine搔涝,執(zhí)行rpc厨喂,leader 驗證和措,這個我們前面分析過,還有一個最重要的就事raft log應(yīng)用了蜕煌,代碼如下:

case newLog := <-r.applyCh://這個是前面我們提交log future的
   if r.getLeadershipTransferInProgress() {
      r.logger.Debug(ErrLeadershipTransferInProgress.Error())
      newLog.respond(ErrLeadershipTransferInProgress)
      continue
   }
   // Group commit, gather all the ready commits
   ready := []*logFuture{newLog}
GROUP_COMMIT_LOOP:
   for i := 0; i < r.conf.MaxAppendEntries; i++ {
      select {
      case newLog := <-r.applyCh:
         ready = append(ready, newLog)
      default:
         break GROUP_COMMIT_LOOP
      }
   }

   // Dispatch the logs
   if stepDown {
      // we're in the process of stepping down as leader, don't process anything new
     //如果發(fā)現(xiàn)我們不是leader了派阱,直接響應(yīng)失敗 
     for i := range ready {
         ready[i].respond(ErrNotLeader)
      }
   } else {
      r.dispatchLogs(ready)
   }

這里的一個重要的點就是組提交,我們在基礎(chǔ)篇提過斜纪,這里就是實現(xiàn)了颁褂,就是讀applyCh的log,這個里做了組提交的優(yōu)化傀广,最多一次發(fā)送MaxAppendEntries個,默認(rèn)位64個彩届,如果并發(fā)高的情況下伪冰,這里是能讀到一個batch的,或者沒有了樟蠕,就不等了贮聂,這里是不能等的,因為raft算法要保證順序寨辩,這里是單線程出來的吓懈,下面就開始dispatch log了,代碼如下:

// dispatchLog is called on the leader to push a log to disk, mark it
// as inflight and begin replication of it.
func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
   now := time.Now()
   defer metrics.MeasureSince([]string{"raft", "leader", "dispatchLog"}, now)

   //獲取當(dāng)前l(fā)eader的任期編號靡狞,這個不會重復(fù)是遞增的耻警,如果有心的leaer了,會比這個大甸怕。
   term := r.getCurrentTerm()
   //log 編號甘穿,寫一個加1
   lastIndex := r.getLastIndex()

   n := len(applyLogs)
   logs := make([]*Log, n)
   metrics.SetGauge([]string{"raft", "leader", "dispatchNumLogs"}, float32(n))

   //設(shè)置每個log的編號和任期
   for idx, applyLog := range applyLogs {
      applyLog.dispatch = now
      lastIndex++
      applyLog.log.Index = lastIndex
      applyLog.log.Term = term
      logs[idx] = &applyLog.log
      r.leaderState.inflight.PushBack(applyLog)
   }

   // Write the log entry locally
   // log先寫入本地持久化,consul大部分的版本底層用的是boltdb梢杭,boltdb
   // 是一個支持事物的數(shù)據(jù)庫温兼,非常方便,這里會涉及io操作武契。
   if err := r.logs.StoreLogs(logs); err != nil {
      r.logger.Error("failed to commit logs", "error", err)
      //如果寫失敗募判,則直接響應(yīng),前面的future阻塞就會喚醒咒唆。
      for _, applyLog := range applyLogs {
         applyLog.respond(err)
      }
      //更新自己為follower
      r.setState(Follower)
      return
   }
   //這里很重要届垫,好就才看明白,這個是log 復(fù)制成功后钧排,最終應(yīng)用到狀態(tài)機(jī)的一個機(jī)制
     //這里是記錄下leader自己的結(jié)果敦腔,因為過半leader也算一份。
   r.leaderState.commitment.match(r.localID, lastIndex)

   // Update the last log since it's on disk now
   // 更新最新log entry的編號恨溜,寫到這里了符衔。
   r.setLastLog(lastIndex, term)

   // Notify the replicators of the new log
   // 開始異步發(fā)送給所有的follower找前,這個leader主go routine的活就干完了。
   for _, f := range r.leaderState.replState {
      asyncNotifyCh(f.triggerCh)
   }
}

這個dispatchlog的邏輯注釋里基本寫清楚了判族,核心的go routine 經(jīng)過一頓操作后躺盛,最主要就是兩點:

本地持久化log

記錄自己寫成功,因為計算過半時形帮,leader自己這一份也算在里面槽惫,這個很重要。
又異步交給了replicate go routine來處理辩撑,他就去繼續(xù)去分組提交了界斜,大概率如此循環(huán)往復(fù),不知疲倦的給replication routine 派活合冀。

復(fù)制GoRoutine

replication routine 會監(jiān)聽triggerCh channel各薇,接受領(lǐng)導(dǎo)的任務(wù),這個比較簡單君躺,就開始真正發(fā)給各自的follower了峭判,代碼如下:

case <-s.triggerCh:
   lastLogIdx, _ := r.getLastLog()
   //這個后面沒有異步了,就是這個rpc調(diào)用棕叫,判斷
   shouldStop = r.replicateTo(s, lastLogIdx)

replicateTo 就是rpc調(diào)研林螃,真正遠(yuǎn)程rpc給follower,等待響應(yīng)俺泣。對于響應(yīng)的結(jié)果怎么處理疗认,怎么真正應(yīng)用到本地,還沒有分析伏钠,帶下一篇提交篇侮邀,因為插入請求還wait在哪里呢是不是。

總結(jié)

寫著寫著文章又很長了贝润,如果你讀到了這里绊茧,就給我點個贊,關(guān)注下打掘,我會馬不停蹄的開始下一篇华畏。

文本注意從consul leader server接受請求,做一些檢查尊蚁,token校驗亡笑,分配發(fā)送,然后異步交給了leader的核心goroutine横朋,核心go routine通過分組合并仑乌,計算好log 編號和任期term。就交給了replication routine,replication routine 把log 先本地持久化晰甚,然后異步發(fā)給所有的follower衙传,等待他們的結(jié)果,到底是commit 應(yīng)用到本地狀態(tài)機(jī)怎么實現(xiàn)的厕九,下面一篇見蓖捶,歡迎關(guān)注和轉(zhuǎn)發(fā)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末扁远,一起剝皮案震驚了整個濱河市俊鱼,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌畅买,老刑警劉巖并闲,帶你破解...
    沈念sama閱讀 216,402評論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異谷羞,居然都是意外死亡焙蚓,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評論 3 392
  • 文/潘曉璐 我一進(jìn)店門洒宝,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人萌京,你說我怎么就攤上這事雁歌。” “怎么了知残?”我有些...
    開封第一講書人閱讀 162,483評論 0 353
  • 文/不壞的土叔 我叫張陵靠瞎,是天一觀的道長。 經(jīng)常有香客問我求妹,道長乏盐,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,165評論 1 292
  • 正文 為了忘掉前任制恍,我火速辦了婚禮父能,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘净神。我一直安慰自己何吝,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,176評論 6 388
  • 文/花漫 我一把揭開白布鹃唯。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪鸽粉。 梳的紋絲不亂的頭發(fā)上挚赊,一...
    開封第一講書人閱讀 51,146評論 1 297
  • 那天,我揣著相機(jī)與錄音,去河邊找鬼跪者。 笑死棵帽,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的坑夯。 我是一名探鬼主播岖寞,決...
    沈念sama閱讀 40,032評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼柜蜈!你這毒婦竟也來了仗谆?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,896評論 0 274
  • 序言:老撾萬榮一對情侶失蹤淑履,失蹤者是張志新(化名)和其女友劉穎隶垮,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體秘噪,經(jīng)...
    沈念sama閱讀 45,311評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡狸吞,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,536評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了指煎。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蹋偏。...
    茶點故事閱讀 39,696評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖至壤,靈堂內(nèi)的尸體忽然破棺而出威始,到底是詐尸還是另有隱情,我是刑警寧澤像街,帶...
    沈念sama閱讀 35,413評論 5 343
  • 正文 年R本政府宣布黎棠,位于F島的核電站,受9級特大地震影響镰绎,放射性物質(zhì)發(fā)生泄漏脓斩。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,008評論 3 325
  • 文/蒙蒙 一畴栖、第九天 我趴在偏房一處隱蔽的房頂上張望随静。 院中可真熱鬧,春花似錦吗讶、人聲如沸挪挤。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽扛门。三九已至,卻和暖如春纵寝,著一層夾襖步出監(jiān)牢的瞬間论寨,已是汗流浹背星立。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留葬凳,地道東北人绰垂。 一個月前我還...
    沈念sama閱讀 47,698評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像火焰,于是被迫代替她去往敵國和親劲装。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,592評論 2 353

推薦閱讀更多精彩內(nèi)容