Nacos的集群

nacos的集群分為持久化和非持久化兩種,持久化采用的Raft協(xié)議來實現(xiàn),非持久化使用不同節(jié)點作為節(jié)點分片存儲實現(xiàn),以下主要分析持久化的實現(xiàn)

1.nacos持久化集群

??nacos持久化的集群類似于zookeeper拍顷, 它分為leader角色和follower角色, 那么從這個角色的名字可以看出來塘幅,這個集群存在選舉的機制昔案。 因為如果自己不具備選舉功能,角色的命名可能就是master/slave了

1. 1選舉算法

使用raft算法協(xié)議保證數(shù)據(jù)一致性(和zookeeper一樣弱一致性)电媳,以及相關的leader選舉

raft參考:http://thesecretlivesofdata.com/raft/踏揣;
強烈建議先理解raft協(xié)議思想,下面看代碼無非就是熟悉一下流程而已匾乓;

  • leader

  • follower

  • candidate選舉

選舉有兩個時機

  • 1.服務啟動的時候

  • 2.leader掛了的時候

所有節(jié)點啟動的時候捞稿,都是follower狀態(tài)。 如果在一段時間內如果沒有收到leader的心跳(可能是沒有 leader拼缝,也可能是leader掛了)娱局,那么follower會變成Candidate。然后發(fā)起選舉咧七,選舉之前铃辖,會增加 term,這個term和zookeeper中的epoch的道理是一樣的猪叙。

  • follower會投自己一票娇斩,并且給其他節(jié)點發(fā)送票據(jù)vote,等到其他節(jié)點回復

  • 在這個過程中穴翩,可能出現(xiàn)幾種情況 收到過半的票數(shù)通過犬第,則成為leader 被告知其他節(jié)點已經(jīng)成為leader,則自己切換為follower 一段時間內沒有收到過半的投票芒帕,則重新發(fā)起選舉

  • 約束條件在任一term中歉嗓,單個節(jié)點最多只能投一票

選舉的幾種情況

  • 第一種情況,贏得選舉之后背蟆,leader會給所有節(jié)點發(fā)送消息鉴分,避免其他節(jié)點觸發(fā)新的選舉

  • 第二種情況,比如有三個節(jié)點A B C带膀。A B同時發(fā)起選舉志珍,而A的選舉消息先到達C,C給A投了一 票垛叨,當B的消息到達C時伦糯,已經(jīng)不能滿足上面提到的第一個約束,即C不會給B投票,而A和B顯然都 不會給對方投票敛纲。A勝出之后喂击,會給B,C發(fā)心跳消息,節(jié)點B發(fā)現(xiàn)節(jié)點A的term不低于自己的term淤翔, 知道有已經(jīng)有Leader了翰绊,于是轉換成follower

  • 第三種情況, 沒有任何節(jié)點獲得超過半數(shù)的投票旁壮,可能是平票的情況监嗜。加入總共有四個節(jié)點 (A/B/C/D),Node C寡具、Node D同時成為了candidate,但Node A投了NodeD一票稚补,NodeB投 了Node C一票童叠,這就出現(xiàn)了平票 split vote的情況。這個時候大家都在等啊等课幕,直到超時后重新發(fā) 起選舉厦坛。如果出現(xiàn)平票的情況,那么就延長了系統(tǒng)不可用的時間,因此raft引入了randomized election timeouts來盡量避免平票情況

源碼分析:

Server節(jié)點數(shù)據(jù)獲取是通過ServerListManager從文件和內存數(shù)據(jù)讀取以及比較來的

public class ServerListManager{    
    @PostConstruct
    public void init() {
        //這個獲取server列表乍惊,列表更新也會觸發(fā)對應事件
        GlobalExecutor.registerServerListUpdater(new ServerListUpdater());
        //服務狀態(tài)監(jiān)測
        GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 2000);
    }
}

下面是Raft核心算法

