Nacos配置中心

要分析Nacos源碼僻他,好歹我們也通過源碼啟動起來,這樣也方便我們debug代碼腊尚。
注:nacos1.1.3

文章篇幅較長吨拗,一定要有耐心鸠匀;如果有疑問歡迎咨詢討論

1.啟動服務(wù)

源碼下載好了根據(jù)我下面的步驟先啟動起來再說:

注:我們配置中心按照mysql存儲配置冠胯,如果用默認(rèn)derby的話耀态,直接按照第4步修改啟動即可
1.找到config模塊中-resource/META-INF/nacos-db.sql
2.mysql數(shù)據(jù)庫創(chuàng)建nacos庫蔗蹋,然后執(zhí)行上面的nacos-db.sql
3.修改 console模塊的application.properties,加入如下內(nèi)容:

spring.datasource.platform=mysql
db.num=1
db.url.0=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true
db.user=root
db.password=root

  1. 啟動參數(shù)添加-Dnacos.standalone=true 表示單機啟動
  2. 本地訪問127.0.0.1:8848/nacos/index.html就好了璧亮,默認(rèn)賬號密碼都是nacos

然后我們就看到這樣一個頁面:


console.png

其實這個頁面的對應(yīng)的代碼就是我們的console模塊,
模塊也很簡單刚照,就是基于Spring-Security做校驗伐庭,然后對于這個頁面做的一些CRUD
如下:


console-frame.png

2.配置中心解析

上面說了那么多的啟動內(nèi)容辫塌,終于到了我們這篇文章核心解析點了活鹰。
????簡單的提一下我們服務(wù)端的配置管理方式哈恰,服務(wù)端對于config除了基本的配置存儲;另外還有一個歷史存儲志群,每一次修改都有數(shù)據(jù)存儲着绷,UI界面也是可以查看的,還可以打標(biāo)锌云。解讀源碼前先要對于數(shù)據(jù)模型有個概念荠医,如下:

官網(wǎng)對于模型的描述:
腦袋里一定要構(gòu)建這兩個模型,這兩個模型第一個構(gòu)建數(shù)據(jù)模型的key,另外一個構(gòu)建具體的數(shù)據(jù)內(nèi)容彬向;配置中心的key主要是DataId
????Nacos 數(shù)據(jù)模型 Key 由三元組唯一確定, Namespace默認(rèn)是空串豫喧,公共命名空間(public),分組默認(rèn)是 DEFAULT_GROUP幢泼。


nacos-model.jpeg

????配置領(lǐng)域模型圍繞配置紧显,主要有兩個關(guān)聯(lián)的實體,一個是配置變更歷史缕棵,一個是服務(wù)標(biāo)簽(用于打標(biāo)分類孵班,方便索引),由 ID 關(guān)聯(lián)招驴。


nacos-config-model.jpeg

根據(jù)官方的例子來看配置中心相關(guān)內(nèi)容

public class ConfigExample {

    public static void main(String[] args) throws NacosException, InterruptedException {
        String serverAddr = "localhost";
        String dataId = "dubbo.properties";
        String group = "DEFAULT_GROUP";
        String namespace = "b1092a4a-3b8d-4e33-8874-55cee3839c1f";
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.NAMESPACE, namespace);
        properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
         //這里創(chuàng)建了配置服務(wù)篙程,個人猜想這里應(yīng)該是把該初始化的線程服務(wù)等你都啟動好了
        ConfigService configService = NacosFactory.createConfigService(properties);
        //根據(jù)dataId,group獲取配置信息
        String content = configService.getConfig(dataId, group, 5000);
        System.out.println(content);
        //添加監(jiān)聽器
        configService.addListener(dataId, group, new Listener() {
            @Override
            public void receiveConfigInfo(String configInfo) {
                System.out.println("receive:" + configInfo);
            }

            @Override
            public Executor getExecutor() {
                return null;
            }
        });

        boolean isPublishOk = configService.publishConfig(dataId, group, "content");
        System.out.println(isPublishOk);

        Thread.sleep(3000);
        content = configService.getConfig(dataId, group, 5000);
        System.out.println(content);

        boolean isRemoveOk = configService.removeConfig(dataId, group);
        System.out.println(isRemoveOk);
        Thread.sleep(3000);

        content = configService.getConfig(dataId, group, 5000);
        System.out.println(content);
        Thread.sleep(300000);

    }
}

2.1 創(chuàng)建ConfigService

先理解ConfigSerice中干了些什么?

//1.通過源碼可以發(fā)現(xiàn)這里是  NacosConfigService
ConfigService configService = NacosFactory.createConfigService(properties);


//2.如下别厘,這里調(diào)用NacosConfigService的構(gòu)造方法
public static ConfigService createConfigService(Properties properties) throws NacosException {
    try {
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        ConfigService vendorImpl = (ConfigService)constructor.newInstance(properties);
        return vendorImpl;
    } catch (Throwable e) {
        throw new NacosException(-400, e.getMessage());
    }
}

//3.NacosConfigService的構(gòu)造方法如下
public NacosConfigService(Properties properties) throws NacosException {
    String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
    if (StringUtils.isBlank(encodeTmp)) {
        encode = Constants.ENCODE;
    } else {
        encode = encodeTmp.trim();
    }
    String namespaceTmp = properties.getProperty(PropertyKeyConst.NAMESPACE);
    if (StringUtils.isBlank(namespaceTmp)) {
        namespace = TenantUtil.getUserTenant();
        properties.put(PropertyKeyConst.NAMESPACE, namespace);
    } else {
        namespace = namespaceTmp;
        properties.put(PropertyKeyConst.NAMESPACE, namespace);
    }
    //這里創(chuàng)建代理連接服務(wù)器
    agent = new ServerHttpAgent(properties);
    //這個其實是針對endpoint設(shè)置才會起作用虱饿,這里簡單說一下;
    //線程異步的通過nameServer命名服務(wù)獲取serverList
    agent.start();
    //這里就是客戶端主要后臺工作
    //configFilterChainManager就是攔截器管理器触趴,持有所有攔截器氮发,我們在客戶端可以配置相應(yīng)的攔截器
    worker = new ClientWorker(agent, configFilterChainManager);
}

我們繼續(xù)往下看,看這個ClientWorker主要干了些什么冗懦?

    public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
        this.agent = agent;
        this.configFilterChainManager = configFilterChainManager;

        // Initialize the timeout parameter

        init(properties);
        //初始化一個客戶端工作線程池爽冕,可以忽略展示不看
        executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
        //通過下面的命名longPolling,其實就有點來頭了披蕉;后面我們分析颈畸,這也是初識話一個線程池
        executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });

        executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                   //簡單的通過名稱看,這里就是檢查配置没讲,這是一個定時任務(wù)眯娱,10ms會執(zhí)行一次
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
    }

  //5.這里分批處理任務(wù),檢查配置信息爬凑,并更新數(shù)據(jù)信息徙缴,暫時先提到這里,后面具體分析贰谣;
public void checkConfigInfo() {
    // 分任務(wù) 娜搂;  cacheMap是一個全局變量和我們添加的Listener有關(guān)迁霎;
    int listenerSize = cacheMap.get().size();
    // 向上取整為批數(shù)    (ParamUtil.getPerTaskConfigSize()默認(rèn)數(shù)量3000)
    int longingTaskCount = (int)Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int)currentLongingTaskCount; i < longingTaskCount; i++) {
            // 要判斷任務(wù)是否在執(zhí)行 這塊需要好好想想吱抚。 任務(wù)列表現(xiàn)在是無序的。變化過程可能有問題
            executorService.execute(new LongPullingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}

總結(jié)一下創(chuàng)建NacosConfigServie做了些什么考廉?

  • 1.初始化我們配置properties秘豹,解析我們一些配置參數(shù),創(chuàng)建我們NacosCofigService服務(wù)用于我們客戶端執(zhí)行操作
  • 2.創(chuàng)建http代理類昌粤,如果我們基于endPoint的命名服務(wù)獲取服務(wù)列表既绕,會有定時線程跑獲取serverList這里不展開啄刹,可以自己閱讀源碼
  • 3.初始化了兩個線程池,一個線程池executor(單線程池)10ms定時執(zhí)行一次檢查配置信息凄贩,檢查配置如果要執(zhí)行又通過另外一個線程池executorService來執(zhí)行
  • 4.具體執(zhí)行又要通過cacheMap(主要功能就是監(jiān)聽器存儲者)的數(shù)量來決定分幾批來執(zhí)行誓军,檢查配置后面在詳細(xì)講解(和長輪訓(xùn)有關(guān)哦)

2.2 獲取配置信息getConfig

下面這一句簡單的代碼,具體有發(fā)生了什么了疲扎,下面我們娓娓道來昵时;
String content = configService.getConfig(dataId, group, 5000);

2.2.1 客戶端分析


  //直接點擊進源碼,來到核心點NacosConfigSercie這個方法
  private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
        group = null2defaultGroup(group);
        ParamUtils.checkKeyParam(dataId, group);//判斷不能為空
        ConfigResponse cr = new ConfigResponse();

        cr.setDataId(dataId);
        cr.setTenant(tenant);
        cr.setGroup(group);

        // 優(yōu)先使用本地配置 (這個就是客戶端本地磁盤寫的文件)
        String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
        if (content != null) {
            LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),
                dataId, group, tenant, ContentUtils.truncateContent(content));
            cr.setContent(content);
            //獲取到配置信息后執(zhí)行過濾器過濾
            configFilterChainManager.doFilter(null, cr);
            content = cr.getContent();
            return content;
        }

        try {
            //通過遠(yuǎn)程server獲取配置內(nèi)容椒丧,我們主要分析這里
            content = worker.getServerConfig(dataId, group, tenant, timeoutMs);

            cr.setContent(content);
          //獲取到配置信息后執(zhí)行過濾器過濾
            configFilterChainManager.doFilter(null, cr);
            content = cr.getContent();
            //直接返回
            return content;
        } catch (NacosException ioe) {
            if (NacosException.NO_RIGHT == ioe.getErrCode()) {
                throw ioe;
            }
            LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",
                agent.getName(), dataId, group, tenant, ioe.toString());
        }

        LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),
            dataId, group, tenant, ContentUtils.truncateContent(content));
        content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
        cr.setContent(content);
        configFilterChainManager.doFilter(null, cr);
        content = cr.getContent();
        return content;
    }

