5. PhxPaxos源碼分析之狀態(tài)機

目錄
1. PhxPaxos源碼分析之關于PhxPaxos
2. PhxPaxos分析之網(wǎng)絡基礎部件
3. PhxPaxos源碼分析之Proposer、Acceptor
4. PhxPaxos源碼分析之Learner
5. PhxPaxos源碼分析之狀態(tài)機
6. PhxPaxos源碼分析之歸檔機制
7. PhxPaxos源碼分析之整體架構


5.1 基本概念

  • Paxos log
    通過Paxos算法選中(chosen)的笔喉、有序的矫废、一組提案值宜鸯。例如两曼,PhxSQL中使用PhxPaxos確定的有序值為binlog唱歧。
  • 狀態(tài)機
    業(yè)務自定義的朦乏,如何使用Paxos log的數(shù)據(jù)消費邏輯球及。狀態(tài)及的一個特點是:只要初始狀態(tài)一致,輸入一致呻疹,那么引出的最終狀態(tài)也是一致的。在PhxSQL中筹陵,這個狀態(tài)機就是binlog的replay機制刽锤,即在其他節(jié)點上執(zhí)行和主節(jié)點一致的binlog操作,保證各個節(jié)點數(shù)據(jù)的一致性朦佩。

5.2 代碼設計

狀態(tài)機相關類的類圖如下:


狀態(tài)機
  • SMFac
    狀態(tài)機管理類并思,內(nèi)部維護一個狀態(tài)機(StateMachine)列表,對外提供統(tǒng)一的狀態(tài)機訪問接口语稠。
  • StateMachine
    狀態(tài)機抽象類宋彼。允許PhxPaxos使用者繼承該類弄砍,定制自己的業(yè)務狀態(tài)機。并將該狀態(tài)機添加到管理類中输涕。
  • InsideSM
    內(nèi)部狀態(tài)機抽象類音婶。
  • SystemVSM
    系統(tǒng)狀態(tài)機。用于處理集群節(jié)點變更莱坎。
  • MasterStateMachine
    主節(jié)點狀態(tài)機衣式。用于集群選主。

5.3 狀態(tài)機管理(SMFac)

PhxPaxos的每個Group(暫時理解為PhxPaxos)允許同時存在多個狀態(tài)機檐什,但一個paxos log只能被一個狀態(tài)機消費碴卧,不同的狀態(tài)機的之間數(shù)據(jù)互相隔離。但他們共享同一份Group資源:Proposer乃正、Acceptor住册、Learner、InstanceId等等瓮具。其中界弧,SystemVSM,MasterStateMachine這兩個內(nèi)置的狀態(tài)機默認添加到所有的Group之中搭综。

SMFac做為管理類垢箕,除支持添加各種狀態(tài)機,還對外提供了統(tǒng)一的狀態(tài)機執(zhí)行接口兑巾。

    class SMFac
    {
    public:
        //執(zhí)行狀態(tài)機
        bool Execute(const int iGroupIdx, const uint64_t llInstanceID,
                     const std::string& sPaxosValue, SMCtx* poSMCtx);
        //執(zhí)行狀態(tài)機的Checkpoint操作
        bool ExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID, const std::string& sPaxosValue);
        //打包
        void PackPaxosValue(std::string& sPaxosValue, const int iSMID = 0);
        //添加狀態(tài)機
        void AddSM(StateMachine* poSM);

    public:
        void BeforePropose(const int iGroupIdx, std::string& sValue);
        void BeforeBatchPropose(const int iGroupIdx, std::string& sValue);
        void BeforeProposeCall(const int iGroupIdx, const int iSMID, std::string& sValue, bool& change);

    public:
        const uint64_t GetCheckpointInstanceID(const int iGroupIdx) const;
        //返回狀態(tài)機列表
        std::vector<StateMachine*> GetSMList();
        ......
    };

