目錄
1. PhxPaxos源碼分析之關(guān)于PhxPaxos
2. PhxPaxos分析之網(wǎng)絡(luò)基礎(chǔ)部件
3. PhxPaxos源碼分析之Proposer婚度、Acceptor
4. PhxPaxos源碼分析之Learner
5. PhxPaxos源碼分析之狀態(tài)機(jī)
6. PhxPaxos源碼分析之歸檔機(jī)制
7. PhxPaxos源碼分析之整體架構(gòu)
4.1 基本概念
在Paxos中Learner角色負(fù)責(zé)向其他節(jié)點(diǎn)學(xué)習(xí)選中的提案值。具體包括如下兩種場景:
- Learner所在節(jié)點(diǎn)參與了提案選舉官卡,Learner需要知道其接受(accept)的提案值是否被選中(chosen)。
- Learner所在節(jié)點(diǎn)已落后于其他節(jié)點(diǎn)醋虏,Learner需要選擇合適的策略快速完成追趕寻咒,并重新參與到提案選舉當(dāng)中。
4.2 選中(chosen)通知
當(dāng)本節(jié)點(diǎn)Proposer的某個(gè)提案被選中(chosen)時(shí)颈嚼,通過(MsgType_PaxosLearner_ProposerSendSuccess)消息通知到各個(gè)節(jié)點(diǎn)毛秘,代碼如下:
void Learner::OnProposerSendSuccess(const PaxosMsg &oPaxosMsg)
{
BP->GetLearnerBP()->OnProposerSendSuccess();
PLGHead("START Msg.InstanceID %lu Now.InstanceID %lu Msg.ProposalID %lu State.AcceptedID %lu "
"State.AcceptedNodeID %lu, Msg.from_nodeid %lu",
oPaxosMsg.instanceid(), GetInstanceID(), oPaxosMsg.proposalid(),
m_poAcceptor->GetAcceptorState()->GetAcceptedBallot().m_llProposalID,
m_poAcceptor->GetAcceptorState()->GetAcceptedBallot().m_llNodeID,
oPaxosMsg.nodeid());
if (oPaxosMsg.instanceid() != GetInstanceID())
{
//Instance id not same, that means not in the same instance, ignord.
PLGDebug("InstanceID not same, skip msg");
return;
}
if (m_poAcceptor->GetAcceptorState()->GetAcceptedBallot().isnull())
{
//Not accept any yet.
BP->GetLearnerBP()->OnProposerSendSuccessNotAcceptYet();
PLGDebug("I haven't accpeted any proposal");
return;
}
BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
if (m_poAcceptor->GetAcceptorState()->GetAcceptedBallot() != oBallot)
{
//Proposalid not same, this accept value maybe not chosen value.
PLGDebug("ProposalBallot not same to AcceptedBallot");
BP->GetLearnerBP()->OnProposerSendSuccessBallotNotSame();
return;
}
//learn value.
m_oLearnerState.LearnValueWithoutWrite(
oPaxosMsg.instanceid(),
m_poAcceptor->GetAcceptorState()->GetAcceptedValue(),
m_poAcceptor->GetAcceptorState()->GetChecksum());
BP->GetLearnerBP()->OnProposerSendSuccessSuccessLearn();
PLGHead("END Learn value OK, value %zu", m_poAcceptor->GetAcceptorState()->GetAcceptedValue().size());
TransmitToFollower();
}
正常情況下,所有節(jié)點(diǎn)處于online狀態(tài)阻课,共同參與paxos選舉叫挟。因此,各個(gè)節(jié)點(diǎn)的instance id一致限煞。為了避免沖突抹恳,paxos建議只由主節(jié)點(diǎn)的proposer發(fā)起提案,這樣保證接受提案和習(xí)得提案編號(hào)一致署驻。
此時(shí)奋献,Learn習(xí)得的提案值實(shí)際上就是本節(jié)點(diǎn)Accept的數(shù)據(jù),因此learner只更新內(nèi)存狀態(tài)即可旺上,無需再次落盤(acceptor已落盤)瓶蚂。最后,如果存在follower節(jié)點(diǎn)宣吱,數(shù)據(jù)同步到follower(follower節(jié)點(diǎn)不參與paxos算法窃这,相當(dāng)于某個(gè)paxos節(jié)點(diǎn)的同步備)。
4.3 提案值追趕
一旦節(jié)點(diǎn)處于落后狀態(tài)征候,它無法再參與到paxos提案選舉中來杭攻。這時(shí)需要由learner發(fā)起主動(dòng)學(xué)習(xí)完成追趕洒试。
PhxPaxos啟動(dòng)時(shí),啟動(dòng)learner定時(shí)器朴上。learner定時(shí)發(fā)送learn請求到各個(gè)節(jié)點(diǎn)垒棋,發(fā)送請求攜帶本節(jié)點(diǎn)的Instance ID、Node ID信息痪宰。各節(jié)點(diǎn)收到該請求后叼架,處理邏輯如下:
//本節(jié)點(diǎn)無可學(xué)習(xí)數(shù)據(jù),跳過本次請求
if (oPaxosMsg.instanceid() >= GetInstanceID())
{
return;
}
//本節(jié)點(diǎn)有可學(xué)習(xí)數(shù)據(jù)衣撬,且本節(jié)點(diǎn)保存了此部分的paxos log
if (oPaxosMsg.instanceid() >= m_poCheckpointMgr->GetMinChosenInstanceID())
{
//通知Learner Sender發(fā)送數(shù)據(jù)
if (!m_oLearnerSender.Prepare(oPaxosMsg.instanceid(), oPaxosMsg.nodeid()))
{
BP->GetLearnerBP()->OnAskforLearnGetLockFail();
PLGErr("LearnerSender working for others.");
//如本節(jié)點(diǎn)正在處理其他節(jié)點(diǎn)的learn請求乖订,但發(fā)起者只少了一條記錄,繞過Learner Sender發(fā)送具练。
if (oPaxosMsg.instanceid() == (GetInstanceID() - 1))
{
PLGImp("InstanceID only difference one, just send this value to other.");
//send one value
AcceptorStateData oState;
int ret = m_oPaxosLog.ReadState(m_poConfig->GetMyGroupIdx(), oPaxosMsg.instanceid(), oState);
if (ret == 0)
{
BallotNumber oBallot(oState.acceptedid(), oState.acceptednodeid());
SendLearnValue(oPaxosMsg.nodeid(), oPaxosMsg.instanceid(), oBallot, oState.acceptedvalue(), 0, false);
}
}
//如果Leaner Sender正在處理其他節(jié)點(diǎn)請求乍构,跳過本次請求
return;
}
}
//本節(jié)點(diǎn)有可學(xué)習(xí)數(shù)據(jù),并且本節(jié)點(diǎn)當(dāng)前并未處理其他節(jié)點(diǎn)的Learn請求扛点,發(fā)送本節(jié)點(diǎn)信息給發(fā)起者
SendNowInstanceID(oPaxosMsg.instanceid(), oPaxosMsg.nodeid());
當(dāng)接收節(jié)點(diǎn)有可學(xué)習(xí)數(shù)據(jù)哥遮,并且當(dāng)前并未處理其他節(jié)點(diǎn)的Learn請求,發(fā)送如下信息包括:本節(jié)點(diǎn)的node id陵究、當(dāng)前的instance id眠饮、min chosen instance id、請求的instance id铜邮;除此之外仪召,當(dāng)paxos log差距超過50,那么兩邊的主節(jié)點(diǎn)信息松蒜、集群信息可能不同扔茅,發(fā)送此部分?jǐn)?shù)據(jù)。發(fā)起者可能依次接收到多個(gè)節(jié)點(diǎn)請求秸苗,需要二次確認(rèn):
//當(dāng)初請求的instance id和本節(jié)點(diǎn)已不一致召娜,說明已通過其他方式開始習(xí)得
if (oPaxosMsg.instanceid() != GetInstanceID())
{
PLGErr("Lag msg, skip");
return;
}
//本節(jié)點(diǎn)已經(jīng)追趕甚至超過了接受節(jié)點(diǎn)的paxos log,跳過
if (oPaxosMsg.nowinstanceid() <= GetInstanceID())
{
PLGErr("Lag msg, skip");
return;
}
//本節(jié)點(diǎn)數(shù)據(jù)落后太多难述,接受節(jié)點(diǎn)已無完整的paxos log數(shù)據(jù)萤晴,嘗試進(jìn)入Checkpoint模式
if (oPaxosMsg.minchoseninstanceid() > GetInstanceID())
{
BP->GetCheckpointBP()->NeedAskforCheckpoint();
PLGHead("my instanceid %lu small than other's minchoseninstanceid %lu, other nodeid %lu",
GetInstanceID(), oPaxosMsg.minchoseninstanceid(), oPaxosMsg.nodeid());
AskforCheckpoint(oPaxosMsg.nodeid());
}
else if (!m_bIsIMLearning)
{
//接受節(jié)點(diǎn)有可習(xí)得的完整數(shù)據(jù),向接受節(jié)點(diǎn)確認(rèn)習(xí)得請求
ComfirmAskForLearn(oPaxosMsg.nodeid());
}
其實(shí)上面做了這么多胁后,簡單講就是:如果本節(jié)點(diǎn)在請求這段時(shí)間內(nèi)并未進(jìn)行自主學(xué)習(xí)店读,并且也沒有其他節(jié)點(diǎn)在此之前通知本節(jié)點(diǎn)開始學(xué)習(xí),本節(jié)點(diǎn)發(fā)送確認(rèn)請求到learn接收者攀芯,準(zhǔn)備開始接受learn數(shù)據(jù)屯断。learn接受者收到確認(rèn)請求激活learner sender線程,發(fā)送paxos log數(shù)據(jù)。
4.4 Learner Sender
Learner Sender發(fā)送本節(jié)點(diǎn)數(shù)據(jù)到請求節(jié)點(diǎn)殖演。為了避免learn操作造成網(wǎng)絡(luò)擁塞氧秘,在數(shù)據(jù)發(fā)送前做了限流。
void LearnerSender :: SendLearnedValue(const uint64_t llBeginInstanceID, const nodeid_t iSendToNodeID)
{
PLGHead("BeginInstanceID %lu SendToNodeID %lu", llBeginInstanceID, iSendToNodeID);
uint64_t llSendInstanceID = llBeginInstanceID;
int ret = 0;
uint32_t iLastChecksum = 0;
//control send speed to avoid affecting the network too much.
int iSendQps = LearnerSender_SEND_QPS;
int iSleepMs = iSendQps > 1000 ? 1 : 1000 / iSendQps;
int iSendInterval = iSendQps > 1000 ? iSendQps / 1000 + 1 : 1;
PLGDebug("SendQps %d SleepMs %d SendInterval %d AckLead %d",
iSendQps, iSleepMs, iSendInterval, m_iAckLead);
int iSendCount = 0;
while (llSendInstanceID < m_poLearner->GetInstanceID())
{
ret = SendOne(llSendInstanceID, iSendToNodeID, iLastChecksum);
if (ret != 0)
{
PLGErr("SendOne fail, SendInstanceID %lu SendToNodeID %lu ret %d",
llSendInstanceID, iSendToNodeID, ret);
return;
}
if (!CheckAck(llSendInstanceID))
{
return;
}
iSendCount++;
llSendInstanceID++;
ReleshSending();
if (iSendCount >= iSendInterval)
{
iSendCount = 0;
Time::MsSleep(iSleepMs);
}
}
//succ send, reset ack lead.
m_iAckLead = LearnerSender_ACK_LEAD;
PLGImp("SendDone, SendEndInstanceID %lu", llSendInstanceID);
}
注意趴久,這里的while循環(huán)判定邏輯為while (llSendInstanceID < m_poLearner->GetInstanceID())丸相,即只追趕到當(dāng)前instance id的前一條。這是因?yàn)閘earner當(dāng)前所處的instance id可能尚未完成提案彼棍,但這之前的提案是已選中(chosen)的灭忠。單條paxos log發(fā)送邏輯如下:
int LearnerSender :: SendOne(const uint64_t llSendInstanceID, const nodeid_t iSendToNodeID, uint32_t & iLastChecksum)
{
BP->GetLearnerBP()->SenderSendOnePaxosLog();
AcceptorStateData oState;
int ret = m_poPaxosLog->ReadState(m_poConfig->GetMyGroupIdx(), llSendInstanceID, oState);
if (ret != 0)
{
return ret;
}
BallotNumber oBallot(oState.acceptedid(), oState.acceptednodeid());
ret = m_poLearner->SendLearnValue(iSendToNodeID, llSendInstanceID, oBallot, oState.acceptedvalue(), iLastChecksum);
iLastChecksum = oState.checksum();
return ret;
}
4.5 更快的對齊數(shù)據(jù)
下述文字截取自《微信自研生產(chǎn)級(jí)paxos類庫PhxPaxos實(shí)現(xiàn)原理介紹》。
上文說到當(dāng)各臺(tái)機(jī)器的當(dāng)前運(yùn)行實(shí)例編號(hào)不一致的時(shí)候座硕,就需要Learner介入工作來對齊數(shù)據(jù)了弛作。Learner通過其他機(jī)器拉取到當(dāng)前實(shí)例的chosen value,從而跳轉(zhuǎn)到下一編號(hào)的實(shí)例华匾,如此反復(fù)最終將自己的實(shí)例編號(hào)更新到與其他機(jī)器一致映琳。那么這里學(xué)習(xí)一個(gè)實(shí)例的網(wǎng)絡(luò)延時(shí)代價(jià)是一個(gè)RTT≈├可能這個(gè)延遲看起來還不錯(cuò)萨西,但是當(dāng)新的數(shù)據(jù)仍然通過一個(gè)RTT的代價(jià)不斷寫入的時(shí)候,而落后的機(jī)器仍然以一個(gè)RTT來進(jìn)行學(xué)習(xí)诸尽,這樣會(huì)出現(xiàn)很難追上的情況原杂。
這里需要改進(jìn),我們可以提前獲取差距您机,批量打包進(jìn)行學(xué)習(xí),比如A機(jī)器Learner記錄當(dāng)前實(shí)例編號(hào)是x年局,B機(jī)器是y际看,而x < y,那么B機(jī)器通過通信獲取這個(gè)差距矢否,將(x,y]的chosen value一起打包發(fā)送給A機(jī)器仲闽,A機(jī)器進(jìn)行批量的學(xué)習(xí)。這是一個(gè)很不錯(cuò)的方法僵朗。
但仍然不夠快赖欣,當(dāng)落后的數(shù)據(jù)極大,B機(jī)器發(fā)送數(shù)據(jù)需要的網(wǎng)絡(luò)耗時(shí)也將變大验庙,那么發(fā)送數(shù)據(jù)的過程中顶吮,A機(jī)器處于一種空閑狀態(tài),由于paxos另外一個(gè)瓶頸在于寫盤粪薛,如果不能利用這段時(shí)間來進(jìn)行寫盤悴了,那性能仍然堪憂。我們參考流式傳輸,采用類似的方法實(shí)現(xiàn)Learner的邊發(fā)邊學(xué)湃交,B機(jī)器源源不斷的往A機(jī)器輸送數(shù)據(jù)熟空,而A機(jī)器只需要收到一個(gè)實(shí)例最小單元的包體,即可立即解開進(jìn)行學(xué)習(xí)并完成寫盤搞莺。
具體的實(shí)現(xiàn)大概是先進(jìn)行一對一的協(xié)商息罗,建立一個(gè)Session通道,在Session通道里直接采用直塞的方式無腦發(fā)送數(shù)據(jù)才沧。當(dāng)然也不是完全的無腦迈喉,Session通過心跳機(jī)制進(jìn)行維護(hù),一旦Session斷開即停止發(fā)送糜工。
“我們參考流式傳輸弊添,采用類似的方法實(shí)現(xiàn)Learner的邊發(fā)邊學(xué),B機(jī)器源源不斷的往A機(jī)器輸送數(shù)據(jù)捌木,而A機(jī)器只需要收到一個(gè)實(shí)例最小單元的包體油坝,即可立即解開進(jìn)行學(xué)習(xí)并完成寫盤∨亳桑”澈圈。這部分實(shí)際上封裝在網(wǎng)絡(luò)層,來看如何做到A機(jī)器接收到一個(gè)最小的實(shí)例單元:
int MessageEvent :: OnRead()
{
if (m_iLeftReadLen > 0)
{
return ReadLeft();
}
int iReadLen = m_oSocket.receive(m_sReadHeadBuffer + m_iLastReadHeadPos, sizeof(int) - m_iLastReadHeadPos);
if (iReadLen == 0)
{
BP->GetNetworkBP()->TcpOnReadMessageLenError();
PLErr("read head fail, readlen %d, socket broken", iReadLen);
return -1;
}
m_iLastReadHeadPos += iReadLen;
if (m_iLastReadHeadPos < (int)sizeof(int))
{
PLImp("head read pos %d small than sizeof(int) %zu", m_iLastReadHeadPos, sizeof(int));
return 0;
}
m_iLastReadHeadPos = 0;
int niLen = 0;
int iLen = 0;
memcpy((char *)&niLen, m_sReadHeadBuffer, sizeof(int));
iLen = ntohl(niLen) - 4;
if (iLen < 0 || iLen > MAX_VALUE_SIZE)
{
PLErr("need to read len wrong %d", iLen);
return -2;
}
m_oReadCacheBuffer.Ready(iLen);
m_iLeftReadLen = iLen;
m_iLastReadPos = 0;
//second read maybe no data read, so readlen == 0 is ok.
bool bAgain = false;
iReadLen = m_oSocket.receive(m_oReadCacheBuffer.GetPtr(), iLen, &bAgain);
if (iReadLen == 0)
{
if (!bAgain)
{
PLErr("second read data fail, readlen %d, no again, socket broken", iReadLen);
return -1;
}
else
{
PLErr("second read data, readlen %d need again", iReadLen);
return 0;
}
}
if (iReadLen == iLen)
{
ReadDone(m_oReadCacheBuffer, iLen);
m_iLeftReadLen = 0;
m_iLastReadPos = 0;
}
else if (iReadLen < iLen)
{
m_iLastReadPos = iReadLen;
m_iLeftReadLen = iLen - iReadLen;
PLImp("read buflen %d small than except len %d", iReadLen, iLen);
}
else
{
PLErr("read buflen %d large than except len %d", iReadLen, iLen);
return -2;
}
return 0;
}
OnRead函數(shù)由以下兩部分組成:
- 首先讀取數(shù)據(jù)包大小帆啃,這個(gè)過程可能需要分多次完成瞬女。
- 讀取指定數(shù)據(jù)包大小的數(shù)據(jù),這部分也可能需要分多次完成努潘。
當(dāng)單個(gè)數(shù)據(jù)包讀完诽偷,已獲得完整的“最小實(shí)例單元”,通過ReadDone將數(shù)據(jù)包發(fā)往Node節(jié)點(diǎn)處理疯坤。
至于心跳报慕,其實(shí)就是PhxPaxos中的Ack機(jī)制。在SendLearnedValue中压怠,每發(fā)送一條記錄需要執(zhí)行一次CheckAck眠冈。如果檢查失敗,將終止發(fā)送菌瘫。每條記錄發(fā)送后蜗顽,learner sender要求對端發(fā)送一個(gè)異步的ack請求,這個(gè)過程是異步的雨让。CheckAck邏輯如下:
const bool LearnerSender :: CheckAck(const uint64_t llSendInstanceID)
{
m_oLock.Lock();
if (llSendInstanceID < m_llAckInstanceID)
{
m_iAckLead = LearnerSender_ACK_LEAD;
PLGImp("Already catch up, ack instanceid %lu now send instanceid %lu",
m_llAckInstanceID, llSendInstanceID);
m_oLock.UnLock();
return false;
}
//如果當(dāng)前發(fā)送的instance id和對端已確認(rèn)的instance id差距超過了設(shè)定值
while (llSendInstanceID > m_llAckInstanceID + m_iAckLead)
{
uint64_t llNowTime = Time::GetSteadyClockMS();
uint64_t llPassTime = llNowTime > m_llAbsLastAckTime ? llNowTime - m_llAbsLastAckTime : 0;
//同時(shí)雇盖,超過了設(shè)定的超時(shí)時(shí)間,提前終止
if ((int)llPassTime >= LearnerSender_ACK_TIMEOUT)
{
BP->GetLearnerBP()->SenderAckTimeout();
PLGErr("Ack timeout, last acktime %lu now send instanceid %lu",
m_llAbsLastAckTime, llSendInstanceID);
CutAckLead();
m_oLock.UnLock();
return false;
}
BP->GetLearnerBP()->SenderAckDelay();
//PLGErr("Need sleep to slow down send speed, sendinstaceid %lu ackinstanceid %lu",
//llSendInstanceID, m_llAckInstanceID);
m_oLock.WaitTime(20);
}
m_oLock.UnLock();
return true;
}
4.6 關(guān)于選中(chosen)
在整個(gè)講解learner中宫患,我們一直在強(qiáng)調(diào)選中(chosen)提案或者選中(chosen)值刊懈。這個(gè)過程提出以下疑問这弧,并嘗試解答。
問:learner如何知道某個(gè)提案是否被選中呢虚汛?
答:“4.2 選中通知”是通知各個(gè)learner值是否已被選中的一種常規(guī)方式匾浪。
問:如果某個(gè)值被選中后,提案發(fā)起節(jié)點(diǎn)異常卷哩,選中消息未發(fā)出會(huì)如何蛋辈?
答:重新發(fā)起選舉,新的被選中的提案編號(hào)不同将谊,但提案值保持不變冷溶。
問:“4.2 選中通知”中,只更新了內(nèi)存狀態(tài)尊浓,在持久化的數(shù)據(jù)中逞频,如何區(qū)分一個(gè)值的狀態(tài)是accept還是chosen呢?
答: 如果當(dāng)前的instance id為N栋齿,在N-1之前的所有提案值都是chosen的苗胀。當(dāng)instance id為N的提案值可能是chosen狀態(tài)、也可能是accept狀態(tài)瓦堵。
問:為何非chosen狀態(tài)的數(shù)據(jù)也需要落盤
答:參見P2C的不變性基协,即Prepare和Accept階段做過的承諾、接受過的值即便節(jié)點(diǎn)重啟等異常情況下也需要保持不變菇用。
問:我還有一個(gè)關(guān)于Instance類初始化的問題澜驮。
答:Instance類本章尚未涉及,留在第七章解答吧惋鸥。
4.6 總結(jié)
經(jīng)過第三杂穷、第四兩章的講解,paxos協(xié)議的算法實(shí)現(xiàn)已經(jīng)分析完成卦绣。PhxPaxos并未對paxos做任何變種亭畜,甚至還做了一點(diǎn)簡化。
下一節(jié)迎卤,我們將介紹paxos made simple中另外一個(gè)重要的概念:狀態(tài)機(jī)。當(dāng)然玷坠,這里提到的“簡化”也會(huì)提及蜗搔。
【轉(zhuǎn)載請注明】隨安居士. 4. PhxPaxos源碼分析之Learner. 2017.11.15