通過上面獲取代碼發(fā)現(xiàn)壹甥,這里委托給ClientWorker去實現(xiàn)遠(yuǎn)程配置信心的拉取,繼續(xù)看源碼壶熏;

    public String getServerConfig(String dataId, String group, String tenant, long readTimeout)
        throws NacosException {
        if (StringUtils.isBlank(group)) {
            group = Constants.DEFAULT_GROUP;
        }

        HttpResult result = null;
        try {
            List<String> params = null;
            if (StringUtils.isBlank(tenant)) {
                params = Arrays.asList("dataId", dataId, "group", group);
            } else {
                params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant);
            }
            //代理類這里就是我們最開始初始化 new MetricsHttpAgent(new ServerHttpAgent(properties))的代理句柠;這里的路徑是/configs;記住這是要去服務(wù)端看的
            result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
        } catch (IOException e) {
            String message = String.format(
                "[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(),
                dataId, group, tenant);
            LOGGER.error(message, e);
            throw new NacosException(NacosException.SERVER_ERROR, e);
        }

        switch (result.code) {
            case HttpURLConnection.HTTP_OK:
                //沒開啟本地緩存會存儲快照
                LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
                return result.content;
            case HttpURLConnection.HTTP_NOT_FOUND:
                LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
                return null;
            
           
        }
    }

其實我們客戶端分析到這里就差不多了棒假,再看一下遠(yuǎn)程怎么獲取業(yè)務(wù)就好了溯职;不過我還是想看一下,在集群模式下帽哑,我們客戶端怎么請求 哪一個服務(wù)缸榄,請求失敗了又怎么處理的;所以我在往下走了一步祝拯。

    @Override
    public HttpResult httpGet(String path, List<String> headers, List<String> paramValues, String encoding,
                              long readTimeoutMs) throws IOException {
        //這個就是請求超時時間甚带,下面一個do循環(huán),超出這個時間則推出遠(yuǎn)程請求
        final long endTime = System.currentTimeMillis() + readTimeoutMs;
        final boolean isSSL = false;
        //這里就是我們需要請求的遠(yuǎn)程url佳头,這里其實是按照權(quán)重選擇的一個鹰贵,特別是在集群中的時候
        String currentServerAddr = serverListMgr.getCurrentServerAddr();
        int maxRetry = this.maxRetry;

        do {
            try {
                List<String> newHeaders = getSpasHeaders(paramValues);
                if (headers != null) {
                    newHeaders.addAll(headers);
                }
                //發(fā)起請求
                HttpResult result = HttpSimpleClient.httpGet(
                    getUrl(currentServerAddr, path), newHeaders, paramValues, encoding,
                    readTimeoutMs, isSSL);
                if (result.code == HttpURLConnection.HTTP_INTERNAL_ERROR
                    || result.code == HttpURLConnection.HTTP_BAD_GATEWAY
                    || result.code == HttpURLConnection.HTTP_UNAVAILABLE) {
                    LOGGER.error("[NACOS ConnectException] currentServerAddr: {}, httpCode: {}",
                        serverListMgr.getCurrentServerAddr(), result.code);
                } else {
                    //有可能有服務(wù)請求失敗,會更新為最新的遠(yuǎn)程server地址
                    // Update the currently available server addr
                    serverListMgr.updateCurrentServerAddr(currentServerAddr);
                    return result;
                }
            } catch (ConnectException ce) {
                LOGGER.error("[NACOS ConnectException httpGet] currentServerAddr:{}, err : {}", serverListMgr.getCurrentServerAddr(), ce.getMessage());
            } catch (SocketTimeoutException stoe) {
                LOGGER.error("[NACOS SocketTimeoutException httpGet] currentServerAddr:{}康嘉, err : {}", serverListMgr.getCurrentServerAddr(), stoe.getMessage());
            } catch (IOException ioe) {
                LOGGER.error("[NACOS IOException httpGet] currentServerAddr: " + serverListMgr.getCurrentServerAddr(), ioe);
                throw ioe;
            }

            if (serverListMgr.getIterator().hasNext()) {
                currentServerAddr = serverListMgr.getIterator().next();
            } else {
                maxRetry--;
                if (maxRetry < 0) {
                    throw new ConnectException("[NACOS HTTP-GET] The maximum number of tolerable server reconnection errors has been reached");
                }
              //serverlist;請求失敗的情況下這個list實現(xiàn)了Iterator功能碉输,通過這個來獲取下一個服務(wù)
                serverListMgr.refreshCurrentServerAddr();
            }

        } while (System.currentTimeMillis() <= endTime);

        LOGGER.error("no available server");
        throw new ConnectException("no available server");
    }

客戶端我們請求的流程就先到這里了,也先來一個小總結(jié):

  • 1.先通過本地緩存文件獲取亭珍,如果存在則直接通過本地文件拉取
  • 2.如果本地文件沒有則通過遠(yuǎn)程服務(wù)拉取敷钾,如果還是沒有在本地緩存沒有開啟的情況下通過本地快照文件拉取
  • 3.我們在請求遠(yuǎn)程服務(wù)端的時候會選擇循環(huán)用某個 服務(wù)請求,其中請求失敗會換一個連接請求肄梨;在超時時間內(nèi)沒有結(jié)果會直接返回

2.2.2 服務(wù)器處理配置請求

如下就是一個簡單的get請求

    @GetMapping
    public void getConfig(HttpServletRequest request, HttpServletResponse response,
                          @RequestParam("dataId") String dataId, @RequestParam("group") String group,
                          @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY)
                              String tenant,
                          @RequestParam(value = "tag", required = false) String tag)
        throws IOException, ServletException, NacosException {
        // check params
        ParamUtils.checkParam(dataId, group, "datumId", "content");
        ParamUtils.checkParam(tag);

        final String clientIp = RequestUtil.getRemoteIp(request);
        inner.doGetConfig(request, response, dataId, group, tenant, tag, clientIp);
    }

