Nacos 和 Apollo中的 長輪詢 定時機(jī)制权谁,太好用了

今天這篇文章來介紹一下Nacos配置中心的原理之一:長輪詢機(jī)制的應(yīng)用

為方便理解與表達(dá)剩檀,這里把 Nacos 控制臺和 Nacos 注冊中心稱為 Nacos 服務(wù)器(就是 web 界面那個),我們編寫的業(yè)務(wù)服務(wù)稱為 Nacso 客戶端旺芽;

Nacos 動態(tài)監(jiān)聽的長輪詢機(jī)制原理圖沪猴,本篇將圍繞這張圖剖析長輪詢定時機(jī)制的原理:

image.png

ConfigService 是 Nacos 客戶端提供的用于訪問實現(xiàn)配置中心基本操作的類型,我們將從 ConfigService 的實例化開始長輪詢定時機(jī)制的源碼之旅甥绿;

1. 客戶端的長輪詢定時機(jī)制

我們從
NacosPropertySourceLocator.locate()開始【斷點步入】:

1.1 利用反射機(jī)制實例化 NacosConfigService 對象

客戶端的長輪詢定時任務(wù)是在
NacosFactory.createConfigService() 方法中字币,構(gòu)建 ConfigService 對象是實時啟動的,我們接著 1.1 處的源碼共缕;

進(jìn)入
NacosFactory.createConfigService():

public static ConfigService createConfigService(Properties properties) throws NacosException {
    //【斷點步入】創(chuàng)建 ConfigService
    return ConfigFactory.createConfigService(properties);
}

進(jìn)入
ConfigFactory.createConfigService()洗出,發(fā)現(xiàn)其使用反射機(jī)制實例化 NacosConfigService 對象;

image.png

1.2 NacosConfigService 的構(gòu)造方法里啟動長輪詢定時任務(wù)

進(jìn)入
NacosConfigService.NacosConfigService() 構(gòu)造方法图谷,里面設(shè)置了一些跟遠(yuǎn)程任務(wù)相關(guān)的屬性翩活;

image.png

1.2.1 初始化 HttpAgent

MetricsHttpAgent 類的設(shè)計如下:

image.png

ServerHttpAgent 類的設(shè)計如下:

image.png

1.2.2 初始化 ClientWorker

進(jìn)入 ClientWorker.ClientWorker() 構(gòu)造方法,主要是創(chuàng)建了兩個定時調(diào)度的線程池便贵,并啟動一個定時任務(wù)菠镇;

image.png

進(jìn)入
ClientWorker.checkConfigInfo(),每隔 10s 檢查一次配置是否發(fā)生變化承璃;

  • cacheMap:是一個 AtomicReference<Map<String, CacheData>> 對象利耍,用來存儲監(jiān)聽變更的緩存集合,key 是根據(jù) datalD/group/tenant(租戶)拼接的值盔粹。Value 是對應(yīng)的存儲在 Nacos 服務(wù)器上的配置文件的內(nèi)容隘梨;
  • 長輪詢?nèi)蝿?wù)拆分:默認(rèn)情況下,每個長輪詢 LongPollingRunnable 任務(wù)處理3000個監(jiān)聽配置集舷嗡。如果超過3000個轴猎,則需要啟動多個 LongPollingRunnable 去執(zhí)行;
image.png

1.3 檢查配置變更进萄,讀取變更配置 LongPollingRunnable.run()

因為我們沒有這么多配置項捻脖,debug 不進(jìn)去锐峭,所以直接找到 LongPollingRunnable.run() 方法,該方法的主要邏輯是:

根據(jù) taskld 對 cacheMap 進(jìn)行數(shù)據(jù)分割可婶;

再通過checkLocalConfig() 方法比較本地配置文件(在${user}\nacos\config\ 里)的數(shù)據(jù)是否存在變更沿癞,如果有變更則直接觸發(fā)通知;

