raftexample簡介
本文中raft的實(shí)現(xiàn)以etcd中raftexample實(shí)現(xiàn)為例襟士。
raftexample的例子中,啟動(dòng)了一個(gè)kv數(shù)據(jù)庫作為raft的state machine柿赊,另外啟動(dòng)了一個(gè)http數(shù)據(jù)庫來響應(yīng)客戶端的請求嫂丙。其中proposeC以及commitC,errorC主要用來交互憾儒,proposeC為由客戶端發(fā)送過來的條目询兴,通過httpserver流向共識部分raftNode;commitC為經(jīng)共識的條目起趾,由raftNode流向kv數(shù)據(jù)庫蕉朵。
/*
github.com/coreos/etcd/contrib/raftexample/main.go
main()
*/
var kvs *kvstore
getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
//啟動(dòng)raft節(jié)點(diǎn)
commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
//啟動(dòng)kv數(shù)據(jù)庫作為state machine
kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)
// the key-value http handler will propose updates to raft
serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
raftexample中的存儲(chǔ)
raftexample中的存儲(chǔ)分為以下幾部分
- kv數(shù)據(jù)庫, 存儲(chǔ)state machine的狀態(tài)阳掐。raftexample例子中是個(gè)簡單的map始衅。
- 內(nèi)存數(shù)據(jù)中的log,raft共識用到的日志條目缭保,一般是一段較新的日志汛闸,可能包含一部分已共識的日志和一些尚未共識的日志條目。由于是內(nèi)存維護(hù)艺骂,可以靈活的重寫替換诸老。該部分內(nèi)容中位于commitIndex之前的部分是已共識的部分,內(nèi)容不會(huì)被替換钳恕,commitIndex到lastIndex之間的內(nèi)容是尚未共識的部分别伏,index對應(yīng)的條目內(nèi)容可能被替換蹄衷。該部分在raft.MemoryStorage實(shí)現(xiàn)。
- wal部分厘肮,raft共識所用的日志文件愧口。該文件只會(huì)追加,不會(huì)重寫和覆蓋类茂。raft共識過程中收到的日志條目耍属,都會(huì)記錄在wal日志文件中。即可能在wal日志文件中看見同index不同term的日志條目巩检。主要作用是在節(jié)點(diǎn)崩潰后可以通過wal部分重新啟動(dòng)厚骗。
- snapshot文件部分,類似于存檔點(diǎn)的概念兢哭,已經(jīng)過共識的部分领舰,可以在一定時(shí)間或者一點(diǎn)量消息后,生成snapshot迟螺,snapshot文件一般保存當(dāng)前state machine的狀態(tài)提揍,及集群的相關(guān)配置等數(shù)據(jù)。snapshot可以幫助節(jié)點(diǎn)快速啟動(dòng)煮仇,以及新節(jié)點(diǎn)加入時(shí)的快速同步劳跃。
幾個(gè)重要的組件
etcd中raft的實(shí)現(xiàn)了很好的模塊化,其中raft共識模塊主要是實(shí)現(xiàn)了算法的邏輯浙垫,而系統(tǒng)需要用到的存儲(chǔ)刨仑、通訊等模塊都從共識模塊中很好的剝離了出來是單獨(dú)模塊的實(shí)現(xiàn)。這樣做可以很方便對raft的共識模塊進(jìn)行移植夹姥,也可以很方便地支持多種底層的存儲(chǔ)或者通訊方式杉武。raftexample中的組件大概可以分為以下幾部分。
- 應(yīng)用層部分:對應(yīng)其raftNode組件辙售,應(yīng)用層可以根據(jù)自己的需要實(shí)現(xiàn)自己的raftNode轻抱。
- 共識部分(github.com/coreos/etcd/raft):對應(yīng)其raft組件。該部分實(shí)現(xiàn)了raft核心算法部分旦部。其邏輯主要在raft結(jié)構(gòu)體實(shí)現(xiàn)的方法中祈搜,其通過raftLog結(jié)構(gòu)體以及progress結(jié)構(gòu)體,實(shí)現(xiàn)raft算法中l(wèi)og部分的管理和節(jié)點(diǎn)對集群中其他節(jié)點(diǎn)信息的管理士八。并且通過node結(jié)構(gòu)體提供給了應(yīng)用層與共識部分溝通的渠道容燕。
- 通訊部分(github.com/coreos/etcd/rafthttp):對應(yīng)其Transport組件。使用http協(xié)議完成節(jié)點(diǎn)間的相互通信婚度。
- 存儲(chǔ)部分(github.com/coreos/etcd/raftsnap蘸秘、github.com/coreos/etcd/wal):對應(yīng)其snapshotter組件與wal組件。通過這兩個(gè)組件分別對快照和日志條目進(jìn)行持久化的存儲(chǔ)。
raftNode組件:主要處理應(yīng)用層服務(wù)
raftNode結(jié)構(gòu)體醋虏,實(shí)際上是應(yīng)用層的包裝防症,真正的raft共識部分在其中的node(raft.Node)中娇豫。RaftNode結(jié)構(gòu)體中可以放應(yīng)用層需要的一些東西渐溶。大致有以下一些東西
- 協(xié)程交互用的通道漏益,包括proposeC,confChangeC粘舟,commitC,errorC佩研。
- raft節(jié)點(diǎn)的基本配置柑肴,包括id,peers旬薯,join晰骑,waldir,snapdir绊序。
- raft節(jié)點(diǎn)狀態(tài)硕舆,包括lastIndex,confState骤公,snapshotIndex抚官,appliedIndex等。
- raft共識部分的組件阶捆,包括node其是共識的關(guān)鍵部分凌节,raftStorage是日志條目在內(nèi)存中存儲(chǔ)的實(shí)現(xiàn),wal是日志條目存儲(chǔ)在文件中的實(shí)現(xiàn)洒试,snapshotter及其相關(guān)配置是snapshot處理的相關(guān)實(shí)現(xiàn)倍奢,transport是raft節(jié)點(diǎn)間通訊的組件。
/*
github.com/coreos/etcd/contrib/raftexample/raft.go
*/
// A key-value stream backed by raft
type raftNode struct {
proposeC <-chan string // proposed messages (k,v)
confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
commitC chan<- *string // entries committed to log (k,v)
errorC chan<- error // errors from raft session
id int // client ID for raft session
peers []string // raft peer URLs
join bool // node is joining an existing cluster
waldir string // path to WAL directory
snapdir string // path to snapshot directory
getSnapshot func() ([]byte, error)
lastIndex uint64 // index of log at start
confState raftpb.ConfState
snapshotIndex uint64 //snapshotIndex
appliedIndex uint64 //同論文中的appliedIndex垒棋,用于記錄最新的已提交state machine執(zhí)行的日志的索引
// raft backing for the commit/error channel
node raft.Node //真正的共識部分的node
raftStorage *raft.MemoryStorage //raft中內(nèi)存存儲(chǔ)日志的部分
wal *wal.WAL //wal文件部分
snapshotter *raftsnap.Snapshotter
snapshotterReady chan *raftsnap.Snapshotter // signals when snapshotter is ready
snapCount uint64
transport *rafthttp.Transport
stopc chan struct{} // signals proposal channel closed
httpstopc chan struct{} // signals http server to shutdown
httpdonec chan struct{} // signals http server shutdown complete
}
大致有以下幾種函數(shù)
- 啟動(dòng)raft函數(shù):startRaft
- snapshot相關(guān)的函數(shù)卒煞,主要處理snapshot的觸發(fā)、將什么內(nèi)容保存到snapshot叼架、保存snapshot畔裕、加載snapshot等事項(xiàng),包括:maybeTriggerSnapshot乖订,publishSnapshot柴钻,saveSnap,loadSnapshot
- 日志條目相關(guān)的函數(shù),主要將條目提交給state machine去執(zhí)行垢粮,包括:publishEntries贴届,entriesToApply
- 重新加載和重放WAL,包括:openWAL,replayWAL
- 響應(yīng)Raft和客戶端請求的函數(shù):serverRaft毫蚓,serveChannels
node組件占键,應(yīng)用層與共識模塊的溝通者
node結(jié)構(gòu)體的主要作用是應(yīng)用層和共識模塊的銜接。將應(yīng)用層的消息傳遞給底層共識模塊元潘,并將底層共識模塊共識后的結(jié)果反饋給應(yīng)用層畔乙。結(jié)構(gòu)體的主要成員都是用來作消息傳遞用處的通道。
type node struct {
propc chan pb.Message
recvc chan pb.Message
confc chan pb.ConfChange
confstatec chan pb.ConfState
readyc chan Ready
advancec chan struct{}
tickc chan struct{}
done chan struct{}
stop chan struct{}
status chan chan Status
logger Logger
}
大致函數(shù)有幾種:
- 啟動(dòng)翩概、新建牲距、停止:StartNode,RestartNode钥庇,newNode,Stop牍鞠。
- 發(fā)送命令給共識模塊的函數(shù):Tick,Campaign,Propose,Step,ProposeConfChange,ApplyConfChange评姨,TransferLeadership难述。
- 反饋給應(yīng)用層的函數(shù)Ready,Advance,,Status,ReportUnreachable,ReadIndex。
raft組件:共識組件
raft結(jié)構(gòu)體吐句,共識組件結(jié)構(gòu)體胁后。
首先放一下論文中需要維護(hù)的state來對比
currentTerm //當(dāng)前任期
votedFor //當(dāng)前任期的候選者編號,無則為null
log[] //日志條目
//Volatile state on all servers,所有服務(wù)器上維護(hù)
commitIndex //已知的最高的可被提交的日志條目的索引嗦枢,初始為0
lastApplied //當(dāng)前已提交給state machine執(zhí)行的條目的索引攀芯,初始為0
//Volatile state on leaders:(Reinitialized after election),只在leader節(jié)點(diǎn)上維護(hù)
nextIndex[] //對于每一臺(tái)服務(wù)器文虏,下一條將要發(fā)給該服務(wù)器的條目的索引敲才,初始為leader最后一條條目索引+1
matchIndex[] //每一個(gè)服務(wù)器已知的最高的已復(fù)制的條目的索引,初始為0
etcd中raft實(shí)現(xiàn)中raft結(jié)構(gòu)體內(nèi)容择葡,可以看出其主要包含一下幾部分
- state的基本項(xiàng)紧武,包括:id,Term(論文中currentTerm)敏储,vote(論文中voteFor)阻星,raftLog(論文中l(wèi)og[],并且維護(hù)了comitIndex和lastApplied),prs及l(fā)earnerPrs(論文中的matchIndex與nextIndex以及一些其他關(guān)于對應(yīng)節(jié)點(diǎn)狀態(tài)的信息)已添。
- 一些其他狀態(tài)項(xiàng)妥箕,包括state用于判別當(dāng)前節(jié)點(diǎn)狀態(tài)、isLearner判別當(dāng)前節(jié)點(diǎn)是否是learner更舞、lead表示當(dāng)前l(fā)eaderid畦幢、以及l(fā)eadTransferee、pendingConfIndex缆蝉、readStates宇葱、checkQuorum瘦真、preVote等。
- 一些其他配置項(xiàng)黍瞧,包括maxInflight诸尽,maxMsgSize,heartbeatTimeout印颤,electionTimeout您机,randomizedElectionTimeout,disableProposalForwarding等年局。
- 重要的函數(shù)际看,tick用來計(jì)時(shí),step用來處理各種RPC矢否。
type raft struct {
//同論文中的state的一些基本項(xiàng)
id uint64 //節(jié)點(diǎn)id
Term uint64 //節(jié)點(diǎn)當(dāng)前所處任期仲闽,即currentTerm
Vote uint64 // 即votedFor
raftLog *raftLog //節(jié)點(diǎn)的日志,即log[]
prs map[uint64]*Progress //progress中存儲(chǔ)了對應(yīng)節(jié)點(diǎn)的matchIndex和nextIndex
learnerPrs map[uint64]*Progress
//一些其他狀態(tài)
readStates []ReadState
state StateType
// isLearner is true if the local raft node is a learner.
isLearner bool
votes map[uint64]bool
msgs []pb.Message
readOnly *readOnly
lead uint64 // the leader id
// leadTransferee is id of the leader transfer target when its value is not zero.
// Follow the procedure defined in raft thesis 3.10.
leadTransferee uint64
pendingConfIndex uint64
checkQuorum bool
preVote bool
//一些其他配置項(xiàng)
maxInflight int
maxMsgSize uint64
electionElapsed int
heartbeatElapsed int
heartbeatTimeout int
electionTimeout int
randomizedElectionTimeout int
disableProposalForwarding bool
tick func()
step stepFunc //按照leader兴喂,follower蔼囊,candidate角色不同焚志,有三個(gè)不同的函數(shù)
logger Logger
}
raft結(jié)構(gòu)體及其方法是算法中的主體部分衣迷。大致有以下幾種函數(shù):
- 新建并啟動(dòng):newRaft
- 查詢或者獲取狀態(tài)類:包括hasLeader,softState,hardState,quorum,nodes,learnerNodes,getProgress酱酬,maybeCommit壶谒,pastElectionTimeout,checkQuorumActive
- 控制類函數(shù):主要作用是根據(jù)消息類型調(diào)用響應(yīng)的函數(shù)膳沽,包括:Step汗菜,stepLeader,stepCandidate挑社,stepFollower
- 發(fā)送消息類:包括send陨界,sendAppend,sendHeartbeat痛阻,sendTimeoutNow菌瘪,bcastAppend,bcastHearbeat阱当,bcastHearbeatWithCtx俏扩。其中幾個(gè)bcast打頭函數(shù)表示向集群中的每個(gè)節(jié)點(diǎn)發(fā)送。
- 處理消息類:包括handleAppendEntries,handleHeartbeat,handleSnapshot,
- 處理集群成員變更類:addNode弊添,addLearner录淡,addNodeOrLearnerNode,removeNode油坝,restoreNode嫉戚,setProgress刨裆,delProgress
- 實(shí)際操作類:包括appendEntry,
- tick函數(shù)類:tickElection彼水,tickHeartbeat
- 角色轉(zhuǎn)化類:becomeFollower崔拥,becomeCandidate,becomePreCandidate凤覆,becomeLeader
- 輔助函數(shù)類:reset链瓦,restore,resetRandomizedElectionTimeout,loadState盯桦,campaign慈俯,poll
Progress:維護(hù)集群中的節(jié)點(diǎn)狀態(tài)
Progress結(jié)構(gòu)體,該結(jié)構(gòu)體主要是在每個(gè)節(jié)點(diǎn)維護(hù)集群中其他節(jié)點(diǎn)的狀態(tài)用拥峦。主要包括
- 論文中用以共識用的Match(論文中的matchIndex)贴膘,next(論文中的nextIndex)。
- 包括一些節(jié)點(diǎn)的是否正常運(yùn)行的狀態(tài)RecentActive略号、Paused刑峡。
- 節(jié)點(diǎn)的身份狀態(tài)State及isLearner等。
type Progress struct {
Match, Next uint64
State ProgressStateType
Paused bool
PendingSnapshot uint64
RecentActive bool
ins *inflights
IsLearner bool
}
大致有以下幾種函數(shù):
- 控制類:pause玄柠、resume
- 狀態(tài)轉(zhuǎn)變類函數(shù): resetState, becomeProbe, becomeReplicate, becomeSnapshot
- Match和Next維護(hù)類函數(shù):
maybeUpdate, optimisticUpdate, maybeDecrTo, - - 狀態(tài)查詢類函數(shù):IsPaused
- snapshot相關(guān): snapshotFailure,needSnapshotAbort
raftLog:raft中的日志管理者
raftLog結(jié)構(gòu)體突梦,raft中的日志條目管理的結(jié)構(gòu)體,前文中介紹raft中的日志存儲(chǔ)包括文件存儲(chǔ)和內(nèi)存存儲(chǔ)兩部分羽利。該結(jié)構(gòu)體中storage即為內(nèi)存存儲(chǔ)的日志條目部分宫患。unstable是在日志尚未存儲(chǔ)到日志文件時(shí)的狀態(tài)。commited和applied是論文中commitIndex和lastApplied的概念这弧,及最新的已共識可提交的日志條目索引娃闲,和已提交給state machine執(zhí)行的最新條目的索引。
type raftLog struct {
// storage contains all stable entries since the last snapshot.
storage Storage
// unstable contains all unstable entries and snapshot.
// they will be saved into storage.
unstable unstable
// committed is the highest log position that is known to be in
// stable storage on a quorum of nodes.
committed uint64
// applied is the highest log position that the application has
// been instructed to apply to its state machine.
// Invariant: applied <= committed
applied uint64
logger Logger
}
大致有以下幾種函數(shù)
- 新建: newLog
- 查詢狀態(tài)類:獲取索引匾浪、快照或者條目信息等皇帮,包括:unstableEntries,
nextEnts蛋辈, hasNextEnts属拾, snapshot, firstIndex梯浪, lastIndex捌年, lastTerm , entries挂洛, allEntries礼预。 - 狀態(tài)更新類:更新commited或者applied的值,包括:commitTo虏劲,
appliedTo托酸, stableTo褒颈, stableSnapTo 。 - 條目更新類:append
- 判斷狀態(tài)類:判斷條目是否可以被追加到當(dāng)前励堡,是否是當(dāng)前term谷丸,找出沖突等。包括:maybeAppend应结, findConflict刨疼, isUpToDate matchTerm, maybeCommit鹅龄, mustCheckOutOfBounds
- 實(shí)際操作類:
- 輔助類:term揩慕, slice,zeroTermOnErrCompacted
Snapshotter組件:快照管理者
type Snapshotter struct {
dir string
}
大致包含保存快照SaveSnap扮休,讀取快照Read迎卤,加載快照Load等。
WAL組件:日志文件管理者
WAL結(jié)構(gòu)體玷坠,用來管理日志文件蜗搔。
type WAL struct {
dir string // the living directory of the underlay files
// dirFile is a fd for the wal directory for syncing on Rename
dirFile *os.File
metadata []byte // metadata recorded at the head of each WAL
state raftpb.HardState // hardstate recorded at the head of WAL
start walpb.Snapshot // snapshot to start reading
decoder *decoder // decoder to decode records
readClose func() error // closer for decode reader
mu sync.Mutex
enti uint64 // index of the last entry saved to the wal
encoder *encoder // encoder to encode records
locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
fp *filePipeline
}
大致有以下幾種函數(shù):
- 文件讀取類:Open,OpenAtIndex八堡,OpenForRead樟凄,ReadAll,
- 文件寫入類:saveEntry秕重,saveState不同,Save厉膀,SaveSnapshot溶耘,saveCrc
- 其他文件操作類:Create,renameWal服鹅,cut凳兵,Close
Transport組件:消息傳輸者
Transport組件是負(fù)責(zé)消息通信的,目前的實(shí)現(xiàn)方式是http的實(shí)現(xiàn)企软。
type Transport struct {
DialTimeout time.Duration // maximum duration before timing out dial of the request
// DialRetryFrequency defines the frequency of streamReader dial retrial attempts;
// a distinct rate limiter is created per every peer (default value: 10 events/sec)
DialRetryFrequency rate.Limit
TLSInfo transport.TLSInfo // TLS information used when creating connection
ID types.ID // local member ID
URLs types.URLs // local peer URLs
ClusterID types.ID // raft cluster ID for request validation
Raft Raft // raft state machine, to which the Transport forwards received messages and reports status
Snapshotter *raftsnap.Snapshotter
ServerStats *stats.ServerStats // used to record general transportation statistics
// used to record transportation statistics with followers when
// performing as leader in raft protocol
LeaderStats *stats.LeaderStats
// ErrorC is used to report detected critical errors, e.g.,
// the member has been permanently removed from the cluster
// When an error is received from ErrorC, user should stop raft state
// machine and thus stop the Transport.
ErrorC chan error
streamRt http.RoundTripper // roundTripper used by streams
pipelineRt http.RoundTripper // roundTripper used by pipelines
mu sync.RWMutex // protect the remote and peer map
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
peers map[types.ID]Peer // peers map
prober probing.Prober
}
大致包含以下幾種函數(shù):
- 啟動(dòng)和停止:Start庐扫,Stop
- 消息發(fā)送和處理:Handler,Get仗哨,Send形庭,SendSnapshot
- 集群鏈接的維護(hù):CutPeer,MendPeer厌漂,AddRemote萨醒,AddPeer,RemovePeer苇倡,RemoveAllPeers富纸,UpdatePeer囤踩,ActiveSince,ActivePeers