狀態(tài)機用于消費paxos log条获,即一旦paxos的提案值確定立即交由狀態(tài)機消費。入口邏輯如下:

    int Instance :: ReceiveMsgForLearner(const PaxosMsg& oPaxosMsg)
    {
        //Learner消息處理邏輯
        ......

        //當前Instance Id的提案值已習得
        if (m_oLearner.IsLearned())
        {
            BP->GetInstanceBP()->OnInstanceLearned();

            //獲取狀態(tài)機上下文信息
            SMCtx* poSMCtx = nullptr;
            bool bIsMyCommit = m_oCommitCtx.IsMyCommit(m_oLearner.GetInstanceID(), m_oLearner.GetLearnValue(), poSMCtx);
            if (!bIsMyCommit)
            {
                BP->GetInstanceBP()->OnInstanceLearnedNotMyCommit();
                PLGDebug("this value is not my commit");
            }
            else
            {
                int iUseTimeMs = m_oTimeStat.Point();
                BP->GetInstanceBP()->OnInstanceLearnedIsMyCommit(iUseTimeMs);
                PLGHead("My commit ok, usetime %dms", iUseTimeMs);
            }

            //執(zhí)行狀態(tài)機
            if (!SMExecute(m_oLearner.GetInstanceID(), m_oLearner.GetLearnValue(), bIsMyCommit, poSMCtx))
            {
                BP->GetInstanceBP()->OnInstanceLearnedSMExecuteFail();

                PLGErr("SMExecute fail, instanceid %lu, not increase instanceid", m_oLearner.GetInstanceID());
                m_oCommitCtx.SetResult(PaxosTryCommitRet_ExecuteFail,
                                       m_oLearner.GetInstanceID(), m_oLearner.GetLearnValue());

                m_oProposer.CancelSkipPrepare();

                return -1;
            }
            ......
        }
    }

SMExecute只是一個跳板函數(shù)蒋歌,直接調(diào)用SMFact的Execute方法帅掘。

    bool SMFac :: Execute(const int iGroupIdx, const uint64_t llInstanceID, const std::string& sPaxosValue, SMCtx* poSMCtx)
    {
        if (sPaxosValue.size() < sizeof(int))
        {
            PLG1Err("Value wrong, instanceid %lu size %zu", llInstanceID, sPaxosValue.size());
            //need do nothing, just skip
            return true;
        }
        //必須為有效的SM ID
        int iSMID = 0;
        memcpy(&iSMID, sPaxosValue.data(), sizeof(int));
        if (iSMID == 0)
        {
            PLG1Imp("Value no need to do sm, just skip, instanceid %lu", llInstanceID);
            return true;
        }
        //提前paxos log數(shù)據(jù)
        std::string sBodyValue = string(sPaxosValue.data() + sizeof(int), sPaxosValue.size() - sizeof(int));
        //批量處理
        if (iSMID == BATCH_PROPOSE_SMID)
        {
            BatchSMCtx* poBatchSMCtx = nullptr;
            if (poSMCtx != nullptr && poSMCtx->m_pCtx != nullptr)
            {
                poBatchSMCtx = (BatchSMCtx*)poSMCtx->m_pCtx;
            }

            return BatchExecute(iGroupIdx, llInstanceID, sBodyValue, poBatchSMCtx);
        }
        else
        {
            //指定狀態(tài)機處理
            return DoExecute(iGroupIdx, llInstanceID, sBodyValue, iSMID, poSMCtx);
        }
    }

前面提到一個paxos log只能被一個狀態(tài)機消費,但上面卻提供了一個狀態(tài)機批量處理的邏輯堂油。是否互相矛盾呢修档?來看代碼實現(xiàn):

    bool SMFac :: BatchExecute(const int iGroupIdx, const uint64_t llInstanceID, const std::string& sBodyValue, BatchSMCtx* poBatchSMCtx)
    {
        BatchPaxosValues oBatchValues;
        bool bSucc = oBatchValues.ParseFromArray(sBodyValue.data(), sBodyValue.size());

        if (!bSucc)
        {
            PLG1Err("ParseFromArray fail, valuesize %zu", sBodyValue.size());
            return false;
        }

        if (poBatchSMCtx != nullptr)
        {
            if ((int)poBatchSMCtx->m_vecSMCtxList.size() != oBatchValues.values_size())
            {
                PLG1Err("values size %d not equal to smctx size %zu",
                        oBatchValues.values_size(), poBatchSMCtx->m_vecSMCtxList.size());
                return false;
            }
        }
        //依次處理每條記錄
        for (int i = 0; i < oBatchValues.values_size(); i++)
        {
            const PaxosValue& oValue = oBatchValues.values(i);
            SMCtx* poSMCtx = poBatchSMCtx != nullptr ? poBatchSMCtx->m_vecSMCtxList[i] : nullptr;
            bool bExecuteSucc = DoExecute(iGroupIdx, llInstanceID, oValue.value(), oValue.smid(), poSMCtx);

            if (!bExecuteSucc)
            {
                return false;
            }
        }

        return true;
    }

