raft理論與實(shí)踐[4]-lab2b

準(zhǔn)備工作

執(zhí)行日志

  • 我們需要執(zhí)行日志中的命令峻呛,因此在make函數(shù)中产还,新開一個(gè)協(xié)程:applyLogEntryDaemon()
func Make(peers []*labrpc.ClientEnd, me int,
    persister *Persister, applyCh chan ApplyMsg) *Raft {
    ...
    go rf.applyLogEntryDaemon() // start apply log
    DPrintf("[%d-%s]: newborn election(%s) heartbeat(%s) term(%d) voted(%d)\n",
        rf.me, rf, rf.electionTimeout, rf.heartbeatInterval, rf.CurrentTerm, rf.VotedFor)
    return rf
}

  • 如果rf.lastApplied == rf.commitIndex, 意味著commit log entry命令都已經(jīng)被執(zhí)行了繁堡,這時(shí)用信號(hào)量陷入等待。
  • 一旦收到信號(hào),說明需要執(zhí)行命令。這時(shí)會(huì)把最后執(zhí)行的log entry之后,一直到最后一個(gè)commit log entry的所有l(wèi)og都傳入通道apply中進(jìn)行執(zhí)行诡延。
  • 由于是測(cè)試,處理log的邏輯會(huì)在測(cè)試代碼中古胆,暫時(shí)不用關(guān)心
// applyLogEntryDaemon exit when shutdown channel is closed
func (rf *Raft) applyLogEntryDaemon() {
    for {
        var logs []LogEntry
        // wait
        rf.mu.Lock()
        for rf.lastApplied == rf.commitIndex {
            rf.commitCond.Wait()
            select {
            case <-rf.shutdownCh:
                rf.mu.Unlock()
                DPrintf("[%d-%s]: peer %d is shutting down apply log entry to client daemon.\n", rf.me, rf, rf.me)
                close(rf.applyCh)
                return
            default:
            }
        }
        last, cur := rf.lastApplied, rf.commitIndex
        if last < cur {
            rf.lastApplied = rf.commitIndex
            logs = make([]LogEntry, cur-last)
            copy(logs, rf.Logs[last+1:cur+1])
        }
        rf.mu.Unlock()
        for i := 0; i < cur-last; i++ {
            // current command is replicated, ignore nil command
            reply := ApplyMsg{
                CommandIndex: last + i + 1,
                Command:      logs[i].Command,
                CommandValid: true,
            }
            // reply to outer service
            // DPrintf("[%d-%s]: peer %d apply %v to client.\n", rf.me, rf, rf.me)
            DPrintf("[%d-%s]: peer %d apply to client.\n", rf.me, rf, rf.me)
            // Note: must in the same goroutine, or may result in out of order apply
            rf.applyCh <- reply
        }
    }
}

  • 新增 Start函數(shù)肆良,此函數(shù)為leader執(zhí)行從client發(fā)送過來的命令。
  • 當(dāng)client發(fā)送過來之后赤兴,首先需要做的就是新增entry 到leader的log中妖滔。并且將自身的nextIndex 與matchIndex 更新。
func (rf *Raft) Start(command interface{}) (int, int, bool) {
    index := -1
    term := 0
    isLeader := false

    // Your code here (2B).
    select {
    case <-rf.shutdownCh:
        return -1, 0, false
    default:
        rf.mu.Lock()
        defer rf.mu.Unlock()
        // Your code here (2B).
        if rf.state == Leader {
            log := LogEntry{rf.CurrentTerm, command}
            rf.Logs = append(rf.Logs, log)

            index = len(rf.Logs) - 1
            term = rf.CurrentTerm
            isLeader = true

            //DPrintf("[%d-%s]: client add new entry (%d-%v), logs: %v\n", rf.me, rf, index, command, rf.logs)
            DPrintf("[%d-%s]: client add new entry (%d)\n", rf.me, rf, index)
            //DPrintf("[%d-%s]: client add new entry (%d-%v)\n", rf.me, rf, index, command)

            // only update leader
            rf.nextIndex[rf.me] = index + 1
            rf.matchIndex[rf.me] = index
        }
    }

    return index, term, isLeader
}

  • 接下來最重要的部分涉及到日志同步桶良,這是通過AppendEntries實(shí)現(xiàn)的座舍。我們知道leader會(huì)不時(shí)的調(diào)用consistencyCheck(n)進(jìn)行一致性檢查。
  • 在給第n號(hào)節(jié)點(diǎn)一致性檢查時(shí)陨帆,首先獲取pre = rf.nextIndex曲秉,pre至少要為1,代表要給n節(jié)點(diǎn)發(fā)送的log index疲牵。
  • 因此AppendEntriesArgs參數(shù)中承二,PrevLogIndex 與 prevlogTerm 都為pre - 1位置,
  • 代表leader相信PrevLogIndex及其之前的節(jié)點(diǎn)都是與leader相同的纲爸。
  • 將pre及其之后的entry 加入到AppendEntriesArgs參數(shù)中亥鸠。 這些log entry可能是與leader不相同的,或者是follower根本就沒有的。