服務(wù)端真正處理的實現(xiàn)來了:

    /**
     * 同步配置獲取接口
     */
 public String doGetConfig(HttpServletRequest request, HttpServletResponse response, String dataId, String group,
                              String tenant, String tag, String clientIp) throws IOException, ServletException {
        //通過dataId阻荒,group,tenant拼接為字符串作為group key
        final String groupKey = GroupKey2.getKey(dataId, group, tenant);
        String autoTag = request.getHeader("Vipserver-Tag");
        String requestIpApp = RequestUtil.getAppName(request);
        //這是nacos自己實現(xiàn)的自旋鎖众羡,超級簡單的鎖
        int lockResult = tryConfigReadLock(groupKey);

        final String requestIp = RequestUtil.getRemoteIp(request);
        boolean isBeta = false;
        if (lockResult > 0) {
           //   侨赡。。。會判斷是否開啟beta羊壹,tag走不同的邏輯感覺一大對重復(fù)代碼,這里提取了部分代碼
                          //一般沒有開啟beta,也沒有tag走這里
                            md5 = cacheItem.getMd5();
                            lastModified = cacheItem.getLastModifiedTs();
                            //這里就是單機模式蓖宦,并且沒有使用mysql才會從derby中獲取
                            if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
                                configInfoBase = persistService.findConfigInfo(dataId, group, tenant);
                            } else {
                                //我們這種恰好是這種,單機+mysql所以會通過磁盤獲取油猫,是不是會很怪稠茂,為啥mysql還要從文件中讀取,那么什么時候會寫呢情妖?是不是好多疑問主慰?
                                file = DiskUtil.targetFile(dataId, group, tenant);
                            }
                            if (configInfoBase == null && fileNotExist(file)) {
                                // FIXME CacheItem
                                // 不存在了無法簡單的計算推送delayed,這里簡單的記做-1
                                ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1,
                                    ConfigTraceService.PULL_EVENT_NOTFOUND, -1, requestIp);

                                // pullLog.info("[client-get] clientIp={}, {},
                                // no data",
                                // new Object[]{clientIp, groupKey});
                                //而且文件沒有直接返回文件不存在鲫售,why共螺?小朋友是不是一大堆問好?情竹?藐不??別著急秦效;后面會解析
                                response.setStatus(HttpServletResponse.SC_NOT_FOUND);
                                response.getWriter().println("config data not exist");
                                return HttpServletResponse.SC_NOT_FOUND + "";
                            }
               //文件存在的話會走這里的內(nèi)容雏蛮;返回內(nèi)容以及內(nèi)容的md5,最后一次修改時間等阱州,
                response.setHeader(Constants.CONTENT_MD5, md5);
                /**
                 *  禁用緩存
                 */
                response.setHeader("Pragma", "no-cache");
                response.setDateHeader("Expires", 0);
                response.setHeader("Cache-Control", "no-cache,no-store");
                if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
                    response.setDateHeader("Last-Modified", lastModified);
                } else {
                    fis = new FileInputStream(file);
                    response.setDateHeader("Last-Modified", file.lastModified());
                }

                if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
                    out = response.getWriter();
                    out.print(configInfoBase.getContent());
                    out.flush();
                    out.close();
                } else {
                    fis.getChannel().transferTo(0L, fis.getChannel().size(),
                        Channels.newChannel(response.getOutputStream()));
                }

                LogUtil.pullCheckLog.warn("{}|{}|{}|{}", groupKey, requestIp, md5, TimeUtils.getCurrentTimeStr());

                final long delayed = System.currentTimeMillis() - lastModified;

                // TODO distinguish pull-get && push-get
                // 否則無法直接把delayed作為推送延時的依據(jù)挑秉,因為主動get請求的delayed值都很大,發(fā)布事件這里就是記錄了一個traceLog
                ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, lastModified,
                    ConfigTraceService.PULL_EVENT_OK, delayed,
                    requestIp);
         苔货。犀概。。
        }else if (lockResult == 0) { //獲取鎖失敗
            // FIXME CacheItem 不存在了無法簡單的計算推送delayed夜惭,這里簡單的記做-1
            ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1,
                ConfigTraceService.PULL_EVENT_NOTFOUND, -1, requestIp);

            response.setStatus(HttpServletResponse.SC_NOT_FOUND);
            response.getWriter().println("config data not exist");
            return HttpServletResponse.SC_NOT_FOUND + "";

        } else {
            pullLog.info("[client-get] clientIp={}, {}, get data during dump", clientIp, groupKey);
            response.setStatus(HttpServletResponse.SC_CONFLICT);
            response.getWriter().println("requested file is being modified, please try later.");
            return HttpServletResponse.SC_CONFLICT + "";
        }

        return HttpServletResponse.SC_OK + "";
    }

服務(wù)端處理獲取配置請求總結(jié)一下:

  • 1.加鎖獲取內(nèi)存中的緩存信息姻灶,根據(jù)是否是beta,是否含有tag等走下面的邏輯
  • 2.判斷單機+不是mysql;讀取數(shù)據(jù)庫數(shù)據(jù)诈茧,否則讀取本地文件緩存數(shù)據(jù)是否存在
  • 3.不存在直接返回文件不存在響應(yīng)产喉,并記錄日志,否則返回內(nèi)容信息

當(dāng)時看到這個有點懵逼了敢会。曾沈。為啥有mysql不讀,反而去讀一個磁盤鸥昏?磁盤的數(shù)據(jù)又是什么時候?qū)懙娜悖縲hy?感覺頓時懷疑人生了互广,別著急敛腌;后面會解析

先簡單的個人分析下:
除了單機+非mysql才查詢sql;而且nacos默認(rèn)的配置存儲默認(rèn)是derby惫皱,還要一個就是mysql的實現(xiàn)像樊;那么結(jié)論就是:

  • 只有單機+derby存儲才會查sql;其他都查sql旅敷,而derby又是nacos內(nèi)置數(shù)據(jù)庫生棍,存儲在本地文件中,說白了就是本地文件媳谁;
    那么這里獲取的信息其實就是本地文件涂滴,就算是mysql也不會從遠(yuǎn)程拉取服務(wù),降低遠(yuǎn)程請求消耗晴音;而且在集群的情況下肯定是本地文件來拉取的柔纵; 個人觀點理解,有誤請歡迎批評指正

上面說了那么多锤躁,其實都是比較簡單的搁料;就是簡單的獲取請求,那么在更新配置后系羞,客戶端的配置怎么更新呢郭计,是客戶端主動拉取,還是服務(wù)端推送呢椒振?這個才是核心關(guān)鍵

2.3 配置更新

我們可以通過某一個客戶端更新配置昭伸,或者UI界面更新配置;
客戶端
boolean isPublishOk = configService.publishConfig(dataId, group, "content");
客戶端這個請求流程和獲取配置一樣澎迎;只是發(fā)起一個Post請求庐杨;

重點看服務(wù)端的配置處理ConfigController

    @PostMapping
    public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
                                 @RequestParam("dataId") String dataId, @RequestParam("group") String group,
                                 @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY)
                                     String tenant,
                                 @RequestParam("content") String content,
                                 @RequestParam(value = "tag", required = false) String tag,
                                 @RequestParam(value = "appName", required = false) String appName,
                                 @RequestParam(value = "src_user", required = false) String srcUser,
                                 @RequestParam(value = "config_tags", required = false) String configTags,
                                 @RequestParam(value = "desc", required = false) String desc,
                                 @RequestParam(value = "use", required = false) String use,
                                 @RequestParam(value = "effect", required = false) String effect,
                                 @RequestParam(value = "type", required = false) String type,
                                 @RequestParam(value = "schema", required = false) String schema)
        throws NacosException {
        final String srcIp = RequestUtil.getRemoteIp(request);
        String requestIpApp = RequestUtil.getAppName(request);
        ParamUtils.checkParam(dataId, group, "datumId", content);
        ParamUtils.checkParam(tag);

        Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);
       //去掉了一些不重要檢查

        if (AggrWhitelist.isAggrDataId(dataId)) {
            log.warn("[aggr-conflict] {} attemp to publish single data, {}, {}",
                RequestUtil.getRemoteIp(request), dataId, group);
            throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");
        }

        final Timestamp time = TimeUtils.getCurrentTime();
        String betaIps = request.getHeader("betaIps");
        ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
        if (StringUtils.isBlank(betaIps)) {
            if (StringUtils.isBlank(tag)) {
                persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
            } else {
                //這里就是簡單的 數(shù)據(jù)庫持久化(derby/mysql)
                persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
                //主要是這個事件觸發(fā)器,
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
            }
        } else { // beta publish
            persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
            EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
        }
        ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(),
            LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content);

        return true;
    }

重點就是事件觸發(fā):
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));

這個事件觸發(fā)會把監(jiān)聽ConfigDataChangeEvent事件的監(jiān)聽器執(zhí)行了

    /**
     * fire event, notify listeners.
     */
    static public void fireEvent(Event event) {
        if (null == event) {
            throw new IllegalArgumentException();
        }
        //遍歷執(zhí)行即可夹供,遍歷的是直接感興趣的listerner
        for (AbstractEventListener listener : getEntry(event.getClass()).listeners) {
            try {
                listener.onEvent(event);
            } catch (Exception e) {
                log.error(e.toString(), e);
            }
        }
    }

對于AbstractEventListener 我們只有兩個實現(xiàn)辑莫;

  • AsyncNotifyService:通過名字就是一個異步通知服務(wù)
    這個實現(xiàn)是監(jiān)聽的ConfigDataChangeEvent;所以觸發(fā)這個
  • LongPollingService:長輪訓(xùn)服務(wù)罩引,有點意思各吨,
    不過這個感興趣的是LocalDataChangeEvent,所以這個不會觸發(fā)哦
  //這個是AysncNotifyServive實現(xiàn)的接口袁铐,而且創(chuàng)建的時會自動添加到ConfigDataChangeEvent監(jiān)聽列表中
  @Override
    public List<Class<? extends Event>> interest() {
        List<Class<? extends Event>> types = new ArrayList<Class<? extends Event>>();
        // 觸發(fā)配置變更同步通知
        types.add(ConfigDataChangeEvent.class);
        return types;
    }

所以我們先分析到這里會觸發(fā) AysncNotifyService監(jiān)聽器揭蜒,下面繼續(xù)看

2.3.1 AysncNotifyService監(jiān)聽器

AysncNotifyService的事件觸發(fā)后執(zhí)行的邏輯,