實際上,批量的含義指的是數(shù)據(jù)中包含多個paxos log值府框,而非批量的對多個狀態(tài)機執(zhí)行操作吱窝。依次處理每條paxos log時,調(diào)用的DoExecute攜帶了有效的sm id信息迫靖。DoExecute查找sm id配套的狀態(tài)機院峡,交由狀態(tài)機消費數(shù)據(jù)。

    bool SMFac :: DoExecute(const int iGroupIdx, const uint64_t llInstanceID,
                            const std::string& sBodyValue, const int iSMID, SMCtx* poSMCtx)
    {
        if (iSMID == 0)
        {
            PLG1Imp("Value no need to do sm, just skip, instanceid %lu", llInstanceID);
            return true;
        }

        if (m_vecSMList.size() == 0)
        {
            PLG1Imp("No any sm, need wait sm, instanceid %lu", llInstanceID);
            return false;
        }

        for (auto & poSM : m_vecSMList)
        {
            //查找正確的狀態(tài)機系宜,消費paxos log
            if (poSM->SMID() == iSMID)
            {
                return poSM->Execute(iGroupIdx, llInstanceID, sBodyValue, poSMCtx);
            }
        }

        PLG1Err("Unknown smid %d instanceid %lu", iSMID, llInstanceID);
        return false;
    }

SMFac中的其他邏輯照激,此處不再贅述,有興趣可以參考源碼盹牧。

5.4 狀態(tài)機(StateMachine)

狀態(tài)機抽象類的接口定義如下:

    class StateMachine
    {
    public:
        virtual ~StateMachine() {}

        //狀態(tài)機標識俩垃,需要保證唯一性励幼。
        virtual const int SMID() const = 0;

        //狀態(tài)機執(zhí)行函數(shù),返回true意味著execute執(zhí)行完成口柳,不需要重新執(zhí)行苹粟。
        virtual bool Execute(const int iGroupIdx, const uint64_t llInstanceID,
                             const std::string& sPaxosValue, SMCtx* poSMCtx) = 0;

        //真正發(fā)起Propose之前,調(diào)用狀態(tài)機中該函數(shù)啄清,修改請求數(shù)據(jù)或做其他處理
        virtual void BeforePropose(const int iGroupIdx, std::string& sValue);

        //是否需要調(diào)用BeforePropose六水,默認為false
        virtual const bool NeedCallBeforePropose();

        //--------------------------------------Checkpoint機制相關---------------------------------------------------
        //Checkpoint機制執(zhí)行函數(shù)
        virtual bool ExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID,
                                          const std::string& sPaxosValue);

        //返回checkpoint已執(zhí)行的、最大的instance id辣卒;PhxPaxos將頻繁調(diào)用此接口掷贾。
        virtual const uint64_t GetCheckpointInstanceID(const int iGroupIdx) const;

        //調(diào)用此接口鎖定Checkpoint文件,后續(xù)調(diào)用GetCheckpointState獲取的文件不允許有任何變更
        virtual int LockCheckpointState();

        //返回Checkpoint的文件路徑及文件列表
        virtual int GetCheckpointState(const int iGroupIdx, std::string& sDirPath,
                                       std::vector<std::string>& vecFileList);

        //Checkpoint文件使用完畢荣茫,解鎖Checkpoint文件
        virtual void UnLockCheckpointState();

        //使用指定路徑的Checkpoint文件想帅,PhxPaxos在執(zhí)行完該函數(shù)后將重啟當前進程。
        virtual int LoadCheckpointState(const int iGroupIdx, const std::string& sCheckpointTmpFileDirPath,
                                        const std::vector<std::string>& vecFileList, const uint64_t llCheckpointInstanceID);
    };

按功能劃分為如下兩類:

  1. 狀態(tài)機職責函數(shù)啡莉。包括狀態(tài)機標識(SMID)港准、狀態(tài)遷移(Execute)、BeforePropose咧欣、NeedCallBeforePropose浅缸。
  2. Checkpoint相關函數(shù)。上述代碼中由“Checkpoint機制相關”分割部分邏輯魄咕,這部分“Checkpoint機制”一章中還會提到衩椒。