func (rf *Raft) consistencyCheck(n int) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    pre := max(1,rf.nextIndex[n])
    var args = AppendEntriesArgs{
        Term:         rf.CurrentTerm,
        LeaderID:     rf.me,
        PrevLogIndex: pre - 1,
        PrevLogTerm:  rf.Logs[pre - 1].Term,
        Entries:      nil,
        LeaderCommit: rf.commitIndex,
    }

    if rf.nextIndex[n] < len(rf.Logs){
        args.Entries = append(args.Entries, rf.Logs[pre:]...)
    }

    go func() {
        DPrintf("[%d-%s]: consistency Check to peer %d.\n", rf.me, rf, n)
        var reply AppendEntriesReply
        if rf.sendAppendEntries(n, &args, &reply) {
            rf.consistencyCheckReplyHandler(n, &reply)
        }
    }()
}

  • 接下來查看follower執(zhí)行AppendEntries時(shí)的反應(yīng)负蚊。

  • AppendEntries會(huì)新增兩個(gè)返回參數(shù):

  • ConflictTerm代表可能發(fā)生沖突的term

  • FirstIndex 代表可能發(fā)生沖突的第一個(gè)index神妹。

type AppendEntriesReply struct {
    CurrentTerm int  // currentTerm, for leader to update itself
    Success     bool // true if follower contained entry matching prevLogIndex and prevLogTerm
    // extra info for heartbeat from follower
    ConflictTerm int // term of the conflicting entry
    FirstIndex   int // the first index it stores for ConflictTerm
}

  • 如果args.PrevLogIndex < len(rf.Logs), 表明至少當(dāng)前節(jié)點(diǎn)的log長(zhǎng)度是合理的。

  • 令preLogIdx 與 args.PrevLogIndex相等家妆。prelogTerm為當(dāng)前follower節(jié)點(diǎn)preLogIdx位置的term鸵荠。

  • 如果擁有相同的term,說明follower與leader 在preLogIdx之前的log entry都是相同的伤极。因此請(qǐng)求是成功的蛹找。

  • 此時(shí)會(huì)截?cái)鄁ollower的log,將傳遞過來的entry加入到follower的log之后哨坪,執(zhí)行此步驟后庸疾,強(qiáng)制要求與leader的log相同了。

  • 請(qǐng)求成功后当编,reply的ConflictTerm為最后一個(gè)log entry的term,reply的FirstIndex為最后一個(gè)log entry的index彼硫。

  • 否則說明leader與follower的日志是有沖突的,沖突的原因可能是:

  • leader認(rèn)為的match log entry超出了follower的log個(gè)數(shù)凌箕,或者follower 還沒有任何log entry(除了index為0的entry是每一個(gè)節(jié)點(diǎn)都有的)。

  • log在相同的index下词渤,leader的term 與follower的term確是不同的牵舱。

  • 這時(shí)找到follower沖突的term即為ConflictTerm。

  • 獲取此term的第一個(gè)entry的index即為FirstIndex缺虐。

  • 所以最后芜壁,AppendEntries會(huì)返回沖突的term以及第一個(gè)可能沖突的index。

