etcd 中 raft 算法的使用方法

raft 協(xié)議是一個(gè)一致性算法挤渐,解決多臺(tái)機(jī)器之間數(shù)據(jù)一致的問(wèn)題阵翎。raft 聲稱簡(jiǎn)潔明了,可以取代非常復(fù)雜的 PAXOS 算法东囚。然而翻看 raft 的論文后跺嗽,會(huì)發(fā)現(xiàn)即便聲稱簡(jiǎn)潔明了,自己完整地實(shí)現(xiàn) raft 還是很麻煩的。

etcd是一個(gè)分布式的 key-value 存儲(chǔ)組件桨嫁,它通過(guò) raft 算法保證多臺(tái)機(jī)器數(shù)據(jù)的一致性植兰。那么 etcd 中的 raft 算法可以提取出來(lái)用在自己的項(xiàng)目中嗎?

答案是可以的璃吧。etcd 不僅實(shí)現(xiàn)了 raft楣导,還把 raft 解耦得很完美,完全可以獨(dú)立使用畜挨。代碼庫(kù)點(diǎn)這兒:https://github.com/etcd-io/etcd/tree/master/raft爷辙。

美中不足的是,etcd raft的使用文檔寫得很爛朦促,文檔中列的代碼缺了很多關(guān)鍵部分膝晾,是跑不起來(lái)的。按照文檔中的代碼寫务冕,不是報(bào)錯(cuò)就是 go panic血当,要不就是跑起來(lái)后機(jī)器都僵著不選舉。經(jīng)過(guò)筆者的實(shí)踐禀忆,補(bǔ)齊了缺失的代碼臊旭,完成了一個(gè)可以跑起來(lái)的示例,代碼見(jiàn)文章最后箩退。

實(shí)踐過(guò)程中离熏,使用文檔中沒(méi)有提及的幾個(gè)點(diǎn):

  1. 文檔說(shuō) n := raft.StartNode() 就可以啟動(dòng)一個(gè)節(jié)點(diǎn),實(shí)際這樣做會(huì) panic戴涝,要自己額外再封裝一個(gè) struct 滋戳,并且實(shí)現(xiàn) Process() 方法才行(見(jiàn)本文 raft.go里的 rNode

  2. 文檔說(shuō)集群中在收到對(duì)方節(jié)點(diǎn)的 RPC 消息時(shí),要調(diào)用 n.Step() 方法:

func recvRaftRPC(ctx context.Context, m raftpb.Message) {
    n.Step(ctx, m)
}

但這個(gè)recvRaftRPC() 又在哪調(diào)用呢啥刻?回顧第 1 條不是要自己封裝一個(gè) struct 嗎奸鸯,n.Step() 應(yīng)該寫在這個(gè) struct 的 Process() 方法里,而不是放在什么 recvRaftRPC() 里(見(jiàn)本文 raft.go 里的 rNode)可帽。raft 算法會(huì)在接收到其他節(jié)點(diǎn)的RPC請(qǐng)求時(shí)調(diào)用 Process()娄涩,

  1. 還是 raft.StartNode() ,文檔的這段代碼:
n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})

意思是三個(gè)節(jié)點(diǎn)的集群映跟,如果當(dāng)前啟動(dòng)節(jié)點(diǎn) ID 是 0x01蓄拣,那么啟動(dòng)時(shí) peer 列表只傳 0x02, 0x03,不傳自己努隙,實(shí)際這樣做啟動(dòng)集群后會(huì)僵住不選舉球恤。正確做法是把節(jié)點(diǎn)自己也傳入 peer 列表。

  1. 文檔中的 for-select 循環(huán)剃法,是要寫在一個(gè) go 協(xié)程里的碎捺。不然啟動(dòng)后集群會(huì)僵住不選舉路鹰。

示例代碼介紹

本文的示例代碼是一個(gè)三節(jié)點(diǎn)的集群贷洲,節(jié)點(diǎn)之前通過(guò) http 交換 raft 報(bào)文收厨。

集群?jiǎn)?dòng)之后,0x01節(jié)點(diǎn)會(huì)每隔 1 秒申請(qǐng)?zhí)岚福ㄒ簿褪菢I(yè)務(wù)數(shù)據(jù)):

for {
    log.Printf("Propose on node %v\n", *id)
    n.node.Propose(context.TODO(), []byte("hello"))
    time.Sleep(time.Second)
}