@Override
    public void onEvent(Event event) {

        // 并發(fā)產(chǎn)生 ConfigDataChangeEvent
        if (event instanceof ConfigDataChangeEvent) {
            ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
            long dumpTs = evt.lastModifiedTs;
            String dataId = evt.dataId;
            String group = evt.group;
            String tenant = evt.tenant;
            String tag = evt.tag;
            List<?> ipList = serverListService.getServerList();
            
            // 其實這里任何類型隊列都可以
            Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
            //這里會根據(jù)server集群的個數(shù)添加幾個task
            for (int i = 0; i < ipList.size(); i++) {
                //這里記得看一下url構(gòu)成會有 /communication/dataChange
                queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String) ipList.get(i), evt.isBeta));
            }
          //線程異步執(zhí)行剔桨,AsyncTask任務(wù)
            EXECUTOR.execute(new AsyncTask(httpclient, queue));
        }
    }

具體異步任務(wù)執(zhí)行邏輯

    class AsyncTask implements Runnable {

        public AsyncTask(CloseableHttpAsyncClient httpclient, Queue<NotifySingleTask> queue) {
            this.httpclient = httpclient;
            this.queue = queue;
        }

        @Override
        public void run() {
            executeAsyncInvoke();
        }

        private void executeAsyncInvoke() {
            while (!queue.isEmpty()) {
                NotifySingleTask task = queue.poll();
                String targetIp = task.getTargetIP();
                if (serverListService.getServerList().contains(
                    targetIp)) {
                    // 啟動健康檢查且有不監(jiān)控的ip則直接把放到通知隊列屉更,否則通知
                    if (serverListService.isHealthCheck()
                        && ServerListService.getServerListUnhealth().contains(targetIp)) {
                        // target ip 不健康,則放入通知列表中
                        ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
                            task.getLastModified(),
                            LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target);
                        // get delay time and set fail count to the task,這會重試
                        asyncTaskExecute(task);
                    } else {
                        HttpGet request = new HttpGet(task.url);
                        request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,
                            String.valueOf(task.getLastModified()));
                        request.setHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, LOCAL_IP);
                        if (task.isBeta) {
                            request.setHeader("isBeta", "true");
                        }
                        //發(fā)送通知洒缀,通知的是各個服務(wù)這個接口瑰谜,communication/dataChange
                        //這里注意有一個回調(diào)函數(shù)欺冀,作用很簡單如果服務(wù)端通知失敗,會做一個次記錄萨脑,另外這里設(shè)置了有一個重試最多次數(shù)隐轩,還有時長組將增大,避免無限重試增大服務(wù)器開銷
                        httpclient.execute(request, new AsyncNotifyCallBack(httpclient, task));
                    }
                }
            }
        }

        private Queue<NotifySingleTask> queue;
        private CloseableHttpAsyncClient httpclient;

    }

異步通知其實又發(fā)起來一個廣播通知/ communication/dataChange接口調(diào)用渤早,所以輾轉(zhuǎn)到CommnicationController了职车,這也是集群下通知所有的服務(wù)器的流程

 /**
     * 通知配置信息改變
     */
    @GetMapping("/dataChange")
    public Boolean notifyConfigInfo(HttpServletRequest request,
                                    @RequestParam("dataId") String dataId, @RequestParam("group") String group,
                                    @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY)
                                        String tenant,
                                    @RequestParam(value = "tag", required = false) String tag) {
        dataId = dataId.trim();
        group = group.trim();
        String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
        long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
        String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
        String isBetaStr = request.getHeader("isBeta");
        if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
            //這里就是主要的執(zhí)行邏輯了,主要是異步處理
            dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
        } else {
            dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
        }
        //返回true表示執(zhí)行成功了
        return true;
    }

最后我們進源碼發(fā)現(xiàn),只是投建了一個DumpTask放到taskManager的隊列中

public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
                     boolean isBeta) {
        String groupKey = GroupKey2.getKey(dataId, group, tenant);
        //這里使用的是dumpTaskMgr鹊杖,所以默認(rèn)的處理器就是DumpProcessor
        dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
    }

/**
 * 用于處理一定要執(zhí)行成功的任務(wù) 單線程的方式處理任務(wù)悴灵,保證任務(wù)一定被成功處理
 *
 * @author huali
 */
public final class TaskManager implements TaskManagerMBean {
 /**
     * 將任務(wù)加入到任務(wù)Map中
     *
     * @param type
     * @param task
     */
    public void addTask(String type, AbstractTask task) {
        this.lock.lock();
        try {
            //放入隊列中
            AbstractTask oldTask = tasks.put(type, task);
            MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
            if (null != oldTask) {
                task.merge(oldTask);
            }
        } finally {
            this.lock.unlock();
        }
    }

}

上面可以發(fā)現(xiàn)放入隊列中就直接返回結(jié)果了;這里放入隊列了不用猜就知道肯定有一個后臺線程來執(zhí)行隊列骂蓖;
TaskManager

  //服務(wù)啟動會創(chuàng)建這個后臺線程跑服務(wù)
    class ProcessRunnable implements Runnable {

        @Override
        public void run() {
            while (!TaskManager.this.closed.get()) {
                try {
                    //直接100ms一次的執(zhí)行一次
                    Thread.sleep(100);
                    TaskManager.this.process();
                } catch (Throwable e) {
                }
            }

        }

    }
 protected void process() {
        for (Map.Entry<String, AbstractTask> entry : this.tasks.entrySet()) {
            AbstractTask task = null;
            this.lock.lock();
            try {
                // 獲取任務(wù)
                task = entry.getValue();
                if (null != task) {
                    if (!task.shouldProcess()) {
                        // 任務(wù)當(dāng)前不需要被執(zhí)行积瞒,直接跳過
                        continue;
                    }
                    // 先將任務(wù)從任務(wù)Map中刪除
                    this.tasks.remove(entry.getKey());
                    MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
                }
            } finally {
                this.lock.unlock();
            }

            if (null != task) {
                // 獲取任務(wù)處理器
                TaskProcessor processor = this.taskProcessors.get(entry.getKey());
                if (null == processor) {
                    // 如果沒有根據(jù)任務(wù)類型設(shè)置的處理器,使用默認(rèn)處理器
                    processor = this.getDefaultTaskProcessor();
                }
                if (null != processor) {
                    boolean result = false;
                    try {
                        // 處理任務(wù)登下,
                        result = processor.process(entry.getKey(), task);
                    } catch (Throwable t) {
                        log.error("task_fail", "處理task失敗", t);
                    }
                    if (!result) {
                        // 任務(wù)處理失敗赡鲜,設(shè)置處理時間
                        task.setLastProcessTime(System.currentTimeMillis());

                        // 失敗了,將任務(wù)重新加入到任務(wù)Map中庐船,下次再次存儲
                        this.addTask(entry.getKey(), task);
                    }
                }
            }
        }

        if (tasks.isEmpty()) {
            this.lock.lock();
            try {
                this.notEmpty.signalAll();
            } finally {
                this.lock.unlock();
            }
        }
    }

上面的任務(wù)處理又會交給DumpProcessor去處理

 @Override
    public boolean process(String taskType, AbstractTask task) {
        DumpTask dumpTask = (DumpTask)task;
        String[] pair = GroupKey2.parseKey(dumpTask.groupKey);
        String dataId = pair[0];
        String group = pair[1];
        String tenant = pair[2];
        long lastModified = dumpTask.lastModified;
        String handleIp = dumpTask.handleIp;
        boolean isBeta = dumpTask.isBeta;
        String tag = dumpTask.tag;

          //省略了一些判斷邏輯银酬,這里是主要的數(shù)據(jù)處理
          if (StringUtils.isBlank(tag)) {
               //通過數(shù)據(jù)庫獲取配置信息,這個數(shù)據(jù)庫肯定是最新的
                ConfigInfo cf = dumpService.persistService.findConfigInfo(dataId, group, tenant);
              //重新加載聚合白名單,先不管
                if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) {
                    if (null != cf) {
                        AggrWhitelist.load(cf.getContent());
                    } else {
                        AggrWhitelist.load(null);
                    }
                }
                //加載客戶端ip白名單,也不管
                if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
                    if (null != cf) {
                        ClientIpWhiteList.load(cf.getContent());
                    } else {
                        ClientIpWhiteList.load(null);
                    }
                }
                
                if (dataId.equals(SwitchService.SWITCH_META_DATAID)) {
                    if (null != cf) {
                        SwitchService.load(cf.getContent());
                    } else {
                        SwitchService.load(null);
                    }
                }

                boolean result;
                if (null != cf) {
                     //主要是這一步筐钟,這一步主要做了些什么內(nèi)容
                    //1.更新內(nèi)存中的緩存
                    //2.dump磁盤更新
                    result = ConfigService.dump(dataId, group, tenant, cf.getContent(), lastModified);

                    if (result) {
                        ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,
                            ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
                            cf.getContent().length());
                    }
                } else {
                    result = ConfigService.remove(dataId, group, tenant);

                    if (result) {
                        ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,
                            ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
                    }
                }
                return result;

}