// AppendEntries handler, including heartbeat, must backup quickly
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
    ...
    preLogIdx, preLogTerm := 0, 0
    if args.PrevLogIndex < len(rf.Logs) {
        preLogIdx = args.PrevLogIndex
        preLogTerm = rf.Logs[preLogIdx].Term
    }

    // last log is match
    if preLogIdx == args.PrevLogIndex && preLogTerm == args.PrevLogTerm {
        reply.Success = true
        // truncate to known match
        rf.Logs = rf.Logs[:preLogIdx+1]
        rf.Logs = append(rf.Logs, args.Entries...)
        var last = len(rf.Logs) - 1

        // min(leaderCommit, index of last new entry)
        if args.LeaderCommit > rf.commitIndex {
            rf.commitIndex = min(args.LeaderCommit, last)
            // signal possible update commit index
            go func() { rf.commitCond.Broadcast() }()
        }
        // tell leader to update matched index
        reply.ConflictTerm = rf.Logs[last].Term
        reply.FirstIndex = last

        if len(args.Entries) > 0 {
            DPrintf("[%d-%s]: AE success from leader %d (%d cmd @ %d), commit index: l->%d, f->%d.\n",
                rf.me, rf, args.LeaderID, len(args.Entries), preLogIdx+1, args.LeaderCommit, rf.commitIndex)
        } else {
            DPrintf("[%d-%s]: <heartbeat> current logs: %v\n", rf.me, rf, rf.Logs)
        }
    } else {
        reply.Success = false

        // extra info for restore missing entries quickly: from original paper and lecture note
        // if follower rejects, includes this in reply:
        //
        // the follower's term in the conflicting entry
        // the index of follower's first entry with that term
        //
        // if leader knows about the conflicting term:
        //      move nextIndex[i] back to leader's last entry for the conflicting term
        // else:
        //      move nextIndex[i] back to follower's first index
        var first = 1
        reply.ConflictTerm = preLogTerm
        if reply.ConflictTerm == 0 {
            // which means leader has more logs or follower has no log at all
            first = len(rf.Logs)
            reply.ConflictTerm = rf.Logs[first-1].Term
        } else {
            i := preLogIdx
            // term的第一個(gè)log entry
            for ; i > 0; i-- {
                if rf.Logs[i].Term != preLogTerm {
                    first = i + 1
                    break
                }
            }
        }
        reply.FirstIndex = first
        if len(rf.Logs) <= args.PrevLogIndex {
            DPrintf("[%d-%s]: AE failed from leader %d, leader has more logs (%d > %d), reply: %d - %d.\n",
                rf.me, rf, args.LeaderID, args.PrevLogIndex, len(rf.Logs)-1, reply.ConflictTerm,
                reply.FirstIndex)
        } else {
            DPrintf("[%d-%s]: AE failed from leader %d, pre idx/term mismatch (%d != %d, %d != %d).\n",
                rf.me, rf, args.LeaderID, args.PrevLogIndex, preLogIdx, args.PrevLogTerm, preLogTerm)
        }
    }
}

  • leader調(diào)用AppendEntries后高氮,會(huì)執(zhí)行回調(diào)函數(shù)consistencyCheckReplyHandler慧妄。
  • 如果調(diào)用是成功的,那么正常的更新matchIndex剪芍,nextIndex即下一個(gè)要發(fā)送的index應(yīng)該為matchIndex + 1塞淹。
  • 如果調(diào)用失敗,說明有沖突罪裹。
  • 如果confiicting term等于0饱普,說明了leader認(rèn)為的match log entry超出了follower的log個(gè)數(shù),或者follower 還沒有任何log entry(除了index為0的entry是每一個(gè)節(jié)點(diǎn)都有的)状共。
  • 此時(shí)簡(jiǎn)單的讓nextIndex 為reply.FirstIndex即可套耕。
  • 如果conficting term不為0,獲取leader節(jié)點(diǎn)confiicting term 的最后一個(gè)log index峡继,此時(shí)nextIndex 應(yīng)該為此index與reply.FirstIndex的最小值冯袍。
  • 檢查最小值是必須的:
  • 假設(shè)
  • s1: 0-0 1-1 1-2 1-3 1-4 1-5
  • s2: 0-0 1-1 1-2 1-3 1-4 1-5
  • s3: 0-0 1-1
  • 此時(shí)s1為leader,并一致性檢查s3, 從1-5開始檢查,此時(shí)由于leader有更多的log康愤,因此檢查不成功儡循,返回confict term 1, firstindex:2
  • 如果只是獲取confiicting term 的最后一個(gè)log index翘瓮,那么nextIndex又是1-5贮折,陷入了死循環(huán)。
