raft 協(xié)議是一個(gè)一致性算法挤渐,解決多臺(tái)機(jī)器之間數(shù)據(jù)一致的問(wèn)題阵翎。raft 聲稱簡(jiǎn)潔明了,可以取代非常復(fù)雜的 PAXOS 算法东囚。然而翻看 raft 的論文后跺嗽,會(huì)發(fā)現(xiàn)即便聲稱簡(jiǎn)潔明了,自己完整地實(shí)現(xiàn) raft 還是很麻煩的。
etcd是一個(gè)分布式的 key-value 存儲(chǔ)組件桨嫁,它通過(guò) raft 算法保證多臺(tái)機(jī)器數(shù)據(jù)的一致性植兰。那么 etcd 中的 raft 算法可以提取出來(lái)用在自己的項(xiàng)目中嗎?
答案是可以的璃吧。etcd 不僅實(shí)現(xiàn)了 raft楣导,還把 raft 解耦得很完美,完全可以獨(dú)立使用畜挨。代碼庫(kù)點(diǎn)這兒:https://github.com/etcd-io/etcd/tree/master/raft爷辙。
美中不足的是,etcd raft的使用文檔寫得很爛朦促,文檔中列的代碼缺了很多關(guān)鍵部分膝晾,是跑不起來(lái)的。按照文檔中的代碼寫务冕,不是報(bào)錯(cuò)就是 go panic血当,要不就是跑起來(lái)后機(jī)器都僵著不選舉。經(jīng)過(guò)筆者的實(shí)踐禀忆,補(bǔ)齊了缺失的代碼臊旭,完成了一個(gè)可以跑起來(lái)的示例,代碼見(jiàn)文章最后箩退。
實(shí)踐過(guò)程中离熏,使用文檔中沒(méi)有提及的幾個(gè)點(diǎn):
文檔說(shuō)
n := raft.StartNode()
就可以啟動(dòng)一個(gè)節(jié)點(diǎn),實(shí)際這樣做會(huì) panic戴涝,要自己額外再封裝一個(gè) struct 滋戳,并且實(shí)現(xiàn)Process()
方法才行(見(jiàn)本文 raft.go里的rNode
)文檔說(shuō)集群中在收到對(duì)方節(jié)點(diǎn)的 RPC 消息時(shí),要調(diào)用
n.Step()
方法:
func recvRaftRPC(ctx context.Context, m raftpb.Message) {
n.Step(ctx, m)
}
但這個(gè)recvRaftRPC()
又在哪調(diào)用呢啥刻?回顧第 1 條不是要自己封裝一個(gè) struct 嗎奸鸯,n.Step()
應(yīng)該寫在這個(gè) struct 的 Process()
方法里,而不是放在什么 recvRaftRPC()
里(見(jiàn)本文 raft.go 里的 rNode
)可帽。raft 算法會(huì)在接收到其他節(jié)點(diǎn)的RPC請(qǐng)求時(shí)調(diào)用 Process()
娄涩,
- 還是
raft.StartNode()
,文檔的這段代碼:
n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})
意思是三個(gè)節(jié)點(diǎn)的集群映跟,如果當(dāng)前啟動(dòng)節(jié)點(diǎn) ID 是 0x01
蓄拣,那么啟動(dòng)時(shí) peer 列表只傳 0x02, 0x03
,不傳自己努隙,實(shí)際這樣做啟動(dòng)集群后會(huì)僵住不選舉球恤。正確做法是把節(jié)點(diǎn)自己也傳入 peer 列表。
- 文檔中的
for-select
循環(huán)剃法,是要寫在一個(gè) go 協(xié)程里的碎捺。不然啟動(dòng)后集群會(huì)僵住不選舉路鹰。
示例代碼介紹
本文的示例代碼是一個(gè)三節(jié)點(diǎn)的集群贷洲,節(jié)點(diǎn)之前通過(guò) http 交換 raft 報(bào)文收厨。
集群?jiǎn)?dòng)之后,0x01節(jié)點(diǎn)會(huì)每隔 1 秒申請(qǐng)?zhí)岚福ㄒ簿褪菢I(yè)務(wù)數(shù)據(jù)):
for {
log.Printf("Propose on node %v\n", *id)
n.node.Propose(context.TODO(), []byte("hello"))
time.Sleep(time.Second)
}
然后在代碼的 這個(gè)地方:
for _, entry := range rd.CommittedEntries {
switch entry.Type {
case raftpb.EntryNormal:
log.Printf("Receive committed data on node %v: %v\n", rn.id, string(entry.Data))
....
}
集群的每個(gè)節(jié)點(diǎn)都會(huì)收到這個(gè)提案优构,這時(shí)后提案在集群里是一致的了诵叁,可以放心地持久化了。
完整代碼:
main.go
package main
import (
"context"
"flag"
"log"
"time"
)
func main() {
id := flag.Uint64("id", 1, "node id")
flag.Parse()
log.Printf("I'am node %v\n", *id)
cluster := map[uint64]string{
1: "http://127.0.0.1:22210",
2: "http://127.0.0.1:22220",
3: "http://127.0.0.1:22230",
}
n := newRaftNode(*id, cluster)
if *id == 1 {
time.Sleep(5 * time.Second)
for {
log.Printf("Propose on node %v\n", *id)
n.node.Propose(context.TODO(), []byte("hello"))
time.Sleep(time.Second)
}
}
select {}
}
raft.go
package main
import (
"context"
"log"
"net/http"
"strconv"
"strings"
"time"
"go.etcd.io/etcd/etcdserver/api/rafthttp"
stats "go.etcd.io/etcd/etcdserver/api/v2stats"
"go.etcd.io/etcd/pkg/types"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
"go.uber.org/zap"
)
type rNode struct {
id uint64
peerMap map[uint64]string
node raft.Node
raftStorage *raft.MemoryStorage
transport *rafthttp.Transport
}
func newRaftNode(id uint64, peerMap map[uint64]string) *rNode {
n := &rNode{
id: id,
peerMap: peerMap,
raftStorage: raft.NewMemoryStorage(),
}
go n.startRaft()
return n
}
func (rn *rNode) startRaft() {
peers := []raft.Peer{}
for i := range rn.peerMap {
peers = append(peers, raft.Peer{ID: uint64(i)})
}
c := &raft.Config{
ID: rn.id,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: rn.raftStorage,
MaxSizePerMsg: 4096,
MaxInflightMsgs: 256,
}
rn.node = raft.StartNode(c, peers)
rn.transport = &rafthttp.Transport{
Logger: zap.NewExample(),
ID: types.ID(rn.id),
ClusterID: 0x1000,
Raft: rn,
ServerStats: stats.NewServerStats("", ""),
LeaderStats: stats.NewLeaderStats(strconv.Itoa(int(rn.id))),
ErrorC: make(chan error),
}
rn.transport.Start()
for peer, addr := range rn.peerMap {
if peer != rn.id {
rn.transport.AddPeer(types.ID(peer), []string{addr})
}
}
go rn.serveRaft()
go rn.serveChannels()
}
func (rn *rNode) serveRaft() {
addr := rn.peerMap[rn.id][strings.LastIndex(rn.peerMap[rn.id], ":"):]
server := http.Server{
Addr: addr,
Handler: rn.transport.Handler(),
}
server.ListenAndServe()
}
func (rn *rNode) serveChannels() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
rn.node.Tick()
case rd := <-rn.node.Ready():
rn.raftStorage.Append(rd.Entries)
rn.transport.Send(rd.Messages)
if !raft.IsEmptySnap(rd.Snapshot) {
rn.raftStorage.ApplySnapshot(rd.Snapshot)
}
for _, entry := range rd.CommittedEntries {
switch entry.Type {
case raftpb.EntryNormal:
log.Printf("Receive committed data on node %v: %v\n", rn.id, string(entry.Data))
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
cc.Unmarshal(entry.Data)
rn.node.ApplyConfChange(cc)
}
}
rn.node.Advance()
case err := <-rn.transport.ErrorC:
log.Fatal(err)
}
}
}
func (rn *rNode) Process(ctx context.Context, m raftpb.Message) error {
return rn.node.Step(ctx, m)
}
func (rn *rNode) IsIDRemoved(id uint64) bool { return false }
func (rn *rNode) ReportUnreachable(id uint64) {}
func (rn *rNode) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}