要分析Nacos源碼僻他,好歹我們也通過源碼啟動起來,這樣也方便我們debug代碼腊尚。
注:nacos1.1.3
文章篇幅較長吨拗,一定要有耐心鸠匀;如果有疑問歡迎咨詢討論
1.啟動服務(wù)
源碼下載好了根據(jù)我下面的步驟先啟動起來再說:
注:我們配置中心按照mysql存儲配置冠胯,如果用默認(rèn)derby的話耀态,直接按照第4步修改啟動即可
1.找到config模塊中-resource/META-INF/nacos-db.sql
2.mysql數(shù)據(jù)庫創(chuàng)建nacos庫蔗蹋,然后執(zhí)行上面的nacos-db.sql
3.修改 console模塊的application.properties,加入如下內(nèi)容:spring.datasource.platform=mysql
db.num=1
db.url.0=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true
db.user=root
db.password=root
- 啟動參數(shù)添加
-Dnacos.standalone=true
表示單機啟動- 本地訪問127.0.0.1:8848/nacos/index.html就好了璧亮,默認(rèn)賬號密碼都是nacos
然后我們就看到這樣一個頁面:
其實這個頁面的對應(yīng)的代碼就是我們的console模塊,
模塊也很簡單刚照,就是基于Spring-Security做校驗伐庭,然后對于這個頁面做的一些CRUD
如下:
2.配置中心解析
上面說了那么多的啟動內(nèi)容辫塌,終于到了我們這篇文章核心解析點了活鹰。
????簡單的提一下我們服務(wù)端的配置管理方式哈恰,服務(wù)端對于config除了基本的配置存儲;另外還有一個歷史存儲志群,每一次修改都有數(shù)據(jù)存儲着绷,UI界面也是可以查看的,還可以打標(biāo)锌云。解讀源碼前先要對于數(shù)據(jù)模型有個概念荠医,如下:
官網(wǎng)對于模型的描述:
腦袋里一定要構(gòu)建這兩個模型,這兩個模型第一個構(gòu)建數(shù)據(jù)模型的key,另外一個構(gòu)建具體的數(shù)據(jù)內(nèi)容彬向;配置中心的key主要是DataId
????Nacos 數(shù)據(jù)模型 Key 由三元組唯一確定, Namespace默認(rèn)是空串豫喧,公共命名空間(public),分組默認(rèn)是 DEFAULT_GROUP幢泼。
????配置領(lǐng)域模型圍繞配置紧显,主要有兩個關(guān)聯(lián)的實體,一個是配置變更歷史缕棵,一個是服務(wù)標(biāo)簽(用于打標(biāo)分類孵班,方便索引),由 ID 關(guān)聯(lián)招驴。
根據(jù)官方的例子來看配置中心相關(guān)內(nèi)容
public class ConfigExample {
public static void main(String[] args) throws NacosException, InterruptedException {
String serverAddr = "localhost";
String dataId = "dubbo.properties";
String group = "DEFAULT_GROUP";
String namespace = "b1092a4a-3b8d-4e33-8874-55cee3839c1f";
Properties properties = new Properties();
properties.put(PropertyKeyConst.NAMESPACE, namespace);
properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
//這里創(chuàng)建了配置服務(wù)篙程,個人猜想這里應(yīng)該是把該初始化的線程服務(wù)等你都啟動好了
ConfigService configService = NacosFactory.createConfigService(properties);
//根據(jù)dataId,group獲取配置信息
String content = configService.getConfig(dataId, group, 5000);
System.out.println(content);
//添加監(jiān)聽器
configService.addListener(dataId, group, new Listener() {
@Override
public void receiveConfigInfo(String configInfo) {
System.out.println("receive:" + configInfo);
}
@Override
public Executor getExecutor() {
return null;
}
});
boolean isPublishOk = configService.publishConfig(dataId, group, "content");
System.out.println(isPublishOk);
Thread.sleep(3000);
content = configService.getConfig(dataId, group, 5000);
System.out.println(content);
boolean isRemoveOk = configService.removeConfig(dataId, group);
System.out.println(isRemoveOk);
Thread.sleep(3000);
content = configService.getConfig(dataId, group, 5000);
System.out.println(content);
Thread.sleep(300000);
}
}
2.1 創(chuàng)建ConfigService
先理解ConfigSerice中干了些什么?
//1.通過源碼可以發(fā)現(xiàn)這里是 NacosConfigService
ConfigService configService = NacosFactory.createConfigService(properties);
//2.如下别厘,這里調(diào)用NacosConfigService的構(gòu)造方法
public static ConfigService createConfigService(Properties properties) throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
ConfigService vendorImpl = (ConfigService)constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(-400, e.getMessage());
}
}
//3.NacosConfigService的構(gòu)造方法如下
public NacosConfigService(Properties properties) throws NacosException {
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
encode = Constants.ENCODE;
} else {
encode = encodeTmp.trim();
}
String namespaceTmp = properties.getProperty(PropertyKeyConst.NAMESPACE);
if (StringUtils.isBlank(namespaceTmp)) {
namespace = TenantUtil.getUserTenant();
properties.put(PropertyKeyConst.NAMESPACE, namespace);
} else {
namespace = namespaceTmp;
properties.put(PropertyKeyConst.NAMESPACE, namespace);
}
//這里創(chuàng)建代理連接服務(wù)器
agent = new ServerHttpAgent(properties);
//這個其實是針對endpoint設(shè)置才會起作用虱饿,這里簡單說一下;
//線程異步的通過nameServer命名服務(wù)獲取serverList
agent.start();
//這里就是客戶端主要后臺工作
//configFilterChainManager就是攔截器管理器触趴,持有所有攔截器氮发,我們在客戶端可以配置相應(yīng)的攔截器
worker = new ClientWorker(agent, configFilterChainManager);
}
我們繼續(xù)往下看,看這個ClientWorker主要干了些什么冗懦?
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
// Initialize the timeout parameter
init(properties);
//初始化一個客戶端工作線程池爽冕,可以忽略展示不看
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
//通過下面的命名longPolling,其實就有點來頭了披蕉;后面我們分析颈畸,這也是初識話一個線程池
executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setDaemon(true);
return t;
}
});
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
//簡單的通過名稱看,這里就是檢查配置没讲,這是一個定時任務(wù)眯娱,10ms會執(zhí)行一次
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
//5.這里分批處理任務(wù),檢查配置信息爬凑,并更新數(shù)據(jù)信息徙缴,暫時先提到這里,后面具體分析贰谣;
public void checkConfigInfo() {
// 分任務(wù) 娜搂; cacheMap是一個全局變量和我們添加的Listener有關(guān)迁霎;
int listenerSize = cacheMap.get().size();
// 向上取整為批數(shù) (ParamUtil.getPerTaskConfigSize()默認(rèn)數(shù)量3000)
int longingTaskCount = (int)Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int)currentLongingTaskCount; i < longingTaskCount; i++) {
// 要判斷任務(wù)是否在執(zhí)行 這塊需要好好想想吱抚。 任務(wù)列表現(xiàn)在是無序的。變化過程可能有問題
executorService.execute(new LongPullingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
總結(jié)一下創(chuàng)建NacosConfigServie做了些什么考廉?
- 1.初始化我們配置properties秘豹,解析我們一些配置參數(shù),創(chuàng)建我們NacosCofigService服務(wù)用于我們客戶端執(zhí)行操作
- 2.創(chuàng)建http代理類昌粤,如果我們基于endPoint的命名服務(wù)獲取服務(wù)列表既绕,會有定時線程跑獲取serverList這里不展開啄刹,可以自己閱讀源碼
- 3.初始化了兩個線程池,一個線程池executor(單線程池)10ms定時執(zhí)行一次檢查配置信息凄贩,檢查配置如果要執(zhí)行又通過另外一個線程池executorService來執(zhí)行
- 4.具體執(zhí)行又要通過cacheMap(主要功能就是監(jiān)聽器存儲者)的數(shù)量來決定分幾批來執(zhí)行誓军,檢查配置后面在詳細(xì)講解(和長輪訓(xùn)有關(guān)哦)
2.2 獲取配置信息getConfig
下面這一句簡單的代碼,具體有發(fā)生了什么了疲扎,下面我們娓娓道來昵时;
String content = configService.getConfig(dataId, group, 5000);
2.2.1 客戶端分析
//直接點擊進源碼,來到核心點NacosConfigSercie這個方法
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
group = null2defaultGroup(group);
ParamUtils.checkKeyParam(dataId, group);//判斷不能為空
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setTenant(tenant);
cr.setGroup(group);
// 優(yōu)先使用本地配置 (這個就是客戶端本地磁盤寫的文件)
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
if (content != null) {
LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),
dataId, group, tenant, ContentUtils.truncateContent(content));
cr.setContent(content);
//獲取到配置信息后執(zhí)行過濾器過濾
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
try {
//通過遠(yuǎn)程server獲取配置內(nèi)容椒丧,我們主要分析這里
content = worker.getServerConfig(dataId, group, tenant, timeoutMs);
cr.setContent(content);
//獲取到配置信息后執(zhí)行過濾器過濾
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
//直接返回
return content;
} catch (NacosException ioe) {
if (NacosException.NO_RIGHT == ioe.getErrCode()) {
throw ioe;
}
LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",
agent.getName(), dataId, group, tenant, ioe.toString());
}
LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),
dataId, group, tenant, ContentUtils.truncateContent(content));
content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
通過上面獲取代碼發(fā)現(xiàn)壹甥,這里委托給ClientWorker去實現(xiàn)遠(yuǎn)程配置信心的拉取,繼續(xù)看源碼壶熏;
public String getServerConfig(String dataId, String group, String tenant, long readTimeout)
throws NacosException {
if (StringUtils.isBlank(group)) {
group = Constants.DEFAULT_GROUP;
}
HttpResult result = null;
try {
List<String> params = null;
if (StringUtils.isBlank(tenant)) {
params = Arrays.asList("dataId", dataId, "group", group);
} else {
params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant);
}
//代理類這里就是我們最開始初始化 new MetricsHttpAgent(new ServerHttpAgent(properties))的代理句柠;這里的路徑是/configs;記住這是要去服務(wù)端看的
result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
} catch (IOException e) {
String message = String.format(
"[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(),
dataId, group, tenant);
LOGGER.error(message, e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
switch (result.code) {
case HttpURLConnection.HTTP_OK:
//沒開啟本地緩存會存儲快照
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
return result.content;
case HttpURLConnection.HTTP_NOT_FOUND:
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
return null;
}
}
其實我們客戶端分析到這里就差不多了棒假,再看一下遠(yuǎn)程怎么獲取業(yè)務(wù)就好了溯职;不過我還是想看一下,在集群模式下帽哑,我們客戶端怎么請求 哪一個服務(wù)缸榄,請求失敗了又怎么處理的;所以我在往下走了一步祝拯。
@Override
public HttpResult httpGet(String path, List<String> headers, List<String> paramValues, String encoding,
long readTimeoutMs) throws IOException {
//這個就是請求超時時間甚带,下面一個do循環(huán),超出這個時間則推出遠(yuǎn)程請求
final long endTime = System.currentTimeMillis() + readTimeoutMs;
final boolean isSSL = false;
//這里就是我們需要請求的遠(yuǎn)程url佳头,這里其實是按照權(quán)重選擇的一個鹰贵,特別是在集群中的時候
String currentServerAddr = serverListMgr.getCurrentServerAddr();
int maxRetry = this.maxRetry;
do {
try {
List<String> newHeaders = getSpasHeaders(paramValues);
if (headers != null) {
newHeaders.addAll(headers);
}
//發(fā)起請求
HttpResult result = HttpSimpleClient.httpGet(
getUrl(currentServerAddr, path), newHeaders, paramValues, encoding,
readTimeoutMs, isSSL);
if (result.code == HttpURLConnection.HTTP_INTERNAL_ERROR
|| result.code == HttpURLConnection.HTTP_BAD_GATEWAY
|| result.code == HttpURLConnection.HTTP_UNAVAILABLE) {
LOGGER.error("[NACOS ConnectException] currentServerAddr: {}, httpCode: {}",
serverListMgr.getCurrentServerAddr(), result.code);
} else {
//有可能有服務(wù)請求失敗,會更新為最新的遠(yuǎn)程server地址
// Update the currently available server addr
serverListMgr.updateCurrentServerAddr(currentServerAddr);
return result;
}
} catch (ConnectException ce) {
LOGGER.error("[NACOS ConnectException httpGet] currentServerAddr:{}, err : {}", serverListMgr.getCurrentServerAddr(), ce.getMessage());
} catch (SocketTimeoutException stoe) {
LOGGER.error("[NACOS SocketTimeoutException httpGet] currentServerAddr:{}康嘉, err : {}", serverListMgr.getCurrentServerAddr(), stoe.getMessage());
} catch (IOException ioe) {
LOGGER.error("[NACOS IOException httpGet] currentServerAddr: " + serverListMgr.getCurrentServerAddr(), ioe);
throw ioe;
}
if (serverListMgr.getIterator().hasNext()) {
currentServerAddr = serverListMgr.getIterator().next();
} else {
maxRetry--;
if (maxRetry < 0) {
throw new ConnectException("[NACOS HTTP-GET] The maximum number of tolerable server reconnection errors has been reached");
}
//serverlist;請求失敗的情況下這個list實現(xiàn)了Iterator功能碉输,通過這個來獲取下一個服務(wù)
serverListMgr.refreshCurrentServerAddr();
}
} while (System.currentTimeMillis() <= endTime);
LOGGER.error("no available server");
throw new ConnectException("no available server");
}
客戶端我們請求的流程就先到這里了,也先來一個小總結(jié):
- 1.先通過本地緩存文件獲取亭珍,如果存在則直接通過本地文件拉取
- 2.如果本地文件沒有則通過遠(yuǎn)程服務(wù)拉取敷钾,如果還是沒有在本地緩存沒有開啟的情況下通過本地快照文件拉取
- 3.我們在請求遠(yuǎn)程服務(wù)端的時候會選擇循環(huán)用某個 服務(wù)請求,其中請求失敗會換一個連接請求肄梨;在超時時間內(nèi)沒有結(jié)果會直接返回
2.2.2 服務(wù)器處理配置請求
如下就是一個簡單的get請求
@GetMapping
public void getConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam("dataId") String dataId, @RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY)
String tenant,
@RequestParam(value = "tag", required = false) String tag)
throws IOException, ServletException, NacosException {
// check params
ParamUtils.checkParam(dataId, group, "datumId", "content");
ParamUtils.checkParam(tag);
final String clientIp = RequestUtil.getRemoteIp(request);
inner.doGetConfig(request, response, dataId, group, tenant, tag, clientIp);
}
服務(wù)端真正處理的實現(xiàn)來了:
/**
* 同步配置獲取接口
*/
public String doGetConfig(HttpServletRequest request, HttpServletResponse response, String dataId, String group,
String tenant, String tag, String clientIp) throws IOException, ServletException {
//通過dataId阻荒,group,tenant拼接為字符串作為group key
final String groupKey = GroupKey2.getKey(dataId, group, tenant);
String autoTag = request.getHeader("Vipserver-Tag");
String requestIpApp = RequestUtil.getAppName(request);
//這是nacos自己實現(xiàn)的自旋鎖众羡,超級簡單的鎖
int lockResult = tryConfigReadLock(groupKey);
final String requestIp = RequestUtil.getRemoteIp(request);
boolean isBeta = false;
if (lockResult > 0) {
// 侨赡。。。會判斷是否開啟beta羊壹,tag走不同的邏輯感覺一大對重復(fù)代碼,這里提取了部分代碼
//一般沒有開啟beta,也沒有tag走這里
md5 = cacheItem.getMd5();
lastModified = cacheItem.getLastModifiedTs();
//這里就是單機模式蓖宦,并且沒有使用mysql才會從derby中獲取
if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
configInfoBase = persistService.findConfigInfo(dataId, group, tenant);
} else {
//我們這種恰好是這種,單機+mysql所以會通過磁盤獲取油猫,是不是會很怪稠茂,為啥mysql還要從文件中讀取,那么什么時候會寫呢情妖?是不是好多疑問主慰?
file = DiskUtil.targetFile(dataId, group, tenant);
}
if (configInfoBase == null && fileNotExist(file)) {
// FIXME CacheItem
// 不存在了無法簡單的計算推送delayed,這里簡單的記做-1
ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1,
ConfigTraceService.PULL_EVENT_NOTFOUND, -1, requestIp);
// pullLog.info("[client-get] clientIp={}, {},
// no data",
// new Object[]{clientIp, groupKey});
//而且文件沒有直接返回文件不存在鲫售,why共螺?小朋友是不是一大堆問好?情竹?藐不??別著急秦效;后面會解析
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
response.getWriter().println("config data not exist");
return HttpServletResponse.SC_NOT_FOUND + "";
}
//文件存在的話會走這里的內(nèi)容雏蛮;返回內(nèi)容以及內(nèi)容的md5,最后一次修改時間等阱州,
response.setHeader(Constants.CONTENT_MD5, md5);
/**
* 禁用緩存
*/
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
response.setDateHeader("Last-Modified", lastModified);
} else {
fis = new FileInputStream(file);
response.setDateHeader("Last-Modified", file.lastModified());
}
if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
out = response.getWriter();
out.print(configInfoBase.getContent());
out.flush();
out.close();
} else {
fis.getChannel().transferTo(0L, fis.getChannel().size(),
Channels.newChannel(response.getOutputStream()));
}
LogUtil.pullCheckLog.warn("{}|{}|{}|{}", groupKey, requestIp, md5, TimeUtils.getCurrentTimeStr());
final long delayed = System.currentTimeMillis() - lastModified;
// TODO distinguish pull-get && push-get
// 否則無法直接把delayed作為推送延時的依據(jù)挑秉,因為主動get請求的delayed值都很大,發(fā)布事件這里就是記錄了一個traceLog
ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, lastModified,
ConfigTraceService.PULL_EVENT_OK, delayed,
requestIp);
苔货。犀概。。
}else if (lockResult == 0) { //獲取鎖失敗
// FIXME CacheItem 不存在了無法簡單的計算推送delayed夜惭,這里簡單的記做-1
ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1,
ConfigTraceService.PULL_EVENT_NOTFOUND, -1, requestIp);
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
response.getWriter().println("config data not exist");
return HttpServletResponse.SC_NOT_FOUND + "";
} else {
pullLog.info("[client-get] clientIp={}, {}, get data during dump", clientIp, groupKey);
response.setStatus(HttpServletResponse.SC_CONFLICT);
response.getWriter().println("requested file is being modified, please try later.");
return HttpServletResponse.SC_CONFLICT + "";
}
return HttpServletResponse.SC_OK + "";
}
服務(wù)端處理獲取配置請求總結(jié)一下:
- 1.加鎖獲取內(nèi)存中的緩存信息姻灶,根據(jù)是否是beta,是否含有tag等走下面的邏輯
- 2.判斷單機+不是mysql;讀取數(shù)據(jù)庫數(shù)據(jù)诈茧,否則讀取本地文件緩存數(shù)據(jù)是否存在
- 3.不存在直接返回文件不存在響應(yīng)产喉,并記錄日志,否則返回內(nèi)容信息
當(dāng)時看到這個有點懵逼了敢会。曾沈。為啥有mysql不讀,反而去讀一個磁盤鸥昏?磁盤的數(shù)據(jù)又是什么時候?qū)懙娜悖縲hy?感覺頓時懷疑人生了互广,別著急敛腌;后面會解析
先簡單的個人分析下:
除了單機+非mysql才查詢sql;而且nacos默認(rèn)的配置存儲默認(rèn)是derby惫皱,還要一個就是mysql的實現(xiàn)像樊;那么結(jié)論就是:
- 只有單機+derby存儲才會查sql;其他都查sql旅敷,而derby又是nacos內(nèi)置數(shù)據(jù)庫生棍,存儲在本地文件中,說白了就是本地文件媳谁;
那么這里獲取的信息其實就是本地文件涂滴,就算是mysql也不會從遠(yuǎn)程拉取服務(wù),降低遠(yuǎn)程請求消耗晴音;而且在集群的情況下肯定是本地文件來拉取的柔纵; 個人觀點理解,有誤請歡迎批評指正
上面說了那么多锤躁,其實都是比較簡單的搁料;就是簡單的獲取請求,那么在更新配置后系羞,客戶端的配置怎么更新呢郭计,是客戶端主動拉取,還是服務(wù)端推送呢椒振?這個才是核心關(guān)鍵
2.3 配置更新
我們可以通過某一個客戶端更新配置昭伸,或者UI界面更新配置;
客戶端
boolean isPublishOk = configService.publishConfig(dataId, group, "content");
客戶端這個請求流程和獲取配置一樣澎迎;只是發(fā)起一個Post請求庐杨;
重點看服務(wù)端的配置處理ConfigController
@PostMapping
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam("dataId") String dataId, @RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY)
String tenant,
@RequestParam("content") String content,
@RequestParam(value = "tag", required = false) String tag,
@RequestParam(value = "appName", required = false) String appName,
@RequestParam(value = "src_user", required = false) String srcUser,
@RequestParam(value = "config_tags", required = false) String configTags,
@RequestParam(value = "desc", required = false) String desc,
@RequestParam(value = "use", required = false) String use,
@RequestParam(value = "effect", required = false) String effect,
@RequestParam(value = "type", required = false) String type,
@RequestParam(value = "schema", required = false) String schema)
throws NacosException {
final String srcIp = RequestUtil.getRemoteIp(request);
String requestIpApp = RequestUtil.getAppName(request);
ParamUtils.checkParam(dataId, group, "datumId", content);
ParamUtils.checkParam(tag);
Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);
//去掉了一些不重要檢查
if (AggrWhitelist.isAggrDataId(dataId)) {
log.warn("[aggr-conflict] {} attemp to publish single data, {}, {}",
RequestUtil.getRemoteIp(request), dataId, group);
throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");
}
final Timestamp time = TimeUtils.getCurrentTime();
String betaIps = request.getHeader("betaIps");
ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
if (StringUtils.isBlank(betaIps)) {
if (StringUtils.isBlank(tag)) {
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
} else {
//這里就是簡單的 數(shù)據(jù)庫持久化(derby/mysql)
persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
//主要是這個事件觸發(fā)器,
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
}
} else { // beta publish
persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
}
ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(),
LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
return true;
}
重點就是事件觸發(fā):
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
這個事件觸發(fā)會把監(jiān)聽ConfigDataChangeEvent事件的監(jiān)聽器執(zhí)行了
/**
* fire event, notify listeners.
*/
static public void fireEvent(Event event) {
if (null == event) {
throw new IllegalArgumentException();
}
//遍歷執(zhí)行即可夹供,遍歷的是直接感興趣的listerner
for (AbstractEventListener listener : getEntry(event.getClass()).listeners) {
try {
listener.onEvent(event);
} catch (Exception e) {
log.error(e.toString(), e);
}
}
}
對于AbstractEventListener 我們只有兩個實現(xiàn)辑莫;
- AsyncNotifyService:通過名字就是一個異步通知服務(wù)
這個實現(xiàn)是監(jiān)聽的ConfigDataChangeEvent;所以觸發(fā)這個 - LongPollingService:長輪訓(xùn)服務(wù)罩引,有點意思各吨,
不過這個感興趣的是LocalDataChangeEvent,所以這個不會觸發(fā)哦
//這個是AysncNotifyServive實現(xiàn)的接口袁铐,而且創(chuàng)建的時會自動添加到ConfigDataChangeEvent監(jiān)聽列表中
@Override
public List<Class<? extends Event>> interest() {
List<Class<? extends Event>> types = new ArrayList<Class<? extends Event>>();
// 觸發(fā)配置變更同步通知
types.add(ConfigDataChangeEvent.class);
return types;
}
所以我們先分析到這里會觸發(fā) AysncNotifyService監(jiān)聽器揭蜒,下面繼續(xù)看
2.3.1 AysncNotifyService監(jiān)聽器
AysncNotifyService的事件觸發(fā)后執(zhí)行的邏輯,
@Override
public void onEvent(Event event) {
// 并發(fā)產(chǎn)生 ConfigDataChangeEvent
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = evt.group;
String tenant = evt.tenant;
String tag = evt.tag;
List<?> ipList = serverListService.getServerList();
// 其實這里任何類型隊列都可以
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
//這里會根據(jù)server集群的個數(shù)添加幾個task
for (int i = 0; i < ipList.size(); i++) {
//這里記得看一下url構(gòu)成會有 /communication/dataChange
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String) ipList.get(i), evt.isBeta));
}
//線程異步執(zhí)行剔桨,AsyncTask任務(wù)
EXECUTOR.execute(new AsyncTask(httpclient, queue));
}
}
具體異步任務(wù)執(zhí)行邏輯
class AsyncTask implements Runnable {
public AsyncTask(CloseableHttpAsyncClient httpclient, Queue<NotifySingleTask> queue) {
this.httpclient = httpclient;
this.queue = queue;
}
@Override
public void run() {
executeAsyncInvoke();
}
private void executeAsyncInvoke() {
while (!queue.isEmpty()) {
NotifySingleTask task = queue.poll();
String targetIp = task.getTargetIP();
if (serverListService.getServerList().contains(
targetIp)) {
// 啟動健康檢查且有不監(jiān)控的ip則直接把放到通知隊列屉更,否則通知
if (serverListService.isHealthCheck()
&& ServerListService.getServerListUnhealth().contains(targetIp)) {
// target ip 不健康,則放入通知列表中
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
task.getLastModified(),
LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target);
// get delay time and set fail count to the task,這會重試
asyncTaskExecute(task);
} else {
HttpGet request = new HttpGet(task.url);
request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,
String.valueOf(task.getLastModified()));
request.setHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, LOCAL_IP);
if (task.isBeta) {
request.setHeader("isBeta", "true");
}
//發(fā)送通知洒缀,通知的是各個服務(wù)這個接口瑰谜,communication/dataChange
//這里注意有一個回調(diào)函數(shù)欺冀,作用很簡單如果服務(wù)端通知失敗,會做一個次記錄萨脑,另外這里設(shè)置了有一個重試最多次數(shù)隐轩,還有時長組將增大,避免無限重試增大服務(wù)器開銷
httpclient.execute(request, new AsyncNotifyCallBack(httpclient, task));
}
}
}
}
private Queue<NotifySingleTask> queue;
private CloseableHttpAsyncClient httpclient;
}
異步通知其實又發(fā)起來一個廣播通知/ communication/dataChange接口調(diào)用渤早,所以輾轉(zhuǎn)到CommnicationController
了职车,這也是集群下通知所有的服務(wù)器的流程
/**
* 通知配置信息改變
*/
@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request,
@RequestParam("dataId") String dataId, @RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY)
String tenant,
@RequestParam(value = "tag", required = false) String tag) {
dataId = dataId.trim();
group = group.trim();
String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
String isBetaStr = request.getHeader("isBeta");
if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
//這里就是主要的執(zhí)行邏輯了,主要是異步處理
dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
} else {
dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
}
//返回true表示執(zhí)行成功了
return true;
}
最后我們進源碼發(fā)現(xiàn),只是投建了一個DumpTask放到taskManager的隊列中
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
boolean isBeta) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
//這里使用的是dumpTaskMgr鹊杖,所以默認(rèn)的處理器就是DumpProcessor
dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
}
/**
* 用于處理一定要執(zhí)行成功的任務(wù) 單線程的方式處理任務(wù)悴灵,保證任務(wù)一定被成功處理
*
* @author huali
*/
public final class TaskManager implements TaskManagerMBean {
/**
* 將任務(wù)加入到任務(wù)Map中
*
* @param type
* @param task
*/
public void addTask(String type, AbstractTask task) {
this.lock.lock();
try {
//放入隊列中
AbstractTask oldTask = tasks.put(type, task);
MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
if (null != oldTask) {
task.merge(oldTask);
}
} finally {
this.lock.unlock();
}
}
}
上面可以發(fā)現(xiàn)放入隊列中就直接返回結(jié)果了;這里放入隊列了不用猜就知道肯定有一個后臺線程來執(zhí)行隊列骂蓖;
TaskManager
//服務(wù)啟動會創(chuàng)建這個后臺線程跑服務(wù)
class ProcessRunnable implements Runnable {
@Override
public void run() {
while (!TaskManager.this.closed.get()) {
try {
//直接100ms一次的執(zhí)行一次
Thread.sleep(100);
TaskManager.this.process();
} catch (Throwable e) {
}
}
}
}
protected void process() {
for (Map.Entry<String, AbstractTask> entry : this.tasks.entrySet()) {
AbstractTask task = null;
this.lock.lock();
try {
// 獲取任務(wù)
task = entry.getValue();
if (null != task) {
if (!task.shouldProcess()) {
// 任務(wù)當(dāng)前不需要被執(zhí)行积瞒,直接跳過
continue;
}
// 先將任務(wù)從任務(wù)Map中刪除
this.tasks.remove(entry.getKey());
MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
}
} finally {
this.lock.unlock();
}
if (null != task) {
// 獲取任務(wù)處理器
TaskProcessor processor = this.taskProcessors.get(entry.getKey());
if (null == processor) {
// 如果沒有根據(jù)任務(wù)類型設(shè)置的處理器,使用默認(rèn)處理器
processor = this.getDefaultTaskProcessor();
}
if (null != processor) {
boolean result = false;
try {
// 處理任務(wù)登下,
result = processor.process(entry.getKey(), task);
} catch (Throwable t) {
log.error("task_fail", "處理task失敗", t);
}
if (!result) {
// 任務(wù)處理失敗赡鲜,設(shè)置處理時間
task.setLastProcessTime(System.currentTimeMillis());
// 失敗了,將任務(wù)重新加入到任務(wù)Map中庐船,下次再次存儲
this.addTask(entry.getKey(), task);
}
}
}
}
if (tasks.isEmpty()) {
this.lock.lock();
try {
this.notEmpty.signalAll();
} finally {
this.lock.unlock();
}
}
}
上面的任務(wù)處理又會交給DumpProcessor去處理
@Override
public boolean process(String taskType, AbstractTask task) {
DumpTask dumpTask = (DumpTask)task;
String[] pair = GroupKey2.parseKey(dumpTask.groupKey);
String dataId = pair[0];
String group = pair[1];
String tenant = pair[2];
long lastModified = dumpTask.lastModified;
String handleIp = dumpTask.handleIp;
boolean isBeta = dumpTask.isBeta;
String tag = dumpTask.tag;
//省略了一些判斷邏輯银酬,這里是主要的數(shù)據(jù)處理
if (StringUtils.isBlank(tag)) {
//通過數(shù)據(jù)庫獲取配置信息,這個數(shù)據(jù)庫肯定是最新的
ConfigInfo cf = dumpService.persistService.findConfigInfo(dataId, group, tenant);
//重新加載聚合白名單,先不管
if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) {
if (null != cf) {
AggrWhitelist.load(cf.getContent());
} else {
AggrWhitelist.load(null);
}
}
//加載客戶端ip白名單,也不管
if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
if (null != cf) {
ClientIpWhiteList.load(cf.getContent());
} else {
ClientIpWhiteList.load(null);
}
}
if (dataId.equals(SwitchService.SWITCH_META_DATAID)) {
if (null != cf) {
SwitchService.load(cf.getContent());
} else {
SwitchService.load(null);
}
}
boolean result;
if (null != cf) {
//主要是這一步筐钟,這一步主要做了些什么內(nèi)容
//1.更新內(nèi)存中的緩存
//2.dump磁盤更新
result = ConfigService.dump(dataId, group, tenant, cf.getContent(), lastModified);
if (result) {
ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,
ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
cf.getContent().length());
}
} else {
result = ConfigService.remove(dataId, group, tenant);
if (result) {
ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,
ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
}
}
return result;
}
上面使用ConfigService.dump執(zhí)行的任務(wù)才是重點揩瞪,這里一共做了幾件事
- 如果不是單機,或者使用了mysql篓冲,那么更新本地文件緩存
- 構(gòu)建CacheItem緩存在內(nèi)存中
- 發(fā)布一個LocalDataChangeEvent事件李破;
/**
* 保存配置文件,并緩存md5.
*/
static public boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
makeSure(groupKey);
final int lockResult = tryWriteLock(groupKey);
assert (lockResult != 0);
if (lockResult < 0) {
dumpLog.warn("[dump-error] write lock failed. {}", groupKey);
return false;
}
try {
final String md5 = MD5.getInstance().getMD5String(content);
if (md5.equals(ConfigService.getContentMd5(groupKey))) {
dumpLog.warn(
"[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
+ "lastModifiedNew={}",
groupKey, md5, ConfigService.getLastModifiedTs(groupKey), lastModifiedTs);
} else if (!STANDALONE_MODE || PropertyUtil.isStandaloneUseMysql()) {
//其實這里也是解密了壹将;會把更新的內(nèi)容使用本地文件緩存起來嗤攻;
//這也是我們在get配置信息的時候直接從文件獲取的地方
DiskUtil.saveToDisk(dataId, group, tenant, content);
}
updateMd5(groupKey, md5, lastModifiedTs);
return true;
} catch (IOException ioe) {
dumpLog.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);
if (ioe.getMessage() != null) {
String errMsg = ioe.getMessage();
if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN)
|| errMsg.contains(DISK_QUATA_EN)) {
// 磁盤寫滿保護代碼
fatalLog.error("磁盤滿自殺退出", ioe);
System.exit(0);
}
}
return false;
} finally {
releaseWriteLock(groupKey);
}
}
//更新md5
public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
CacheItem cache = makeSure(groupKey);
if (cache.md5 == null || !cache.md5.equals(md5)) {
cache.md5 = md5;
cache.lastModifiedTs = lastModifiedTs;
//這里觸發(fā)LocalDataChangeEvent事件,涉及到我們之前所有的長輪訓(xùn)監(jiān)聽器诽俯,哈哈妇菱,是不是有點通了;
EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey));
}
}
基本上這里可以再次總結(jié)下服務(wù)端配置更新后流程:
- 1.某一個服務(wù)器收到更新請求后暴区;先自己更新本地數(shù)據(jù)庫的數(shù)據(jù)闯团;然后發(fā)布一個ConfigDataChangeEvent事件
- 2.該事件會讓每一個服務(wù)收到某一個配置更改了,然后每一個服務(wù)開始執(zhí)行流程
- 3.每一個服務(wù)會新建一個task任務(wù)放入自己的任務(wù)隊列中
- 4.每一個服務(wù)后臺的線程會從隊列中執(zhí)行該任務(wù)
- 任務(wù)執(zhí)行包含仙粱,如果非單機或者mysql會刷新本地緩存文件房交;這個也是我們前面分析的獲取配置的文件
- 會更新內(nèi)存中CacheItem緩存內(nèi)容信息
- 然后觸發(fā) LocalDataChangeEvent 這個涉及長輪訓(xùn)情況,下面解析
注意一點配置中心數(shù)據(jù)一致性的問題:
- nacos的配置中心在集群條件下配置數(shù)據(jù)依賴于第三方mysql做數(shù)據(jù)庫存儲伐割,因為默認(rèn)的derby是服務(wù)內(nèi)置的存儲候味,難以滿足集群條件共享
- 數(shù)據(jù)變更后刃唤,變更節(jié)點更新mysql后會廣播的消息給其他節(jié)點,失敗后也只會重試幾次白群,多次失敗了就沒有做其他處理了不過是由日志記錄的尚胞;節(jié)點收到廣播消息會添加到自己的隊列里,不斷的處理川抡,失敗了在添加會隊列中即可辐真。而且沒有服務(wù)器沒有其他定時任務(wù)去比較服務(wù)器配置內(nèi)容
所以配置中心應(yīng)該屬于采用去中心化的思想設(shè)計的须尚。
疑問崖堤??耐床?如果服務(wù)器發(fā)布數(shù)據(jù)密幔,其他節(jié)點更新沒有成功怎么辦?肯定有解決方案撩轰,后文揭秘
其實到這里內(nèi)容基本上都差不多了胯甩,內(nèi)容篇幅過長,可以休息下分批閱讀堪嫂;
2.3.2 LongPollingService長輪訓(xùn)服務(wù)
上面我們解析到服務(wù)端配置變化后會觸發(fā)LocalDataChangeEvent事件偎箫,也就是LongPollingService的onEvent方法;那么具體整個流程會是怎么樣的呢皆串;我們得結(jié)合客戶端請求來看淹办;
之前給出一個疑問:配置更新后,我們是客戶端主動去拉還是服務(wù)端推送恶复?
1)客戶端分析
先從客戶端分析怜森,我們還記得我們創(chuàng)建NacosConfigService會開啟后臺線程檢查配置更新;ClientWorker谤牡,先回憶下代碼
public void checkConfigInfo() {
// 分任務(wù)副硅,這里就是監(jiān)聽器的個數(shù)
int listenerSize = cacheMap.get().size();
// 向上取整為批數(shù) (perTaskCofigSize 默認(rèn)是3000)
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
// 要判斷任務(wù)是否在執(zhí)行 這塊需要好好想想。 任務(wù)列表現(xiàn)在是無序的翅萤。變化過程可能有問題
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
第一步先分析這里如果longingTaskCount <=currentLongingTaskCount是不會執(zhí)行LongPollingRunnable的恐疲;
所以需要知道
- currentLongingTaskCount(初始化是0)在執(zhí)行后會更新為longTaskCount的值
- longingTaskCount 是我們本地監(jiān)聽器的數(shù)量除以3000向上取整;所以如果沒有監(jiān)聽器套么,我覺得這里的長輪訓(xùn)根本不會執(zhí)行流纹;而且都是3000個監(jiān)聽器由一個線程來輪訓(xùn);
在看LongPollingRunnable具體執(zhí)行邏輯
class LongPollingRunnable implements Runnable {
private int taskId;
public LongPollingRunnable(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
// check failover config
for (CacheData cacheData : cacheMap.get().values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
//本地配置檢查违诗,這個可以自己查看
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
LOGGER.error("get local config info error", e);
}
}
}
// check server config 這里只返回更改后的dataId漱凝,group,tenant(namespace)具體內(nèi)容還要去重新拉取一次 請求路徑 configs/listener
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
//重新拉取代碼更新
String content = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(content);
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}",
agent.getName(), dataId, group, tenant, cache.getMd5(),
ContentUtils.truncateContent(content));
} catch (NacosException ioe) {
String message = String.format(
"[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
agent.getName(), dataId, group, tenant);
LOGGER.error(message, ioe);
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
//檢查本地MD5,如果更新會觸發(fā)本地監(jiān)聽器
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
//執(zhí)行完了會再次執(zhí)行該流程
executorService.execute(this);
} catch (Throwable e) {
// If the rotation training task is abnormal, the next execution time of the task will be punished
LOGGER.error("longPolling error : ", e);
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
}
2) 服務(wù)端處理輪訓(xùn)
客戶端一次會把少于3000個實例data發(fā)送給服務(wù)端檢查MD5
請求路徑 configs/listener诸迟;實例ConfigController;
/**
* 比較MD5
*/
@PostMapping("/listener")
public void listener(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
String probeModify = request.getParameter("Listening-Configs");
if (StringUtils.isBlank(probeModify)) {
throw new IllegalArgumentException("invalid probeModify");
}
probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
Map<String, String> clientMd5Map;
try {
clientMd5Map = MD5Util.getClientMd5Map(probeModify);
} catch (Throwable e) {
throw new IllegalArgumentException("invalid probeModify");
}
// do long-polling
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
ConfigServletInner執(zhí)行
/**
* 輪詢接口
*/
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
Map<String, String> clientMd5Map, int probeRequestSize)
throws IOException {
// 長輪詢
if (LongPollingService.isSupportLongPolling(request)) {
//重點就是這個方法執(zhí)行了
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}
// else 兼容短輪詢邏輯
List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
// 兼容短輪詢result
String oldResult = MD5Util.compareMd5OldResult(changedGroups);
String newResult = MD5Util.compareMd5ResultString(changedGroups);
茸炒。愕乎。。壁公。
// 禁用緩存
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
return HttpServletResponse.SC_OK + "";
}
LongPollingService這里添加客戶端長輪訓(xùn)任務(wù)感论,有服務(wù)端通過線程池持有
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {
String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
/**
* 提前500ms返回響應(yīng),為避免客戶端超時 @qiaoyi.dingqy 2013.10.22改動 add delay time for LoadBalance
*/
//這里就是服務(wù)端hang住的時間紊册,這里是30s-500ms=29.5s
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
// do nothing but set fix polling timeout
} else {
long start = System.currentTimeMillis();
List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
if (changedGroups.size() > 0) {
generateResponse(req, rsp, changedGroups);
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",
clientMd5Map.size(), probeRequestSize, changedGroups.size());
return;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
}
}
String ip = RequestUtil.getRemoteIp(req);
// 一定要由HTTP線程調(diào)用比肄,否則離開后容器會立即發(fā)送響應(yīng)
final AsyncContext asyncContext = req.startAsync();
// AsyncContext.setTimeout()的超時時間不準(zhǔn),所以只能自己控制
asyncContext.setTimeout(0L);
//這里就是執(zhí)行的延遲timeOut的任務(wù)返回當(dāng)前的長連接結(jié)果
scheduler.execute(
new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
任務(wù)具體內(nèi)容囊陡,就是檢查服務(wù)端的md5是否修改芳绩,修改了就結(jié)果;
class ClientLongPolling implements Runnable {
@Override
public void run() {
asyncTimeoutFuture = scheduler.schedule(new Runnable() {
@Override
public void run() {
try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
/**
* 刪除訂閱關(guān)系
*/
allSubs.remove(ClientLongPolling.this);
if (isFixedPolling()) {
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
(System.currentTimeMillis() - createTime),
"fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
"polling",
clientMd5Map.size(), probeRequestSize);
//MD5比較撞反,
List<String> changedGroups = MD5Util.compareMd5(
(HttpServletRequest)asyncContext.getRequest(),
(HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
if (changedGroups.size() > 0) {
sendResponse(changedGroups);
} else {
sendResponse(null);
}
} else {
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
(System.currentTimeMillis() - createTime),
"timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
"polling",
clientMd5Map.size(), probeRequestSize);
sendResponse(null);
}
} catch (Throwable t) {
LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());
}
}
}, timeoutTime, TimeUnit.MILLISECONDS);
//這里會把當(dāng)前放在隊列中妥色,以備后用
allSubs.add(this);
}
這里長輪訓(xùn)就是這樣實現(xiàn)的,客戶端請求后被服務(wù)端給hang住29.5s后返回具體結(jié)果遏片;但是如果中途有數(shù)據(jù)更改了嘹害,真的會等待29.5后返回結(jié)果嗎;
還記得我們之前數(shù)據(jù)更改了觸發(fā)了一個LocalDataChangeEvent事件嗎吮便,而這個事件正好是LongPollingService來處理
public void onEvent(Event event) {
if (isFixedPolling()) {
// ignore
} else {
//這里處理dataChangeTask
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
}
}
}
最終本地發(fā)送配置變化會直接通過如下方式解決
/**
* 長輪詢訂閱關(guān)系
*/
final Queue<ClientLongPolling> allSubs;
// =================
class DataChangeTask implements Runnable {
@Override
public void run() {
try {
ConfigService.getContentBetaMd5(groupKey);
//遍歷客戶端包含這個group的請求
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
if (clientSub.clientMd5Map.containsKey(groupKey)) {
// 如果beta發(fā)布且不在beta列表直接跳過
if (isBeta && !betaIps.contains(clientSub.ip)) {
continue;
}
// 如果tag發(fā)布且不在tag列表直接跳過
if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
continue;
}
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
iter.remove(); // 刪除訂閱關(guān)系
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
(System.currentTimeMillis() - changeTime),
"in-advance",
RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()),
"polling",
clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
//這里直接拿到在服務(wù)端hang住的請求返回更改的配置groupKey
clientSub.sendResponse(Arrays.asList(groupKey));
}
}
} catch (Throwable t) {
LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause());
}
}
到這里也是基本都是完成這個配置的分析笔呀;不過這里也留給你一個問題:
分析途中有一個判定isFixedPolling()
這個影響和目前是否即可返回結(jié)果還是保持服務(wù)端hang任務(wù);
2.4 服務(wù)啟動數(shù)據(jù)加載
上面我們也是發(fā)現(xiàn)了髓需,集群模式下服務(wù)器之間的數(shù)據(jù)是沒有做數(shù)據(jù)定時檢查的许师,那么啟動的數(shù)據(jù)肯定要從最新的數(shù)據(jù)去獲取,也就是我們的mysql拿授账,然后dump到服務(wù)器本地中枯跑,后面客戶端請求就從本地獲取了。白热。具體我們通過代碼來驗證
@Service
public class DumpService {
@Autowired
private Environment env;
@Autowired
PersistService persistService;
//啟動初始化方法敛助,作為服務(wù)器對于配置文件的處理
@PostConstruct
public void init() {
LogUtil.defaultLog.warn("DumpService start");
//構(gòu)建不同的處理器
DumpProcessor processor = new DumpProcessor(this);
DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this);
DumpAllBetaProcessor dumpAllBetaProcessor = new DumpAllBetaProcessor(this);
DumpAllTagProcessor dumpAllTagProcessor = new DumpAllTagProcessor(this);
dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager");
dumpTaskMgr.setDefaultTaskProcessor(processor);
dumpAllTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpAllTaskManager");
dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor);
Runnable dumpAll = () -> dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());
Runnable dumpAllBeta = () -> dumpAllTaskMgr.addTask(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask());
//清除超過30天的歷史配置
Runnable clearConfigHistory = () -> {
log.warn("clearConfigHistory start");
if (ServerListService.isFirstIp()) {
try {
Timestamp startTime = getBeforeStamp(TimeUtils.getCurrentTime(), 24 * getRetentionDays());
int totalCount = persistService.findConfigHistoryCountByTime(startTime);
if (totalCount > 0) {
int pageSize = 1000;
int removeTime = (totalCount + pageSize - 1) / pageSize;
log.warn("clearConfigHistory, getBeforeStamp:{}, totalCount:{}, pageSize:{}, removeTime:{}",
new Object[] {startTime, totalCount, pageSize, removeTime});
while (removeTime > 0) {
// 分頁刪除,以免批量太大報錯
persistService.removeConfigHistory(startTime, pageSize);
removeTime--;
}
}
} catch (Throwable e) {
log.error("clearConfigHistory error", e);
}
}
};
try {
//啟動的時候屋确,更新本地磁盤為最新的數(shù)據(jù)纳击。
dumpConfigInfo(dumpAllProcessor);
// 更新beta緩存
LogUtil.defaultLog.info("start clear all config-info-beta.");
DiskUtil.clearAllBeta();
if (persistService.isExistTable(BETA_TABLE_NAME)) {
dumpAllBetaProcessor.process(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask());
}
// 更新Tag緩存
LogUtil.defaultLog.info("start clear all config-info-tag.");
DiskUtil.clearAllTag();
if (persistService.isExistTable(TAG_TABLE_NAME)) {
dumpAllTagProcessor.process(DumpAllTagTask.TASK_ID, new DumpAllTagTask());
}
// add to dump aggr
List<ConfigInfoChanged> configList = persistService.findAllAggrGroup();
if (configList != null && !configList.isEmpty()) {
total = configList.size();
List<List<ConfigInfoChanged>> splitList = splitList(configList, INIT_THREAD_COUNT);
for (List<ConfigInfoChanged> list : splitList) {
MergeAllDataWorker work = new MergeAllDataWorker(list);
work.start();
}
log.info("server start, schedule merge end.");
}
} catch (Exception e) {
LogUtil.fatalLog.error(
"Nacos Server did not start because dumpservice bean construction failure :\n" + e.getMessage(),
e.getCause());
throw new RuntimeException(
"Nacos Server did not start because dumpservice bean construction failure :\n" + e.getMessage());
}
//集群條件
if (!STANDALONE_MODE) {
Runnable heartbeat = () -> {
String heartBeatTime = TimeUtils.getCurrentTime().toString();
// write disk
try {
DiskUtil.saveHeartBeatToDisk(heartBeatTime);
} catch (IOException e) {
LogUtil.fatalLog.error("save heartbeat fail" + e.getMessage());
}
};
//周期性心跳檢查,日志記錄心跳時間
TimerTaskService.scheduleWithFixedDelay(heartbeat, 0, 10, TimeUnit.SECONDS);
long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10;
LogUtil.defaultLog.warn("initialDelay:{}", initialDelay);
//周期性任務(wù) 全量dump所有配置信息攻臀,時間間隔是6*60min=6h
TimerTaskService.scheduleWithFixedDelay(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE,
TimeUnit.MINUTES);
TimerTaskService.scheduleWithFixedDelay(dumpAllBeta, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE,
TimeUnit.MINUTES);
}
//定時任務(wù)執(zhí)行清理
TimerTaskService.scheduleWithFixedDelay(clearConfigHistory, 10, 10, TimeUnit.MINUTES);
}
首次dump本地文件
private void dumpConfigInfo(DumpAllProcessor dumpAllProcessor)
throws IOException {
int timeStep = 6;
Boolean isAllDump = true;
// initial dump all
FileInputStream fis = null;
Timestamp heartheatLastStamp = null;
try {
//默認(rèn)我們沒有配置快速啟動焕数,這里可以自己分析下,看起來也是通過心跳文件讀取刨啸,如果最后心跳的時間和當(dāng)前時間不超過6小時就不加載數(shù)據(jù)庫文件做持久化
if (isQuickStart()) { //是否快速啟動堡赔,默認(rèn)不是
File heartbeatFile = DiskUtil.heartBeatFile();
if (heartbeatFile.exists()) {
fis = new FileInputStream(heartbeatFile);
String heartheatTempLast = IoUtils.toString(fis, Constants.ENCODE);
heartheatLastStamp = Timestamp.valueOf(heartheatTempLast);
if (TimeUtils.getCurrentTime().getTime()
- heartheatLastStamp.getTime() < timeStep * 60 * 60 * 1000) {
isAllDump = false;
}
}
}
if (isAllDump) {
//如果啟動全量dump,也是默認(rèn)就直接服務(wù)數(shù)據(jù)庫的數(shù)據(jù)设联,全部給dump存儲在服務(wù)器磁盤中
LogUtil.defaultLog.info("start clear all config-info.");
DiskUtil.clearAll();
dumpAllProcessor.process(DumpAllTask.TASK_ID, new DumpAllTask());
} else {
//如果是快速啟動滿足小于6h善已,那么部分檢查文件md5然后更新
Timestamp beforeTimeStamp = getBeforeStamp(heartheatLastStamp,
timeStep);
DumpChangeProcessor dumpChangeProcessor = new DumpChangeProcessor(
this, beforeTimeStamp, TimeUtils.getCurrentTime());
dumpChangeProcessor.process(DumpChangeTask.TASK_ID, new DumpChangeTask());
Runnable checkMd5Task = () -> {
LogUtil.defaultLog.error("start checkMd5Task");
List<String> diffList = ConfigService.checkMd5();
for (String groupKey : diffList) {
String[] dg = GroupKey.parseKey(groupKey);
String dataId = dg[0];
String group = dg[1];
String tenant = dg[2];
ConfigInfoWrapper configInfo = persistService.queryConfigInfo(dataId, group, tenant);
ConfigService.dumpChange(dataId, group, tenant, configInfo.getContent(),
configInfo.getLastModified());
}
LogUtil.defaultLog.error("end checkMd5Task");
};
TimerTaskService.scheduleWithFixedDelay(checkMd5Task, 0, 12,
TimeUnit.HOURS);
}
} catch (IOException e) {
LogUtil.fatalLog.error("dump config fail" + e.getMessage());
throw e;
} finally {
if (null != fis) {
try {
fis.close();
} catch (IOException e) {
LogUtil.defaultLog.warn("close file failed");
}
}
}
}
從類灼捂,方法的命名也應(yīng)該很容易猜測出來,我們直接看dumpAllProcesso處理流程
6h執(zhí)行一次的全量更新
class DumpAllProcessor implements TaskProcessor {
DumpAllProcessor(DumpService dumpService) {
this.dumpService = dumpService;
this.persistService = dumpService.persistService;
}
@Override
public boolean process(String taskType, AbstractTask task) {
long currentMaxId = persistService.findConfigMaxId();
long lastMaxId = 0;
//基于上次id來查找數(shù)據(jù)换团,優(yōu)化數(shù)據(jù)分頁
while (lastMaxId < currentMaxId) {
//分頁查出配置信息數(shù)據(jù)悉稠,默認(rèn)1000條
Page<PersistService.ConfigInfoWrapper> page = persistService.findAllConfigInfoFragment(lastMaxId,
PAGE_SIZE);
if (page != null && page.getPageItems() != null && !page.getPageItems().isEmpty()) {
for (PersistService.ConfigInfoWrapper cf : page.getPageItems()) {
long id = cf.getId();
lastMaxId = id > lastMaxId ? id : lastMaxId;
if (cf.getDataId().equals(AggrWhitelist.AGGRIDS_METADATA)) {
AggrWhitelist.load(cf.getContent());
}
if (cf.getDataId().equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
ClientIpWhiteList.load(cf.getContent());
}
if (cf.getDataId().equals(SwitchService.SWITCH_META_DATAID)) {
SwitchService.load(cf.getContent());
}
//dump到本地服務(wù)器磁盤上
boolean result = ConfigService.dump(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(),
cf.getLastModified(), cf.getType());
final String content = cf.getContent();
final String md5 = Md5Utils.getMD5(content, Constants.ENCODE);
LogUtil.dumpLog.info("[dump-all-ok] {}, {}, length={}, md5={}",
GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(), content.length(), md5);
}
defaultLog.info("[all-dump] {} / {}", lastMaxId, currentMaxId);
} else {
lastMaxId += PAGE_SIZE;
}
}
return true;
}
static final int PAGE_SIZE = 1000;
final DumpService dumpService;
final PersistService persistService;
}
簡單的總結(jié):
- 1.默認(rèn)會把數(shù)據(jù)庫的配置提取出來緩存到服務(wù)器磁盤文件中
- 2.集群模式下DumpService在啟動初始化的時候啟動周期任務(wù)做心跳日志团滥,主要是為下次啟動是否全量dump做記錄
- 3.集群模式下默認(rèn)6小時執(zhí)行從數(shù)據(jù)庫中執(zhí)行一起全量配置dump到本地磁盤
- 4.周期檢查歷史配置缩宜,超過30天的做刪除
- ...
之前的數(shù)據(jù)性一致性問題:如果數(shù)據(jù)更新的時候更新失敗,會在6h后定時任務(wù)從數(shù)據(jù)庫中拉取數(shù)據(jù)然后更新本地文件氛雪。
2.5 服務(wù)之間健康檢查
???集群中服務(wù)肯定要做一個健康檢查想虎,如果有服務(wù)不可用要做相關(guān)的處理服務(wù)主要看 ServerListService類
???同樣的道理在類初始化會解析集群下的節(jié)點信息卦尊,如果開啟了地址服務(wù)器還會定時從遠(yuǎn)程服務(wù)器更新拿到server列表信息。
@PostConstruct
public void init() {
String envDomainName = System.getenv("address_server_domain");
if (StringUtils.isBlank(envDomainName)) {
domainName = System.getProperty("address.server.domain", "jmenv.tbsite.net");
} else {
domainName = envDomainName;
}
String envAddressPort = System.getenv("address_server_port");
if (StringUtils.isBlank(envAddressPort)) {
addressPort = System.getProperty("address.server.port", "8080");
} else {
addressPort = envAddressPort;
}
addressUrl = System.getProperty("address.server.url",
servletContext.getContextPath() + "/" + RunningConfigUtils.getClusterName());
addressServerUrl = "http://" + domainName + ":" + addressPort + addressUrl;
envIdUrl = "http://" + domainName + ":" + addressPort + "/env";
defaultLog.info("ServerListService address-server port:" + addressPort);
defaultLog.info("ADDRESS_SERVER_URL:" + addressServerUrl);
isHealthCheck = PropertyUtil.isHealthCheck();
maxFailCount = PropertyUtil.getMaxHealthCheckFailCount();
fatalLog.warn("useAddressServer:{}", isUseAddressServer);
GetServerListTask task = new GetServerListTask();
task.run();
if (CollectionUtils.isEmpty(serverList)) {
fatalLog.error("########## cannot get serverlist, so exit.");
throw new RuntimeException("cannot get serverlist, so exit.");
} else {
TimerTaskService.scheduleWithFixedDelay(task, 0L, 5L, TimeUnit.SECONDS);
}
}
在web容器初始化完成后觸發(fā)事件
@Override
public void onApplicationEvent(WebServerInitializedEvent event) {
if (port == 0) {
port = event.getWebServer().getPort();
List<String> newList = new ArrayList<String>();
for (String serverAddrTmp : serverList) {
newList.add(getFormatServerAddr(serverAddrTmp));
}
setServerList(new ArrayList<String>(newList));
}
httpclient.start();
//很容易發(fā)現(xiàn)我們有一個周期任務(wù)5s檢查一次服務(wù)器健康磷醋;
CheckServerHealthTask checkServerHealthTask = new CheckServerHealthTask();
TimerTaskService.scheduleWithFixedDelay(checkServerHealthTask, 0L, 5L, TimeUnit.SECONDS);
}
繼續(xù)看CheckServerHealthTask執(zhí)行邏輯
private void checkServerHealth() {
long startCheckTime = System.currentTimeMillis();
for (String serverIp : serverList) {
//請求其他節(jié)點節(jié)點
// Compatible with old codes,use status.taobao
String url = "http://" + serverIp + servletContext.getContextPath() + Constants.HEALTH_CONTROLLER_PATH;
// 路徑url:"/nacos/health";
HttpGet request = new HttpGet(url);
//我們也有回調(diào)函數(shù)猫牡,如果節(jié)點信息返回成功就從不健康列表移除胡诗,否則需要添加上
httpclient.execute(request, new AsyncCheckServerHealthCallBack(serverIp));
}
long endCheckTime = System.currentTimeMillis();
long cost = endCheckTime - startCheckTime;
defaultLog.debug("checkServerHealth cost: {}", cost);
}
看看其他節(jié)點怎么處理的
@GetMapping
public String getHealth() {
// TODO UP DOWN WARN
StringBuilder sb = new StringBuilder();
//數(shù)據(jù)庫的監(jiān)控 信息
String dbStatus = dataSourceService.getHealth();
//如果有遠(yuǎn)程地址服務(wù)器邓线,還有遠(yuǎn)程地址服務(wù)器是否健康信息
if (dbStatus.contains(heathUpStr) && ServerListService.isAddressServerHealth() && ServerListService
.isInIpList()) {
sb.append(heathUpStr);
} else if (dbStatus.contains(heathWarnStr) && ServerListService.isAddressServerHealth() && ServerListService
.isInIpList()) {
sb.append("WARN:");
sb.append("slave db (").append(dbStatus.split(":")[1]).append(") down. ");
} else {
sb.append("DOWN:");
//對于配置中心有主db判別
if (dbStatus.contains(heathDownStr)) {
sb.append("master db (").append(dbStatus.split(":")[1]).append(") down. ");
}
if (!ServerListService.isAddressServerHealth()) {
sb.append("address server down. ");
}
if (!ServerListService.isInIpList()) {
sb.append("server ip ").append(LOCAL_IP).append(" is not in the serverList of address server. ");
}
}
//返回數(shù)據(jù)
return sb.toString();
}
服務(wù)器節(jié)點間的監(jiān)控信息比較簡單,就周期發(fā)送心跳檢測即可煌恢。
配置中心集群大總結(jié):
- 1.配置中心依賴第三方數(shù)據(jù)庫mysql做存儲
- 數(shù)據(jù)更新數(shù)據(jù)庫骇陈,然后廣播通知其他節(jié)點更新,使用了多個異步操作事件通知完成更新
- 3.所有的客戶端獲取的數(shù)據(jù)都是服務(wù)節(jié)點本地緩存信息獲取的瑰抵,不會去數(shù)據(jù)庫拉數(shù)據(jù)
- 配置中心一致性的問題也是依賴6h一次全量從數(shù)據(jù)庫同步到服務(wù)器磁盤緩存文件中做到的
- 5.客戶端監(jiān)聽的配置會使用長輪訓(xùn)去拉取服務(wù)器配置變化你雌,如果沒有變化服務(wù)端會hang住29.5s的時間避免服務(wù)器壓力。
-...