func (rf *Raft) consistencyCheckReplyHandler(n int, reply *AppendEntriesReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    if rf.state != Leader {
        return
    }
    if reply.Success {
        // RPC and consistency check successful
        rf.matchIndex[n] = reply.FirstIndex
        rf.nextIndex[n] = rf.matchIndex[n] + 1
        rf.updateCommitIndex() // try to update commitIndex
    } else {
        // found a new leader? turn to follower
        if rf.state == Leader && reply.CurrentTerm > rf.CurrentTerm {
            rf.turnToFollow()
            rf.resetTimer <- struct{}{}
            DPrintf("[%d-%s]: leader %d found new term (heartbeat resp from peer %d), turn to follower.",
                rf.me, rf, rf.me, n)
            return
        }

        // Does leader know conflicting term?
        var know, lastIndex = false, 0
        if reply.ConflictTerm != 0 {
            for i := len(rf.Logs) - 1; i > 0; i-- {
                if rf.Logs[i].Term == reply.ConflictTerm {
                    know = true
                    lastIndex = i
                    DPrintf("[%d-%s]: leader %d have entry %d is the last entry in term %d.",
                        rf.me, rf, rf.me, i, reply.ConflictTerm)
                    break
                }
            }
            if know {
                rf.nextIndex[n] = min(lastIndex, reply.FirstIndex)
            } else {
                rf.nextIndex[n] = reply.FirstIndex
            }
        } else {
            rf.nextIndex[n] = reply.FirstIndex
        }
        rf.nextIndex[n] = min(rf.nextIndex[n], len(rf.Logs))
        DPrintf("[%d-%s]: nextIndex for peer %d  => %d.\n",
            rf.me, rf, n, rf.nextIndex[n])
    }
}

  • 當(dāng)調(diào)用AppendEntry成功后资盅,說明follower與leader的log是匹配的调榄。此時(shí)leader會(huì)找到commited的log并且執(zhí)行其命令。
  • 這里有一個(gè)比較巧妙的方法呵扛,對(duì)matchIndex排序后取最中間的數(shù)每庆。
  • 由于matchIndex代表follower有多少log與leader的log匹配,因此中間的log index意味著其得到了大部分節(jié)點(diǎn)的認(rèn)可今穿。
  • 因此會(huì)將此中間的index之前的所有l(wèi)og entry都執(zhí)行了缤灵。
  • rf.Logs[target].Term == rf.CurrentTerm 是必要的:
  • 這是由于當(dāng)一個(gè)entry出現(xiàn)在大多數(shù)節(jié)點(diǎn)的log中,并不意味著其一定會(huì)成為commit蓝晒∪觯考慮下面的情況:
  S1: 1 2     1 2 4
  S2: 1 2     1 2
  S3: 1   --> 1 2
  S4: 1       1
  S5: 1       1 3

  • s1在term2成為leader,只有s1芝薇,s2添加了entry2.
  • s5變成了term3的leader胚嘲,之后s1變?yōu)榱藅erm4的leader,接著繼續(xù)發(fā)送entry2到s3中洛二。
  • 此時(shí)馋劈,如果s5再次變?yōu)榱薼eader,那么即便沒有S1的支持晾嘶,S5任然變?yōu)榱薼eader妓雾,并且應(yīng)用entry3,覆蓋掉entry2垒迂。
  • 所以一個(gè)entry要變?yōu)閏ommit械姻,必須:
  • 1、在其term周期內(nèi)机断,就復(fù)制到大多數(shù)策添。
  • 2、如果隨后的entry被提交毫缆。在上例中唯竹,如果s1持續(xù)成為term4的leader,那么entry2就會(huì)成為commit苦丁。
  • 這是由于以下原因造成的:更高任期為最新的投票規(guī)則浸颓,以及l(fā)eader將其日志強(qiáng)加給follower。