public void run() {
    List<CacheData> cacheDatas = new ArrayList();
    ArrayList inInitializingCacheList = new ArrayList();
    try {
        //遍歷 CacheData扰肌,檢查本地配置
        Iterator var3 = ((Map)ClientWorker.this.cacheMap.get()).values().iterator();
        while(var3.hasNext()) {
            CacheData cacheData = (CacheData)var3.next();
            if (cacheData.getTaskId() == this.taskId) {
                cacheDatas.add(cacheData);
                try {
                    //檢查本地配置
                    ClientWorker.this.checkLocalConfig(cacheData);
                    if (cacheData.isUseLocalConfigInfo()) {
                        cacheData.checkListenerMd5();
                    }
                } catch (Exception var13) {
                    ClientWorker.LOGGER.error("get local config info error", var13);
                }
            }
        }
        //【斷點步入 1.3.1】通過長輪詢請求檢查服務(wù)端對應(yīng)的配置是否發(fā)生變更
        List<String> changedGroupKeys = ClientWorker.this.checkUpdateDataIds(cacheDatas, inInitializingCacheList);
        //遍歷存在變更的 groupKey抛寝,重新加載最新數(shù)據(jù)
        Iterator var16 = changedGroupKeys.iterator();
        while(var16.hasNext()) {
            String groupKey = (String)var16.next();
            String[] key = GroupKey.parseKey(groupKey);
            String dataId = key[0];
            String group = key[1];
            String tenant = null;
            if (key.length == 3) {
                tenant = key[2];
            }
            try {
                //【斷點步入 1.3.2】讀取變更配置,這里的 dataId曙旭、group 和 tenant 是【1.3.1】里獲取的
                String content = ClientWorker.this.getServerConfig(dataId, group, tenant, 3000L);
                CacheData cache = (CacheData)((Map)ClientWorker.this.cacheMap.get()).get(GroupKey.getKeyTenant(dataId, group, tenant));
                cache.setContent(content);
                ClientWorker.LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}", new Object[]{ClientWorker.this.agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(content)});
            } catch (NacosException var12) {
                String message = String.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", ClientWorker.this.agent.getName(), dataId, group, tenant);
                ClientWorker.LOGGER.error(message, var12);
            }
        }
        //觸發(fā)事件通知
        var16 = cacheDatas.iterator();
        while(true) {
            CacheData cacheDatax;
            do {
                if (!var16.hasNext()) {
                    inInitializingCacheList.clear();
                    //繼續(xù)定時執(zhí)行當(dāng)前線程
                    ClientWorker.this.executorService.execute(this);
                    return;
                }
                cacheDatax = (CacheData)var16.next();
            } while(cacheDatax.isInitializing() && !inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheDatax.dataId, cacheDatax.group, cacheDatax.tenant)));
            cacheDatax.checkListenerMd5();
            cacheDatax.setInitializing(false);
        }
    } catch (Throwable var14) {
        ClientWorker.LOGGER.error("longPolling error : ", var14);
        ClientWorker.this.executorService.schedule(this, (long)ClientWorker.this.taskPenaltyTime, TimeUnit.MILLISECONDS);
    }
}

注意:這里的斷點需要注意 Nacos 服務(wù)器上修改配置(間隔大于 30s),進(jìn)入后才好理解晶府;

1.3.1 檢查配置變更 ClientWorker.checkUpdateDataIds()

我們點進(jìn)
ClientWorker.checkUpdateDataIds() 方法桂躏,發(fā)現(xiàn)其最終調(diào)用的是 ClientWorker.checkUpdateConfigStr() 方法,其實現(xiàn)邏輯與源碼如下:

  • 通過MetricsHttpAgent.httpPost() 方法(上面 1.2.1 有提到)調(diào)用/v1/cs/configs/listener 接口實現(xiàn)長輪詢請求川陆;
  • 輪詢請求在實現(xiàn)層面只是設(shè)置了一個比較長的超時時間剂习,默認(rèn)是 30s;
  • 如果服務(wù)端的數(shù)據(jù)發(fā)生了變更较沪,客戶端會收到一個 HttpResult 鳞绕,服務(wù)端返回的是存在數(shù)據(jù)變更的 Data ID、Group尸曼、Tenant们何;
  • 獲得這些信息之后,在LongPollingRunnable.run() 方法中調(diào)用 getServerConfig() 去 Nacos 服務(wù)器上讀取具體的配置內(nèi)容控轿;
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {
    List<String> params = Arrays.asList("Listening-Configs", probeUpdateString);
    List<String> headers = new ArrayList(2);
    headers.add("Long-Pulling-Timeout");
    headers.add("" + this.timeout);
    if (isInitializingCacheList) {
        headers.add("Long-Pulling-Timeout-No-Hangup");
        headers.add("true");
    }
    if (StringUtils.isBlank(probeUpdateString)) {
        return Collections.emptyList();
    } else {
        try {
            //調(diào)用 /v1/cs/configs/listener 接口實現(xiàn)長輪詢請求冤竹,返回的 HttpResult 里包含存在數(shù)據(jù)變更的 Data ID、Group茬射、Tenant
            HttpResult result = this.agent.httpPost("/v1/cs/configs/listener", headers, params, this.agent.getEncode(), this.timeout);
            if (200 == result.code) {
                this.setHealthServer(true);
                //
                return this.parseUpdateDataIdResponse(result.content);
            }
            this.setHealthServer(false);
            LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", this.agent.getName(), result.code);
        } catch (IOException var6) {
            this.setHealthServer(false);
            LOGGER.error("[" + this.agent.getName() + "] [check-update] get changed dataId exception", var6);
            throw var6;
        }
        return Collections.emptyList();
    }
}