5.5 集群變更狀態(tài)機(SystemVSM)

SystemVSM(System Variable State Machine)用于PhxPaxos集群成員變更,這是對外提供的一組功能接口哮兰,PhxPaxos本身不會主動觸發(fā)集群變更毛萌。對外接口定義在Node接口類中,如下:

class Node
{
    public:
        virtual int ShowMembership(const int iGroupIdx, NodeInfoList& vecNodeInfoList) = 0;
        virtual int AddMember(const int iGroupIdx, const NodeInfo& oNode) = 0;
        virtual int RemoveMember(const int iGroupIdx, const NodeInfo& oNode) = 0;
        virtual int ChangeMember(const int iGroupIdx, const NodeInfo& oFromNode, const NodeInfo& oToNode) = 0;
}

注意:如需要啟用集群成員變更功能喝滞,需要設置Options中bUseMembership配置項為true阁将。

集群的初始節(jié)點信息通過Options::vecNodeInfoList配置項傳遞給PhxPaxos。如果啟用了節(jié)點變更功能右遭,將在首次發(fā)送消息前同步集群狀態(tài)做盅。

    void Instance :: CheckNewValue()
    {
        ......

        //啟用系統(tǒng)變更 and (首個提案實例 or 當前集群狀態(tài)需要更新)
        if (m_poConfig->GetIsUseMembership()
            && (m_oProposer.GetInstanceID() == 0 || m_poConfig->GetGid() == 0))
        {
            //Init system variables.
            PLGHead("Need to init system variables, Now.InstanceID %lu Now.Gid %lu",
                    m_oProposer.GetInstanceID(), m_poConfig->GetGid());

            uint64_t llGid = OtherUtils::GenGid(m_poConfig->GetMyNodeID());
            string sInitSVOpValue;
            int ret = m_poConfig->GetSystemVSM()->CreateGid_OPValue(llGid, sInitSVOpValue);
            assert(ret == 0);

            //發(fā)起定向由SystemVSM處理的paxos消息
            m_oSMFac.PackPaxosValue(sInitSVOpValue, m_poConfig->GetSystemVSM()->SMID());
            m_oProposer.NewValue(sInitSVOpValue);
        }
    }

因為集群節(jié)點可能發(fā)生變更,因此集群信息不能再以配置文件中的Options::vecNodeInfoList為準狸演,需要單獨做持久化言蛇。這部分邏輯在狀態(tài)機的Execute中觸發(fā),調(diào)用UpdateSystemVariables完成宵距。

    int SystemVSM :: UpdateSystemVariables(const SystemVariables& oVariables)
    {
        WriteOptions oWriteOptions;
        oWriteOptions.bSync = true;
        //以選中的提案值為準,持久化到數(shù)據(jù)庫
        int ret = m_oSystemVStore.Write(oWriteOptions, m_iMyGroupIdx, oVariables);
        if (ret != 0)
        {
            PLG1Err("SystemVStore::Write fail, ret %d", ret);
            return -1;
        }
        //以選中的提案值為準
        m_oSystemVariables = oVariables;
        RefleshNodeID();

        return 0;
    }

集群節(jié)點變更包括新增吨拗、修改满哪、刪除婿斥,處理邏輯如下(引自《如何進行成員變更》):

在已有集群新增,替換機器

  • 準備好新的機器哨鸭,新的機器以空成員啟動PhxPaxos(Options::vecNodeInfoList設置為空)民宿。
  • 在任意一個已有的集群成員節(jié)點中調(diào)用Node::AddMember或Node::ChangeMember

在已有集群刪除機器

  • 在任意一個已有的成員節(jié)點中調(diào)用Node::RemoveMember

如果所有節(jié)點都希望立即獲得這個成員變更操作的通知,可通過options.h設置Options::pMembershipChangeCallback回調(diào)函數(shù)像鸡。