上面使用ConfigService.dump執(zhí)行的任務(wù)才是重點揩瞪,這里一共做了幾件事

  • 如果不是單機,或者使用了mysql篓冲,那么更新本地文件緩存
  • 構(gòu)建CacheItem緩存在內(nèi)存中
  • 發(fā)布一個LocalDataChangeEvent事件李破;
    /**
     * 保存配置文件,并緩存md5.
     */
    static public boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs) {
        String groupKey = GroupKey2.getKey(dataId, group, tenant);
        makeSure(groupKey);
        final int lockResult = tryWriteLock(groupKey);
        assert (lockResult != 0);

        if (lockResult < 0) {
            dumpLog.warn("[dump-error] write lock failed. {}", groupKey);
            return false;
        }

        try {
            final String md5 = MD5.getInstance().getMD5String(content);
            if (md5.equals(ConfigService.getContentMd5(groupKey))) {
                dumpLog.warn(
                    "[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
                        + "lastModifiedNew={}",
                    groupKey, md5, ConfigService.getLastModifiedTs(groupKey), lastModifiedTs);
            } else if (!STANDALONE_MODE || PropertyUtil.isStandaloneUseMysql()) {
                //其實這里也是解密了壹将;會把更新的內(nèi)容使用本地文件緩存起來嗤攻;
                //這也是我們在get配置信息的時候直接從文件獲取的地方
                DiskUtil.saveToDisk(dataId, group, tenant, content);
            }
            updateMd5(groupKey, md5, lastModifiedTs);
            return true;
        } catch (IOException ioe) {
            dumpLog.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);
            if (ioe.getMessage() != null) {
                String errMsg = ioe.getMessage();
                if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN)
                    || errMsg.contains(DISK_QUATA_EN)) {
                    // 磁盤寫滿保護代碼
                    fatalLog.error("磁盤滿自殺退出", ioe);
                    System.exit(0);
                }
            }
            return false;
        } finally {
            releaseWriteLock(groupKey);
        }
    }

    //更新md5
    public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
        CacheItem cache = makeSure(groupKey);
        if (cache.md5 == null || !cache.md5.equals(md5)) {
            cache.md5 = md5;
            cache.lastModifiedTs = lastModifiedTs;
            //這里觸發(fā)LocalDataChangeEvent事件,涉及到我們之前所有的長輪訓(xùn)監(jiān)聽器诽俯,哈哈妇菱,是不是有點通了;
            EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey));
        }
    }

基本上這里可以再次總結(jié)下服務(wù)端配置更新后流程:

  • 1.某一個服務(wù)器收到更新請求后暴区;先自己更新本地數(shù)據(jù)庫的數(shù)據(jù)闯团;然后發(fā)布一個ConfigDataChangeEvent事件
  • 2.該事件會讓每一個服務(wù)收到某一個配置更改了,然后每一個服務(wù)開始執(zhí)行流程
  • 3.每一個服務(wù)會新建一個task任務(wù)放入自己的任務(wù)隊列中
  • 4.每一個服務(wù)后臺的線程會從隊列中執(zhí)行該任務(wù)
    • 任務(wù)執(zhí)行包含仙粱,如果非單機或者mysql會刷新本地緩存文件房交;這個也是我們前面分析的獲取配置的文件
    • 會更新內(nèi)存中CacheItem緩存內(nèi)容信息
    • 然后觸發(fā) LocalDataChangeEvent 這個涉及長輪訓(xùn)情況,下面解析

注意一點配置中心數(shù)據(jù)一致性的問題:

  • nacos的配置中心在集群條件下配置數(shù)據(jù)依賴于第三方mysql做數(shù)據(jù)庫存儲伐割,因為默認(rèn)的derby是服務(wù)內(nèi)置的存儲候味,難以滿足集群條件共享
  • 數(shù)據(jù)變更后刃唤,變更節(jié)點更新mysql后會廣播的消息給其他節(jié)點,失敗后也只會重試幾次白群,多次失敗了就沒有做其他處理了不過是由日志記錄的尚胞;節(jié)點收到廣播消息會添加到自己的隊列里,不斷的處理川抡,失敗了在添加會隊列中即可辐真。而且沒有服務(wù)器沒有其他定時任務(wù)去比較服務(wù)器配置內(nèi)容
    所以配置中心應(yīng)該屬于采用去中心化的思想設(shè)計的须尚。

疑問崖堤??耐床?如果服務(wù)器發(fā)布數(shù)據(jù)密幔,其他節(jié)點更新沒有成功怎么辦?肯定有解決方案撩轰,后文揭秘

其實到這里內(nèi)容基本上都差不多了胯甩,內(nèi)容篇幅過長,可以休息下分批閱讀堪嫂;

2.3.2 LongPollingService長輪訓(xùn)服務(wù)

上面我們解析到服務(wù)端配置變化后會觸發(fā)LocalDataChangeEvent事件偎箫,也就是LongPollingService的onEvent方法;那么具體整個流程會是怎么樣的呢皆串;我們得結(jié)合客戶端請求來看淹办;
之前給出一個疑問:配置更新后,我們是客戶端主動去拉還是服務(wù)端推送恶复?

1)客戶端分析

先從客戶端分析怜森,我們還記得我們創(chuàng)建NacosConfigService會開啟后臺線程檢查配置更新;ClientWorker谤牡,先回憶下代碼

public void checkConfigInfo() {
        // 分任務(wù)副硅,這里就是監(jiān)聽器的個數(shù)
        int listenerSize = cacheMap.get().size();
        // 向上取整為批數(shù)      (perTaskCofigSize 默認(rèn)是3000)
        int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                // 要判斷任務(wù)是否在執(zhí)行 這塊需要好好想想。 任務(wù)列表現(xiàn)在是無序的翅萤。變化過程可能有問題
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
    }

第一步先分析這里如果longingTaskCount <=currentLongingTaskCount是不會執(zhí)行LongPollingRunnable的恐疲;
所以需要知道

  • currentLongingTaskCount(初始化是0)在執(zhí)行后會更新為longTaskCount的值
  • longingTaskCount 是我們本地監(jiān)聽器的數(shù)量除以3000向上取整;所以如果沒有監(jiān)聽器套么,我覺得這里的長輪訓(xùn)根本不會執(zhí)行流纹;而且都是3000個監(jiān)聽器由一個線程來輪訓(xùn);

在看LongPollingRunnable具體執(zhí)行邏輯

 class LongPollingRunnable implements Runnable {
        private int taskId;

        public LongPollingRunnable(int taskId) {
            this.taskId = taskId;
        }

        @Override
        public void run() {

            List<CacheData> cacheDatas = new ArrayList<CacheData>();
            List<String> inInitializingCacheList = new ArrayList<String>();
            try {
                // check failover config
                for (CacheData cacheData : cacheMap.get().values()) {
                    if (cacheData.getTaskId() == taskId) {
                        cacheDatas.add(cacheData);
                        try {
                            //本地配置檢查违诗,這個可以自己查看
                            checkLocalConfig(cacheData);
                            if (cacheData.isUseLocalConfigInfo()) {
                                cacheData.checkListenerMd5();
                            }
                        } catch (Exception e) {
                            LOGGER.error("get local config info error", e);
                        }
                    }
                }

                // check server config 這里只返回更改后的dataId漱凝,group,tenant(namespace)具體內(nèi)容還要去重新拉取一次 請求路徑 configs/listener
                List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);

                for (String groupKey : changedGroupKeys) {
                    String[] key = GroupKey.parseKey(groupKey);
                    String dataId = key[0];
                    String group = key[1];
                    String tenant = null;
                    if (key.length == 3) {
                        tenant = key[2];
                    }
                    try {
                        //重新拉取代碼更新
                        String content = getServerConfig(dataId, group, tenant, 3000L);
                        CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                        cache.setContent(content);
                        LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}",
                            agent.getName(), dataId, group, tenant, cache.getMd5(),
                            ContentUtils.truncateContent(content));
                    } catch (NacosException ioe) {
                        String message = String.format(
                            "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                            agent.getName(), dataId, group, tenant);
                        LOGGER.error(message, ioe);
                    }
                }
                for (CacheData cacheData : cacheDatas) {
                    if (!cacheData.isInitializing() || inInitializingCacheList
                        .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                        //檢查本地MD5,如果更新會觸發(fā)本地監(jiān)聽器
                        cacheData.checkListenerMd5();
                        cacheData.setInitializing(false);
                    }
                }
                inInitializingCacheList.clear();
                //執(zhí)行完了會再次執(zhí)行該流程
                executorService.execute(this);

            } catch (Throwable e) {

                // If the rotation training task is abnormal, the next execution time of the task will be punished
                LOGGER.error("longPolling error : ", e);
                executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
            }
        }
    }

2) 服務(wù)端處理輪訓(xùn)