然后在代碼的 這個(gè)地方:

for _, entry := range rd.CommittedEntries {
    switch entry.Type {
    case raftpb.EntryNormal:
       log.Printf("Receive committed data on node %v: %v\n", rn.id, string(entry.Data))
    ....
}

集群的每個(gè)節(jié)點(diǎn)都會(huì)收到這個(gè)提案优构,這時(shí)后提案在集群里是一致的了诵叁,可以放心地持久化了。

完整代碼:

main.go

package main

import (
    "context"
    "flag"
    "log"
    "time"
)

func main() {
    id := flag.Uint64("id", 1, "node id")
    flag.Parse()
    log.Printf("I'am node %v\n", *id)

    cluster := map[uint64]string{
        1: "http://127.0.0.1:22210",
        2: "http://127.0.0.1:22220",
        3: "http://127.0.0.1:22230",
    }
    n := newRaftNode(*id, cluster)

    if *id == 1 {
        time.Sleep(5 * time.Second)
        for {
            log.Printf("Propose on node %v\n", *id)
            n.node.Propose(context.TODO(), []byte("hello"))
            time.Sleep(time.Second)
        }

    }

    select {}

}

raft.go

package main

import (
    "context"
    "log"
    "net/http"
    "strconv"
    "strings"
    "time"

    "go.etcd.io/etcd/etcdserver/api/rafthttp"
    stats "go.etcd.io/etcd/etcdserver/api/v2stats"
    "go.etcd.io/etcd/pkg/types"
    "go.etcd.io/etcd/raft"
    "go.etcd.io/etcd/raft/raftpb"
    "go.uber.org/zap"
)

type rNode struct {
    id      uint64
    peerMap map[uint64]string

    node        raft.Node
    raftStorage *raft.MemoryStorage

    transport *rafthttp.Transport
}

func newRaftNode(id uint64, peerMap map[uint64]string) *rNode {
    n := &rNode{
        id:          id,
        peerMap:     peerMap,
        raftStorage: raft.NewMemoryStorage(),
    }
    go n.startRaft()
    return n
}

func (rn *rNode) startRaft() {
    peers := []raft.Peer{}
    for i := range rn.peerMap {
        peers = append(peers, raft.Peer{ID: uint64(i)})
    }
    c := &raft.Config{
        ID:              rn.id,
        ElectionTick:    10,
        HeartbeatTick:   1,
        Storage:         rn.raftStorage,
        MaxSizePerMsg:   4096,
        MaxInflightMsgs: 256,
    }
    rn.node = raft.StartNode(c, peers)
    rn.transport = &rafthttp.Transport{
        Logger:      zap.NewExample(),
        ID:          types.ID(rn.id),
        ClusterID:   0x1000,
        Raft:        rn,
        ServerStats: stats.NewServerStats("", ""),
        LeaderStats: stats.NewLeaderStats(strconv.Itoa(int(rn.id))),
        ErrorC:      make(chan error),
    }
    rn.transport.Start()
    for peer, addr := range rn.peerMap {
        if peer != rn.id {
            rn.transport.AddPeer(types.ID(peer), []string{addr})
        }
    }
    go rn.serveRaft()
    go rn.serveChannels()
}

func (rn *rNode) serveRaft() {
    addr := rn.peerMap[rn.id][strings.LastIndex(rn.peerMap[rn.id], ":"):]
    server := http.Server{
        Addr:    addr,
        Handler: rn.transport.Handler(),
    }
    server.ListenAndServe()
}

func (rn *rNode) serveChannels() {

    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            rn.node.Tick()
        case rd := <-rn.node.Ready():
            rn.raftStorage.Append(rd.Entries)
            rn.transport.Send(rd.Messages)
            if !raft.IsEmptySnap(rd.Snapshot) {
                rn.raftStorage.ApplySnapshot(rd.Snapshot)
            }
            for _, entry := range rd.CommittedEntries {
                switch entry.Type {
                case raftpb.EntryNormal:
                    log.Printf("Receive committed data on node %v: %v\n", rn.id, string(entry.Data))
                case raftpb.EntryConfChange:
                    var cc raftpb.ConfChange
                    cc.Unmarshal(entry.Data)
                    rn.node.ApplyConfChange(cc)
                }
            }
            rn.node.Advance()
        case err := <-rn.transport.ErrorC:
            log.Fatal(err)
        }
    }

}