代碼實現(xiàn)上活鹰,增、刪只估、改處理邏輯類似志群,以AddMember為例。

    int PNode :: AddMember(const int iGroupIdx, const NodeInfo& oNode)
    {
        if (!CheckGroupID(iGroupIdx))
        {
            return Paxos_GroupIdxWrong;
        }

        SystemVSM* poSystemVSM = m_vecGroupList[iGroupIdx]->GetConfig()->GetSystemVSM();

        if (poSystemVSM->GetGid() == 0)
        {
            return Paxos_MembershipOp_NoGid;
        }
        //獲取當前集群已有的節(jié)點列表
        uint64_t llVersion = 0;
        NodeInfoList vecNodeInfoList;
        poSystemVSM->GetMembership(vecNodeInfoList, llVersion);

        for (auto & oNodeInfo : vecNodeInfoList)
        {
            if (oNodeInfo.GetNodeID() == oNode.GetNodeID())
            {
                return Paxos_MembershipOp_Add_NodeExist;
            }
        }
        //補充需要新增的節(jié)點信息
        vecNodeInfoList.push_back(oNode);
        //以完整的集群信息發(fā)起提案蛔钙,即將該信息同步到各個節(jié)點
        return ProposalMembership(poSystemVSM, iGroupIdx, vecNodeInfoList, llVersion);
    }

5.6 選主服務狀態(tài)機(MasterStateMachine)

MasterStateMachine稱為主節(jié)點狀態(tài)機锌云,用于和選主及租約相關操作的狀態(tài)維護、更新吁脱。除MasterStateMachine之外桑涎,另外一個和選主服務相關的類為MasterMgr,PhxPaxos為它啟動了一個單獨的線程兼贡。這次我們從MasterMgr開始說起攻冷。

MasterMgr內(nèi)部組合了MasterStateMachine,也是MasterStateMachine的唯一實例點遍希。

    MasterStateMachine* MasterMgr :: GetMasterSM()
    {
        return &m_oDefaultMasterSM;
    }

在PhxPaxos節(jié)點啟動時等曼,為每個Group創(chuàng)一個選主服務對象,啟動選主服務(MasterMgr)孵班。選主服務線程定期檢測主節(jié)點狀態(tài)涉兽、執(zhí)行續(xù)約等操作。

    void MasterMgr :: run()
    {
        m_bIsStarted = true;

        while (true)
        {
            if (m_bIsEnd)
            {
                return;
            }
            //租約時長篙程,單位毫秒
            int iLeaseTime = m_iLeaseTime;

            uint64_t llBeginTime = Time::GetSteadyClockMS();
            //嘗試選主
            TryBeMaster(iLeaseTime);
            //計算下次選主操作的時間間隔
            int iContinueLeaseTimeout = (iLeaseTime - 100) / 4;
            iContinueLeaseTimeout = iContinueLeaseTimeout / 2 + OtherUtils::FastRand() % iContinueLeaseTimeout;
            //如果需要主動觸發(fā)重新選主枷畏,將下次選主周期設置為:租約*2
            if (m_bNeedDropMaster)
            {
                BP->GetMasterBP()->DropMaster();
                m_bNeedDropMaster = false;
                iContinueLeaseTimeout = iLeaseTime * 2;
                PLG1Imp("Need drop master, this round wait time %dms", iContinueLeaseTimeout);
            }
           //實際間隔 = 計算間隔 - 本次操作運行時間
            uint64_t llEndTime = Time::GetSteadyClockMS();
            int iRunTime = llEndTime > llBeginTime ? llEndTime - llBeginTime : 0;
            int iNeedSleepTime = iContinueLeaseTimeout > iRunTime ? iContinueLeaseTimeout - iRunTime : 0;

            PLG1Imp("TryBeMaster, sleep time %dms", iNeedSleepTime);
            Time::MsSleep(iNeedSleepTime);
        }
    }

假設租約時長為5秒,每次選主操作耗時500ms虱饿,那么選主間隔為:

step1: (5000 - 100) / 4 = 1225
step2: 1225 / 2 + rand() % 1225 = 612 + 600(0-1225的任意值) = 1212
step3: 1212 - 500 = 712 ms