客戶端一次會把少于3000個實例data發(fā)送給服務(wù)端檢查MD5
請求路徑 configs/listener诸迟;實例ConfigController;

    /**
     * 比較MD5
     */
    @PostMapping("/listener")
    public void listener(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {
        request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
        String probeModify = request.getParameter("Listening-Configs");
        if (StringUtils.isBlank(probeModify)) {
            throw new IllegalArgumentException("invalid probeModify");
        }

        probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);

        Map<String, String> clientMd5Map;
        try {
            clientMd5Map = MD5Util.getClientMd5Map(probeModify);
        } catch (Throwable e) {
            throw new IllegalArgumentException("invalid probeModify");
        }

        // do long-polling
        inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
    }

ConfigServletInner執(zhí)行

 /**
     * 輪詢接口
     */
    public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
                                  Map<String, String> clientMd5Map, int probeRequestSize)
        throws IOException {

        // 長輪詢
        if (LongPollingService.isSupportLongPolling(request)) {
            //重點就是這個方法執(zhí)行了
            longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
            return HttpServletResponse.SC_OK + "";
        }

        // else 兼容短輪詢邏輯
        List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);

        // 兼容短輪詢result
        String oldResult = MD5Util.compareMd5OldResult(changedGroups);
        String newResult = MD5Util.compareMd5ResultString(changedGroups);
    茸炒。愕乎。。壁公。

        // 禁用緩存
        response.setHeader("Pragma", "no-cache");
        response.setDateHeader("Expires", 0);
        response.setHeader("Cache-Control", "no-cache,no-store");
        response.setStatus(HttpServletResponse.SC_OK);
        return HttpServletResponse.SC_OK + "";
    }

LongPollingService這里添加客戶端長輪訓(xùn)任務(wù)感论,有服務(wù)端通過線程池持有

    public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
                                     int probeRequestSize) {

        String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
        String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
        String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
        String tag = req.getHeader("Vipserver-Tag");
        int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
        /**
         * 提前500ms返回響應(yīng),為避免客戶端超時 @qiaoyi.dingqy 2013.10.22改動  add delay time for LoadBalance
         */
        //這里就是服務(wù)端hang住的時間紊册,這里是30s-500ms=29.5s
        long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
        if (isFixedPolling()) {
            timeout = Math.max(10000, getFixedPollingInterval());
            // do nothing but set fix polling timeout
        } else {
            long start = System.currentTimeMillis();
            List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
            if (changedGroups.size() > 0) {
                generateResponse(req, rsp, changedGroups);
                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
                    System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",
                    clientMd5Map.size(), probeRequestSize, changedGroups.size());
                return;
            } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                    changedGroups.size());
                return;
            }
        }
        String ip = RequestUtil.getRemoteIp(req);
        // 一定要由HTTP線程調(diào)用比肄,否則離開后容器會立即發(fā)送響應(yīng)
        final AsyncContext asyncContext = req.startAsync();
        // AsyncContext.setTimeout()的超時時間不準(zhǔn),所以只能自己控制
        asyncContext.setTimeout(0L);
      
          //這里就是執(zhí)行的延遲timeOut的任務(wù)返回當(dāng)前的長連接結(jié)果
        scheduler.execute(
            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
    }

任務(wù)具體內(nèi)容囊陡,就是檢查服務(wù)端的md5是否修改芳绩,修改了就結(jié)果;

 class ClientLongPolling implements Runnable {

        @Override
        public void run() {
            asyncTimeoutFuture = scheduler.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
                        /**
                         * 刪除訂閱關(guān)系
                         */
                        allSubs.remove(ClientLongPolling.this);

                        if (isFixedPolling()) {
                            LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
                                (System.currentTimeMillis() - createTime),
                                "fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
                                "polling",
                                clientMd5Map.size(), probeRequestSize);
                            //MD5比較撞反,
                            List<String> changedGroups = MD5Util.compareMd5(
                                (HttpServletRequest)asyncContext.getRequest(),
                                (HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
                            if (changedGroups.size() > 0) {
                                sendResponse(changedGroups);
                            } else {
                                sendResponse(null);
                            }
                        } else {
                            LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
                                (System.currentTimeMillis() - createTime),
                                "timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
                                "polling",
                                clientMd5Map.size(), probeRequestSize);
                            sendResponse(null);
                        }
                    } catch (Throwable t) {
                        LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());
                    }

                }

            }, timeoutTime, TimeUnit.MILLISECONDS);
            //這里會把當(dāng)前放在隊列中妥色,以備后用
            allSubs.add(this);
        }

這里長輪訓(xùn)就是這樣實現(xiàn)的,客戶端請求后被服務(wù)端給hang住29.5s后返回具體結(jié)果遏片;但是如果中途有數(shù)據(jù)更改了嘹害,真的會等待29.5后返回結(jié)果嗎;

還記得我們之前數(shù)據(jù)更改了觸發(fā)了一個LocalDataChangeEvent事件嗎吮便,而這個事件正好是LongPollingService來處理

    public void onEvent(Event event) {
        if (isFixedPolling()) {
            // ignore
        } else {
            //這里處理dataChangeTask
            if (event instanceof LocalDataChangeEvent) {
                LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
                scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
            }
        }
    }

最終本地發(fā)送配置變化會直接通過如下方式解決

  /**
     * 長輪詢訂閱關(guān)系
     */
    final Queue<ClientLongPolling> allSubs;

    // =================

    class DataChangeTask implements Runnable {
        @Override
        public void run() {
            try {
                ConfigService.getContentBetaMd5(groupKey);
                //遍歷客戶端包含這個group的請求
                for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
                    ClientLongPolling clientSub = iter.next();
                    if (clientSub.clientMd5Map.containsKey(groupKey)) {
                        // 如果beta發(fā)布且不在beta列表直接跳過
                        if (isBeta && !betaIps.contains(clientSub.ip)) {
                            continue;
                        }

                        // 如果tag發(fā)布且不在tag列表直接跳過
                        if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
                            continue;
                        }

                        getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                        iter.remove(); // 刪除訂閱關(guān)系
                        LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
                            (System.currentTimeMillis() - changeTime),
                            "in-advance",
                            RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()),
                            "polling",
                            clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
                        //這里直接拿到在服務(wù)端hang住的請求返回更改的配置groupKey
                        clientSub.sendResponse(Arrays.asList(groupKey));
                    }
                }
            } catch (Throwable t) {
                LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause());
            }
        }

到這里也是基本都是完成這個配置的分析笔呀;不過這里也留給你一個問題:
分析途中有一個判定isFixedPolling()這個影響和目前是否即可返回結(jié)果還是保持服務(wù)端hang任務(wù);

2.4 服務(wù)啟動數(shù)據(jù)加載

上面我們也是發(fā)現(xiàn)了髓需,集群模式下服務(wù)器之間的數(shù)據(jù)是沒有做數(shù)據(jù)定時檢查的许师,那么啟動的數(shù)據(jù)肯定要從最新的數(shù)據(jù)去獲取,也就是我們的mysql拿授账,然后dump到服務(wù)器本地中枯跑,后面客戶端請求就從本地獲取了。白热。具體我們通過代碼來驗證

@Service
public class DumpService {

    @Autowired
    private Environment env;

    @Autowired
    PersistService persistService;
    //啟動初始化方法敛助,作為服務(wù)器對于配置文件的處理
    @PostConstruct
    public void init() {
        LogUtil.defaultLog.warn("DumpService start");
        //構(gòu)建不同的處理器
        DumpProcessor processor = new DumpProcessor(this);
        DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this);
        DumpAllBetaProcessor dumpAllBetaProcessor = new DumpAllBetaProcessor(this);
        DumpAllTagProcessor dumpAllTagProcessor = new DumpAllTagProcessor(this);

        dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager");
        dumpTaskMgr.setDefaultTaskProcessor(processor);

        dumpAllTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpAllTaskManager");
        dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor);

        Runnable dumpAll = () -> dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());
    
        Runnable dumpAllBeta = () -> dumpAllTaskMgr.addTask(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask());
        //清除超過30天的歷史配置
        Runnable clearConfigHistory = () -> {
            log.warn("clearConfigHistory start");
            if (ServerListService.isFirstIp()) {
                try {
                    Timestamp startTime = getBeforeStamp(TimeUtils.getCurrentTime(), 24 * getRetentionDays());
                    int totalCount = persistService.findConfigHistoryCountByTime(startTime);
                    if (totalCount > 0) {
                        int pageSize = 1000;
                        int removeTime = (totalCount + pageSize - 1) / pageSize;
                        log.warn("clearConfigHistory, getBeforeStamp:{}, totalCount:{}, pageSize:{}, removeTime:{}",
                            new Object[] {startTime, totalCount, pageSize, removeTime});
                        while (removeTime > 0) {
                            // 分頁刪除,以免批量太大報錯
                            persistService.removeConfigHistory(startTime, pageSize);
                            removeTime--;
                        }
                    }
                } catch (Throwable e) {
                    log.error("clearConfigHistory error", e);
                }
            }
        };

        try {
            //啟動的時候屋确,更新本地磁盤為最新的數(shù)據(jù)纳击。
            dumpConfigInfo(dumpAllProcessor);

            // 更新beta緩存
            LogUtil.defaultLog.info("start clear all config-info-beta.");
            DiskUtil.clearAllBeta();
            if (persistService.isExistTable(BETA_TABLE_NAME)) {
                dumpAllBetaProcessor.process(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask());
            }
            // 更新Tag緩存
            LogUtil.defaultLog.info("start clear all config-info-tag.");
            DiskUtil.clearAllTag();
            if (persistService.isExistTable(TAG_TABLE_NAME)) {
                dumpAllTagProcessor.process(DumpAllTagTask.TASK_ID, new DumpAllTagTask());
            }

            // add to dump aggr
            List<ConfigInfoChanged> configList = persistService.findAllAggrGroup();
            if (configList != null && !configList.isEmpty()) {
                total = configList.size();
                List<List<ConfigInfoChanged>> splitList = splitList(configList, INIT_THREAD_COUNT);
                for (List<ConfigInfoChanged> list : splitList) {
                    MergeAllDataWorker work = new MergeAllDataWorker(list);
                    work.start();
                }
                log.info("server start, schedule merge end.");
            }
        } catch (Exception e) {
            LogUtil.fatalLog.error(
                "Nacos Server did not start because dumpservice bean construction failure :\n" + e.getMessage(),
                e.getCause());
            throw new RuntimeException(
                "Nacos Server did not start because dumpservice bean construction failure :\n" + e.getMessage());
        }
        //集群條件
        if (!STANDALONE_MODE) {
            Runnable heartbeat = () -> {
                String heartBeatTime = TimeUtils.getCurrentTime().toString();
                // write disk
                try {
                    DiskUtil.saveHeartBeatToDisk(heartBeatTime);
                } catch (IOException e) {
                    LogUtil.fatalLog.error("save heartbeat fail" + e.getMessage());
                }
            };
            //周期性心跳檢查,日志記錄心跳時間
            TimerTaskService.scheduleWithFixedDelay(heartbeat, 0, 10, TimeUnit.SECONDS);

            long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10;
            LogUtil.defaultLog.warn("initialDelay:{}", initialDelay);
            //周期性任務(wù) 全量dump所有配置信息攻臀,時間間隔是6*60min=6h
            TimerTaskService.scheduleWithFixedDelay(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE,
                TimeUnit.MINUTES);

            TimerTaskService.scheduleWithFixedDelay(dumpAllBeta, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE,
                TimeUnit.MINUTES);
        }
        //定時任務(wù)執(zhí)行清理
        TimerTaskService.scheduleWithFixedDelay(clearConfigHistory, 10, 10, TimeUnit.MINUTES);

    }

首次dump本地文件

    private void dumpConfigInfo(DumpAllProcessor dumpAllProcessor)
        throws IOException {
        int timeStep = 6;
        Boolean isAllDump = true;
        // initial dump all
        FileInputStream fis = null;
        Timestamp heartheatLastStamp = null;
        try {
            //默認(rèn)我們沒有配置快速啟動焕数,這里可以自己分析下,看起來也是通過心跳文件讀取刨啸,如果最后心跳的時間和當(dāng)前時間不超過6小時就不加載數(shù)據(jù)庫文件做持久化
            if (isQuickStart()) { //是否快速啟動堡赔,默認(rèn)不是
                File heartbeatFile = DiskUtil.heartBeatFile();
                if (heartbeatFile.exists()) {
                    fis = new FileInputStream(heartbeatFile);
                    String heartheatTempLast = IoUtils.toString(fis, Constants.ENCODE);
                    heartheatLastStamp = Timestamp.valueOf(heartheatTempLast);
                    if (TimeUtils.getCurrentTime().getTime()
                        - heartheatLastStamp.getTime() < timeStep * 60 * 60 * 1000) {
                        isAllDump = false;
                    }
                }
            }
            if (isAllDump) {
                //如果啟動全量dump,也是默認(rèn)就直接服務(wù)數(shù)據(jù)庫的數(shù)據(jù)设联,全部給dump存儲在服務(wù)器磁盤中
                LogUtil.defaultLog.info("start clear all config-info.");
                DiskUtil.clearAll();
                dumpAllProcessor.process(DumpAllTask.TASK_ID, new DumpAllTask());
            } else {
                //如果是快速啟動滿足小于6h善已,那么部分檢查文件md5然后更新
                Timestamp beforeTimeStamp = getBeforeStamp(heartheatLastStamp,
                    timeStep);
                DumpChangeProcessor dumpChangeProcessor = new DumpChangeProcessor(
                    this, beforeTimeStamp, TimeUtils.getCurrentTime());
                dumpChangeProcessor.process(DumpChangeTask.TASK_ID, new DumpChangeTask());
                Runnable checkMd5Task = () -> {
                    LogUtil.defaultLog.error("start checkMd5Task");
                    List<String> diffList = ConfigService.checkMd5();
                    for (String groupKey : diffList) {
                        String[] dg = GroupKey.parseKey(groupKey);
                        String dataId = dg[0];
                        String group = dg[1];
                        String tenant = dg[2];
                        ConfigInfoWrapper configInfo = persistService.queryConfigInfo(dataId, group, tenant);
                        ConfigService.dumpChange(dataId, group, tenant, configInfo.getContent(),
                            configInfo.getLastModified());
                    }
                    LogUtil.defaultLog.error("end checkMd5Task");
                };
                TimerTaskService.scheduleWithFixedDelay(checkMd5Task, 0, 12,
                    TimeUnit.HOURS);
            }
        } catch (IOException e) {
            LogUtil.fatalLog.error("dump config fail" + e.getMessage());
            throw e;
        } finally {
            if (null != fis) {
                try {
                    fis.close();
                } catch (IOException e) {
                    LogUtil.defaultLog.warn("close file failed");
                }
            }
        }
    }

從類灼捂,方法的命名也應(yīng)該很容易猜測出來,我們直接看dumpAllProcesso處理流程
6h執(zhí)行一次的全量更新

class DumpAllProcessor implements TaskProcessor {

    DumpAllProcessor(DumpService dumpService) {
        this.dumpService = dumpService;
        this.persistService = dumpService.persistService;
    }

    @Override
    public boolean process(String taskType, AbstractTask task) {
        long currentMaxId = persistService.findConfigMaxId();
        long lastMaxId = 0;
        //基于上次id來查找數(shù)據(jù)换团,優(yōu)化數(shù)據(jù)分頁
        while (lastMaxId < currentMaxId) {
            //分頁查出配置信息數(shù)據(jù)悉稠,默認(rèn)1000條
            Page<PersistService.ConfigInfoWrapper> page = persistService.findAllConfigInfoFragment(lastMaxId,
                PAGE_SIZE);
            if (page != null && page.getPageItems() != null && !page.getPageItems().isEmpty()) {
                for (PersistService.ConfigInfoWrapper cf : page.getPageItems()) {
                    long id = cf.getId();
                    lastMaxId = id > lastMaxId ? id : lastMaxId;
                    if (cf.getDataId().equals(AggrWhitelist.AGGRIDS_METADATA)) {
                        AggrWhitelist.load(cf.getContent());
                    }

                    if (cf.getDataId().equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
                        ClientIpWhiteList.load(cf.getContent());
                    }

                    if (cf.getDataId().equals(SwitchService.SWITCH_META_DATAID)) {
                        SwitchService.load(cf.getContent());
                    }
                    //dump到本地服務(wù)器磁盤上
                    boolean result = ConfigService.dump(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(),
                        cf.getLastModified(), cf.getType());

                    final String content = cf.getContent();
                    final String md5 = Md5Utils.getMD5(content, Constants.ENCODE);
                    LogUtil.dumpLog.info("[dump-all-ok] {}, {}, length={}, md5={}",
                        GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(), content.length(), md5);
                }
                defaultLog.info("[all-dump] {} / {}", lastMaxId, currentMaxId);
            } else {
                lastMaxId += PAGE_SIZE;
            }
        }
        return true;
    }

    static final int PAGE_SIZE = 1000;

    final DumpService dumpService;
    final PersistService persistService;
}

簡單的總結(jié):

  • 1.默認(rèn)會把數(shù)據(jù)庫的配置提取出來緩存到服務(wù)器磁盤文件中
  • 2.集群模式下DumpService在啟動初始化的時候啟動周期任務(wù)做心跳日志团滥,主要是為下次啟動是否全量dump做記錄
  • 3.集群模式下默認(rèn)6小時執(zhí)行從數(shù)據(jù)庫中執(zhí)行一起全量配置dump到本地磁盤
  • 4.周期檢查歷史配置缩宜,超過30天的做刪除
  • ...