1.3.2 讀取變更配置 ClientWorker.getServerConfig()

進(jìn)入
ClientWorker.getServerConfig() 方法鹦蠕;讀取服務(wù)器上的變更配置;最終調(diào)用的是 MetricsHttpAgent.httpGet() 方法(上面 1.2.1 有提到)在抛,調(diào)用 /v1/cs/configs 接口獲取配置钟病;然后通過調(diào)用 LocalConfigInfoProcessor.saveSnapshot() 將變更的配置保存到本地;

image.png
image.png

2. 服務(wù)端的長輪詢定時機(jī)制

2.1 服務(wù)器接收請求 ConfigController.listener()

Nacos客戶端 通過 HTTP 協(xié)議與服務(wù)器通信刚梭,那么在服務(wù)器源碼里必然有對應(yīng)接口的實現(xiàn)肠阱;

在 nacos-config 模塊下的 controller 包,提供了個 ConfigController 類來處理請求望浩,其中有個 /listener 接口辖所,是客戶端發(fā)起數(shù)據(jù)監(jiān)聽的接口,其主要邏輯和源碼如下:

  • 獲取客戶端需要監(jiān)聽的可能發(fā)生變化的配置磨德,并計算 MD5 值缘回;
  • ConfigServletInner.doPollingConfig() 開始執(zhí)行長輪詢請求吆视;

2.2 執(zhí)行長輪詢請求 ConfigSer

image.png

vletInner.doPollingConfig()

進(jìn)入
ConfigServletInner.doPollingConfig() 方法,該方法封裝了長輪詢的實現(xiàn)邏輯酥宴,同時兼容短輪詢邏輯啦吧;

image.png

進(jìn)入
LongPollingService.addLongPollingClient() 方法,里面是長輪詢的核心處理邏輯拙寡,主要作用是把客戶端的長輪詢請求封裝成 ClientPolling 交給 scheduler 執(zhí)行授滓;

image.png

2.3 創(chuàng)建線程執(zhí)行定時任務(wù) ClientLongPolling.run()

我們找到 ClientLongPolling.run() 方法,這里可以體現(xiàn)長輪詢定時機(jī)制的核心原理肆糕,通俗來說般堆,就是:

服務(wù)端收到請求之后,不立即返回诚啃,沒有變更則在延后 (30-0.5)s 把請求結(jié)果返回給客戶端淮摔;

這就使得客戶端和服務(wù)端之間在 30s 之內(nèi)數(shù)據(jù)沒有發(fā)生變化的情況下一直處于連接狀態(tài);

image.png

2.4 監(jiān)聽配置變更事件

2.4.1 監(jiān)聽 LocalDataChangeEvent 事件的實現(xiàn)

當(dāng)我們在 Nacos 服務(wù)器或通過 API 方式變更配置后始赎,會發(fā)布一個 LocalDataChangeEvent 事件和橙,該事件會被 LongPollingService 監(jiān)聽;

這里 LongPollingService 為什么具有監(jiān)聽功能在 1.3.1 版本后有些變化:

  • 1.3.1 前:LongPollingService.onEvent()造垛;
  • 1.3.1 后:Subscriber.onEvent()魔招;

在 Nacos 1.3.1 版本之前,通過 LongPollingService 繼承 AbstractEventListener 實現(xiàn)監(jiān)聽五辽,覆蓋 onEvent() 方法办斑;


image.png

而在 1.3.2 版本之后,通過構(gòu)造訂閱者實現(xiàn)

image.png

效果是一樣的奔脐,實現(xiàn)了對 LocalDataChangeEvent 事件的監(jiān)聽俄周,并通過通過線程池執(zhí)行 DataChangeTask 任務(wù);

2.4.2 監(jiān)聽事件后的處理邏輯 DataChangeTask.run()

我們找到 DataChangeTask.run() 方法髓迎,這個線程任務(wù)實現(xiàn)了

image.png

3. 源碼結(jié)構(gòu)圖小結(jié)