也就是說拥诡,選主和續(xù)約周期大約為租約的1/4時間。
來看TryBeMaster:

    void MasterMgr :: TryBeMaster(const int iLeaseTime)
    {
        nodeid_t iMasterNodeID = nullnode;
        uint64_t llMasterVersion = 0;
         //獲取當前的主節(jié)點信息
        //step 1 check exist master and get version
        m_oDefaultMasterSM.SafeGetMaster(iMasterNodeID, llMasterVersion);

        //1. 如果當前已經(jīng)存在主節(jié)點氮发,那么需要執(zhí)行續(xù)約動作渴肉,該動作只能由主節(jié)點發(fā)起
        //2. 如果當前無主節(jié)點,需要執(zhí)行選主動作爽冕,任意節(jié)點發(fā)起
        if (iMasterNodeID != nullnode && (iMasterNodeID != m_poPaxosNode->GetMyNodeID()))
        {
            PLG1Imp("Ohter as master, can't try be master, masterid %lu myid %lu",
                    iMasterNodeID, m_poPaxosNode->GetMyNodeID());
            return;
        }

        BP->GetMasterBP()->TryBeMaster();

        //step 2 try be master
        std::string sPaxosValue;

        //構建選主仇祭、續(xù)約數(shù)據(jù)(node id、租約時長等)
        if (!MasterStateMachine::MakeOpValue(
                m_poPaxosNode->GetMyNodeID(),
                llMasterVersion,
                iLeaseTime,
                MasterOperatorType_Complete,
                sPaxosValue))
        {
            PLG1Err("Make paxos value fail");
            return;
        }

        const int iMasterLeaseTimeout = iLeaseTime - 100;

        uint64_t llAbsMasterTimeout = Time::GetSteadyClockMS() + iMasterLeaseTimeout;
        uint64_t llCommitInstanceID = 0;

        SMCtx oCtx;
        oCtx.m_iSMID = MASTER_V_SMID;
        oCtx.m_pCtx = (void*)&llAbsMasterTimeout;

        //各節(jié)點間同步選主颈畸、續(xù)約數(shù)據(jù)
        int ret = m_poPaxosNode->Propose(m_iMyGroupIdx, sPaxosValue, llCommitInstanceID, &oCtx);

        if (ret != 0)
        {
            BP->GetMasterBP()->TryBeMasterProposeFail();
        }
    }

一旦節(jié)點發(fā)起了選主乌奇、續(xù)約操作到各個節(jié)點没讲,各節(jié)點習得該數(shù)據(jù)后執(zhí)行MasterStateMachine的Execute操作。Execute做了基本檢查后調(diào)用LearnMaster礁苗,直接看該函數(shù)爬凑。

    int MasterStateMachine :: LearnMaster(
        const uint64_t llInstanceID,
        const MasterOperator& oMasterOper,
        const uint64_t llAbsMasterTimeout)
    {
        std::lock_guard<std::mutex> oLockGuard(m_oMutex);

        PLG1Debug("my last version %lu other last version %lu this version %lu instanceid %lu",
                  m_llMasterVersion, oMasterOper.lastversion(), oMasterOper.version(), llInstanceID);

        //master version不一致,且由其他節(jié)點發(fā)送來的master version更新试伙,嘗試使用新的master
        if (oMasterOper.lastversion() != 0
            && llInstanceID > m_llMasterVersion
            && oMasterOper.lastversion() != m_llMasterVersion)
        {
            BP->GetMasterBP()->MasterSMInconsistent();
            PLG1Err("other last version %lu not same to my last version %lu, instanceid %lu",
                    oMasterOper.lastversion(), m_llMasterVersion, llInstanceID);
            //隨機修復?
            if (OtherUtils::FastRand() % 100 < 50)
            {
                //try to fix online
                PLG1Err("try to fix, set my master version %lu as other last version %lu, instanceid %lu",
                        m_llMasterVersion, oMasterOper.lastversion(), llInstanceID);
                m_llMasterVersion = oMasterOper.lastversion();
            }
        }

        //即便經(jīng)過上面的修復嘗試嘁信,版本依舊不一致
        if (oMasterOper.version() != m_llMasterVersion)
        {
            PLG1Debug("version conflit, op version %lu now master version %lu",
                      oMasterOper.version(), m_llMasterVersion);
            return 0;
        }

        //將新主、續(xù)約時間等更新到數(shù)據(jù)庫中
        int ret = UpdateMasterToStore(oMasterOper.nodeid(), llInstanceID, oMasterOper.timeout());

        if (ret != 0)
        {
            PLG1Err("UpdateMasterToStore fail, ret %d", ret);
            return -1;
        }

        //內(nèi)存數(shù)據(jù)更新
        m_iMasterNodeID = oMasterOper.nodeid();

        if (m_iMasterNodeID == m_iMyNodeID)
        {
            //self be master
            //use local abstimeout
            m_llAbsExpireTime = llAbsMasterTimeout;

            BP->GetMasterBP()->SuccessBeMaster();
            PLG1Head("Be master success, absexpiretime %lu", m_llAbsExpireTime);
        }
        else
        {
            //other be master
            //use new start timeout
            m_llAbsExpireTime = Time::GetSteadyClockMS() + oMasterOper.timeout();

            BP->GetMasterBP()->OtherBeMaster();
            PLG1Head("Ohter be master, absexpiretime %lu", m_llAbsExpireTime);
        }

        m_iLeaseTime = oMasterOper.timeout();
        m_llMasterVersion = llInstanceID;

        PLG1Imp("OK, masternodeid %lu version %lu abstimeout %lu",
                m_iMasterNodeID, m_llMasterVersion, m_llAbsExpireTime);

        return 0;
    }

