raft 結(jié)構(gòu)
網(wǎng)絡(luò)層
首先etcd最外層有一個(gè)網(wǎng)絡(luò)層狰住,負(fù)責(zé)與集群其他節(jié)點(diǎn)通信或者接受客戶(hù)端的請(qǐng)求,這里我們主要學(xué)習(xí)raft模塊不詳細(xì)解讀齿梁,使用就用網(wǎng)絡(luò)層來(lái)代替催植。
node
node負(fù)責(zé)raft于網(wǎng)絡(luò)層的交互肮蛹,交互使用go的chan
propc: make(chan msgWithResult), //接收網(wǎng)絡(luò)層MsgProp類(lèi)型消息
recvc: make(chan pb.Message), //接收網(wǎng)絡(luò)層除MsgProp類(lèi)型以外的消息
confc: make(chan pb.ConfChange),//接收EntryConfChange類(lèi)型消息比如動(dòng)態(tài)添加節(jié)點(diǎn)
confstatec: make(chan pb.ConfState),
readyc: make(chan Ready),//向上層返回 ready
advancec: make(chan struct{}),//上層處理往ready后返回給raft的消息
// make tickc a buffered chan, so raft node can buffer some ticks when the node
// is busy processing raft messages. Raft node will resume process buffered
// ticks when it becomes idle.
tickc: make(chan struct{}, 128),//管理超時(shí)的管道
done: make(chan struct{}),
stop: make(chan struct{}),
status: make(chan chan Status),
可以看到定義來(lái)許多管道來(lái)交互信息
raft
raft主要處理raft算法的實(shí)現(xiàn),和日志的復(fù)制创南。
raft會(huì)存儲(chǔ)當(dāng)前節(jié)點(diǎn)的id伦忠,任期,投票等稿辙。
其中raftlog用來(lái)管理日志昆码,可以看到raftlog下有兩個(gè)類(lèi)。
一個(gè)日志的流程是這樣的:
- 客戶(hù)端發(fā)送請(qǐng)求邻储,生成日志會(huì)先進(jìn)入到unstable模塊赋咽,如名字所表示,它是不安全的吨娜,因?yàn)槿罩具€沒(méi)有進(jìn)行持久化脓匿。
- 將unstable中的日志同步給其他節(jié)點(diǎn),同時(shí)交給上層持久化宦赠。node也負(fù)責(zé)和上層通信陪毡。
- 如果日志已經(jīng)被半數(shù)以上的節(jié)點(diǎn)復(fù)制成功了,那么這部分日志將會(huì)被認(rèn)為提交成功勾扭。raft會(huì)修改raftlog中的committed記錄提交的index毡琉。
- 提交成功后raft會(huì)將提交成功的日志返回給上層,上層會(huì)應(yīng)用日志妙色,然后響應(yīng)給客戶(hù)端成功绊起,同時(shí)raft也會(huì)同步給其他節(jié)點(diǎn)讓他們也應(yīng)用日志,然后修改自己的applied記錄應(yīng)用的index燎斩。
啟動(dòng)node
創(chuàng)建node
創(chuàng)建node之前先會(huì)讀取持久化這磁盤(pán)中的日志虱歪,根據(jù)是否有日志和是否是一個(gè)新集群,有三種情況
- 沒(méi)有日志并且是新集群栅表,直接調(diào)用startNode
- 沒(méi)有日志不是新集群笋鄙,會(huì)先與其他節(jié)點(diǎn)通信然后更新配置調(diào)用straNode
- 有日志,說(shuō)明是節(jié)點(diǎn)重啟怪瓶,讀取日志調(diào)用restartNode
startNode 和 restartNode其實(shí)差別不大萧落,都會(huì)構(gòu)建一個(gè)raft的配置
c := &raft.Config{
ID: uint64(id),//當(dāng)前節(jié)點(diǎn)id
ElectionTick: cfg.ElectionTicks,//選舉用超時(shí)時(shí)間
HeartbeatTick: 1,//心跳間隔
Storage: s,//就是MemorySorage
MaxSizePerMsg: maxSizePerMsg, //每次發(fā)消息的最大size
MaxInflightMsgs: maxInflightMsgs,
CheckQuorum: true,
PreVote: cfg.PreVote,//上文提到的pre模式
}
除了Storage參數(shù),其他都是一些配置的屬性洗贰。在straNode中
Storage是調(diào)用 s = raft.NewMemoryStorage() 來(lái)創(chuàng)建的一個(gè)空的初始對(duì)象找岖。而restartNode中會(huì)用Storage加載從日志中讀取的數(shù)據(jù)。來(lái)還原服務(wù)宕機(jī)前的狀態(tài)敛滋。
s := raft.NewMemoryStorage()
if snapshot != nil {
s.ApplySnapshot(*snapshot)
}
創(chuàng)建raft
接下來(lái)調(diào)用raft.StartNode(c, peers)來(lái)創(chuàng)建raft對(duì)象许布。c就是上面的raft.Config,peers記錄集群中所有節(jié)點(diǎn)的id绎晃。
func StartNode(c *Config, peers []Peer) Node {
r := newRaft(c)
// become the follower at term 1 and apply initial configuration
// entries of term 1
r.becomeFollower(1, None)
for _, peer := range peers {
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
d, err := cc.Marshal()
if err != nil {
panic("unexpected marshal error")
}
e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
r.raftLog.append(e)
}
// Mark these initial entries as committed.
// TODO(bdarnell): These entries are still unstable; do we need to preserve
// the invariant that committed < unstable?
r.raftLog.committed = r.raftLog.lastIndex()
- 先調(diào)用newRaft創(chuàng)建raft對(duì)象蜜唾,newRaft方法中會(huì)先Storage中讀取信息杂曲。然后創(chuàng)建raft對(duì)象,然后判斷Storage是不是空袁余,如果是重啟節(jié)點(diǎn)擎勘,那這里Storage就會(huì)有數(shù)據(jù),然后更新raft的數(shù)據(jù)颖榜。
if !isHardStateEqual(hs, emptyState) {
r.loadState(hs)
}
if c.Applied > 0 {
raftlog.appliedTo(c.Applied)
}
/-------------------------------------------/
func (r *raft) loadState(state pb.HardState) {
if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
}
r.raftLog.committed = state.Commit
r.Term = state.Term
r.Vote = state.Vote
/------------------------------------------/
func (l *raftLog) appliedTo(i uint64) {
if i == 0 {
return
}
if l.committed < i || i < l.applied {
l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
}
l.applied = i
}
}
r.loadState(hs) 里面會(huì)更新raft記錄的任期和投票,raftLog中的提交index棚饵。
appliedTo 會(huì)一個(gè)raftLog的應(yīng)用index。
之后會(huì)調(diào)用r.becomeFollower(1, None)修改節(jié)點(diǎn)狀態(tài)變成follower
同樣其他狀態(tài)也有相應(yīng)的函數(shù):
func (r *raft) becomeFollower(term uint64, lead uint64)
func (r *raft) becomeCandidate()
func (r *raft) becomePreCandidate()
func (r *raft) becomeLeader()
然后如果是新集群掩完,則會(huì)將集群的節(jié)點(diǎn)信息追加到日志中蟹地,并提交追加的日志,這部分用于重啟時(shí)能從日志中讀取到集群的節(jié)點(diǎn)信息藤为。
最后創(chuàng)建node 開(kāi)啟協(xié)程啟動(dòng)node ,創(chuàng)建node沒(méi)有什么操作就是創(chuàng)建對(duì)象怪与。
n := newNode()
n.logger = c.Logger
go n.run(r)
return &n
run
func (n *node) run(r *raft) {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var prevLastUnstablei, prevLastUnstablet uint64
var havePrevLastUnstablei bool
var prevSnapi uint64
var applyingToI uint64
var rd Ready
lead := None
prevSoftSt := r.softState()
prevHardSt := emptyState
for {
if advancec != nil {
readyc = nil
} else {
rd = newReady(r, prevSoftSt, prevHardSt)
if rd.containsUpdates() {
readyc = n.readyc
} else {
readyc = nil
}
}
if lead != r.lead {
if r.hasLeader() {
if lead == None {
propc = n.propc
} else {
propc = nil
}
lead = r.lead
}
select {
case pm := <-propc:
err := r.Step(m)
case m := <-n.recvc:
r.Step(m)
case cc := <-n.confc:
case <-n.tickc:
r.tick()
case readyc <- rd:
advancec = n.advancec
case <-advancec:
advancec = nil
case c := <-n.status:
c <- getStatus(r)
case <-n.stop:
close(n.done)
return
}
}
}
上面的代碼是刪除來(lái)很多邏輯只保留基本結(jié)構(gòu)的。
主要用于從管道讀取消息然后傳遞給raft缅疟。
這里重點(diǎn)就是分别,raft是不能主動(dòng)給發(fā)消息的,只能是上層應(yīng)用自己來(lái)拉取存淫。
第一次進(jìn)入循環(huán)advancec為空耘斩,會(huì)調(diào)用
newReady(r, prevSoftSt, prevHardSt) 這個(gè)函數(shù)的返回就是raft的狀態(tài)變化和要發(fā)送的消息。
rd := Ready{
Entries: r.raftLog.unstableEntries(),//unstable中的日志交給上層持久化
CommittedEntries: r.raftLog.nextEnts(),//已經(jīng)提交待應(yīng)用的日志桅咆,交給上層應(yīng)用
Messages: r.msgs,//raft要發(fā)送的消息
}
node拿到這個(gè)對(duì)象后會(huì)通過(guò)readyc通道發(fā)送給上層括授,然后記錄這次的狀態(tài)已用于下次調(diào)用Ready的時(shí)候判斷狀態(tài)是否變化。
之后會(huì)等待advancec通道岩饼,如果要消息則說(shuō)明上次的消息已經(jīng)處理完成荚虚,可以修改自己的狀態(tài)了,比如日志應(yīng)用完成修改自己的應(yīng)用index籍茧。
這就是raft啟動(dòng)的過(guò)程版述。下一個(gè)文章講解選舉的流程