etcd學(xué)習(xí)筆記(三): Propose

我們已經(jīng)在raft-example看過(guò)了對(duì)Propose的簡(jiǎn)單處理了,但是真正的etcd對(duì)Propose的處理更加復(fù)雜尾抑。主要是有如下幾個(gè)點(diǎn):

  1. consistent index。用于處理boltdb和raftlog之間的冪等性逝慧。
  2. 同步返回截驮。由于raft的log復(fù)制是異步的,如何做到同步返回結(jié)果快集。

當(dāng)blotdb用作狀態(tài)機(jī)的時(shí)候贡羔,wal和blotdb作為兩個(gè)不同的實(shí)體,很有可能存在不一致的情況个初。所以etcd在blotdb中存儲(chǔ)一條記錄consistent-index乖寒,來(lái)代表已經(jīng)apply到blot-db上成功的log index,這樣當(dāng)根據(jù)wal恢復(fù)blot-db的時(shí)候院溺,就可以判斷l(xiāng)og index是不是已經(jīng)被apply過(guò)楣嘁。

處理過(guò)程

在etcd-server中,一條propose的處理過(guò)程:
首先是為每一條請(qǐng)求注冊(cè)一個(gè)唯一requestID覆获,然后register并等待requestID處理完成

func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {

    ai := s.getAppliedIndex()
    ci := s.getCommittedIndex()
    if ci > ai+maxGapBetweenApplyAndCommitIndex {
        return nil, ErrTooManyRequests
    }

    r.Header = &pb.RequestHeader{
        ID: s.reqIDGen.Next(),
    }

    id := r.ID
    if id == 0 {
        id = r.Header.ID
    }
    ch := s.w.Register(id)

    cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
    defer cancel()

    start := time.Now()
    err = s.r.Propose(cctx, data)


    select {
    case x := <-ch:
        return x.(*applyResult), nil
    case <-cctx.Done():
        proposalsFailed.Inc()
        s.w.Trigger(id, nil) // GC wait
        return nil, s.parseProposeCtxErr(cctx.Err(), start)
    case <-s.done:
        return nil, ErrStopped
    }
}

請(qǐng)求轉(zhuǎn)發(fā)到raft-node處理

根據(jù)之前的知識(shí)马澈,raft-node經(jīng)過(guò)一番處理之后,交給上層的ready結(jié)構(gòu)來(lái)處理弄息,首先msg轉(zhuǎn)發(fā)到leader痊班,然后leader調(diào)用processMsg,這里主要的操作是copy log到follower摹量。


                ap := apply{
                    entries:  rd.CommittedEntries,
                    snapshot: rd.Snapshot,
                    notifyc:  notifyc,
                }

                updateCommittedIndex(&ap, rh)

                select {
                case r.applyc <- ap:
                case <-r.stopped:
                    return
                }

                if islead {
                    r.transport.Send(r.processMessages(rd.Messages))
                }

                // Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
                // ensure that recovery after a snapshot restore is possible.
                if !raft.IsEmptySnap(rd.Snapshot) {
                    if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
                        r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
                    }
                }

                if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
                    r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
                }
                if !raft.IsEmptyHardState(rd.HardState) {
                    proposalsCommitted.Set(float64(rd.HardState.Commit))
                }

                if !raft.IsEmptySnap(rd.Snapshot) {
                    // Force WAL to fsync its hard state before Release() releases
                    // old data from the WAL. Otherwise could get an error like:
                    // panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
                    // See https://github.com/etcd-io/etcd/issues/10219 for more details.
                    if err := r.storage.Sync(); err != nil {
                        r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err))
                    }

                    // etcdserver now claim the snapshot has been persisted onto the disk
                    notifyc <- struct{}{}

                    // gofail: var raftBeforeApplySnap struct{}
                    r.raftStorage.ApplySnapshot(rd.Snapshot)
                    r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
                    // gofail: var raftAfterApplySnap struct{}

                    if err := r.storage.Release(rd.Snapshot); err != nil {
                        r.lg.Fatal("failed to release Raft wal", zap.Error(err))
                    }
                    // gofail: var raftAfterWALRelease struct{}
                }

                r.raftStorage.Append(rd.Entries)

                if !islead {
                    msgs := r.processMessages(rd.Messages)
                    notifyc <- struct{}{}
                    r.transport.Send(msgs)
                } else {
                    // leader already processed 'MsgSnap' and signaled
                    notifyc <- struct{}{}
                }

                r.Advance()