總結如下:

  1. 每個Group啟動一個選主服務線程疏叨,定期觸發(fā)選主潘靖、續(xù)約操作。
  2. 通過Propose發(fā)起選主考廉,選主成功后主節(jié)點信息持久化到數(shù)據(jù)庫秘豹。
  3. 主節(jié)點定時發(fā)起續(xù)約保活昌粤,周期約為租約的1/4既绕。
  4. 節(jié)點重啟,優(yōu)先沿用原有的主節(jié)點信息涮坐。

都已經(jīng)到這了凄贩,但有一件事一直沒提。選了Master又能怎么樣呢袱讹?PhxPaxos在哪些場景下使用了Master這個身份疲扎?
答:PhxPaxos對Master角色沒有任何依賴,即便沒有Master捷雕,PhxPaxos依舊可以正常工作椒丧。Master是提供給外部業(yè)務使用的。

5.7 業(yè)務狀態(tài)機

業(yè)務狀態(tài)機即由業(yè)務實現(xiàn)的狀態(tài)機救巷,負責業(yè)務自身邏輯。業(yè)務狀態(tài)機繼承自StateMachine并通過Node的AddStateMachine添加到PhxPaxos浦译。

class Node
{
public:
        //為所有Group添加狀態(tài)機
        virtual void AddStateMachine(StateMachine* poSM) = 0;
        //為指定Group添加狀態(tài)機
        virtual void AddStateMachine(const int iGroupIdx, StateMachine* poSM) = 0;
        ......
}

那么如何使用狀態(tài)機呢棒假?也非常簡單,在Node的Propose方法中指定SMCtx(State Machine Context)精盅,指明本次Propose配套的狀態(tài)機帽哑。

    class SMCtx
    {
    public:
        SMCtx();
        SMCtx(const int iSMID, void* pCtx);
        //狀態(tài)機ID
        int m_iSMID;
        //自定義數(shù)據(jù)
        void* m_pCtx;
    };

class Node
{
public:
        //Base function.
        virtual int Propose(const int iGroupIdx, const std::string& sValue, uint64_t& llInstanceID) = 0;
        virtual int Propose(const int iGroupIdx, const std::string& sValue, uint64_t& llInstanceID, SMCtx* poSMCtx) = 0; 0;
        ......
}

5.8 總結

本章講解了PhxPaxos中的狀態(tài)機、狀態(tài)機管理機制以及兩個內(nèi)置的狀態(tài)機叹俏。每個節(jié)點的paxos log只會被一個狀態(tài)機消費妻枕,因此保存的paxos log中記錄了本條數(shù)據(jù)所屬的狀態(tài)機。

  • 內(nèi)置的SystemVSM用于處理集群節(jié)點變更崔列,變更操作包括成員的新增、修改只壳、刪除救氯。它通過Propose方式完成集群間節(jié)點狀態(tài)變更。
  • 內(nèi)置的MasterStateMachine用于選舉服務赤拒,它配套的MasterMgr定時觸發(fā)選舉或者續(xù)約行為,保證整個集群有一個主節(jié)點。
  • 業(yè)務狀態(tài)機通過Node提供的AddStateMachine接口添加到PhxPaxos亭珍,并通過Propose時指定處理狀態(tài)機。

上一節(jié)我們提到:“PhxPaxos并未對paxos做任何變種枝哄,甚至還做了一點簡化肄梨。”挠锥≈谙郏看過狀態(tài)機的相關邏輯后,不知道各位是否能回答這個問題蓖租?所指的是《Paxos Made Simple》下面這部分邏輯并未實現(xiàn):