func (rn *rNode) Process(ctx context.Context, m raftpb.Message) error {
    return rn.node.Step(ctx, m)
}
func (rn *rNode) IsIDRemoved(id uint64) bool                           { return false }
func (rn *rNode) ReportUnreachable(id uint64)                          {}
func (rn *rNode) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末钦椭,一起剝皮案震驚了整個(gè)濱河市拧额,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌彪腔,老刑警劉巖侥锦,帶你破解...
    沈念sama閱讀 216,496評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異德挣,居然都是意外死亡恭垦,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門格嗅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)番挺,“玉大人,你說(shuō)我怎么就攤上這事屯掖⌒兀” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 162,632評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵贴铜,是天一觀的道長(zhǎng)粪摘。 經(jīng)常有香客問(wèn)我,道長(zhǎng)绍坝,這世上最難降的妖魔是什么赶熟? 我笑而不...
    開(kāi)封第一講書人閱讀 58,180評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮陷嘴,結(jié)果婚禮上映砖,老公的妹妹穿的比我還像新娘。我一直安慰自己灾挨,他們只是感情好邑退,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,198評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著劳澄,像睡著了一般地技。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上秒拔,一...
    開(kāi)封第一講書人閱讀 51,165評(píng)論 1 299
  • 那天莫矗,我揣著相機(jī)與錄音,去河邊找鬼。 笑死作谚,一個(gè)胖子當(dāng)著我的面吹牛三娩,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播妹懒,決...
    沈念sama閱讀 40,052評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼雀监,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了眨唬?” 一聲冷哼從身側(cè)響起会前,我...
    開(kāi)封第一講書人閱讀 38,910評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎匾竿,沒(méi)想到半個(gè)月后瓦宜,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,324評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡岭妖,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,542評(píng)論 2 332
  • 正文 我和宋清朗相戀三年临庇,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片区转。...
    茶點(diǎn)故事閱讀 39,711評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡苔巨,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出废离,到底是詐尸還是另有隱情侄泽,我是刑警寧澤,帶...
    沈念sama閱讀 35,424評(píng)論 5 343
  • 正文 年R本政府宣布蜻韭,位于F島的核電站悼尾,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏肖方。R本人自食惡果不足惜闺魏,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,017評(píng)論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望俯画。 院中可真熱鬧析桥,春花似錦、人聲如沸艰垂。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,668評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)猜憎。三九已至娩怎,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間胰柑,已是汗流浹背截亦。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,823評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工爬泥, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人崩瓤。 一個(gè)月前我還...
    沈念sama閱讀 47,722評(píng)論 2 368
  • 正文 我出身青樓袍啡,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親谷遂。 傳聞我的和親對(duì)象是個(gè)殘疾皇子葬馋,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,611評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容

  • 提到etcd很多人第一反應(yīng)就是一個(gè)鍵值存儲(chǔ)倉(cāng)庫(kù)卖鲤。不過(guò)etcd官方文檔的定義卻是這樣的: A highly-avai...
    神奇的考拉閱讀 6,270評(píng)論 1 19
  • 尋找一種易于理解的一致性算法(擴(kuò)展版) 摘要 Raft 是一種為了管理復(fù)制日志的一致性算法肾扰。它提供了和 Paxos...
    枝葉君閱讀 2,643評(píng)論 0 15
  • 因?yàn)楣ぷ餍枨螅拘枰褂肊TCD來(lái)做gRPC服務(wù)的負(fù)載均衡蛋逾,以及集群管理集晚,所以對(duì)etcd做了一些研究,希望能給大...
    Jay_Guo閱讀 46,585評(píng)論 8 47
  • 1. 分布式系統(tǒng)核心問(wèn)題 參考書籍:《區(qū)塊鏈原理区匣、設(shè)計(jì)與應(yīng)用》 一致性問(wèn)題例子:兩個(gè)不同的電影院買同一種電影票偷拔,如...
    molscar閱讀 910評(píng)論 0 0
  • 所有的關(guān)系 與伴侶,與父母亏钩,與孩子莲绰,與合作伙伴,與朋友…… 都會(huì)經(jīng)歷從好姑丑,到不好蛤签,到丑惡,最后走向真實(shí) 在面對(duì)關(guān)系...
    鐘麗麗_覺(jué)醒閱讀 333評(píng)論 0 0