public class RaftCore {
    // 杜秸。。润绎。 
    @PostConstruct
    public void init() throws Exception {

        Loggers.RAFT.info("initializing Raft sub-system");

        executor.submit(notifier);

        long start = System.currentTimeMillis();

        raftStore.loadDatums(notifier, datums);
        //設置term,默認給0撬碟,如果本地文件獲取有term會用數(shù)據(jù)中的term
        setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
    
        Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());
        //這里不是重點;就是簡單service,instance的任務要保證先完成莉撇,再選舉而已
        while (true) {
            if (notifier.tasks.size() <= 0) {
                break;
            }
            Thread.sleep(1000L);
        }

        initialized = true;
      //這里是一個定時任務負責master的選舉
        GlobalExecutor.registerMasterElection(new MasterElection());
        //這個負責定時心跳處理呢蛤,就是客戶端發(fā)送心跳給服務端
        GlobalExecutor.registerHeartbeat(new HeartBeat());

        Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
            GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
    }
    
}

1)leader選舉

就是基于本地啟動發(fā)起選舉

 //Master的選舉
 public class MasterElection implements Runnable {
        @Override
        public void run() {
            try {
                //表示peers狀態(tài)好了,這個是通過listerner監(jiān)控獲取的
                if (!peers.isReady()) {
                    return;
                }

                RaftPeer local = peers.local();
                //這個是要等待多久開啟發(fā)起選舉棍郎,
                //1.local.leaderDueMs的值一開始是隨機生成的其障,范圍是[0, 15000),單位是毫秒涂佃,
                //2.此后按照500毫秒一個梯度進行遞減励翼,減少到≤0后,就會觸發(fā)選舉操作辜荠。當然汽抚,選舉之前,把超時時間重置一下伯病。
               //3.resetLeaderDue()方法是把leaderDueMs變量重新賦值殊橙,但是并不是像初始化隨機賦值一樣的邏輯,而是在15000毫秒的基礎上加上了一個隨機值,其隨機值的范圍是[0, 5000)毫秒膨蛮。

                local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;

                if (local.leaderDueMs > 0) {
                    return;
                }

                // reset timeout
                local.resetLeaderDue();
                local.resetHeartbeatDue();
                //發(fā)起選舉
                sendVote();
            } catch (Exception e) {
                Loggers.RAFT.warn("[RAFT] error while master election {}", e);
            }

        }

        public void sendVote() {

            RaftPeer local = peers.get(NetUtils.localServer());
            Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}",
                JSON.toJSONString(getLeader()), local.term);

            peers.reset();
            //本地的term增加一
            local.term.incrementAndGet();
            //先投給自己
            local.voteFor = local.ip;
            //更新自己的狀態(tài)
            local.state = RaftPeer.State.CANDIDATE;

            Map<String, String> params = new HashMap<>(1);
            params.put("vote", JSON.toJSONString(local));
            //發(fā)給除了自己所有其他服務節(jié)點
            for (final String server : peers.allServersWithoutMySelf()) {
                final String url = buildURL(server, API_VOTE);
                try {
                    //會請求RaftController的raft/vote接口做 處理叠纹;
                    HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() {
                        @Override
                        public Integer onCompleted(Response response) throws Exception {
                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url);
                                return 1;
                            }

                            RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class);

                            Loggers.RAFT.info("received approve from peer: {}", JSON.toJSONString(peer));
                            //拿到投票結果,選舉最終的leader
                            peers.decideLeader(peer);

                            return 0;
                        }
                    });
                } catch (Exception e) {
                    Loggers.RAFT.warn("error while sending vote to server: {}", server);
                }
            }
        }
    }
}
//Raft節(jié)點集
public class RaftPeerSet {
      //投票選出leader,每次投票結果返回都會執(zhí)行這個操作
      public RaftPeer decideLeader(RaftPeer candidate) {
        peers.put(candidate.ip, candidate);

        SortedBag ips = new TreeBag();
        int maxApproveCount = 0;
        String maxApprovePeer = null;
        for (RaftPeer peer : peers.values()) {
            if (StringUtils.isEmpty(peer.voteFor)) {
                continue;
            }

            ips.add(peer.voteFor);
            if (ips.getCount(peer.voteFor) > maxApproveCount) {
                maxApproveCount = ips.getCount(peer.voteFor);
                maxApprovePeer = peer.voteFor;
            }
        }
        //比較最大投票數(shù)并且當投票數(shù)量 總投票節(jié)點 過半敞葛,將該節(jié)點設置為leader
        if (maxApproveCount >= majorityCount()) {
            RaftPeer peer = peers.get(maxApprovePeer);
            peer.state = RaftPeer.State.LEADER;
        
            if (!Objects.equals(leader, peer)) {
                leader = peer;
                //發(fā)布事件誉察,leader已經(jīng)選舉完成了
                applicationContext.publishEvent(new LeaderElectFinishedEvent(this, leader));
                Loggers.RAFT.info("{} has become the LEADER", leader.ip);
            }
        }

        return leader;
    }
    
}