之前的數(shù)據(jù)性一致性問題:如果數(shù)據(jù)更新的時候更新失敗,會在6h后定時任務(wù)從數(shù)據(jù)庫中拉取數(shù)據(jù)然后更新本地文件氛雪。

2.5 服務(wù)之間健康檢查

???集群中服務(wù)肯定要做一個健康檢查想虎,如果有服務(wù)不可用要做相關(guān)的處理服務(wù)主要看 ServerListService類

???同樣的道理在類初始化會解析集群下的節(jié)點信息卦尊,如果開啟了地址服務(wù)器還會定時從遠(yuǎn)程服務(wù)器更新拿到server列表信息。

    @PostConstruct
    public void init() {
        String envDomainName = System.getenv("address_server_domain");
        if (StringUtils.isBlank(envDomainName)) {
            domainName = System.getProperty("address.server.domain", "jmenv.tbsite.net");
        } else {
            domainName = envDomainName;
        }
        String envAddressPort = System.getenv("address_server_port");
        if (StringUtils.isBlank(envAddressPort)) {
            addressPort = System.getProperty("address.server.port", "8080");
        } else {
            addressPort = envAddressPort;
        }
        addressUrl = System.getProperty("address.server.url",
            servletContext.getContextPath() + "/" + RunningConfigUtils.getClusterName());
        addressServerUrl = "http://" + domainName + ":" + addressPort + addressUrl;
        envIdUrl = "http://" + domainName + ":" + addressPort + "/env";

        defaultLog.info("ServerListService address-server port:" + addressPort);
        defaultLog.info("ADDRESS_SERVER_URL:" + addressServerUrl);
        isHealthCheck = PropertyUtil.isHealthCheck();
        maxFailCount = PropertyUtil.getMaxHealthCheckFailCount();
        fatalLog.warn("useAddressServer:{}", isUseAddressServer);
        GetServerListTask task = new GetServerListTask();
        task.run();
        if (CollectionUtils.isEmpty(serverList)) {
            fatalLog.error("########## cannot get serverlist, so exit.");
            throw new RuntimeException("cannot get serverlist, so exit.");
        } else {
            TimerTaskService.scheduleWithFixedDelay(task, 0L, 5L, TimeUnit.SECONDS);
        }

    }

在web容器初始化完成后觸發(fā)事件

    @Override
    public void onApplicationEvent(WebServerInitializedEvent event) {
        if (port == 0) {
            port = event.getWebServer().getPort();
            List<String> newList = new ArrayList<String>();
            for (String serverAddrTmp : serverList) {
                newList.add(getFormatServerAddr(serverAddrTmp));
            }
            setServerList(new ArrayList<String>(newList));
        }
        httpclient.start();
        //很容易發(fā)現(xiàn)我們有一個周期任務(wù)5s檢查一次服務(wù)器健康磷醋;
        CheckServerHealthTask checkServerHealthTask = new CheckServerHealthTask();
        TimerTaskService.scheduleWithFixedDelay(checkServerHealthTask, 0L, 5L, TimeUnit.SECONDS);

    }

繼續(xù)看CheckServerHealthTask執(zhí)行邏輯

  private void checkServerHealth() {
        long startCheckTime = System.currentTimeMillis();
        for (String serverIp : serverList) {
            //請求其他節(jié)點節(jié)點
            // Compatible with old codes,use status.taobao
            String url = "http://" + serverIp + servletContext.getContextPath() + Constants.HEALTH_CONTROLLER_PATH;
            // 路徑url:"/nacos/health";
            HttpGet request = new HttpGet(url);
            //我們也有回調(diào)函數(shù)猫牡,如果節(jié)點信息返回成功就從不健康列表移除胡诗,否則需要添加上
            httpclient.execute(request, new AsyncCheckServerHealthCallBack(serverIp));
        }
        long endCheckTime = System.currentTimeMillis();
        long cost = endCheckTime - startCheckTime;
        defaultLog.debug("checkServerHealth cost: {}", cost);
    }

看看其他節(jié)點怎么處理的

    @GetMapping
    public String getHealth() {
        // TODO UP DOWN WARN
        StringBuilder sb = new StringBuilder();
        //數(shù)據(jù)庫的監(jiān)控 信息
        String dbStatus = dataSourceService.getHealth();
         //如果有遠(yuǎn)程地址服務(wù)器邓线,還有遠(yuǎn)程地址服務(wù)器是否健康信息
        if (dbStatus.contains(heathUpStr) && ServerListService.isAddressServerHealth() && ServerListService
            .isInIpList()) {
            sb.append(heathUpStr);
        } else if (dbStatus.contains(heathWarnStr) && ServerListService.isAddressServerHealth() && ServerListService
            .isInIpList()) {
            sb.append("WARN:");
            sb.append("slave db (").append(dbStatus.split(":")[1]).append(") down. ");
        } else {
            sb.append("DOWN:");
            //對于配置中心有主db判別
            if (dbStatus.contains(heathDownStr)) {
                sb.append("master db (").append(dbStatus.split(":")[1]).append(") down. ");
            }
            if (!ServerListService.isAddressServerHealth()) {
                sb.append("address server down. ");
            }
            if (!ServerListService.isInIpList()) {
                sb.append("server ip ").append(LOCAL_IP).append(" is not in the serverList of address server. ");
            }
        }
        //返回數(shù)據(jù)
        return sb.toString();
    }

服務(wù)器節(jié)點間的監(jiān)控信息比較簡單,就周期發(fā)送心跳檢測即可煌恢。

配置中心集群大總結(jié):

  • 1.配置中心依賴第三方數(shù)據(jù)庫mysql做存儲
    1. 數(shù)據(jù)更新數(shù)據(jù)庫骇陈,然后廣播通知其他節(jié)點更新,使用了多個異步操作事件通知完成更新
  • 3.所有的客戶端獲取的數(shù)據(jù)都是服務(wù)節(jié)點本地緩存信息獲取的瑰抵,不會去數(shù)據(jù)庫拉數(shù)據(jù)
    1. 配置中心一致性的問題也是依賴6h一次全量從數(shù)據(jù)庫同步到服務(wù)器磁盤緩存文件中做到的
  • 5.客戶端監(jiān)聽的配置會使用長輪訓(xùn)去拉取服務(wù)器配置變化你雌,如果沒有變化服務(wù)端會hang住29.5s的時間避免服務(wù)器壓力。
    -...
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末二汛,一起剝皮案震驚了整個濱河市婿崭,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌肴颊,老刑警劉巖氓栈,帶你破解...
    沈念sama閱讀 211,743評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異婿着,居然都是意外死亡授瘦,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,296評論 3 385
  • 文/潘曉璐 我一進店門竟宋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來提完,“玉大人,你說我怎么就攤上這事丘侠⊥叫溃” “怎么了?”我有些...
    開封第一講書人閱讀 157,285評論 0 348
  • 文/不壞的土叔 我叫張陵蜗字,是天一觀的道長打肝。 經(jīng)常有香客問我官研,道長,這世上最難降的妖魔是什么闯睹? 我笑而不...
    開封第一講書人閱讀 56,485評論 1 283
  • 正文 為了忘掉前任戏羽,我火速辦了婚禮,結(jié)果婚禮上楼吃,老公的妹妹穿的比我還像新娘始花。我一直安慰自己,他們只是感情好孩锡,可當(dāng)我...
    茶點故事閱讀 65,581評論 6 386
  • 文/花漫 我一把揭開白布酷宵。 她就那樣靜靜地躺著,像睡著了一般躬窜。 火紅的嫁衣襯著肌膚如雪浇垦。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,821評論 1 290
  • 那天荣挨,我揣著相機與錄音男韧,去河邊找鬼。 笑死默垄,一個胖子當(dāng)著我的面吹牛此虑,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播口锭,決...
    沈念sama閱讀 38,960評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼朦前,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了鹃操?” 一聲冷哼從身側(cè)響起韭寸,我...
    開封第一講書人閱讀 37,719評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎荆隘,沒想到半個月后恩伺,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,186評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡臭胜,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,516評論 2 327
  • 正文 我和宋清朗相戀三年莫其,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片耸三。...
    茶點故事閱讀 38,650評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡乱陡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出仪壮,到底是詐尸還是另有隱情憨颠,我是刑警寧澤,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站爽彤,受9級特大地震影響养盗,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜适篙,卻給世界環(huán)境...
    茶點故事閱讀 39,936評論 3 313
  • 文/蒙蒙 一往核、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧嚷节,春花似錦聂儒、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,757評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至效斑,卻和暖如春非春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背缓屠。 一陣腳步聲響...
    開封第一講書人閱讀 31,991評論 1 266
  • 我被黑心中介騙來泰國打工奇昙, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人藏研。 一個月前我還...
    沈念sama閱讀 46,370評論 2 360
  • 正文 我出身青樓敬矩,卻偏偏與公主長得像概行,于是被迫代替她去往敵國和親蠢挡。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,527評論 2 349

推薦閱讀更多精彩內(nèi)容