背景
raft協(xié)議是維持整個consul生態(tài)中最重要的一個協(xié)議矫渔,它負責維護consul server之間的強一致性
作為基于 Paxos 的一種變種算法规个,它簡化了狀態(tài)岭皂,通過加入時間差存捺、領(lǐng)導選舉等概念使一致性算法變得更簡單贼邓、易懂
相關(guān)名詞:
- Leader election: 領(lǐng)導選舉抖甘,是目前落地的一致性算法不可避免的一個步驟热鞍,為了達到一致性,必須要已一個共同認可的節(jié)點做出所有操作。而raft協(xié)議的節(jié)點都會處于三種狀態(tài)之一:
- Leader: 操作的真正處理者薇宠,它負責將改動確認生效偷办,并將其同步到其他節(jié)點。
- Follower: 負責將來自 Leader 的改動請求寫入本地 Log澄港,返回成功椒涯。
- Candidate: 如果 Leader 發(fā)生了故障,沒有收到心跳的 Followers 們會進入 Candidate 狀態(tài)回梧,并開始選主废岂,并保持該狀態(tài)直到選主結(jié)束。
- Log Replication: 當 Leader 被選舉出來以后漂辐,所有的寫請求都必須要到 Leader 執(zhí)行泪喊,Leader 會將其作為 Log entry 追加到日志中,然后給其他 Follower 發(fā)送請求髓涯,當絕大部分的 ((N/2)+1)的 Follower replicated 成功后袒啼,就代表該寫入事件成功了,就會返回結(jié)果狀態(tài)到客戶端纬纪。
- Log Compaction: 在實際的應(yīng)用場景中蚓再,因為磁盤空間是有限的,日志不能無限地增長包各,否則即使系統(tǒng)需要重啟也需要耗費大量的時間摘仅。所以, raft 會對節(jié)點進行 snapshot 操作问畅,執(zhí)行成功后娃属,會將 snapshot 之前的日志丟棄掉。
更詳細的raft工作流程參見這個動畫
關(guān)鍵數(shù)據(jù)結(jié)構(gòu)
Raft node結(jié)構(gòu)體
type Raft struct {
raftState //保存狀態(tài)變量結(jié)構(gòu)體
protocolVersion ProtocolVersion
applyCh chan *logFuture //負責異步發(fā)送logs到主線程去提交以及應(yīng)用到有限狀態(tài)機
conf Config //提供Raft初始化所需配置信息
fsm FSM //負責客戶端執(zhí)行命令的有限狀態(tài)機
fsmMutateCh chan interface{} //負責發(fā)送狀態(tài)改變更新到有限狀態(tài)機
fsmSnapshotCh chan *reqSnapshotFuture //負責觸發(fā)一個新的快照
lastContact time.Time //最后一次和主通信時間护姆,用來計算主是否還可用
lastContactLock sync.RWMutex
leader ServerAddress //當前的主的地址
leaderLock sync.RWMutex
leaderCh chan bool //負責通知主的改變
leaderState leaderState //只有狀態(tài)為leader的時候使用該字段
localID ServerID //儲存自己的server ID矾端,避免發(fā)送RPC請求給自己
localAddr ServerAddress //儲存自己的地址
logger *log.Logger //日志
logs LogStore //可靠地日志存儲
configurationChangeCh chan *configurationChangeFuture //負責通知leader做配置更替
configurations configurations //從log/snapshot里記錄最新的配置和最新的committed配置
rpcCh <-chan RPC //傳輸層RPC通道
shutdown bool
shutdownCh chan struct{} //負責退出
shutdownLock sync.Mutex //并發(fā)鎖
snapshots SnapshotStore //負責存儲和讀取snapshots
userSnapshotCh chan *userSnapshotFuture //負責響應(yīng)用戶發(fā)起的快照操作
userRestoreCh chan *userRestoreFuture //負責響應(yīng)用戶發(fā)起的恢復操作
stable StableStore //狀態(tài)持久化存儲,負責持久化raftState的部分字段
trans Transport //傳輸層
verifyCh chan *verifyFuture //發(fā)送異步確認消息到主線程卵皂,確認當前仍然是主
configurationsCh chan *configurationsFuture //安全的獲取主線程以外發(fā)送過來的配置數(shù)據(jù)
bootstrapCh chan *bootstrapFuture //用于嘗試從主線程以外觸發(fā)初始化操作
observersLock sync.RWMutex //保護觀察者的鎖
observers map[uint64]*Observer //觀察者列表
}
raftState 結(jié)構(gòu)體
type raftState struct {
currentTerm uint64 //當前項秩铆,StableStore的緩存
commitIndex uint64 //最高的已提交的日志入口
lastApplied uint64 //最新以應(yīng)用到有限狀態(tài)機的日志
lastLock sync.Mutex //保護接下來的四個字段的鎖
lastSnapshotIndex uint64 //緩存的最新的快照索引
lastSnapshotTerm uint64 //緩存的快照項
lastLogIndex uint64 //LogStore中緩存的最新的日志索引
lastLogTerm uint64 //LogStore中緩存的最新的日志項
routinesGroup sync.WaitGroup //記錄運行中的goroutines
state RaftState //當前的RaftState(0-Follower 1-Candidate 2-Leader 3-Shutdown)
}
出生點
raft作為Consul server之間的共識機制,在agent創(chuàng)建時就應(yīng)該初始化灯变,因此以agent.go為入口:
- 在agent的啟動方法中:
func (a *Agent) Start() error {}
根據(jù)agent的運行時配置(RuntimeConfig)中的ServerMode
參數(shù)殴玛,若該字段為true則
- 創(chuàng)建新的Consul server
server, err := consul.NewServerLogger(consulCfg, a.logger, a.tokens)
- 該函數(shù)會根據(jù)傳入配置構(gòu)造一個新的Consul server,內(nèi)部會初始化當前server的raft server:
if err := s.setupRaft(); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start Raft: %v", err)
}
- 在setupRaft()方法中做了一系列的raft相關(guān)操作:
- 創(chuàng)建一個有限狀態(tài)機
- 創(chuàng)建一個raft的傳輸層
- 根據(jù)情況創(chuàng)建后端(all in-memory或者full disk-based)
- 根據(jù)情況初始化(bootstrap字段置為true或者為dev模式)
- 創(chuàng)建一個channel可靠的傳輸leader的通知
- 將前面創(chuàng)建的狀態(tài)機添祸,日志滚粟,存儲后端,傳輸層等組裝為server的raft node刃泌,并返回坦刀,完成raft的初始化創(chuàng)建:
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans)
if err != nil {
return err
}
return nil
- NewRaft()函數(shù)負責創(chuàng)建raft愧沟,并運行:
- 嘗試恢復老的狀態(tài),如果存在(snapshots, logs, peers)
- 確保server地址和ID有效
- 創(chuàng)建Raft node 結(jié)構(gòu)體實例
- 初始化為一個follower
- 嘗試從集群中恢復數(shù)據(jù)(term鲤遥,log沐寺,snapshot)
- 設(shè)置傳輸層心跳handler
- 開啟后臺工作,并返回盖奈。goFunc()方法開啟一個goroutine并適當?shù)奶幚硪粋€routine的開啟混坞,增加,存在钢坦,減少之間的競爭究孕。
r.goFunc(r.run)
r.goFunc(r.runFSM)
r.goFunc(r.runSnapshots)
return r, nil
運行流程
r.run()
- r.run()方法,開啟一個長期運行的goroutine跑Raft有限狀態(tài)機:
- 在收到退出信號之前爹凹,一直在Follower厨诸,Candidate,Leader三種狀態(tài)之間轉(zhuǎn)換
若當前作為Follower
1. 創(chuàng)建一個定時器設(shè)定超時時間為配置的HeartbeatTimeout參數(shù)的1~2倍里面隨機的值`heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout)`
1. Follower可以處理RPC請求
1. Follower拒絕configurationChangeCh禾酱、applyCh微酬、verifyCh、userRestoreCh通道傳來的請求颤陶,(這些操作只能是Leader來完成)颗管,統(tǒng)一返回`v.respond(ErrNotLeader)`
1. Follower可以處理配置查詢請求:
```
case c := <-r.configurationsCh:
c.configurations = r.configurations.Clone()
c.respond(nil)
```
5. Follower可以處理liveBootStrap請求:
```
case b := <-r.bootstrapCh:
b.respond(r.liveBootstrap(b.configuration))
```
6. 若定時器到期,則重置定時器滓走,之后比較最后一次與Leader通信的時間到現(xiàn)在垦江,是否超出定時器的時間
1. 若沒有超出,則繼續(xù)等待
1. 若超出搅方,則清除當前raft節(jié)點的Leader信息比吭,進入Candidate狀態(tài)
若當前作為Candidate
- 創(chuàng)建一個選舉channel,投自己一票姨涡,同時發(fā)送rpc請求給其他的server請求給自己投票
- 創(chuàng)建一個定時器設(shè)定超時時間為配置的HeartbeatTimeout參數(shù)的1~2倍里面隨機的值
heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout)
- 如果從vote channel傳回消息:
- 如果當前的任期值大于自己的任期值衩藤,則放棄Candiate,回到Follower绣溜,并返回
- 如果是一張有效的投票,則自己的票數(shù)+1
- 如果自己的總票數(shù)大于需要的票數(shù)(所有能投票的voters數(shù)/2 + 1)娄蔼,則自己當選Leader怖喻,并返回
- Follower拒絕configurationChangeCh、applyCh岁诉、verifyCh锚沸、userRestoreCh通道傳來的請求,(這些操作只能是Leader來完成)涕癣,統(tǒng)一返回
v.respond(ErrNotLeader)
- Follower可以處理配置查詢請求:
case c := <-r.configurationsCh:
c.configurations = r.configurations.Clone()
c.respond(nil)
- 如果定時器超時哗蜈,則返回,重新發(fā)起選舉流程
若當前作為Leader
- 通知其他節(jié)點,我現(xiàn)在是Leader
- 初始化leaderState
- 為每一個節(jié)點創(chuàng)建一個channel距潘,負責異步的復制replication到其他的節(jié)點
- 進入leader loop
- 創(chuàng)建一個租期通道炼列,時間為LeaderLeaseTimeout字段配置
- 如果自己的狀態(tài)為Leader則處理rpcCh、commitCh音比、verifyCh俭尖、userRestoreCh、configurationsCh洞翩、configurationChangeChIfStable稽犁、bootstrapCh、applyCh通道的請求
- 如果租期到了骚亿,check是否在最近的lease周期里面已亥,能夠和半數(shù)以上的節(jié)點通信,如果不能来屠,讓出Leader位置虑椎,如果能夠,則按一定算法續(xù)一個租期
r.runFSM()
- r.runFSM()方法是一個長期運行的goroutine的妖,負責應(yīng)用logs到有限狀態(tài)機
- 根據(jù)請求的不同分別做commit绣檬、restore、snapshot操作:
for {
select {
case ptr := <-r.fsmMutateCh:
switch req := ptr.(type) {
case *commitTuple:
commit(req)
case *restoreFuture:
restore(req)
default:
panic(fmt.Errorf("bad type passed to fsmMutateCh: %#v", ptr))
}
case req := <-r.fsmSnapshotCh:
snapshot(req)
case <-r.shutdownCh:
return
}
}
r.runSnapshots()
- r.runSnapshots()方法是一個長期運行的goroutine嫂粟,負責管理給有限狀態(tài)機創(chuàng)建快照
- 根據(jù)配置的快照間隔時間娇未,創(chuàng)建一個定時器,時間為間隔時間的1~2倍中的任意數(shù)值星虹,若定時器到時間零抬,則判斷是否需要做快照,若需要宽涌,則觸發(fā)快照操作
- 用戶可以手動觸發(fā)快照操作