RaftPeer收到投票做處理

public class RaftController {
   @NeedAuth
    @PostMapping("/vote")
    public JSONObject vote(HttpServletRequest request, HttpServletResponse response) throws Exception {

        RaftPeer peer = raftCore.receivedVote(
            JSON.parseObject(WebUtils.required(request, "vote"), RaftPeer.class));

        return JSON.parseObject(JSON.toJSONString(peer));
    }
}
 //RaftCore
  public synchronized RaftPeer receivedVote(RaftPeer remote) {
        if (!peers.contains(remote)) {
            throw new IllegalStateException("can not find peer: " + remote.ip);
        }

        RaftPeer local = peers.get(NetUtils.localServer());
        //如果當前的term大于遠程的term,返回本地的給遠端作為投票返回
        if (remote.term.get() <= local.term.get()) {
            String msg = "received illegitimate vote" +
                ", voter-term:" + remote.term + ", votee-term:" + local.term;

            Loggers.RAFT.info(msg);
            if (StringUtils.isEmpty(local.voteFor)) {
                local.voteFor = local.ip;
            }

            return local;
        }
        //第一個收到就返回先收到的投票
        local.resetLeaderDue();

        local.state = RaftPeer.State.FOLLOWER;
        local.voteFor = remote.ip;
        local.term.set(remote.term.get());

        Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);

        return local;
    }

總結一下上面的leader節(jié)點選舉
腦子里一定要考慮到幾個節(jié)點在投票的情景惹谐,比如下面這個:

vote-myself6.png

  • 1.啟動后會先判定服務是否已經(jīng)啟動好持偏,啟動好后每一個服務有一個初始化leader選舉到期時間,會不停的減去500ms氨肌,直到小于等于0才執(zhí)行選舉

  • 2.選舉的時候term增加1鸿秆;狀態(tài)設置為candidate,并把自己節(jié)點作為投票節(jié)點向其他所有節(jié)點發(fā)送投票選舉;

  • 3.節(jié)點拿到選舉投票信息后怎囚,如果如果遠端的節(jié)點term不能大于本地節(jié)點信息就返回本地投票結果給對方

  • 4.選舉獲取投票結果后會加入本次 統(tǒng)計本地所有的投票信息卿叽,判定如果最多投票節(jié)點數(shù)過半了,就選出leader了恳守;

2) 心跳檢測leader健康

