前言
因?yàn)楣咀罱?xiàng)目原因正好用到了《分布式任務(wù)調(diào)度平臺(tái)XXL-JOB》诅岩,項(xiàng)目結(jié)束打算看看他的源碼,發(fā)現(xiàn)他還依賴于 《分布式服務(wù)框架XXL-RPC》萍倡,于是我決定先看XXL-RPC挂据。但當(dāng)我正準(zhǔn)備看的時(shí)候,我發(fā)現(xiàn)XXL-RPC依賴于 《分布式服務(wù)注冊(cè)中心XXL-REGISTRY》凸舵,于是我決定先看XXL-REGISTRY
讓我們先看看它自己怎么吹自己的
[1.1 概述]
XXL-REGISTRY 是一個(gè)輕量級(jí)分布式服務(wù)注冊(cè)中心,擁有"輕量級(jí)失尖、秒級(jí)注冊(cè)上線啊奄、多環(huán)境渐苏、跨語言、跨機(jī)房"等特性」娇洌現(xiàn)已開放源代碼琼富,開箱即用。
[1.2 特性]
- 1峻仇、輕量級(jí):基于DB與磁盤文件公黑,只需要提供一個(gè)DB實(shí)例即可,無第三方依賴摄咆;
- 2、實(shí)時(shí)性:借助內(nèi)部廣播機(jī)制人断,新服務(wù)上線吭从、下線,可以在1s內(nèi)推送給客戶端恶迈;
- 3涩金、數(shù)據(jù)同步:注冊(cè)中心會(huì)定期全量同步數(shù)據(jù)至磁盤文件,清理無效服務(wù)暇仲,確保服務(wù)數(shù)據(jù)實(shí)時(shí)可用步做;
- 4、性能:服務(wù)發(fā)現(xiàn)時(shí)僅讀磁盤文件奈附,性能非常高全度;服務(wù)注冊(cè)、摘除時(shí)通過磁盤文件校驗(yàn)斥滤,防止重復(fù)注冊(cè)操作将鸵;
- 5、擴(kuò)展性:可方便佑颇、快速的橫向擴(kuò)展顶掉,只需保證服務(wù)注冊(cè)中心配置一致即可,可借助負(fù)載均衡組件如Nginx快速集群部署挑胸;
- 6痒筒、多狀態(tài):服務(wù)內(nèi)置三種狀態(tài):
- 正常狀態(tài)=支持動(dòng)態(tài)注冊(cè)、發(fā)現(xiàn)茬贵,服務(wù)注冊(cè)信息實(shí)時(shí)更新簿透;
- 鎖定狀態(tài)=人工維護(hù)注冊(cè)信息,服務(wù)注冊(cè)信息固定不變闷沥;
- 禁用狀態(tài)=禁止使用萎战,服務(wù)注冊(cè)信息固定為空;
- 7舆逃、跨語言:注冊(cè)中心提供HTTP接口(RESTFUL 格式)供客戶端實(shí)用蚂维,語言無關(guān)戳粒,通用性更強(qiáng);
- 8虫啥、兼容性:項(xiàng)目立項(xiàng)之初是為XXL-RPC量身設(shè)計(jì)蔚约,但是不限于XXL-RPC使用。兼容支持任何服務(wù)框架服務(wù)注冊(cè)實(shí)用涂籽,如dubbo苹祟、springboot等;
- 9评雌、跨機(jī)房:得益于服務(wù)注冊(cè)中心集群關(guān)系對(duì)等特性树枫,集群各節(jié)點(diǎn)提供冪等的配置服務(wù);因此景东,異地跨機(jī)房部署時(shí)砂轻,只需要請(qǐng)求本機(jī)房服務(wù)注冊(cè)中心即可,實(shí)現(xiàn)異地多活斤吐;
- 10搔涝、容器化:提供官方docker鏡像,并實(shí)時(shí)更新推送dockerhub和措,進(jìn)一步實(shí)現(xiàn) "服務(wù)注冊(cè)中心" 產(chǎn)品開箱即用庄呈;
- 11、訪問令牌(accessToken):為提升系統(tǒng)安全性派阱,注冊(cè)中心和客戶端進(jìn)行安全性校驗(yàn)诬留,雙方AccessToken匹配才允許通訊;
屁話不多颁褂,先把源碼下下來故响,創(chuàng)建數(shù)據(jù)庫(kù),跑起來看看
界面簡(jiǎn)介颁独,貌似可以手動(dòng)注冊(cè)服務(wù)彩届,先注冊(cè)一個(gè)玩一下。隨便填寫一下信息誓酒,保存
可以查看剛剛的注冊(cè)信息
點(diǎn)擊運(yùn)行報(bào)表tab看到我剛剛注冊(cè)的服務(wù)信息樟蠕。
回到服務(wù)注冊(cè),再點(diǎn)擊查看靠柑,發(fā)現(xiàn)我剛剛注冊(cè)的地址沒了寨辩,做下猜測(cè):
因?yàn)槲业淖?cè)服務(wù)之后沒有和注冊(cè)中心有任何互動(dòng),被注冊(cè)中心自動(dòng)下線了
界面大概就是這樣歼冰,我們看看官方的使用說明
客戶端maven依賴地址:
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-registry-client</artifactId>
<version>${最新穩(wěn)定版}</version>
</dependency>
客戶端API實(shí)用示例代碼如下:
// 注冊(cè)中心客戶端(基礎(chǔ)類)
XxlRegistryBaseClient registryClient = new XxlRegistryBaseClient("http://localhost:8080/xxl-registry-admin/", null, "xxl-rpc", "test");
// 注冊(cè)中心客戶端(增強(qiáng)類)
XxlRegistryClient registryClient = new XxlRegistryClient("http://localhost:8080/xxl-registry-admin/", null, "xxl-rpc", "test");
// 服務(wù)注冊(cè) & 續(xù)約:
List<XxlRegistryDataParamVO> registryDataList = new ArrayList<>();
registryDataList.add(new XxlRegistryDataParamVO("service01", "address01"));
registryDataList.add(new XxlRegistryDataParamVO("service02", "address02"));
registryClient.registry(registryDataList);
// 服務(wù)摘除:
List<XxlRegistryDataParamVO> registryDataList = new ArrayList<>();
registryDataList.add(new XxlRegistryDataParamVO("service01", "address01"));
registryDataList.add(new XxlRegistryDataParamVO("service02", "address02"));
registryClient.remove(registryDataList);
// 服務(wù)發(fā)現(xiàn):
Set<String> keys = new TreeSet<>();
keys.add("service01");
keys.add("service02");
Map<String, TreeSet<String>> serviceData = registryClient.discovery(keys);
// 服務(wù)監(jiān)控:
Set<String> keys = new TreeSet<>();
keys.add("service01");
keys.add("service02");
registryClient.monitor(keys);
ok靡狞,那就從客戶端代碼入手。
首先需要?jiǎng)?chuàng)建一個(gè)registryClient
,先從XxlRegistryBaseClient
開始隔嫡〉榕拢看看他的構(gòu)造函數(shù)甘穿。
public XxlRegistryBaseClient(String adminAddress, String accessToken, String biz, String env) {
this.adminAddress = adminAddress;
this.accessToken = accessToken;
this.biz = biz;
this.env = env;
// valid
if (adminAddress==null || adminAddress.trim().length()==0) {
throw new RuntimeException("xxl-registry adminAddress empty");
}
if (biz==null || biz.trim().length()<4 || biz.trim().length()>255) {
throw new RuntimeException("xxl-registry biz empty Invalid[4~255]");
}
if (env==null || env.trim().length()<2 || env.trim().length()>255) {
throw new RuntimeException("xxl-registry biz env Invalid[2~255]");
}
// parse
adminAddressArr = new ArrayList<>();
if (adminAddress.contains(",")) {
adminAddressArr.addAll(Arrays.asList(adminAddress.split(",")));
} else {
adminAddressArr.add(adminAddress);
}
}
就是一些簡(jiǎn)單的校驗(yàn)和設(shè)值,沒什么看頭梢杭。接下來看看服務(wù)的注冊(cè)和續(xù)約温兼。
/**
* registry
*
* @param registryDataList
* @return
*/
public boolean registry(List<XxlRegistryDataParamVO> registryDataList){
// valid
if (registryDataList==null || registryDataList.size()==0) {
throw new RuntimeException("xxl-registry registryDataList empty");
}
for (XxlRegistryDataParamVO registryParam: registryDataList) {
if (registryParam.getKey()==null || registryParam.getKey().trim().length()<4 || registryParam.getKey().trim().length()>255) {
throw new RuntimeException("xxl-registry registryDataList#key Invalid[4~255]");
}
if (registryParam.getValue()==null || registryParam.getValue().trim().length()<4 || registryParam.getValue().trim().length()>255) {
throw new RuntimeException("xxl-registry registryDataList#value Invalid[4~255]");
}
}
// pathUrl
String pathUrl = "/api/registry";
// param
XxlRegistryParamVO registryParamVO = new XxlRegistryParamVO();
registryParamVO.setAccessToken(this.accessToken);
registryParamVO.setBiz(this.biz);
registryParamVO.setEnv(this.env);
registryParamVO.setRegistryDataList(registryDataList);
String paramsJson = BasicJson.toJson(registryParamVO);
// result
Map<String, Object> respObj = requestAndValid(pathUrl, paramsJson, 5);
return respObj!=null?true:false;
}
根據(jù)下方requestAndValid代碼可知,這是一個(gè)發(fā)送http請(qǐng)求的代碼武契,那么registry
方法募判,除了一些校驗(yàn),其實(shí)就是向服務(wù)器發(fā)送了一個(gè)httppost請(qǐng)求咒唆。
private Map<String, Object> requestAndValid(String pathUrl, String requestBody, int timeout){
for (String adminAddressUrl: adminAddressArr) {
String finalUrl = adminAddressUrl + pathUrl;
// request
String responseData = BasicHttpUtil.postBody(finalUrl, requestBody, timeout);
if (responseData == null) {
return null;
}
// parse resopnse
Map<String, Object> resopnseMap = null;
try {
resopnseMap = BasicJson.parseMap(responseData);
} catch (Exception e) { }
// valid resopnse
if (resopnseMap==null
|| !resopnseMap.containsKey("code")
|| !"200".equals(String.valueOf(resopnseMap.get("code")))
) {
logger.warn("XxlRegistryBaseClient response fail, responseData={}", responseData);
return null;
}
return resopnseMap;
}
return null;
}
那么直接看看注冊(cè)中心源碼的這個(gè) /api/registry
是干什么的吧
跳過controller的一些校驗(yàn)届垫,直接進(jìn)入service
@Override
public ReturnT<String> registry(String accessToken, String biz, String env, List<XxlRegistryData> registryDataList) {
// valid
if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "AccessToken Invalid");
}
if (biz==null || biz.trim().length()<4 || biz.trim().length()>255) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Biz Invalid[4~255]");
}
if (env==null || env.trim().length()<2 || env.trim().length()>255) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Env Invalid[2~255]");
}
if (registryDataList==null || registryDataList.size()==0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry DataList Invalid");
}
for (XxlRegistryData registryData: registryDataList) {
if (registryData.getKey()==null || registryData.getKey().trim().length()<4 || registryData.getKey().trim().length()>255) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Key Invalid[4~255]");
}
if (registryData.getValue()==null || registryData.getValue().trim().length()<4 || registryData.getValue().trim().length()>255) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Value Invalid[4~255]");
}
}
// fill + add queue
for (XxlRegistryData registryData: registryDataList) {
registryData.setBiz(biz);
registryData.setEnv(env);
}
registryQueue.addAll(registryDataList);
return ReturnT.SUCCESS;
}
除了一些校驗(yàn),最關(guān)鍵的出現(xiàn)了钧排!
registryQueue.addAll(registryDataList);
注冊(cè)數(shù)據(jù)被塞進(jìn)了一個(gè)隊(duì)列里面敦腔。看看這個(gè)隊(duì)列的定義:
private volatile LinkedBlockingQueue<XxlRegistryData> registryQueue = new LinkedBlockingQueue<XxlRegistryData>();
也就是說,我們注冊(cè)(續(xù)約)服務(wù)的時(shí)候,只是把信息放入了一個(gè)隊(duì)列峻呛。那么稍微動(dòng)動(dòng) 腦子就知道调俘,當(dāng)出隊(duì)的時(shí)候,就是真正處理數(shù)據(jù)的時(shí)候躺盛。我們用IDE搜一搜這個(gè)隊(duì)列的相關(guān)操作代碼看看项戴。
搜索結(jié)果激動(dòng)人心,只有一個(gè)地方用了take
槽惫,一個(gè)地方用了addAll
(就是上面提到的)周叮,其他沒有任何調(diào)用。
那就直接看看take出來數(shù)據(jù)做了什么事情吧界斜。
ps:為了方便理解代碼仿耽,先分看下數(shù)據(jù)庫(kù)
在xxl_registry表中,biz各薇,env项贺,key一起構(gòu)成了唯一主鍵,所有服務(wù)的值在data中維護(hù)成數(shù)組的形式存在
xxl_registry
在xxl_registry_data表中峭判,biz
,env
,key
,value
構(gòu)成了唯一主鍵开缎,每條數(shù)據(jù)代表了一個(gè)服務(wù)的注冊(cè)信息,updateTime表示服務(wù)注冊(cè)的時(shí)間林螃。
xxl_registry_data
消息表奕删,記錄服務(wù)變化信息
xxl_registry_message
經(jīng)驗(yàn)老道的程序員已經(jīng)發(fā)現(xiàn)afterPropertiesSet
這個(gè)方法。
在afterPropertiesSet
中開了10個(gè)線程疗认,每個(gè)線程做的事情就是:
- 從
registryQueue
里面取出一條服務(wù)注冊(cè)數(shù)據(jù) - 向數(shù)據(jù)庫(kù)更新或新增一條
xxl_registry_data
數(shù)據(jù)(這個(gè)表主要用來記錄某個(gè)服務(wù)最后的注冊(cè)(續(xù)約)時(shí)間) - 從磁盤讀取記錄該服務(wù)的文件數(shù)據(jù)
3.1. 如果磁盤讀取不到相應(yīng)文件完残,則進(jìn)入checkRegistryDataAndSendMessage
方法伏钠,把xxl_registry_data
中所有的value組成json array,對(duì)比xxl_registry
中的address
看是否一致坏怪。
3.1.1.xxl_registry
中不存在相應(yīng)服務(wù)信息贝润,則新增一條服務(wù)數(shù)據(jù)
3.1.2.xxl_registry
中存在相應(yīng)服務(wù),但服務(wù)address和jsonArray不一致铝宵,則更新xxl_registry
數(shù)據(jù)使其和xxl_registry_data
中的數(shù)據(jù)一致
3.1.3. 當(dāng)數(shù)據(jù)不一致時(shí)打掘,會(huì)向xxl_registry_message
中插入一條消息數(shù)據(jù)
3.2. 讀取到了相應(yīng)服務(wù)的信息,但status
不是0鹏秋,則不做任何操作
3.3. 讀取到了相應(yīng)服務(wù)的信息尊蚁,且包含了剛剛?cè)〕鰜淼姆?wù)注冊(cè)數(shù)據(jù),則不做任何操作
@Override
public void afterPropertiesSet() throws Exception {
// valid
if (registryDataFilePath==null || registryDataFilePath.trim().length()==0) {
throw new RuntimeException("xxl-registry, registryDataFilePath empty.");
}
/**
* registry registry data (client-num/10 s)
*/
for (int i = 0; i < 10; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
while (!executorStoped) {
try {
XxlRegistryData xxlRegistryData = registryQueue.take();
if (xxlRegistryData !=null) {
// refresh or add
int ret = xxlRegistryDataDao.refresh(xxlRegistryData);
if (ret == 0) {
xxlRegistryDataDao.add(xxlRegistryData);
}
// valid file status
XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData);
if (fileXxlRegistry == null) {
// go on
} else if (fileXxlRegistry.getStatus() != 0) {
continue; // "Status limited."
} else {
if (fileXxlRegistry.getDataList().contains(xxlRegistryData.getValue())) {
continue; // "Repeated limited."
}
}
// checkRegistryDataAndSendMessage
checkRegistryDataAndSendMessage(xxlRegistryData);
}
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
}
}
});
}
}
/**
* update Registry And Message
*/
private void checkRegistryDataAndSendMessage(XxlRegistryData xxlRegistryData){
// data json
List<XxlRegistryData> xxlRegistryDataList = xxlRegistryDataDao.findData(xxlRegistryData.getBiz(), xxlRegistryData.getEnv(), xxlRegistryData.getKey());
List<String> valueList = new ArrayList<>();
if (xxlRegistryDataList!=null && xxlRegistryDataList.size()>0) {
for (XxlRegistryData dataItem: xxlRegistryDataList) {
valueList.add(dataItem.getValue());
}
}
String dataJson = JacksonUtil.writeValueAsString(valueList);
// update registry and message
XxlRegistry xxlRegistry = xxlRegistryDao.load(xxlRegistryData.getBiz(), xxlRegistryData.getEnv(), xxlRegistryData.getKey());
boolean needMessage = false;
if (xxlRegistry == null) {
xxlRegistry = new XxlRegistry();
xxlRegistry.setBiz(xxlRegistryData.getBiz());
xxlRegistry.setEnv(xxlRegistryData.getEnv());
xxlRegistry.setKey(xxlRegistryData.getKey());
xxlRegistry.setData(dataJson);
xxlRegistryDao.add(xxlRegistry);
needMessage = true;
} else {
// check status, locked and disabled not use
if (xxlRegistry.getStatus() != 0) {
return;
}
if (!xxlRegistry.getData().equals(dataJson)) {
xxlRegistry.setData(dataJson);
xxlRegistryDao.update(xxlRegistry);
needMessage = true;
}
}
if (needMessage) {
// sendRegistryDataUpdateMessage (registry update)
sendRegistryDataUpdateMessage(xxlRegistry);
}
}
總結(jié)一下侣夷,服務(wù)注冊(cè)的大致流程:
- 通過httppost請(qǐng)求横朋,客戶端把服務(wù)注冊(cè)信息塞入注冊(cè)中心的注冊(cè)隊(duì)列
- 注冊(cè)中心后臺(tái)有10個(gè)線程會(huì)從注冊(cè)隊(duì)列取出注冊(cè)數(shù)據(jù)
- 同步注冊(cè)信息到xxl_registry
- 發(fā)消息
嗯?是不是覺得很奇怪百拓。注冊(cè)只干這些事琴锭?不是磁盤操作嗎?xxl_registry_data中的數(shù)據(jù)一直留著嗎衙传?消息什么時(shí)候處理决帖?太久服務(wù)沒有發(fā)出服務(wù)續(xù)約/心跳,服務(wù)不會(huì)自動(dòng)下線嗎蓖捶?
這些問題先放一放地回,先看看其他幾個(gè)接口吧
既然有服務(wù)注冊(cè),那就有服務(wù)移除俊鱼,先看移除接口刻像。
ps:重復(fù)的邏輯就略過了
@Override
public ReturnT<String> remove(String accessToken, String biz, String env, List<XxlRegistryData> registryDataList) {
// valid
if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "AccessToken Invalid");
}
if (biz==null || biz.trim().length()<4 || biz.trim().length()>255) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Biz Invalid[4~255]");
}
if (env==null || env.trim().length()<2 || env.trim().length()>255) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Env Invalid[2~255]");
}
if (registryDataList==null || registryDataList.size()==0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry DataList Invalid");
}
for (XxlRegistryData registryData: registryDataList) {
if (registryData.getKey()==null || registryData.getKey().trim().length()<4 || registryData.getKey().trim().length()>255) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Key Invalid[4~255]");
}
if (registryData.getValue()==null || registryData.getValue().trim().length()<4 || registryData.getValue().trim().length()>255) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Value Invalid[4~255]");
}
}
// fill + add queue
for (XxlRegistryData registryData: registryDataList) {
registryData.setBiz(biz);
registryData.setEnv(env);
}
removeQueue.addAll(registryDataList);
return ReturnT.SUCCESS;
}
這是一段似曾相識(shí)的代碼,只是把服務(wù)信息放進(jìn)了另一個(gè)隊(duì)列并闲,removeQueue
private volatile LinkedBlockingQueue<XxlRegistryData> registryQueue = new LinkedBlockingQueue<XxlRegistryData>();
ok细睡,按照注冊(cè)的思路,我們看看針對(duì)removeQueue是不是也有10個(gè)線程從中反復(fù)取數(shù)據(jù)呢焙蚓?
/**
* remove registry data (client-num/start-interval s)
*/
for (int i = 0; i < 10; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
while (!executorStoped) {
try {
XxlRegistryData xxlRegistryData = removeQueue.take();
if (xxlRegistryData != null) {
// delete
xxlRegistryDataDao.deleteDataValue(xxlRegistryData.getBiz(), xxlRegistryData.getEnv(), xxlRegistryData.getKey(), xxlRegistryData.getValue());
// valid file status
XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData);
if (fileXxlRegistry == null) {
// go on
} else if (fileXxlRegistry.getStatus() != 0) {
continue; // "Status limited."
} else {
if (!fileXxlRegistry.getDataList().contains(xxlRegistryData.getValue())) {
continue; // "Repeated limited."
}
}
// checkRegistryDataAndSendMessage
checkRegistryDataAndSendMessage(xxlRegistryData);
}
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
}
}
});
}
果不其然纹冤,代碼幾乎和注冊(cè)時(shí)候的一一對(duì)應(yīng),只是把向xxl_registry_data
中插入數(shù)據(jù)變成了刪除數(shù)據(jù)购公。
那么萌京,服務(wù)注冊(cè)相關(guān)的接口已經(jīng)看完了。(沒錯(cuò)宏浩,只有兩個(gè))
既然有服務(wù)注冊(cè)知残,就有服務(wù)發(fā)現(xiàn),有提供方比庄,有消費(fèi)方才是個(gè)正常的系統(tǒng)求妹!
那接下來就看看怎么發(fā)現(xiàn)已經(jīng)注冊(cè)的服務(wù)乏盐!
先看客戶端代碼
// 服務(wù)發(fā)現(xiàn):
Set<String> keys = new TreeSet<>();
keys.add("service01");
keys.add("service02");
Map<String, TreeSet<String>> serviceData = registryClient.discovery(keys);
很簡(jiǎn)單,就是把想要發(fā)現(xiàn)的服務(wù)的key加進(jìn)了參數(shù)里面
那再看看注冊(cè)中心的代碼
@Override
public ReturnT<Map<String, List<String>>> discovery(String accessToken, String biz, String env, List<String> keys) {
// valid
if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
return new ReturnT<>(ReturnT.FAIL_CODE, "AccessToken Invalid");
}
if (biz==null || biz.trim().length()<2 || biz.trim().length()>255) {
return new ReturnT<>(ReturnT.FAIL_CODE, "Biz Invalid[2~255]");
}
if (env==null || env.trim().length()<2 || env.trim().length()>255) {
return new ReturnT<>(ReturnT.FAIL_CODE, "Env Invalid[2~255]");
}
if (keys==null || keys.size()==0) {
return new ReturnT<>(ReturnT.FAIL_CODE, "keys Invalid.");
}
for (String key: keys) {
if (key==null || key.trim().length()<4 || key.trim().length()>255) {
return new ReturnT<>(ReturnT.FAIL_CODE, "Key Invalid[4~255]");
}
}
Map<String, List<String>> result = new HashMap<String, List<String>>();
for (String key: keys) {
XxlRegistryData xxlRegistryData = new XxlRegistryData();
xxlRegistryData.setBiz(biz);
xxlRegistryData.setEnv(env);
xxlRegistryData.setKey(key);
List<String> dataList = new ArrayList<String>();
XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData);
if (fileXxlRegistry!=null) {
dataList = fileXxlRegistry.getDataList();
}
result.put(key, dataList);
}
return new ReturnT<Map<String, List<String>>>(result);
}
// get
public XxlRegistry getFileRegistryData(XxlRegistryData xxlRegistryData){
// fileName
String fileName = parseRegistryDataFileName(xxlRegistryData.getBiz(), xxlRegistryData.getEnv(), xxlRegistryData.getKey());
// read
Properties prop = PropUtil.loadProp(fileName);
if (prop!=null) {
XxlRegistry fileXxlRegistry = new XxlRegistry();
fileXxlRegistry.setData(prop.getProperty("data"));
fileXxlRegistry.setStatus(Integer.valueOf(prop.getProperty("status")));
fileXxlRegistry.setDataList(JacksonUtil.readValue(fileXxlRegistry.getData(), List.class));
return fileXxlRegistry;
}
return null;
}
仔細(xì)看看代碼非常簡(jiǎn)單制恍,就是從注冊(cè)中心的本地磁盤讀取服務(wù)信息父能,然后直接返回。也就是說净神,服務(wù)發(fā)現(xiàn)只會(huì)和磁盤產(chǎn)生交互何吝,和數(shù)據(jù)庫(kù)無關(guān)。和前面它自己吹噓的一樣鹃唯。
那么問題又來了爱榕,注冊(cè)中心是怎么保證磁盤文件的實(shí)時(shí)性和一直性的呢?還是老樣子坡慌,這個(gè)問題放一放黔酥,先把最后一個(gè)客戶端接口看完!
// 服務(wù)監(jiān)控:
Set<String> keys = new TreeSet<>();
keys.add("service01");
keys.add("service02");
registryClient.monitor(keys);
去注冊(cè)中心看看
@Override
public DeferredResult<ReturnT<String>> monitor(String accessToken, String biz, String env, List<String> keys) {
// init
DeferredResult deferredResult = new DeferredResult(30 * 1000L, new ReturnT<>(ReturnT.SUCCESS_CODE, "Monitor timeout, no key updated."));
// valid
if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "AccessToken Invalid"));
return deferredResult;
}
if (biz==null || biz.trim().length()<4 || biz.trim().length()>255) {
deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "Biz Invalid[4~255]"));
return deferredResult;
}
if (env==null || env.trim().length()<2 || env.trim().length()>255) {
deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "Env Invalid[2~255]"));
return deferredResult;
}
if (keys==null || keys.size()==0) {
deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "keys Invalid."));
return deferredResult;
}
for (String key: keys) {
if (key==null || key.trim().length()<4 || key.trim().length()>255) {
deferredResult.setResult(new ReturnT<>(ReturnT.FAIL_CODE, "Key Invalid[4~255]"));
return deferredResult;
}
}
// monitor by client
for (String key: keys) {
String fileName = parseRegistryDataFileName(biz, env, key);
List<DeferredResult> deferredResultList = registryDeferredResultMap.get(fileName);
if (deferredResultList == null) {
deferredResultList = new ArrayList<>();
registryDeferredResultMap.put(fileName, deferredResultList);
}
deferredResultList.add(deferredResult);
}
return deferredResult;
}
看看代碼洪橘,初始化了一個(gè)DeferredResult
DeferredResult deferredResult = new DeferredResult(30 * 1000L, new ReturnT<>(ReturnT.SUCCESS_CODE, "Monitor timeout, no key updated."));
DeferredResult
是spring-web包提供的一個(gè)異步響應(yīng)對(duì)象跪者,下面給出一些點(diǎn)單的sample
@GetMapping("deferredResultTest")
@ResponseBody
public DeferredResult<String> test(@RequestParam("timeout") final Long timeout) throws InterruptedException {
final DeferredResult<String> result = new DeferredResult<>(5L * 1000, "timeout");
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("process start.....");
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
result.setResult("done");
System.out.println("process dnoe......");
}
}).start();
result.onTimeout(new Runnable() {
@Override
public void run() {
System.out.println("timeout");
result.setResult("timeout");
}
});
System.out.println("return result");
return result;
}
請(qǐng)求 http://localhost:8080/xxl-registry-admin/registry/deferredResultTest?timeout=3 時(shí),返回done熄求,控制臺(tái)打印
process start.....
return result
process dnoe......
請(qǐng)求 http://localhost:8080/xxl-registry-admin/registry/deferredResultTest?timeout=8 時(shí)坑夯,返回timeout,控制臺(tái)打印
return result
process start.....
timeout
process dnoe......
回到原來話題抡四,monitor初始化了一個(gè)超時(shí)時(shí)間30秒的DeferredResult
,超時(shí)就返回一個(gè)no key update的response對(duì)象
private Map<String, List<DeferredResult>> registryDeferredResultMap = new ConcurrentHashMap<>();
然后去registryDeferredResultMap
這個(gè)map里找注冊(cè)信息仗谆,把剛剛初始化的DeferredResult加入這個(gè)map中指巡,然后就直接return了這個(gè)DeferredResult對(duì)象。
根據(jù)我們對(duì)DeferredResult對(duì)象的理解隶垮,這個(gè)對(duì)象在30秒內(nèi)沒處理完則會(huì)返回一個(gè)success對(duì)象藻雪,那么30秒內(nèi)處理玩會(huì)發(fā)生什么呢?讓我們搜一搜registryDeferredResultMap
狸吞,肯定在其他地方做了處理勉耀!
搜索結(jié)果表示只有一處
// set
public String setFileRegistryData(XxlRegistry xxlRegistry){
// fileName
String fileName = parseRegistryDataFileName(xxlRegistry.getBiz(), xxlRegistry.getEnv(), xxlRegistry.getKey());
// valid repeat update
Properties existProp = PropUtil.loadProp(fileName);
if (existProp != null
&& existProp.getProperty("data").equals(xxlRegistry.getData())
&& existProp.getProperty("status").equals(String.valueOf(xxlRegistry.getStatus()))
) {
return new File(fileName).getPath();
}
// write
Properties prop = new Properties();
prop.setProperty("data", xxlRegistry.getData());
prop.setProperty("status", String.valueOf(xxlRegistry.getStatus()));
PropUtil.writeProp(prop, fileName);
logger.info(">>>>>>>>>>> xxl-registry, setFileRegistryData: biz={}, env={}, key={}, data={}"
, xxlRegistry.getBiz(), xxlRegistry.getEnv(), xxlRegistry.getKey(), xxlRegistry.getData());
// brocast monitor client
List<DeferredResult> deferredResultList = registryDeferredResultMap.get(fileName);
if (deferredResultList != null) {
registryDeferredResultMap.remove(fileName);
for (DeferredResult deferredResult: deferredResultList) {
deferredResult.setResult(new ReturnT<>(ReturnT.SUCCESS_CODE, "Monitor key update."));
}
}
return new File(fileName).getPath();
}
看完代碼可知,在調(diào)用setFileRegistryData
方法之后蹋偏,通過和磁盤里注冊(cè)數(shù)據(jù)的對(duì)比便斥,如果有變化,則DeferredResult
就會(huì)被setResult
威始。那問題又來了枢纠,setFileRegistryData
這個(gè)方法是什么時(shí)候被調(diào)用呢?讓我們?cè)偎岩凰牙杼模∮质撬?code>afterPropertiesSet晋渺!總過有兩個(gè)地方用到了setFileRegistryData
镰绎,先看第一個(gè)線程。這個(gè)線程每秒鐘會(huì)運(yùn)行一次while里面的代碼木西。
/**
* broadcase new one registry-data-file (1/1s)
*
* clean old message (1/10s)
*/
executorService.execute(new Runnable() {
@Override
public void run() {
while (!executorStoped) {
try {
// new message, filter readed
List<XxlRegistryMessage> messageList = xxlRegistryMessageDao.findMessage(readedMessageIds);
if (messageList!=null && messageList.size()>0) {
for (XxlRegistryMessage message: messageList) {
readedMessageIds.add(message.getId());
if (message.getType() == 0) { // from registry畴栖、add、update八千、deelete吗讶,ne need sync from db, only write
XxlRegistry xxlRegistry = JacksonUtil.readValue(message.getData(), XxlRegistry.class);
// process data by status
if (xxlRegistry.getStatus() == 1) {
// locked, not updated
} else if (xxlRegistry.getStatus() == 2) {
// disabled, write empty
xxlRegistry.setData(JacksonUtil.writeValueAsString(new ArrayList<String>()));
} else {
// default, sync from db (aready sync before message, only write)
}
// sync file
setFileRegistryData(xxlRegistry);
}
}
}
// clean old message;
if ( (System.currentTimeMillis()/1000) % registryBeatTime ==0) {
xxlRegistryMessageDao.cleanMessage(registryBeatTime);
readedMessageIds.clear();
}
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
}
}
});
先去xxl_registry_message
表里找沒被消費(fèi)的消息
注意,這里代碼有個(gè)坑叼丑,這個(gè)
readedMessageIds
這個(gè)參數(shù)最后在sql里面的體現(xiàn)是not in
关翎,所以是,找到?jīng)]有被消費(fèi)過的message
如果有沒有被消費(fèi)鸠信,則做下面處理:
- 把消息id加入到已消費(fèi)list
readedMessageIds
里 - 如果服務(wù)被禁用纵寝,把服務(wù)的注冊(cè)data數(shù)據(jù)清空
- 調(diào)用
setFileRegistryData
方法同步數(shù)據(jù)到磁盤,并響應(yīng)給監(jiān)聽中的客戶端 - 每隔BeatTime(10秒)星立,從xxl_registry_message刪除BeatTime(10秒)之前的數(shù)據(jù)爽茴。清空readedMessageIds列表
那么在看第二個(gè)線程,第二個(gè)線程是BeatTime(10秒)運(yùn)行一次while里面的代碼
/**
* clean old registry-data (1/10s)
*
* sync total registry-data db + file (1+N/10s)
*
* clean old registry-data file
*/
executorService.execute(new Runnable() {
@Override
public void run() {
while (!executorStoped) {
// align to beattime
try {
long sleepSecond = registryBeatTime - (System.currentTimeMillis()/1000)%registryBeatTime;
if (sleepSecond>0 && sleepSecond<registryBeatTime) {
TimeUnit.SECONDS.sleep(sleepSecond);
}
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
try {
// clean old registry-data in db
xxlRegistryDataDao.cleanData(registryBeatTime * 3);
// sync registry-data, db + file
int offset = 0;
int pagesize = 1000;
List<String> registryDataFileList = new ArrayList<>();
List<XxlRegistry> registryList = xxlRegistryDao.pageList(offset, pagesize, null, null, null);
while (registryList!=null && registryList.size()>0) {
for (XxlRegistry registryItem: registryList) {
// process data by status
if (registryItem.getStatus() == 1) {
// locked, not updated
} else if (registryItem.getStatus() == 2) {
// disabled, write empty
String dataJson = JacksonUtil.writeValueAsString(new ArrayList<String>());
registryItem.setData(dataJson);
} else {
// default, sync from db
List<XxlRegistryData> xxlRegistryDataList = xxlRegistryDataDao.findData(registryItem.getBiz(), registryItem.getEnv(), registryItem.getKey());
List<String> valueList = new ArrayList<String>();
if (xxlRegistryDataList!=null && xxlRegistryDataList.size()>0) {
for (XxlRegistryData dataItem: xxlRegistryDataList) {
valueList.add(dataItem.getValue());
}
}
String dataJson = JacksonUtil.writeValueAsString(valueList);
// check update, sync db
if (!registryItem.getData().equals(dataJson)) {
registryItem.setData(dataJson);
xxlRegistryDao.update(registryItem);
}
}
// sync file
String registryDataFile = setFileRegistryData(registryItem);
// collect registryDataFile
registryDataFileList.add(registryDataFile);
}
offset += 1000;
registryList = xxlRegistryDao.pageList(offset, pagesize, null, null, null);
}
// clean old registry-data file
cleanFileRegistryData(registryDataFileList);
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
try {
TimeUnit.SECONDS.sleep(registryBeatTime);
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
}
}
});
}
- 一開始做了一下時(shí)間對(duì)其到BeatTime(10秒)的操作(比如現(xiàn)在是10:10:03绰垂,那么會(huì)休眠7秒室奏,對(duì)其到10:10:10)
- 清除
xxl_registry_data
表中30秒前的數(shù)據(jù) - 從
xxl_registry
中循環(huán)取1000條數(shù)據(jù),
3.1. 如果服務(wù)被鎖定劲装,則什么都不做
3.2. 如果服務(wù)被禁用胧沫,則設(shè)置data為空數(shù)組
3.3. 如果正常,則從xxl_registry_data
表取出相對(duì)服務(wù)的所有value組成list占业,更新回xxl_registry
表
3.4. 同步數(shù)據(jù)到磁盤绒怨,并把文件路徑添加到registryDataFileList
里面 - 清除不在
registryDataFileList
里面的文件
看完這段代碼就知道,這就是前面所說的谦疾,10秒鐘全量同步數(shù)據(jù)南蹂。
看到這里,monitor的大致流程已經(jīng)比較清晰
- 設(shè)置DeferredResult念恍,
- 等待30秒六剥,30秒內(nèi)服務(wù)沒有變化,則在30秒的時(shí)候返回success峰伙。
- 30秒內(nèi)疗疟,如果【每秒檢查message消息的線程】發(fā)現(xiàn)有變化,則會(huì)立即返回success
- 30秒內(nèi)词爬,如果【每10秒全量同步線程】發(fā)現(xiàn)有變化秃嗜,則會(huì)立即返回success
所以,客戶端的視角就是,服務(wù)信息有變化锅锨,則離開拿到monitor方法的返回叽赊,沒變化則30秒的時(shí)候拿到返回值。
既然服務(wù)消費(fèi)者的代碼已經(jīng)比較清楚必搞,那再回頭看看剛剛服務(wù)生產(chǎn)者的代碼留下了的疑問必指。
注冊(cè)只干這些事?不是磁盤操作嗎恕洲?
消費(fèi)者確實(shí)是磁盤操作塔橡,服務(wù)生產(chǎn)者會(huì)同事維護(hù)db和磁盤數(shù)據(jù)
xxl_registry_data
中的數(shù)據(jù)一直留著嗎?
3倍心跳之前的數(shù)據(jù)霜第,會(huì)在【每10秒全量同步線程】中葛家,被刪除
消息什么時(shí)候處理?
【每秒檢查message消息的線程】
太久服務(wù)沒有發(fā)出服務(wù)續(xù)約/心跳泌类,服務(wù)不會(huì)自動(dòng)下線嗎癞谒?
會(huì)在【每10秒全量同步線程】中data被清空,所以需要服務(wù)生產(chǎn)者每隔一段時(shí)間就注冊(cè)(續(xù)約)一次(就是重新調(diào)一次registry方法)刃榨。XxlRegistryClient
提供了后臺(tái)線程自動(dòng)注冊(cè)(續(xù)約)
是個(gè)比較簡(jiǎn)單的生產(chǎn)者消費(fèi)者模型弹砚,而且大多數(shù)的疑問都被解決了。
那么我們重新梳理一遍這個(gè)服務(wù)中心的工作流程:
- 服務(wù)生產(chǎn)者通過registry方法向注冊(cè)中心注冊(cè)(向
registryQueue
隊(duì)列添加數(shù)據(jù))- 后臺(tái)線程會(huì)定時(shí)掃描
registryQueue
枢希,并更新數(shù)據(jù)到db桌吃,發(fā)送消息(消息會(huì)被【每秒檢查message消息的線程】消費(fèi),然后同步數(shù)據(jù)到磁盤)- 服務(wù)生產(chǎn)者通過remove方法向注冊(cè)中心移除服務(wù)(邏輯和注冊(cè)一致)
- 服務(wù)消費(fèi)者從磁盤讀取服務(wù)生產(chǎn)者數(shù)據(jù)
- 10秒鐘會(huì)有一次全量數(shù)據(jù)同步
附上官方架構(gòu)圖一張
讓我們?cè)诨仡^看看他吹噓的功能
1苞轿、輕量級(jí):基于DB與磁盤文件茅诱,只需要提供一個(gè)DB實(shí)例即可,無第三方依賴搬卒;
2让簿、實(shí)時(shí)性:借助內(nèi)部廣播機(jī)制,新服務(wù)上線秀睛、下線,可以在1s內(nèi)推送給客戶端莲祸;
3蹂安、數(shù)據(jù)同步:注冊(cè)中心會(huì)定期全量同步數(shù)據(jù)至磁盤文件,清理無效服務(wù)锐帜,確保服務(wù)數(shù)據(jù)實(shí)時(shí)可用田盈;
4、性能:服務(wù)發(fā)現(xiàn)時(shí)僅讀磁盤文件缴阎,性能非常高允瞧;服務(wù)注冊(cè)、摘除時(shí)通過磁盤文件校驗(yàn),防止重復(fù)注冊(cè)操作述暂;
5痹升、擴(kuò)展性:可方便、快速的橫向擴(kuò)展畦韭,只需保證服務(wù)注冊(cè)中心配置一致即可疼蛾,可借助負(fù)載均衡組件如Nginx快速集群部署;
6艺配、多狀態(tài):服務(wù)內(nèi)置三種狀態(tài):
正常狀態(tài)=支持動(dòng)態(tài)注冊(cè)察郁、發(fā)現(xiàn),服務(wù)注冊(cè)信息實(shí)時(shí)更新转唉;
鎖定狀態(tài)=人工維護(hù)注冊(cè)信息皮钠,服務(wù)注冊(cè)信息固定不變;
禁用狀態(tài)=禁止使用赠法,服務(wù)注冊(cè)信息固定為空麦轰;
7、跨語言:注冊(cè)中心提供HTTP接口(RESTFUL 格式)供客戶端實(shí)用期虾,語言無關(guān)原朝,通用性更強(qiáng);
8镶苞、兼容性:項(xiàng)目立項(xiàng)之初是為XXL-RPC量身設(shè)計(jì)喳坠,但是不限于XXL-RPC使用。兼容支持任何服務(wù)框架服務(wù)注冊(cè)實(shí)用茂蚓,如dubbo壕鹉、springboot等;
9聋涨、跨機(jī)房:得益于服務(wù)注冊(cè)中心集群關(guān)系對(duì)等特性晾浴,集群各節(jié)點(diǎn)提供冪等的配置服務(wù);因此牍白,異地跨機(jī)房部署時(shí)脊凰,只需要請(qǐng)求本機(jī)房服務(wù)注冊(cè)中心即可,實(shí)現(xiàn)異地多活茂腥;
10狸涌、容器化:提供官方docker鏡像,并實(shí)時(shí)更新推送dockerhub最岗,進(jìn)一步實(shí)現(xiàn) "服務(wù)注冊(cè)中心" 產(chǎn)品開箱即用帕胆;
11、訪問令牌(accessToken):為提升系統(tǒng)安全性般渡,注冊(cè)中心和客戶端進(jìn)行安全性校驗(yàn)懒豹,雙方AccessToken匹配才允許通訊芙盘;
除了5,8脸秽,9儒老,10,11豹储,其他的已經(jīng)在剛才的代碼閱讀中得到驗(yàn)證贷盲。
8,10剥扣,11略過巩剖,不是這次閱讀源碼的重點(diǎn)。
我們還剩下最后一個(gè)疑惑钠怯,也就是多服務(wù)中心的時(shí)候怎么所有服務(wù)中心的數(shù)據(jù)一致佳魔。
先看看官方怎么說的
服務(wù)注冊(cè)中心集群(可選)
服務(wù)注冊(cè)中心支持集群部署,提升消息系統(tǒng)容災(zāi)和可用性晦炊。
集群部署時(shí)鞠鲜,幾點(diǎn)要求和建議:
- DB配置保持一致;
- 登陸賬號(hào)配置保持一致断国;
- 建議:推薦通過nginx為集群做負(fù)載均衡贤姆,分配域名。訪問稳衬、客戶端使用等操作均通過該域名進(jìn)行霞捡。
4.3 跨機(jī)房(異地多活)
得益于服務(wù)注冊(cè)中心集群關(guān)系對(duì)等特性,集群各節(jié)點(diǎn)提供冪等的服務(wù)注冊(cè)服務(wù)薄疚;因此碧信,異地跨機(jī)房部署時(shí),> 只需要請(qǐng)求本機(jī)房服務(wù)注冊(cè)中心即可街夭,實(shí)現(xiàn)異地多活砰碴;
舉個(gè)例子:比如機(jī)房A、B 內(nèi)分別部署服務(wù)注冊(cè)中心集群節(jié)點(diǎn)板丽。即機(jī)房A部署 a1呈枉、a2 兩個(gè)服務(wù)注冊(cè)中心服務(wù)節(jié)點(diǎn),機(jī)房B部署 b1埃碱、b2 兩個(gè)服務(wù)注冊(cè)中心服務(wù)節(jié)點(diǎn)碴卧;
那么各機(jī)房?jī)?nèi)應(yīng)用只需要請(qǐng)求本機(jī)房?jī)?nèi)部署的服務(wù)注冊(cè)中心節(jié)點(diǎn)即可,不需要跨機(jī)房調(diào)用乃正。即機(jī)房A內(nèi)業(yè)務(wù)應(yīng)用請(qǐng)求 a1、a2 獲取配置婶博、機(jī)房B內(nèi)業(yè)務(wù)應(yīng)用 b1瓮具、b2 獲取配置。
這種跨機(jī)房部署方式實(shí)現(xiàn)了配置服務(wù)的 "異地多活",擁有以下幾點(diǎn)好處:
- 1名党、注冊(cè)服務(wù)響應(yīng)更快:注冊(cè)請(qǐng)求本機(jī)房?jī)?nèi)搞定叹阔;
- 2、注冊(cè)服務(wù)更穩(wěn)定:注冊(cè)請(qǐng)求不需要跨機(jī)房传睹,不需要考慮復(fù)雜的網(wǎng)絡(luò)情況耳幢,更加穩(wěn)定;
- 2欧啤、容災(zāi)性:即使一個(gè)機(jī)房?jī)?nèi)服務(wù)注冊(cè)中心全部宕機(jī)睛藻,僅會(huì)影響到本機(jī)房?jī)?nèi)應(yīng)用加載服務(wù),其他機(jī)房不會(huì)受到影響邢隧。
4.4 一致性
類似 Raft 方案店印,更輕量級(jí)、穩(wěn)定倒慧;
- Raft:Leader統(tǒng)一處理變更操作請(qǐng)求按摘,一致性協(xié)議的作用具化為保證節(jié)點(diǎn)間操作日志副本(log replication)一致,以term作為邏輯時(shí)鐘(logical clock)保證時(shí)序纫谅,節(jié)點(diǎn)運(yùn)行相同狀態(tài)機(jī)(state machine)得到一致結(jié)果炫贤。
- xxl-registry:
- Leader(統(tǒng)一處理分發(fā)變更請(qǐng)求):DB消息表(僅變更時(shí)產(chǎn)生消息,消息量較小付秕,而且消息輪訓(xùn)存在間隔兰珍,因此消息表壓力不會(huì)太大;)盹牧;
- state machine(順序操作日志副本并保證結(jié)果一直):順序消費(fèi)消息俩垃,保證本地?cái)?shù)據(jù)一致,并通過周期全量同步進(jìn)一步保證一致性汰寓;
嚇得我趕緊看了下Raft方案口柳。看完之后有滑,我就像跃闹,這系統(tǒng)用到Raft方案了嗎?
看起來非常高大上毛好,異地多活望艺,跨機(jī)房,容災(zāi)肌访,其實(shí)通過代碼閱讀找默,就可以很明顯看出來:這個(gè)系統(tǒng)是通過DB實(shí)例以及10秒一次的全量同步來保證一致性的。不管數(shù)據(jù)怎么變吼驶,數(shù)據(jù)庫(kù)中的xxl_registry_data
表才是真正的注冊(cè)數(shù)據(jù)惩激。只要隔一段時(shí)間同步這個(gè)表中的數(shù)據(jù)店煞,就行了。當(dāng)然风钻,前提是只有一個(gè)DB實(shí)例(其實(shí)不是也可以)
那么顷蟀,這個(gè)注冊(cè)中心的大致編程思想通過源碼閱讀結(jié)合官方文檔,已經(jīng)基本了解骡技。
下一篇鸣个,我們來看看《分布式服務(wù)框架XXL-RPC》吧