nacos的集群分為持久化和非持久化兩種,持久化采用的Raft協(xié)議來實現(xiàn),非持久化使用不同節(jié)點作為節(jié)點分片存儲實現(xiàn),以下主要分析持久化的實現(xiàn)
1.nacos持久化集群
??nacos持久化的集群類似于zookeeper拍顷, 它分為leader角色和follower角色, 那么從這個角色的名字可以看出來塘幅,這個集群存在選舉的機制昔案。 因為如果自己不具備選舉功能,角色的命名可能就是master/slave了
1. 1選舉算法
使用raft算法協(xié)議保證數(shù)據(jù)一致性(和zookeeper一樣弱一致性)电媳,以及相關的leader選舉
raft參考:http://thesecretlivesofdata.com/raft/踏揣;
強烈建議先理解raft協(xié)議思想,下面看代碼無非就是熟悉一下流程而已匾乓;
leader
follower
candidate選舉
選舉有兩個時機
1.服務啟動的時候
2.leader掛了的時候
所有節(jié)點啟動的時候捞稿,都是follower狀態(tài)。 如果在一段時間內如果沒有收到leader的心跳(可能是沒有 leader拼缝,也可能是leader掛了)娱局,那么follower會變成Candidate。然后發(fā)起選舉咧七,選舉之前铃辖,會增加 term,這個term和zookeeper中的epoch的道理是一樣的猪叙。
follower會投自己一票娇斩,并且給其他節(jié)點發(fā)送票據(jù)vote,等到其他節(jié)點回復
在這個過程中穴翩,可能出現(xiàn)幾種情況 收到過半的票數(shù)通過犬第,則成為leader 被告知其他節(jié)點已經(jīng)成為leader,則自己切換為follower 一段時間內沒有收到過半的投票芒帕,則重新發(fā)起選舉
約束條件在任一term中歉嗓,單個節(jié)點最多只能投一票
選舉的幾種情況
第一種情況,贏得選舉之后背蟆,leader會給所有節(jié)點發(fā)送消息鉴分,避免其他節(jié)點觸發(fā)新的選舉
第二種情況,比如有三個節(jié)點A B C带膀。A B同時發(fā)起選舉志珍,而A的選舉消息先到達C,C給A投了一 票垛叨,當B的消息到達C時伦糯,已經(jīng)不能滿足上面提到的第一個約束,即C不會給B投票,而A和B顯然都 不會給對方投票敛纲。A勝出之后喂击,會給B,C發(fā)心跳消息,節(jié)點B發(fā)現(xiàn)節(jié)點A的term不低于自己的term淤翔, 知道有已經(jīng)有Leader了翰绊,于是轉換成follower
第三種情況, 沒有任何節(jié)點獲得超過半數(shù)的投票旁壮,可能是平票的情況监嗜。加入總共有四個節(jié)點 (A/B/C/D),Node C寡具、Node D同時成為了candidate,但Node A投了NodeD一票稚补,NodeB投 了Node C一票童叠,這就出現(xiàn)了平票 split vote的情況。這個時候大家都在等啊等课幕,直到超時后重新發(fā) 起選舉厦坛。如果出現(xiàn)平票的情況,那么就延長了系統(tǒng)不可用的時間,因此raft引入了randomized election timeouts來盡量避免平票情況
源碼分析:
Server節(jié)點數(shù)據(jù)獲取是通過ServerListManager從文件和內存數(shù)據(jù)讀取以及比較來的
public class ServerListManager{
@PostConstruct
public void init() {
//這個獲取server列表乍惊,列表更新也會觸發(fā)對應事件
GlobalExecutor.registerServerListUpdater(new ServerListUpdater());
//服務狀態(tài)監(jiān)測
GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 2000);
}
}
下面是Raft核心算法
public class RaftCore {
// 杜秸。。润绎。
@PostConstruct
public void init() throws Exception {
Loggers.RAFT.info("initializing Raft sub-system");
executor.submit(notifier);
long start = System.currentTimeMillis();
raftStore.loadDatums(notifier, datums);
//設置term,默認給0撬碟,如果本地文件獲取有term會用數(shù)據(jù)中的term
setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());
//這里不是重點;就是簡單service,instance的任務要保證先完成莉撇,再選舉而已
while (true) {
if (notifier.tasks.size() <= 0) {
break;
}
Thread.sleep(1000L);
}
initialized = true;
//這里是一個定時任務負責master的選舉
GlobalExecutor.registerMasterElection(new MasterElection());
//這個負責定時心跳處理呢蛤,就是客戶端發(fā)送心跳給服務端
GlobalExecutor.registerHeartbeat(new HeartBeat());
Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}
}
1)leader選舉
就是基于本地啟動發(fā)起選舉
//Master的選舉
public class MasterElection implements Runnable {
@Override
public void run() {
try {
//表示peers狀態(tài)好了,這個是通過listerner監(jiān)控獲取的
if (!peers.isReady()) {
return;
}
RaftPeer local = peers.local();
//這個是要等待多久開啟發(fā)起選舉棍郎,
//1.local.leaderDueMs的值一開始是隨機生成的其障,范圍是[0, 15000),單位是毫秒涂佃,
//2.此后按照500毫秒一個梯度進行遞減励翼,減少到≤0后,就會觸發(fā)選舉操作辜荠。當然汽抚,選舉之前,把超時時間重置一下伯病。
//3.resetLeaderDue()方法是把leaderDueMs變量重新賦值殊橙,但是并不是像初始化隨機賦值一樣的邏輯,而是在15000毫秒的基礎上加上了一個隨機值,其隨機值的范圍是[0, 5000)毫秒膨蛮。
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.leaderDueMs > 0) {
return;
}
// reset timeout
local.resetLeaderDue();
local.resetHeartbeatDue();
//發(fā)起選舉
sendVote();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while master election {}", e);
}
}
public void sendVote() {
RaftPeer local = peers.get(NetUtils.localServer());
Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}",
JSON.toJSONString(getLeader()), local.term);
peers.reset();
//本地的term增加一
local.term.incrementAndGet();
//先投給自己
local.voteFor = local.ip;
//更新自己的狀態(tài)
local.state = RaftPeer.State.CANDIDATE;
Map<String, String> params = new HashMap<>(1);
params.put("vote", JSON.toJSONString(local));
//發(fā)給除了自己所有其他服務節(jié)點
for (final String server : peers.allServersWithoutMySelf()) {
final String url = buildURL(server, API_VOTE);
try {
//會請求RaftController的raft/vote接口做 處理叠纹;
HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url);
return 1;
}
RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class);
Loggers.RAFT.info("received approve from peer: {}", JSON.toJSONString(peer));
//拿到投票結果,選舉最終的leader
peers.decideLeader(peer);
return 0;
}
});
} catch (Exception e) {
Loggers.RAFT.warn("error while sending vote to server: {}", server);
}
}
}
}
}
//Raft節(jié)點集
public class RaftPeerSet {
//投票選出leader,每次投票結果返回都會執(zhí)行這個操作
public RaftPeer decideLeader(RaftPeer candidate) {
peers.put(candidate.ip, candidate);
SortedBag ips = new TreeBag();
int maxApproveCount = 0;
String maxApprovePeer = null;
for (RaftPeer peer : peers.values()) {
if (StringUtils.isEmpty(peer.voteFor)) {
continue;
}
ips.add(peer.voteFor);
if (ips.getCount(peer.voteFor) > maxApproveCount) {
maxApproveCount = ips.getCount(peer.voteFor);
maxApprovePeer = peer.voteFor;
}
}
//比較最大投票數(shù)并且當投票數(shù)量 總投票節(jié)點 過半敞葛,將該節(jié)點設置為leader
if (maxApproveCount >= majorityCount()) {
RaftPeer peer = peers.get(maxApprovePeer);
peer.state = RaftPeer.State.LEADER;
if (!Objects.equals(leader, peer)) {
leader = peer;
//發(fā)布事件誉察,leader已經(jīng)選舉完成了
applicationContext.publishEvent(new LeaderElectFinishedEvent(this, leader));
Loggers.RAFT.info("{} has become the LEADER", leader.ip);
}
}
return leader;
}
}
RaftPeer收到投票做處理
public class RaftController {
@NeedAuth
@PostMapping("/vote")
public JSONObject vote(HttpServletRequest request, HttpServletResponse response) throws Exception {
RaftPeer peer = raftCore.receivedVote(
JSON.parseObject(WebUtils.required(request, "vote"), RaftPeer.class));
return JSON.parseObject(JSON.toJSONString(peer));
}
}
//RaftCore
public synchronized RaftPeer receivedVote(RaftPeer remote) {
if (!peers.contains(remote)) {
throw new IllegalStateException("can not find peer: " + remote.ip);
}
RaftPeer local = peers.get(NetUtils.localServer());
//如果當前的term大于遠程的term,返回本地的給遠端作為投票返回
if (remote.term.get() <= local.term.get()) {
String msg = "received illegitimate vote" +
", voter-term:" + remote.term + ", votee-term:" + local.term;
Loggers.RAFT.info(msg);
if (StringUtils.isEmpty(local.voteFor)) {
local.voteFor = local.ip;
}
return local;
}
//第一個收到就返回先收到的投票
local.resetLeaderDue();
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
local.term.set(remote.term.get());
Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);
return local;
}
總結一下上面的leader節(jié)點選舉:
腦子里一定要考慮到幾個節(jié)點在投票的情景惹谐,比如下面這個:
1.啟動后會先判定服務是否已經(jīng)啟動好持偏,啟動好后每一個服務有一個初始化leader選舉到期時間,會不停的減去500ms氨肌,直到小于等于0才執(zhí)行選舉
2.選舉的時候term增加1鸿秆;狀態(tài)設置為candidate,并把自己節(jié)點作為投票節(jié)點向其他所有節(jié)點發(fā)送投票選舉;
3.節(jié)點拿到選舉投票信息后怎囚,如果如果遠端的節(jié)點term不能大于本地節(jié)點信息就返回本地投票結果給對方
4.選舉獲取投票結果后會加入本次 統(tǒng)計本地所有的投票信息卿叽,判定如果最多投票節(jié)點數(shù)過半了,就選出leader了恳守;
2) 心跳檢測leader健康
心跳檢測 leader是否掛了考婴,如果掛了是要重新選舉的;
這個是leader主動發(fā)起心跳給每一個follower節(jié)點催烘,重置服務端leader的選舉發(fā)生
上面我們初始化了一個啟動RaftCore的時候創(chuàng)建了
GlobalExecutor.registerHeartbeat(new HeartBeat());
我們看這個心跳節(jié)點信息
public class HeartBeat implements Runnable {
@Override
public void run() {
try {
if (!peers.isReady()) {
return;
}
//和上面的道理相同沥阱,有一個心跳延時緩沖
RaftPeer local = peers.local();
local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.heartbeatDueMs > 0) {
return;
}
local.resetHeartbeatDue();
//發(fā)起心跳
sendBeat();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
}
public void sendBeat() throws IOException, InterruptedException {
RaftPeer local = peers.local();
//這里表示在集群中,不是leader就不執(zhí)行了
if (local.state != RaftPeer.State.LEADER && !STANDALONE_MODE) {
return;
}
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());
}
//同樣重置leader選舉時間
local.resetLeaderDue();
// build data
JSONObject packet = new JSONObject();
packet.put("peer", local);
JSONArray array = new JSONArray();
if (switchDomain.isSendBeatOnly()) {
Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", String.valueOf(switchDomain.isSendBeatOnly()));
}
//這個和命名服務service伊群、instance有關考杉,表示把當前節(jié)點的servive,instance打包發(fā)送給從節(jié)點判定變化情況
//默認也會走這個邏輯,除非手動給關閉了
if (!switchDomain.isSendBeatOnly()) {
for (Datum datum : datums.values()) {
JSONObject element = new JSONObject();
if (KeyBuilder.matchServiceMetaKey(datum.key)) {
element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
} else if (KeyBuilder.matchInstanceListKey(datum.key)) {
element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
}
element.put("timestamp", datum.timestamp);
array.add(element);
}
}
packet.put("datums", array);
// broadcast
Map<String, String> params = new HashMap<String, String>(1);
params.put("beat", JSON.toJSONString(packet));
String content = JSON.toJSONString(params);
//gizp壓縮,如果帶有service,instance避免文件過大
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(out);
gzip.write(content.getBytes(StandardCharsets.UTF_8));
gzip.close();
byte[] compressedBytes = out.toByteArray();
String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}",
content.length(), compressedContent.length());
}
//給除了自己的所有節(jié)點發(fā)送心跳信息
for (final String server : peers.allServersWithoutMySelf()) {
try {
final String url = buildURL(server, API_BEAT);
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("send beat to server " + server);
}
//異步發(fā)送url=vote/beat
HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}",
response.getResponseBody(), server);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
return 1;
}
//成功返回結果更新本地ip舰始,peer信息
peers.update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("receive beat response from: {}", url);
}
return 0;
}
@Override
public void onThrowable(Throwable t) {
Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server, t);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
});
} catch (Exception e) {
Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
}
}
}
上面就是簡單遍歷異步發(fā)送給除自己之外的所有節(jié)點自己的信息奔则;
其他節(jié)點拿到請求處理
RaftController
@PostMapping("/beat")
public JSONObject beat(HttpServletRequest request, HttpServletResponse response) throws Exception {
String entity = new String(IoUtils.tryDecompress(request.getInputStream()), StandardCharsets.UTF_8);
String value = URLDecoder.decode(entity, "UTF-8");
value = URLDecoder.decode(value, "UTF-8");
JSONObject json = JSON.parseObject(value);
JSONObject beat = JSON.parseObject(json.getString("beat"));
//這里處理
RaftPeer peer = raftCore.receivedBeat(beat);
return JSON.parseObject(JSON.toJSONString(peer));
}
//收到心跳信息的處理
public RaftPeer receivedBeat(JSONObject beat) throws Exception {
final RaftPeer local = peers.local();
final RaftPeer remote = new RaftPeer();
remote.ip = beat.getJSONObject("peer").getString("ip");
remote.state = RaftPeer.State.valueOf(beat.getJSONObject("peer").getString("state"));
remote.term.set(beat.getJSONObject("peer").getLongValue("term"));
remote.heartbeatDueMs = beat.getJSONObject("peer").getLongValue("heartbeatDueMs");
remote.leaderDueMs = beat.getJSONObject("peer").getLongValue("leaderDueMs");
remote.voteFor = beat.getJSONObject("peer").getString("voteFor");
if (remote.state != RaftPeer.State.LEADER) {
Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}",
remote.state, JSON.toJSONString(remote));
throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
}
if (local.term.get() > remote.term.get()) {
Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}"
, remote.term.get(), local.term.get(), JSON.toJSONString(remote), local.leaderDueMs);
throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get()
+ ", beat-to-term: " + local.term.get());
}
if (local.state != RaftPeer.State.FOLLOWER) {
Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JSON.toJSONString(remote));
// mk follower,如果當前節(jié)點不是follower節(jié)點,強制設置為follower節(jié)點
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
}
final JSONArray beatDatums = beat.getJSONArray("datums");
//重置本地leader的選舉過期蔽午,
local.resetLeaderDue();
//重置心跳時間
local.resetHeartbeatDue();
//1.這個表示獲取到的是leader會把本地節(jié)點的leader應用更新為當前的遠程主節(jié)點
//2.遍歷自己所有的peer易茬,將原來存儲老舊的leader節(jié)點 狀態(tài)由leader->follower狀態(tài)
peers.makeLeader(remote);
//這個表示除了發(fā)送心跳還有其他信息(service,instance)發(fā)送,這下面也主要是解析這些信息
if (!switchDomain.isSendBeatOnly()) {
Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());
//本地key記錄及老,如果有遠程沒有傳遞過來該key;就表示檔期key為失效的key了抽莱,當前節(jié)點會刪除key的所有信息
for (Map.Entry<String, Datum> entry : datums.entrySet()) {
receivedKeysMap.put(entry.getKey(), 0);
}
// now check datums,這個表示會批量執(zhí)行本地更新的datum數(shù)據(jù)
List<String> batch = new ArrayList<>();
int processedCount = 0;
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",
beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);
}
for (Object object : beatDatums) {
processedCount = processedCount + 1;
JSONObject entry = (JSONObject) object;
String key = entry.getString("key");
final String datumKey;
if (KeyBuilder.matchServiceMetaKey(key)) {
datumKey = KeyBuilder.detailServiceMetaKey(key);
} else if (KeyBuilder.matchInstanceListKey(key)) {
datumKey = KeyBuilder.detailInstanceListkey(key);
} else {
// ignore corrupted key:
continue;
}
long timestamp = entry.getLong("timestamp");
receivedKeysMap.put(datumKey, 1);
try {
if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) {
continue;
}
//如果當前內存不存在該key或者時間戳小于 遠程記錄時間搓骄恶,就會記錄起來后面批量執(zhí)行更新
if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
batch.add(datumKey);
}
//表示batch批量執(zhí)行個數(shù) 大于50食铐;或者最后一個處理了就會往下執(zhí)行了
if (batch.size() < 50 && processedCount < beatDatums.size()) {
continue;
}
String keys = StringUtils.join(batch, ",");
if (batch.size() <= 0) {
continue;
}
Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}, datums' size is {}, RaftCore.datums' size is {}"
, getLeader().ip, batch.size(), processedCount, beatDatums.size(), datums.size());
// update datum entry;從遠程主節(jié)點再次拉取datum信息(service/instance)
String url = buildURL(remote.ip, API_GET) + "?keys=" + URLEncoder.encode(keys, "UTF-8");
HttpClient.asyncHttpGet(url, null, null, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
return 1;
}
List<JSONObject> datumList = JSON.parseObject(response.getResponseBody(), new TypeReference<List<JSONObject>>() {
});
//對于遠程的datum數(shù)據(jù)處理
for (JSONObject datumJson : datumList) {
OPERATE_LOCK.lock();
Datum newDatum = null;
try {
//獲取本地舊的datum信息
Datum oldDatum = getDatum(datumJson.getString("key"));
if (oldDatum != null && datumJson.getLongValue("timestamp") <= oldDatum.timestamp.get()) {
Loggers.RAFT.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",
datumJson.getString("key"), datumJson.getLongValue("timestamp"), oldDatum.timestamp);
continue;
}
//滿足service時僧鲁,解析為servive的Datum
if (KeyBuilder.matchServiceMetaKey(datumJson.getString("key"))) {
Datum<Service> serviceDatum = new Datum<>();
serviceDatum.key = datumJson.getString("key");
serviceDatum.timestamp.set(datumJson.getLongValue("timestamp"));
serviceDatum.value =
JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Service.class);
newDatum = serviceDatum;
}
//滿足instance時虐呻,解析為instance的Datum
if (KeyBuilder.matchInstanceListKey(datumJson.getString("key"))) {
Datum<Instances> instancesDatum = new Datum<>();
instancesDatum.key = datumJson.getString("key");
instancesDatum.timestamp.set(datumJson.getLongValue("timestamp"));
instancesDatum.value =
JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Instances.class);
newDatum = instancesDatum;
}
if (newDatum == null || newDatum.value == null) {
Loggers.RAFT.error("receive null datum: {}", datumJson);
continue;
}
//對于變化的 service,instance寫磁盤
raftStore.write(newDatum);
//更新內存中datum
datums.put(newDatum.key, newDatum);
//發(fā)布事件象泵,服務信息變化了
notifier.addTask(newDatum.key, ApplyAction.CHANGE);
//重置本地leader的過期時間
local.resetLeaderDue();
//這里本地的term加上100大于遠程的term值會更新本地leader的term值,并保持自己local的term一致
//為什么會+100斟叼;原因是每次service偶惠、instance更新會term的值都會遞增100
if (local.term.get() + 100 > remote.term.get()) {
getLeader().term.set(remote.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(100);
}
//跟新本地元信息meta.properties文件內容
raftStore.updateTerm(local.term.get());
Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",
newDatum.key, newDatum.timestamp, JSON.toJSONString(remote), local.term);
} catch (Throwable e) {
Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum, e);
} finally {
OPERATE_LOCK.unlock();
}
}
TimeUnit.MILLISECONDS.sleep(200);
return 0;
}
});
batch.clear();
} catch (Exception e) {
Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);
}
}
//選出dead失效的datum信息,并且從內存和磁盤刪除掉
List<String> deadKeys = new ArrayList<>();
for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
if (entry.getValue() == 0) {
deadKeys.add(entry.getKey());
}
}
for (String deadKey : deadKeys) {
try {
deleteDatum(deadKey);
} catch (Exception e) {
Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);
}
}
}
return local;
}
總結下上面心跳信息處理
- leader節(jié)點將自己節(jié)點信息以及自己本地的所有的datum打包發(fā)送到follower節(jié)點
- follower節(jié)點收到信息后馬上重置leaderDue時間朗涩,避免follower節(jié)點發(fā)現(xiàn)leader掛了發(fā)生下一次選舉
- 另外的就是會用leader的datum和本地的datum進行比對忽孽,刪除失效的datum并且并且更新datum信息,更新的時候回對于term作100判斷谢床,添加100兄一;
注:datum就是注冊中心的服務service和實例instance信息的包裝
2.數(shù)據(jù)處理
對于事務操作,請求會轉發(fā)給leader识腿; 非事務操作出革,可以任意一個節(jié)點來處理 在發(fā)布內容的時候,做了兩個事情
如果當前的節(jié)點不是leader渡讼,則轉發(fā)給leader節(jié)點處理
如果是骂束,則向所有節(jié)點發(fā)送onPublish,分析服務注冊會使用到
RaftCore
public void signalPublish(String key, Record value) throws Exception {
if (!isLeader()) {
JSONObject params = new JSONObject();
params.put("key", key);
params.put("value", value);
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);
//leader代理操作
raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);
return;
}
try {
OPERATE_LOCK.lock();
long start = System.currentTimeMillis();
final Datum datum = new Datum();
datum.key = key;
datum.value = value;
if (getDatum(key) == null) {
datum.timestamp.set(1L);
} else {
datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
}
JSONObject json = new JSONObject();
json.put("datum", datum);
json.put("source", peers.local());
//這里會更新本地datum和文件緩存硝全;另外term值會向上遞增100
onPublish(datum, peers.local());
final String content = JSON.toJSONString(json);
//過半策略
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
for (final String server : peers.allServersIncludeMyself()) {
if (isLeader(server)) {
latch.countDown();
continue;
}
final String url = buildURL(server, API_ON_PUB);
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
datum.key, server, response.getStatusCode());
return 1;
}
latch.countDown();
return 0;
}
@Override
public STATE onContentWriteCompleted() {
return STATE.CONTINUE;
}
});
}
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
// only majority servers return success can we consider this update success
Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}
long end = System.currentTimeMillis();
Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
} finally {
OPERATE_LOCK.unlock();
}
}
public void onPublish(Datum datum, RaftPeer source) throws Exception {
RaftPeer local = peers.local();
if (datum.value == null) {
Loggers.RAFT.warn("received empty datum");
throw new IllegalStateException("received empty datum");
}
if (!peers.isLeader(source.ip)) {
Loggers.RAFT.warn("peer {} tried to publish data but wasn't leader, leader: {}",
JSON.toJSONString(source), JSON.toJSONString(getLeader()));
throw new IllegalStateException("peer(" + source.ip + ") tried to publish " +
"data but wasn't leader");
}
if (source.term.get() < local.term.get()) {
Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}",
JSON.toJSONString(source), JSON.toJSONString(local));
throw new IllegalStateException("out of date publish, pub-term:"
+ source.term.get() + ", cur-term: " + local.term.get());
}
local.resetLeaderDue();
// if data should be persistent, usually this is always true:
if (KeyBuilder.matchPersistentKey(datum.key)) {
raftStore.write(datum);
}
//更新內存中的值
datums.put(datum.key, datum);
//term值遞增100
if (isLeader()) {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
} else {
if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
//set leader term:
getLeader().term.set(source.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
}
}
raftStore.updateTerm(local.term.get());
notifier.addTask(datum.key, ApplyAction.CHANGE);
Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}