心跳檢測 leader是否掛了考婴,如果掛了是要重新選舉的;
這個是leader主動發(fā)起心跳給每一個follower節(jié)點催烘,重置服務端leader的選舉發(fā)生
上面我們初始化了一個啟動RaftCore的時候創(chuàng)建了
GlobalExecutor.registerHeartbeat(new HeartBeat());
我們看這個心跳節(jié)點信息

  public class HeartBeat implements Runnable {
        @Override
        public void run() {
            try {

                if (!peers.isReady()) {
                    return;
                }
                //和上面的道理相同沥阱,有一個心跳延時緩沖
                RaftPeer local = peers.local();
                local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
                if (local.heartbeatDueMs > 0) {
                    return;
                }

                local.resetHeartbeatDue();
                //發(fā)起心跳
                sendBeat();
            } catch (Exception e) {
                Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
            }

          public void sendBeat() throws IOException, InterruptedException {
            RaftPeer local = peers.local();
            //這里表示在集群中,不是leader就不執(zhí)行了
            if (local.state != RaftPeer.State.LEADER && !STANDALONE_MODE) {
                return;
            }

            if (Loggers.RAFT.isDebugEnabled()) {
                Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());
            }
            //同樣重置leader選舉時間
            local.resetLeaderDue();

            // build data
            JSONObject packet = new JSONObject();
            packet.put("peer", local);

            JSONArray array = new JSONArray();

            if (switchDomain.isSendBeatOnly()) {
                Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", String.valueOf(switchDomain.isSendBeatOnly()));
            }
            //這個和命名服務service伊群、instance有關考杉,表示把當前節(jié)點的servive,instance打包發(fā)送給從節(jié)點判定變化情況
            //默認也會走這個邏輯,除非手動給關閉了
            if (!switchDomain.isSendBeatOnly()) {
                for (Datum datum : datums.values()) {

                    JSONObject element = new JSONObject();

                    if (KeyBuilder.matchServiceMetaKey(datum.key)) {
                        element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
                    } else if (KeyBuilder.matchInstanceListKey(datum.key)) {
                        element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
                    }
                    element.put("timestamp", datum.timestamp);

                    array.add(element);
                }
            }

            packet.put("datums", array);
            // broadcast
            Map<String, String> params = new HashMap<String, String>(1);
            params.put("beat", JSON.toJSONString(packet));

            String content = JSON.toJSONString(params);
            //gizp壓縮,如果帶有service,instance避免文件過大
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            GZIPOutputStream gzip = new GZIPOutputStream(out);
            gzip.write(content.getBytes(StandardCharsets.UTF_8));
            gzip.close();

            byte[] compressedBytes = out.toByteArray();
            String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);

            if (Loggers.RAFT.isDebugEnabled()) {
                Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}",
                    content.length(), compressedContent.length());
            }
            //給除了自己的所有節(jié)點發(fā)送心跳信息
            for (final String server : peers.allServersWithoutMySelf()) {
                try {
                    final String url = buildURL(server, API_BEAT);
                    if (Loggers.RAFT.isDebugEnabled()) {
                        Loggers.RAFT.debug("send beat to server " + server);
                    }
                    //異步發(fā)送url=vote/beat
                    HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new AsyncCompletionHandler<Integer>() {
                        @Override
                        public Integer onCompleted(Response response) throws Exception {
                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}",
                                    response.getResponseBody(), server);
                                MetricsMonitor.getLeaderSendBeatFailedException().increment();
                                return 1;
                            }
                            //成功返回結果更新本地ip舰始,peer信息
                            peers.update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));
                            if (Loggers.RAFT.isDebugEnabled()) {
                                Loggers.RAFT.debug("receive beat response from: {}", url);
                            }
                            return 0;
                        }

                        @Override
                        public void onThrowable(Throwable t) {
                            Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server, t);
                            MetricsMonitor.getLeaderSendBeatFailedException().increment();
                        }
                    });
                } catch (Exception e) {
                    Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
                    MetricsMonitor.getLeaderSendBeatFailedException().increment();
                }
            }

        }

        }

上面就是簡單遍歷異步發(fā)送給除自己之外的所有節(jié)點自己的信息奔则;