// updateCommitIndex find new commit id, must be called when hold lock
func (rf *Raft) updateCommitIndex() {
    match := make([]int, len(rf.matchIndex))
    copy(match, rf.matchIndex)
    sort.Ints(match)

    DPrintf("[%d-%s]: leader %d try to update commit index: %v @ term %d.\n",
        rf.me, rf, rf.me, rf.matchIndex, rf.CurrentTerm)

    target := match[len(rf.peers)/2]
    if rf.commitIndex < target {
        //fmt.Println("target:",target,match)
        if rf.Logs[target].Term == rf.CurrentTerm {
            //DPrintf("[%d-%s]: leader %d update commit index %d -> %d @ term %d command:%v\n",
            //  rf.me, rf, rf.me, rf.commitIndex, target, rf.CurrentTerm,rf.Logs[target].Command)

            DPrintf("[%d-%s]: leader %d update commit index %d -> %d @ term %d\n",
                rf.me, rf, rf.me, rf.commitIndex, target, rf.CurrentTerm)

            rf.commitIndex = target
            go func() { rf.commitCond.Broadcast() }()
        } else {
            DPrintf("[%d-%s]: leader %d update commit index %d failed (log term %d != current Term %d)\n",
                rf.me, rf, rf.me, rf.commitIndex, rf.Logs[target].Term, rf.CurrentTerm)
        }
    }
}

參考

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市产上,隨后出現(xiàn)的幾起案子棵磷,更是在濱河造成了極大的恐慌,老刑警劉巖晋涣,帶你破解...
    沈念sama閱讀 222,627評(píng)論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件仪媒,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡谢鹊,警方通過查閱死者的電腦和手機(jī)算吩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,180評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來佃扼,“玉大人偎巢,你說我怎么就攤上這事〖嬉” “怎么了压昼?”我有些...
    開封第一講書人閱讀 169,346評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)瘤运。 經(jīng)常有香客問我窍霞,道長(zhǎng),這世上最難降的妖魔是什么拯坟? 我笑而不...
    開封第一講書人閱讀 60,097評(píng)論 1 300
  • 正文 為了忘掉前任官撼,我火速辦了婚禮,結(jié)果婚禮上似谁,老公的妹妹穿的比我還像新娘。我一直安慰自己掠哥,他們只是感情好巩踏,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,100評(píng)論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著续搀,像睡著了一般塞琼。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上禁舷,一...
    開封第一講書人閱讀 52,696評(píng)論 1 312
  • 那天彪杉,我揣著相機(jī)與錄音,去河邊找鬼牵咙。 笑死派近,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的洁桌。 我是一名探鬼主播渴丸,決...
    沈念sama閱讀 41,165評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了谱轨?” 一聲冷哼從身側(cè)響起戒幔,我...
    開封第一講書人閱讀 40,108評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎土童,沒想到半個(gè)月后诗茎,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,646評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡献汗,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,709評(píng)論 3 342
  • 正文 我和宋清朗相戀三年敢订,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片雀瓢。...
    茶點(diǎn)故事閱讀 40,861評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡枢析,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出刃麸,到底是詐尸還是另有隱情醒叁,我是刑警寧澤,帶...
    沈念sama閱讀 36,527評(píng)論 5 351
  • 正文 年R本政府宣布泊业,位于F島的核電站把沼,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏吁伺。R本人自食惡果不足惜饮睬,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,196評(píng)論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望篮奄。 院中可真熱鬧捆愁,春花似錦、人聲如沸窟却。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,698評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽夸赫。三九已至菩帝,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間茬腿,已是汗流浹背啥箭。 一陣腳步聲響...
    開封第一講書人閱讀 33,804評(píng)論 1 274
  • 我被黑心中介騙來泰國(guó)打工睬澡, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留固该,地道東北人点楼。 一個(gè)月前我還...
    沈念sama閱讀 49,287評(píng)論 3 379
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像悴品,于是被迫代替她去往敵國(guó)和親弓候。 傳聞我的和親對(duì)象是個(gè)殘疾皇子郎哭,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,860評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容