In general, suppose a leader can get α commands ahead—that is, it can propose commands i + 1 through i +α after commands 1 through i are chosen. A gap of up to α?1 commands could then arise.

在我看來粱侣,PhxPaxos這么做應該是出于以下幾點考慮:

  • 預取并發(fā)確定多個提案值,將導致程序復雜度指數(shù)上升蓖宦。
  • 通過支持BatchPropose同樣可以同時確定多個值齐婴。

至此,《Paxos Made Simple》中介紹的主要概念都已經(jīng)講解完畢稠茂。但有一個問題:如果paxos持續(xù)運行柠偶,將產(chǎn)生大量的paxos log(選中的提案值)。存儲介質(zhì)不可能支持無限量的數(shù)據(jù)睬关,但同時新節(jié)點加入需要習得全部paxos log诱担。paxos協(xié)議并沒有給出解決方案,那PhxPaxos又給出了什么方案呢电爹?來看下一節(jié)蔫仙。


【轉(zhuǎn)載請注明】隨安居士. 5. PhxPaxos源碼分析之狀態(tài)機. 2017.11.16

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市藐不,隨后出現(xiàn)的幾起案子匀哄,更是在濱河造成了極大的恐慌,老刑警劉巖雏蛮,帶你破解...
    沈念sama閱讀 216,843評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件涎嚼,死亡現(xiàn)場離奇詭異,居然都是意外死亡挑秉,警方通過查閱死者的電腦和手機法梯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,538評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人立哑,你說我怎么就攤上這事夜惭。” “怎么了铛绰?”我有些...
    開封第一講書人閱讀 163,187評論 0 353
  • 文/不壞的土叔 我叫張陵诈茧,是天一觀的道長。 經(jīng)常有香客問我捂掰,道長敢会,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,264評論 1 292
  • 正文 為了忘掉前任这嚣,我火速辦了婚禮鸥昏,結果婚禮上,老公的妹妹穿的比我還像新娘姐帚。我一直安慰自己吏垮,他們只是感情好,可當我...
    茶點故事閱讀 67,289評論 6 390
  • 文/花漫 我一把揭開白布罐旗。 她就那樣靜靜地躺著膳汪,像睡著了一般。 火紅的嫁衣襯著肌膚如雪尤莺。 梳的紋絲不亂的頭發(fā)上旅敷,一...
    開封第一講書人閱讀 51,231評論 1 299
  • 那天,我揣著相機與錄音颤霎,去河邊找鬼媳谁。 笑死,一個胖子當著我的面吹牛友酱,可吹牛的內(nèi)容都是我干的晴音。 我是一名探鬼主播,決...
    沈念sama閱讀 40,116評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼缔杉,長吁一口氣:“原來是場噩夢啊……” “哼锤躁!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起或详,我...
    開封第一講書人閱讀 38,945評論 0 275
  • 序言:老撾萬榮一對情侶失蹤系羞,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后霸琴,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體椒振,經(jīng)...
    沈念sama閱讀 45,367評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,581評論 2 333
  • 正文 我和宋清朗相戀三年梧乘,在試婚紗的時候發(fā)現(xiàn)自己被綠了澎迎。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片庐杨。...
    茶點故事閱讀 39,754評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖夹供,靈堂內(nèi)的尸體忽然破棺而出灵份,到底是詐尸還是另有隱情,我是刑警寧澤哮洽,帶...
    沈念sama閱讀 35,458評論 5 344
  • 正文 年R本政府宣布填渠,位于F島的核電站,受9級特大地震影響袁铐,放射性物質(zhì)發(fā)生泄漏揭蜒。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,068評論 3 327
  • 文/蒙蒙 一剔桨、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧徙融,春花似錦洒缀、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,692評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至隐轩,卻和暖如春饺饭,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背职车。 一陣腳步聲響...
    開封第一講書人閱讀 32,842評論 1 269
  • 我被黑心中介騙來泰國打工瘫俊, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人悴灵。 一個月前我還...
    沈念sama閱讀 47,797評論 2 369
  • 正文 我出身青樓扛芽,卻偏偏與公主長得像,于是被迫代替她去往敵國和親积瞒。 傳聞我的和親對象是個殘疾皇子川尖,可洞房花燭夜當晚...
    茶點故事閱讀 44,654評論 2 354

推薦閱讀更多精彩內(nèi)容