其他節(jié)點拿到請求處理
RaftController

   @PostMapping("/beat")
    public JSONObject beat(HttpServletRequest request, HttpServletResponse response) throws Exception {

        String entity = new String(IoUtils.tryDecompress(request.getInputStream()), StandardCharsets.UTF_8);
        String value = URLDecoder.decode(entity, "UTF-8");
        value = URLDecoder.decode(value, "UTF-8");

        JSONObject json = JSON.parseObject(value);
        JSONObject beat = JSON.parseObject(json.getString("beat"));
        //這里處理
        RaftPeer peer = raftCore.receivedBeat(beat);

        return JSON.parseObject(JSON.toJSONString(peer));
    }

    //收到心跳信息的處理
    public RaftPeer receivedBeat(JSONObject beat) throws Exception {
        final RaftPeer local = peers.local();
        final RaftPeer remote = new RaftPeer();
        remote.ip = beat.getJSONObject("peer").getString("ip");
        remote.state = RaftPeer.State.valueOf(beat.getJSONObject("peer").getString("state"));
        remote.term.set(beat.getJSONObject("peer").getLongValue("term"));
        remote.heartbeatDueMs = beat.getJSONObject("peer").getLongValue("heartbeatDueMs");
        remote.leaderDueMs = beat.getJSONObject("peer").getLongValue("leaderDueMs");
        remote.voteFor = beat.getJSONObject("peer").getString("voteFor");

        if (remote.state != RaftPeer.State.LEADER) {
            Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}",
                remote.state, JSON.toJSONString(remote));
            throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
        }

        if (local.term.get() > remote.term.get()) {
            Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}"
                , remote.term.get(), local.term.get(), JSON.toJSONString(remote), local.leaderDueMs);
            throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get()
                + ", beat-to-term: " + local.term.get());
        }

        if (local.state != RaftPeer.State.FOLLOWER) {

            Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JSON.toJSONString(remote));
            // mk follower,如果當前節(jié)點不是follower節(jié)點,強制設置為follower節(jié)點
            local.state = RaftPeer.State.FOLLOWER;
            local.voteFor = remote.ip;
        }

        final JSONArray beatDatums = beat.getJSONArray("datums");
        //重置本地leader的選舉過期蔽午,
        local.resetLeaderDue();
        //重置心跳時間
        local.resetHeartbeatDue();
        //1.這個表示獲取到的是leader會把本地節(jié)點的leader應用更新為當前的遠程主節(jié)點
        //2.遍歷自己所有的peer易茬,將原來存儲老舊的leader節(jié)點 狀態(tài)由leader->follower狀態(tài)
        peers.makeLeader(remote);
        //這個表示除了發(fā)送心跳還有其他信息(service,instance)發(fā)送,這下面也主要是解析這些信息
        if (!switchDomain.isSendBeatOnly()) {

            Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());
            //本地key記錄及老,如果有遠程沒有傳遞過來該key;就表示檔期key為失效的key了抽莱,當前節(jié)點會刪除key的所有信息
            for (Map.Entry<String, Datum> entry : datums.entrySet()) {
                receivedKeysMap.put(entry.getKey(), 0);
            }

            // now check datums,這個表示會批量執(zhí)行本地更新的datum數(shù)據(jù)
            List<String> batch = new ArrayList<>();

            int processedCount = 0;
            if (Loggers.RAFT.isDebugEnabled()) {
                Loggers.RAFT.debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",
                    beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);
            }
            for (Object object : beatDatums) {
                processedCount = processedCount + 1;

                JSONObject entry = (JSONObject) object;
                String key = entry.getString("key");
                final String datumKey;

                if (KeyBuilder.matchServiceMetaKey(key)) {
                    datumKey = KeyBuilder.detailServiceMetaKey(key);
                } else if (KeyBuilder.matchInstanceListKey(key)) {
                    datumKey = KeyBuilder.detailInstanceListkey(key);
                } else {
                    // ignore corrupted key:
                    continue;
                }

                long timestamp = entry.getLong("timestamp");

                receivedKeysMap.put(datumKey, 1);

                try {
                    if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) {
                        continue;
                    }
                    //如果當前內存不存在該key或者時間戳小于 遠程記錄時間搓骄恶,就會記錄起來后面批量執(zhí)行更新
                    if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
                        batch.add(datumKey);
                    }
                    //表示batch批量執(zhí)行個數(shù) 大于50食铐;或者最后一個處理了就會往下執(zhí)行了
                    if (batch.size() < 50 && processedCount < beatDatums.size()) {
                        continue;
                    }

                    String keys = StringUtils.join(batch, ",");

                    if (batch.size() <= 0) {
                        continue;
                    }

                    Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}, datums' size is {}, RaftCore.datums' size is {}"
                        , getLeader().ip, batch.size(), processedCount, beatDatums.size(), datums.size());

                    // update datum entry;從遠程主節(jié)點再次拉取datum信息(service/instance)
                    String url = buildURL(remote.ip, API_GET) + "?keys=" + URLEncoder.encode(keys, "UTF-8");
                    HttpClient.asyncHttpGet(url, null, null, new AsyncCompletionHandler<Integer>() {
                        @Override
                        public Integer onCompleted(Response response) throws Exception {
                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                return 1;
                            }

                            List<JSONObject> datumList = JSON.parseObject(response.getResponseBody(), new TypeReference<List<JSONObject>>() {
                            });
                            //對于遠程的datum數(shù)據(jù)處理
                            for (JSONObject datumJson : datumList) {
                                OPERATE_LOCK.lock();
                                Datum newDatum = null;
                                try {
                                    //獲取本地舊的datum信息
                                    Datum oldDatum = getDatum(datumJson.getString("key"));

                                    if (oldDatum != null && datumJson.getLongValue("timestamp") <= oldDatum.timestamp.get()) {
                                        Loggers.RAFT.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",
                                            datumJson.getString("key"), datumJson.getLongValue("timestamp"), oldDatum.timestamp);
                                        continue;
                                    }
                                    //滿足service時僧鲁,解析為servive的Datum
                                    if (KeyBuilder.matchServiceMetaKey(datumJson.getString("key"))) {
                                        Datum<Service> serviceDatum = new Datum<>();
                                        serviceDatum.key = datumJson.getString("key");
                                        serviceDatum.timestamp.set(datumJson.getLongValue("timestamp"));
                                        serviceDatum.value =
                                            JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Service.class);
                                        newDatum = serviceDatum;
                                    }
                                    //滿足instance時虐呻,解析為instance的Datum
                                    if (KeyBuilder.matchInstanceListKey(datumJson.getString("key"))) {
                                        Datum<Instances> instancesDatum = new Datum<>();
                                        instancesDatum.key = datumJson.getString("key");
                                        instancesDatum.timestamp.set(datumJson.getLongValue("timestamp"));
                                        instancesDatum.value =
                                            JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Instances.class);
                                        newDatum = instancesDatum;
                                    }

                                    if (newDatum == null || newDatum.value == null) {
                                        Loggers.RAFT.error("receive null datum: {}", datumJson);
                                        continue;
                                    }
                                    //對于變化的 service,instance寫磁盤
                                    raftStore.write(newDatum);
                                    //更新內存中datum
                                    datums.put(newDatum.key, newDatum);
                                    //發(fā)布事件象泵,服務信息變化了
                                    notifier.addTask(newDatum.key, ApplyAction.CHANGE);
                                    //重置本地leader的過期時間
                                    local.resetLeaderDue();
                                    //這里本地的term加上100大于遠程的term值會更新本地leader的term值,并保持自己local的term一致
                                    //為什么會+100斟叼;原因是每次service偶惠、instance更新會term的值都會遞增100
                                    if (local.term.get() + 100 > remote.term.get()) {
                                        getLeader().term.set(remote.term.get());
                                        local.term.set(getLeader().term.get());
                                    } else {
                                        local.term.addAndGet(100);
                                    }
                                    //跟新本地元信息meta.properties文件內容
                                    raftStore.updateTerm(local.term.get());

                                    Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",
                                        newDatum.key, newDatum.timestamp, JSON.toJSONString(remote), local.term);

                                } catch (Throwable e) {
                                    Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum, e);
                                } finally {
                                    OPERATE_LOCK.unlock();
                                }
                            }
                            TimeUnit.MILLISECONDS.sleep(200);
                            return 0;
                        }
                    });

                    batch.clear();

                } catch (Exception e) {
                    Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);
                }

            }

            //選出dead失效的datum信息,并且從內存和磁盤刪除掉
            List<String> deadKeys = new ArrayList<>();
            for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
                if (entry.getValue() == 0) {
                    deadKeys.add(entry.getKey());
                }
            }

            for (String deadKey : deadKeys) {
                try {
                    deleteDatum(deadKey);
                } catch (Exception e) {
                    Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);
                }
            }

        }

        return local;
    }