3.1 客戶端的長輪詢定時機(jī)制

  • NacosPropertySourceLocator.locate() :初始化 ConfigService 對象峦朗,定位配置;
  • NacosConfigService.NacosConfigService():NacosConfigService 的構(gòu)造方法排龄;
  • Executors.newScheduledThreadPool():創(chuàng)建 executor 線程池波势;
  • Executors.newScheduledThreadPool():創(chuàng)建 executorService 線程池;
  • ClientWorker.checkConfigInfo():使用 executor 線程池檢查配置是否發(fā)生變化橄维;
  • ClientWorker.checkLocalConfig():檢查本地配置尺铣;
  • ClientWorker.checkUpdateDataIds():檢查服務(wù)端對應(yīng)的配置是否發(fā)生變更;
  • ClientWorker.getServerConfig():讀取變更配置
  • MetricsHttpAgent.httpPost():調(diào)用 /v1/cs/configs/listener 接口實現(xiàn)長輪詢請求争舞;
  • ClientWorker.checkUpdateConfigStr():檢查服務(wù)端對應(yīng)的配置是否發(fā)生變更凛忿;
  • MetricsHttpAgent.httpGet():調(diào)用 /v1/cs/configs 接口獲取配置;
  • LongPollingRunnable.run():運行長輪詢定時線程竞川;
  • MetricsHttpAgent.MetricsHttpAgent():初始化 HttpAgent店溢;
  • ClientWorker.ClientWorker():初始化 ClientWorker叁熔;
  • NacosFactory.createConfigService():創(chuàng)建配置服務(wù)器;
  • ConfigFactory.createConfigService():利用反射機(jī)制創(chuàng)建配置服務(wù)器床牧;

3.2 服務(wù)端的長輪詢定時機(jī)制

  • ConfigController.listener() :服務(wù)器接收請求荣回;
  • LongPollingService.addLongPollingClient():長輪詢的核心處理邏輯,提前 500ms 返回響應(yīng)戈咳;
  • ClientLongPolling.run():長輪詢定時機(jī)制的實現(xiàn)邏輯心软;
  • Map.put():將 ClientLongPolling 實例本身添加到 allSubs 隊列中;
  • Queue.remove():把 ClientLongPolling 實例本身從 allSubs 隊列中移除著蛙;
  • MD5Util.compareMd5():比較數(shù)據(jù)的 MD5 值删铃;
  • LongPollingService.sendResponse():將變更的結(jié)果通過 response 返回給客戶端;
  • ConfigExecutor.scheduleLongPolling():啟動定時任務(wù)册踩,延時時間為 29.5s泳姐;
  • HttpServletRequest.getHeader():獲取客戶端設(shè)置的請求超時時間;
  • MD5Util.compareMd5():和服務(wù)端的數(shù)據(jù)進(jìn)行 MD5 對比暂吉;
  • ConfigExecutor.executeLongPolling():創(chuàng)建 ClientLongPolling 線程執(zhí)行定時任務(wù);
  • MD5Util.getClientMd5Map():計算 MD5 值缎患;
  • ConfigServletInner.doPollingConfig():執(zhí)行長輪詢請求慕的;

3.3 Nacos 服務(wù)器配置變更的事件監(jiān)聽

Nacos 服務(wù)器上的配置發(fā)生變更后,發(fā)布一個 LocalDataChangeEvent 事件挤渔;

Subscriber.onEvent() :監(jiān)聽 LocalDataChangeEvent 事件(1.3.2 版本后)肮街;

  • DataChangeTask.run():根據(jù) groupKey 返回配置;
  • ConfigExecutor.executeLongPolling():通過線程池執(zhí)行 DataChangeTask 任務(wù)判导;

原文鏈接:
https://developer.51cto.com/article/713995.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末嫉父,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子眼刃,更是在濱河造成了極大的恐慌绕辖,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件擂红,死亡現(xiàn)場離奇詭異仪际,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)昵骤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進(jìn)店門树碱,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人变秦,你說我怎么就攤上這事成榜。” “怎么了蹦玫?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵赎婚,是天一觀的道長刘绣。 經(jīng)常有香客問我,道長惑淳,這世上最難降的妖魔是什么额港? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮歧焦,結(jié)果婚禮上移斩,老公的妹妹穿的比我還像新娘。我一直安慰自己绢馍,他們只是感情好向瓷,可當(dāng)我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著舰涌,像睡著了一般猖任。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上瓷耙,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天朱躺,我揣著相機(jī)與錄音,去河邊找鬼搁痛。 笑死长搀,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的鸡典。 我是一名探鬼主播源请,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼彻况!你這毒婦竟也來了谁尸?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤纽甘,失蹤者是張志新(化名)和其女友劉穎良蛮,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體贷腕,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡背镇,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了泽裳。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瞒斩。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖涮总,靈堂內(nèi)的尸體忽然破棺而出胸囱,到底是詐尸還是另有隱情,我是刑警寧澤瀑梗,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布烹笔,位于F島的核電站裳扯,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏谤职。R本人自食惡果不足惜饰豺,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望允蜈。 院中可真熱鬧冤吨,春花似錦、人聲如沸饶套。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽妓蛮。三九已至怠李,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蛤克,已是汗流浹背捺癞。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留构挤,地道東北人翘簇。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像儿倒,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子呜笑,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內(nèi)容