最終調(diào)用的是apply

        case ap := <-s.r.apply():
            f := func(context.Context) { s.applyAll(&ep, &ap) }
            sched.Schedule(f)


func (s *EtcdServer) apply(
    es []raftpb.Entry,
    confState *raftpb.ConfState,
) (appliedt uint64, appliedi uint64, shouldStop bool) {
    s.lg.Debug("Applying entries", zap.Int("num-entries", len(es)))
    for i := range es {
        e := es[i]

        switch e.Type {
        case raftpb.EntryNormal:
            s.applyEntryNormal(&e)
            s.setAppliedIndex(e.Index)
            s.setTerm(e.Term)

        case raftpb.EntryConfChange:
            // We need to apply all WAL entries on top of v2store
            // and only 'unapplied' (e.Index>backend.ConsistentIndex) on the backend.
            shouldApplyV3 := membership.ApplyV2storeOnly

            // set the consistent index of current executing entry
            if e.Index > s.consistIndex.ConsistentIndex() {
                s.consistIndex.SetConsistentIndex(e.Index, e.Term)
                shouldApplyV3 = membership.ApplyBoth
            }

            var cc raftpb.ConfChange
            pbutil.MustUnmarshal(&cc, e.Data)
            removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3)
            s.setAppliedIndex(e.Index)
            s.setTerm(e.Term)
            shouldStop = shouldStop || removedSelf
            s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})

        }
        appliedi, appliedt = e.Index, e.Term
    }
    return appliedt, appliedi, shouldStop
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末涤伐,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子缨称,更是在濱河造成了極大的恐慌凝果,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,332評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件睦尽,死亡現(xiàn)場(chǎng)離奇詭異器净,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)当凡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,508評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)山害,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人沿量,你說(shuō)我怎么就攤上這事浪慌。” “怎么了朴则?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,812評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵权纤,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我,道長(zhǎng)汹想,這世上最難降的妖魔是什么外邓? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,607評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮古掏,結(jié)果婚禮上坐榆,老公的妹妹穿的比我還像新娘。我一直安慰自己冗茸,他們只是感情好席镀,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,728評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著夏漱,像睡著了一般豪诲。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上挂绰,一...
    開(kāi)封第一講書(shū)人閱讀 49,919評(píng)論 1 290
  • 那天屎篱,我揣著相機(jī)與錄音,去河邊找鬼葵蒂。 笑死交播,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的践付。 我是一名探鬼主播秦士,決...
    沈念sama閱讀 39,071評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼永高!你這毒婦竟也來(lái)了隧土?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,802評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤命爬,失蹤者是張志新(化名)和其女友劉穎曹傀,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體饲宛,經(jīng)...
    沈念sama閱讀 44,256評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡皆愉,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,576評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了艇抠。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片幕庐。...
    茶點(diǎn)故事閱讀 38,712評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖练链,靈堂內(nèi)的尸體忽然破棺而出翔脱,到底是詐尸還是另有隱情奴拦,我是刑警寧澤媒鼓,帶...
    沈念sama閱讀 34,389評(píng)論 4 332
  • 正文 年R本政府宣布,位于F島的核電站,受9級(jí)特大地震影響绿鸣,放射性物質(zhì)發(fā)生泄漏疚沐。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,032評(píng)論 3 316
  • 文/蒙蒙 一潮模、第九天 我趴在偏房一處隱蔽的房頂上張望亮蛔。 院中可真熱鬧,春花似錦擎厢、人聲如沸究流。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,798評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)芬探。三九已至,卻和暖如春厘惦,著一層夾襖步出監(jiān)牢的瞬間偷仿,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,026評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工宵蕉, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留酝静,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,473評(píng)論 2 360
  • 正文 我出身青樓羡玛,卻偏偏與公主長(zhǎng)得像别智,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子稼稿,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,606評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容