總結下上面心跳信息處理

  • leader節(jié)點將自己節(jié)點信息以及自己本地的所有的datum打包發(fā)送到follower節(jié)點
  • follower節(jié)點收到信息后馬上重置leaderDue時間朗涩,避免follower節(jié)點發(fā)現(xiàn)leader掛了發(fā)生下一次選舉
  • 另外的就是會用leader的datum和本地的datum進行比對忽孽,刪除失效的datum并且并且更新datum信息,更新的時候回對于term作100判斷谢床,添加100兄一;

注:datum就是注冊中心的服務service和實例instance信息的包裝

2.數(shù)據(jù)處理

對于事務操作,請求會轉發(fā)給leader识腿; 非事務操作出革,可以任意一個節(jié)點來處理 在發(fā)布內容的時候,做了兩個事情

  • 如果當前的節(jié)點不是leader渡讼,則轉發(fā)給leader節(jié)點處理

  • 如果是骂束,則向所有節(jié)點發(fā)送onPublish,分析服務注冊會使用到

RaftCore

    public void signalPublish(String key, Record value) throws Exception {

        if (!isLeader()) {
            JSONObject params = new JSONObject();
            params.put("key", key);
            params.put("value", value);
            Map<String, String> parameters = new HashMap<>(1);
            parameters.put("key", key);
            //leader代理操作
            raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);
            return;
        }

        try {
            OPERATE_LOCK.lock();
            long start = System.currentTimeMillis();
            final Datum datum = new Datum();
            datum.key = key;
            datum.value = value;
            if (getDatum(key) == null) {
                datum.timestamp.set(1L);
            } else {
                datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
            }

            JSONObject json = new JSONObject();
            json.put("datum", datum);
            json.put("source", peers.local());
            //這里會更新本地datum和文件緩存硝全;另外term值會向上遞增100
            onPublish(datum, peers.local());

            final String content = JSON.toJSONString(json);
            //過半策略
            final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
            for (final String server : peers.allServersIncludeMyself()) {
                if (isLeader(server)) {
                    latch.countDown();
                    continue;
                }
                final String url = buildURL(server, API_ON_PUB);
                HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {
                    @Override
                    public Integer onCompleted(Response response) throws Exception {
                        if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                            Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                datum.key, server, response.getStatusCode());
                            return 1;
                        }
                        latch.countDown();
                        return 0;
                    }

                    @Override
                    public STATE onContentWriteCompleted() {
                        return STATE.CONTINUE;
                    }
                });

            }

            if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
                // only majority servers return success can we consider this update success
                Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
                throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
            }

            long end = System.currentTimeMillis();
            Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
        } finally {
            OPERATE_LOCK.unlock();
        }
    }


    public void onPublish(Datum datum, RaftPeer source) throws Exception {
        RaftPeer local = peers.local();
        if (datum.value == null) {
            Loggers.RAFT.warn("received empty datum");
            throw new IllegalStateException("received empty datum");
        }

        if (!peers.isLeader(source.ip)) {
            Loggers.RAFT.warn("peer {} tried to publish data but wasn't leader, leader: {}",
                JSON.toJSONString(source), JSON.toJSONString(getLeader()));
            throw new IllegalStateException("peer(" + source.ip + ") tried to publish " +
                "data but wasn't leader");
        }

        if (source.term.get() < local.term.get()) {
            Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}",
                JSON.toJSONString(source), JSON.toJSONString(local));
            throw new IllegalStateException("out of date publish, pub-term:"
                + source.term.get() + ", cur-term: " + local.term.get());
        }

        local.resetLeaderDue();

        // if data should be persistent, usually this is always true:
        if (KeyBuilder.matchPersistentKey(datum.key)) {
            raftStore.write(datum);
        }
        //更新內存中的值
        datums.put(datum.key, datum);
        //term值遞增100
        if (isLeader()) {
            local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
        } else {
            if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
                //set leader term:
                getLeader().term.set(source.term.get());
                local.term.set(getLeader().term.get());
            } else {
                local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
            }
        }
        raftStore.updateTerm(local.term.get());

        notifier.addTask(datum.key, ApplyAction.CHANGE);

        Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
    }
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末栖雾,一起剝皮案震驚了整個濱河市楞抡,隨后出現(xiàn)的幾起案子伟众,更是在濱河造成了極大的恐慌,老刑警劉巖召廷,帶你破解...
    沈念sama閱讀 222,378評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件凳厢,死亡現(xiàn)場離奇詭異,居然都是意外死亡竞慢,警方通過查閱死者的電腦和手機先紫,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,970評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來筹煮,“玉大人遮精,你說我怎么就攤上這事“芰剩” “怎么了本冲?”我有些...
    開封第一講書人閱讀 168,983評論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長劫扒。 經(jīng)常有香客問我檬洞,道長,這世上最難降的妖魔是什么沟饥? 我笑而不...
    開封第一講書人閱讀 59,938評論 1 299
  • 正文 為了忘掉前任添怔,我火速辦了婚禮湾戳,結果婚禮上,老公的妹妹穿的比我還像新娘广料。我一直安慰自己砾脑,他們只是感情好,可當我...
    茶點故事閱讀 68,955評論 6 398
  • 文/花漫 我一把揭開白布性昭。 她就那樣靜靜地躺著拦止,像睡著了一般。 火紅的嫁衣襯著肌膚如雪糜颠。 梳的紋絲不亂的頭發(fā)上汹族,一...
    開封第一講書人閱讀 52,549評論 1 312
  • 那天,我揣著相機與錄音其兴,去河邊找鬼顶瞒。 笑死,一個胖子當著我的面吹牛元旬,可吹牛的內容都是我干的榴徐。 我是一名探鬼主播,決...
    沈念sama閱讀 41,063評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼匀归,長吁一口氣:“原來是場噩夢啊……” “哼坑资!你這毒婦竟也來了?” 一聲冷哼從身側響起穆端,我...
    開封第一講書人閱讀 39,991評論 0 277
  • 序言:老撾萬榮一對情侶失蹤袱贮,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后体啰,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體攒巍,經(jīng)...
    沈念sama閱讀 46,522評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,604評論 3 342
  • 正文 我和宋清朗相戀三年荒勇,在試婚紗的時候發(fā)現(xiàn)自己被綠了柒莉。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,742評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡沽翔,死狀恐怖兢孝,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情仅偎,我是刑警寧澤跨蟹,帶...
    沈念sama閱讀 36,413評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站哨颂,受9級特大地震影響喷市,放射性物質發(fā)生泄漏。R本人自食惡果不足惜威恼,卻給世界環(huán)境...
    茶點故事閱讀 42,094評論 3 335
  • 文/蒙蒙 一品姓、第九天 我趴在偏房一處隱蔽的房頂上張望寝并。 院中可真熱鬧,春花似錦腹备、人聲如沸衬潦。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,572評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽镀岛。三九已至,卻和暖如春友驮,著一層夾襖步出監(jiān)牢的瞬間漂羊,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,671評論 1 274
  • 我被黑心中介騙來泰國打工卸留, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留走越,地道東北人。 一個月前我還...
    沈念sama閱讀 49,159評論 3 378
  • 正文 我出身青樓耻瑟,卻偏偏與公主長得像旨指,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子喳整,可洞房花燭夜當晚...
    茶點故事閱讀 45,747評論 2 361

推薦閱讀更多精彩內容