概述
本次1.3.0-BETA的改動程度很大芍躏,涉及兩個模塊的修改以及新增一個核心模塊。
- nacos-core模塊修改
a. nacos集群節(jié)點成員尋址模式的統(tǒng)一管理
b. nacos內(nèi)部事件機制
c. nacos一致性協(xié)議層 - nacos-config模塊修改
a. 新增內(nèi)嵌分布式數(shù)據(jù)存儲組件
b. 內(nèi)嵌存儲與外置存儲細分
c. 內(nèi)嵌存儲簡單運維 - nacos-consistency模塊新增
a. 對于AP協(xié)議以及CP協(xié)議的統(tǒng)一抽象
Nacos的未來整體邏輯架構(gòu)及其組件
Nacos集群成員節(jié)點尋址模式
在1.3.0-BETA之前降狠,nacos的naming模塊以及config模塊存在各自的集群成員節(jié)點列表管理任務对竣。為了統(tǒng)一nacos集群下成員列表的尋址模式庇楞,將集群節(jié)點管理的實現(xiàn)從naming模塊以及config模塊剝離出來,統(tǒng)一下沉到了core模塊的尋址模式否纬,同時新增命令參數(shù)-Dnacos.member.list
進行設(shè)置nacos集群節(jié)點列表吕晌,該參數(shù)可以看作是cluster.conf
文件的一個替代。 前nacos的尋址模式類別如下:
- a. 單機模式:StandaloneMemberLookup
- b. 集群模式:
- i.cluster.conf 件存在:FileConfigMemberLookup
- ii.nacos.member.discovery==true:DiscoveryMemberLookup
- iii.cluster.conf 件不存在或者 -Dnacos.member.list沒有設(shè)置:
AddressServerMemberLookup
邏輯圖如下:
本次還新增成員節(jié)點元數(shù)據(jù)信息临燃,如site睛驳、raft_port、adweight膜廊、weight乏沸,以支持將來在成員節(jié)點之間做相應的負載均衡或者其他操作,因此cluster.conf 件中配置集群成員節(jié)點列表的格式如下:
1172.20.10.7:7001?raft_port=8001&site=unknown&adweight=0&weight=1
該格式完全兼容原本的cluster.conf
格式爪瓜,用戶在使用 1.3.0-BETA版本時蹬跃, 無需改動cluster.conf
文件的內(nèi)容。
尋址模式詳細
接下來介紹除了單機模式下的尋址模式的其他三種尋址模式
FileConfigMemberLookup
該尋址模式是基于cluster.conf
文件進行管理的铆铆,每個節(jié)點會讀取各${nacos.home}/conf
下的cluster.conf
件內(nèi)的成員節(jié)點列表蝶缀,然后組成一個集群。并且在首次讀取完${nacos.home}/conf
下的cluster.conf
文件后薄货,會自動向操作系統(tǒng)的inotify機制注冊一個目錄監(jiān)聽器翁都,監(jiān)聽${nacos.home}/conf
目錄下的所有文件變動(注意,這里只會監(jiān)聽文件谅猾,對于目錄下的文件變動無法監(jiān)聽),當需要進行集群節(jié)點擴縮容時柄慰,需要手動去修改每個節(jié)點各自${nacos.home}/conf
下的cluster.conf
的成員節(jié)點列表內(nèi)容。
private FileWatcher watcher = new FileWatcher() {
@Override
public void onChange(FileChangeEvent event) {
readClusterConfFromDisk();
}
@Override
public boolean interest(String context) {
return StringUtils.contains(context, "cluster.conf");
}
};
@Override
public void run() throws NacosException {
readClusterConfFromDisk();
if (memberManager.getServerList().isEmpty()) {
throw new NacosException(NacosException.SERVER_ERROR,
"Failed to initialize the member node, is empty" );
}
// Use the inotify mechanism to monitor file changes and automat ically
// trigger the reading of cluster.conf
try {
WatchFileCenter.registerWatcher(ApplicationUtils.getConfFile Path(), watcher);
}
catch (Throwable e) {
Loggers.CLUSTER.error("An exception occurred in the launch f ile monitor : {}", e);
}
}
首次啟動時直接讀取cluster.conf
文件內(nèi)的節(jié)點列表信息赊瞬,然后向WatchFileCenter
注冊一個目錄監(jiān)聽器先煎,當cluster.conf
文件發(fā)生變動時自動觸發(fā)readClusterConfFromDisk()
重新讀取cluster.conf
文件。
AddressServerMemberLookup
該尋址模式是基于一個額外的web服務器來管理cluster.conf
巧涧,每個節(jié)點定期向該web服務器請求cluster.conf
的文件內(nèi)容薯蝎,然后實現(xiàn)集群節(jié)點間的尋址,以及擴縮容谤绳。
當需要進行集群擴縮容時占锯,只需要修改cluster.conf
文件即可,然后每個節(jié)點向地址服務器請求時會自動得到最新的cluster.conf
文件內(nèi)容缩筛。
public void init(ServerMemberManager memberManager) throws NacosExce ption {
super.init(memberManager);
initAddressSys();
this.maxFailCount =Integer.parseInt(ApplicationUtils.getProperty("maxHealthCheckFailCount", "12"));
}
private void initAddressSys() {
String envDomainName = System.getenv("address_server_domain");
if (StringUtils.isBlank(envDomainName)) {
domainName = System.getProperty("address.server.domain", "jm env.tbsite.net");
} else {
domainName = envDomainName;
}
String envAddressPort = System.getenv("address_server_port");
if (StringUtils.isBlank(envAddressPort)) {
addressPort = System.getProperty("address.server.port", "8080");
} else {
addressPort = envAddressPort;
}
addressUrl = System.getProperty("address.server.url", memberManager.getContextPath() + "/" + "serverlist");
addressServerUrl = "http://" + domainName + ":" + addressPort + addressUrl;envIdUrl = "http://" + domainName + ":" + addressPort + "/env";
Loggers.CORE.info("ServerListService address-server port:" + addressPort);
Loggers.CORE.info("ADDRESS_SERVER_URL:" + addressServerUrl);
}
@SuppressWarnings("PMD.UndefineMagicConstantRule")
@Override
public void run() throws NacosException {
// With the address server, you need to perform a synchronous me mber node pull at startup
// Repeat three times, successfully jump out
boolean success = false;
Throwable ex = null;
int maxRetry = ApplicationUtils.getProperty("nacos.core.address-server.retry", Integer.class, 5);
for (int i = 0; i < maxRetry; i ++) {
try {
syncFromAddressUrl();
success = true;
break;
} catch (Throwable e) {
ex = e;
Loggers.CLUSTER.error("[serverlist] exception, error : {}", ex);
}
}
if (!success) {
throw new NacosException(NacosException.SERVER_ERROR, ex);;
}
task = new AddressServerSyncTask();
GlobalExecutor.scheduleSyncJob(task, 5_000L);
}
在初始化時消略,會主動去向地址服務器同步當前的集群成員列表信息,如果失敗則進行重試瞎抛,其最大重試次數(shù)可通過設(shè)置nacos.core.address-server.retry
來控制艺演,默認是5次,然后成功之后,將創(chuàng)建定時任務去向地址服務器同步集群成員節(jié)點信息胎撤。
DiscoveryMemberLookup
該尋址模式是新增的集群節(jié)點發(fā)現(xiàn)模式晓殊,該模式需要cluster.conf
或者-Dnacos.member.list
提供初始化集群節(jié)點列表,假設(shè)已有集群cluster-one中有A伤提、B巫俺、C三個節(jié)點,新節(jié)點D要加集群肿男,那么只需要節(jié)點D在啟動時的集群節(jié)點列表存在A介汹、B、C三個中的一個即可舶沛,然后節(jié)點之間會相互同步各自知道的集群節(jié)點列表嘹承,在一定的是時間內(nèi),A冠王、B赶撰、C舌镶、D四個節(jié)點知道的集群節(jié)點成員列表都會是[A柱彻、B、C餐胀、D]在執(zhí)行集群節(jié)點列表同步時哟楷,會隨機選取K個處于UP狀態(tài)的節(jié)點進行同步。
Collection<Member> members = MemberUtils.kRandom(memberManager, membe r -> {
// local node or node check failed will not perform task processing
if (memberManager.isSelf(member) || !member.check ()) {
return false;
}
NodeState state = member.getState();
return !(state == NodeState.DOWN || state == Node State.SUSPICIOUS);
});
通過一個簡單的流程圖看下DiscoveryMemberLookup是怎么工作的
RPC端口協(xié)商
由于將來Nacos會對整體通信通道做升級否灾,采用GRPC優(yōu)化nacos-server之間卖擅,nacos-client與nacos-server之間的通信,同時為了兼容目前已有的HTTP協(xié)議接口墨技,那么勢必會帶來這個問題惩阶,本機用于RPC協(xié)議的端口如何讓其他節(jié)點知道?這里有兩個解決方案扣汪。
重新設(shè)計cluster.conf
之前的cluster.conf格式
ip[:port]
ip[:port]
ip[:port]
由于nacos默認端口是8848断楷,因此在端口未被修改的情況下,可以直接寫IP列表
新的cluster.conf
ip[:port][:RPC_PORT]
ip[:port][:RPC_PORT]
ip[:port][:RPC_PORT]
對于之前的cluster.conf
是完全支持的崭别,因為nacos內(nèi)部可以通過一些計算來約定RPC_PORT
的端口值冬筒,也可以通過顯示的設(shè)置來約定。通過計算來約定RPC_PORT的代碼如下:
// member port
int port = Member.getPort();
// Set the default Raft port information for security
int rpcPort = port + 1000 >= 65535 ? port + 1 : port + 1000;
但是這樣會有一個問題茅主,即如果用戶手動設(shè)置了RPC_PORT
的話舞痰,那么對于客戶端、服務端來說诀姚,感知新的RPC_PORT
就要修改對應的配置文件或者初始化參數(shù)响牛。因此希望說能夠讓用戶無感知的過渡到RPC_PORT
通信通道,即用戶需要對RPC協(xié)議使用的端口無需自己在進行設(shè)置。
端口協(xié)商
端口協(xié)商即利用目前已有的HTTP接口呀打,將RPC協(xié)議占用的端口通過HTTP接口進行查詢返回论衍,這樣無論是客戶端還是服務端,都無需修改目前已有的初始化參數(shù)或者cluster.conf
文件聚磺,其大致時序圖如下:
通過一個額外的端口獲取HTTP接口坯台,直接在內(nèi)部實現(xiàn)RPC端口的協(xié)商,并且只會在初始化時進行拉取瘫寝,這樣蜒蕾,將來nacos新增任何一種協(xié)議的端口都無需修改相應的配置信息,自動完成協(xié)議端口的感知焕阿。
Nacos一致性協(xié)議協(xié)議層抽象
從nacos的未來的整體架構(gòu)圖可以看出咪啡,一致性協(xié)議層將是作為nacos的最為核心的模塊,將服務于構(gòu)建在core模塊之上的各個功能模塊暮屡,或者服務與core模塊本身撤摸。而一致性協(xié)議因為分區(qū)容錯性的存在,需要在可用性與一致性之間做選擇褒纲,因此就存在兩大類一致性:最終一致性和強一致性准夷。在nacos中,這兩類致性協(xié)議都是可能用到的莺掠,比如naming模塊衫嵌,對于服務實例的數(shù)據(jù)管理分別用到了AP以及CP,而對于config模塊彻秆,將會涉及使用CP楔绞。同時還有如下幾個功能需求點:
- 目前持久化服務使用了變種版本的raft,并且業(yè)務和raft協(xié)議耦合唇兑,因此需要抽離解耦酒朵,同時是選擇一個標準的Java版Raft實現(xiàn)。
- 對于中小用戶扎附,配置基本不超過100個蔫耽,獨立一個mysql,相對重一些帕棉,需要一個輕量化的存儲方案针肥,并且支持2.0不依賴mysql和3.0依賴mysql可配置能力。
- 由于CP或者AP香伴,其存在多種實現(xiàn)慰枕,如何對一致性協(xié)議層做一次很好的抽象,以便將來可以快速的實現(xiàn)底層一致性協(xié)議具體實現(xiàn)的替換即纲,如Raft協(xié)議具帮,目前nacos的選型是JRaft,不排除將來nacos會自己實現(xiàn)一個標準raft協(xié)議或者實現(xiàn)Paxos協(xié)議。
- 由于Nacos存在多個獨立工作的功能模塊蜂厅,每個功能模塊之間不能出現(xiàn)影響匪凡,比如A模塊處理請求過慢或者出現(xiàn)異常時,不能影響B(tài)模塊的正常工作掘猿,即每個功能模塊在使用一致性協(xié)議時病游,如何將每個模塊的數(shù)據(jù)處理進行隔離?
根據(jù)一致協(xié)議以及上述功能需求點稠通,本次做了一個抽象的一致協(xié)議層以及相關(guān)的接口衬衬。
一致協(xié)議接口:ConsistencyProtocol
所謂一致性,即多個副本之間是否能夠保持一致性的特性改橘,而副本的本質(zhì)就是數(shù)據(jù)滋尉,對數(shù)據(jù)的操作,不是獲取就是修改飞主。同時狮惜,一致協(xié)議其實是針對分布式情況的,而這必然涉及多個節(jié)點碌识,因此碾篡,需要有相應的接口能夠調(diào)整一致性協(xié)議的協(xié)同工作節(jié)點。如果我們要觀察一致性協(xié)議運行的情況丸冕,該怎么辦耽梅?比如Raft協(xié)議,我們希望得知當前集群中的Leader是誰胖烛,任期的情況,當前集群中的成員節(jié)點有誰诅迷?因此佩番,還需要提供一個一致性協(xié)議元數(shù)據(jù)獲取。
綜上所述罢杉,ConsistencyProtcol的大致設(shè)計可以出來了
/**
* Has nothing to do with the specific implementation of the consist ency protocol
* Initialization sequence: init(Config)
*
* <ul>
* <li>{@link Config} : Relevant configuration information requi red by the consistency protocol,
* for example, the Raft protocol needs to set the election time out time, the location where
* the Log is stored, and the snapshot task execution interval</ li>
* <li>{@link ConsistencyProtocol#protocolMetaData()} : Returns metadata information of the consistency
* protocol, such as leader, term, and other metadata informatio n in the Raft protocol</li>
* </ul>
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public interface ConsistencyProtocol<T extends Config> extends CommandOperations {
/**
* Consistency protocol initialization: perform initialization o perations based
on the incoming Config
* 一致性協(xié)議初始化趟畏,根據(jù) Config 實現(xiàn)類
*
* @param config {@link Config}
*/
void init(T config);
/**
* Copy of metadata information for this consensus protocol
* 該一致性協(xié)議的元數(shù)據(jù)信息
*
* @return metaData {@link ProtocolMetaData}
*/
ProtocolMetaData protocolMetaData();
/**
* Obtain data according to the request
* 數(shù)據(jù)獲取操作,根據(jù)GetRequest中的請求上下文進行查詢相應的數(shù)據(jù)
*
* @param request request
* @return data {@link GetRequest}
* @throws Exception
*/
GetResponse getData(GetRequest request) throws Exception;
/**
* Data operation, returning submission results synchronously
* 同步數(shù)據(jù)提交滩租,在 Datum 中已攜帶相應的數(shù)據(jù)操作信息
*
* @param data {@link Log}
* @return submit operation result
* @throws Exception
*/
LogFuture submit(Log data) throws Exception;
/**
* Data submission operation, returning submission results async hronously
* 異步數(shù)據(jù)提交赋秀,在 Datum 中已攜帶相應的數(shù)據(jù)操作信息,返回一個Future律想,自行操作猎莲,提交發(fā) 的異常會在CompleteFuture中
*
* @param data {@link Log}
* @return {@link CompletableFuture<LogFuture>} submit result
* @throws Exception when submit throw Exception
*/
CompletableFuture<LogFuture> submitAsync(Log data);
/**
* New member list
* 新的成員節(jié)點列表,一致性協(xié)議處理相應的成員節(jié)點是加入還是離開
*
* @param addresses [ip:port, ip:port, ...]
*/
void memberChange(Set<String> addresses);
/**
* Consistency agreement service shut down
* 一致性協(xié)議服務關(guān)閉
*/
void shutdown();
}
針對CP協(xié)議技即,由于存在Leader的概念著洼,因此需要提供一個方法用于獲取CP協(xié)議當前的Leader是誰
public interface CPProtocol<C extends Config> extends ConsistencyPro tocol<C> {
/**
* Returns whether this node is a leader node
*
* @param group business module info
* @return is leader
* @throws Exception
*/
boolean isLeader(String group) throws Exception;
}
數(shù)據(jù)操作請求提交對象:Log、GetRequest
上面說到,一致性協(xié)議其實是對于數(shù)據(jù)操作而言的身笤,數(shù)據(jù)操作基本分為兩大類:數(shù)據(jù)查詢以及數(shù)據(jù)修改豹悬,同時還要滿足不同功能模塊之間的數(shù)據(jù)進行隔離。因此這里針對數(shù)據(jù)修改操作以及數(shù)據(jù)查詢操作分別闡述液荸。
1. 數(shù)據(jù)修改
- 數(shù)據(jù)修改操作瞻佛,一定要知道本次請求是屬于哪一個功能模塊的。
- 數(shù)據(jù)修改操作娇钱,首先一定要知道這個數(shù)據(jù)的修改操作具體是哪一種修改操作涤久,方便功能模塊針對真正的數(shù)據(jù)修改操作進行相應的邏輯操作。
- 數(shù)據(jù)修改操作忍弛,一定要知道修改的數(shù)據(jù)是什么响迂,即請求體,為了使得一致性協(xié)議層更為通用细疚,這里對于請求體的數(shù)據(jù)結(jié)構(gòu)蔗彤,選擇了byte[]數(shù)組。
- 數(shù)據(jù)的類型疯兼,由于我們將真正的數(shù)據(jù)序列化為了byte[]數(shù)組然遏,為了能夠正常序列化,我們可能還需要記錄這個數(shù)據(jù)的類型是什么吧彪。
- 本次請求的信息摘要或者標識信息待侵。
- 本次請求的額外信息,用于將來擴展需要傳輸?shù)臄?shù)據(jù)
綜上姨裸,可以得出Log對象的設(shè)計如下:
message Log {
// 功能模塊分組信息
string group = 1;
// 摘要或者標識
string key = 2;
// 具體請求數(shù)據(jù)
bytes data = 3;
// 數(shù)據(jù)類型
string type = 4;
// 更為具體的數(shù)據(jù)操作
string operation = 5;
// 額外信息
map<string, string> extendInfo = 6;
}
2. 數(shù)據(jù)查詢
- 數(shù)據(jù)查詢操作秧倾,一定要知道本次請求是由哪一個功能模塊發(fā)起的。
- 數(shù)據(jù)查詢的條件是什么傀缩,為了兼容各種存儲結(jié)構(gòu)的數(shù)據(jù)查詢操作那先,這byte[]進行存儲。
- 本次請求的額外信息赡艰,用于將來擴展需要傳輸?shù)臄?shù)據(jù)售淡。
綜上,可以得出GetRequest對象的設(shè)計如下
message GetRequest {
// 功能模塊分組信息
string group = 1;
// 具體請求數(shù)據(jù)
bytes data = 2;
// 額外信息
map<string, string> extendInfo = 3;
}
功能模塊使一致性協(xié)議:LogProcessor
當數(shù)據(jù)操作通過一致性協(xié)議進行submit之后慷垮,每個節(jié)點需要去處理這個Log或者GetRequest對象揖闸,因此,我們需要抽象出一個Log料身、GetRequest對象的Processor汤纸,不同的功能模塊通過實現(xiàn)該處理器泉懦,ConsistencyProtocol內(nèi)部會根據(jù)Log拳话、GetRequest的group屬性乐埠,將Log、GetRequest對象路由到具體的Processor妖谴,當然狂魔,Processor也需要表明自己是屬于哪一個功能模塊的起意。
public abstract class LogProcessor {
/**
* get data by key
*
* @param request request {@link GetRequest}
* @return target type data
*/
public abstract GetResponse getData(GetRequest request);
/**
* Process Submitted Log
*
* @param log {@link Log}
* @return {@link boolean}
*/
public abstract LogFuture onApply(Log log);
/**
* Irremediable errors that need to trigger business price cuts
*
* @param error {@link Throwable}
*/
public void onError(Throwable error) {
}
/**
* In order for the state machine that handles the transaction to be able to route
* the Log to the correct LogProcessor, the LogProcessor needs to have an identity
* information
*
* @return Business unique identification name
*/
public abstract String group();
}
針對CP協(xié)議诺凡,比如Raft協(xié)議,存在快照的設(shè)計议惰,因此我們需要針對CP協(xié)議單獨擴展出一個方法慎颗。
public abstract class LogProcessor4CP extends LogProcessor {
/**
* Discovery snapshot handler
* It is up to LogProcessor to decide which SnapshotOperate shou ld be loaded and saved by itself
*
* @return {@link List <SnapshotOperate>}
*/
public List<SnapshotOperation> loadSnapshotOperate() {
return Collections.emptyList();
}
}
我們可以通過一個時序圖看看,一致性協(xié)議層的大致工作流程如下:
Nacos一致性協(xié)議層之CP協(xié)議的實現(xiàn)選擇——JRaft
一致性協(xié)議層抽象好之后言询,剩下就是具體一致性協(xié)議實現(xiàn)的選擇了俯萎,這里我們選擇了螞蟻服開源的JRaft,那么我們?nèi)绾螌Raft作為CP協(xié)議的一個Backend呢运杭?下面的簡單流程圖描述了當JRaft作為CP協(xié)議的一個Backend時的初始化流程夫啊。
JRaftProtocol是當JRaft作為CP協(xié)議的Backend時的一個ConsistencyProtocol的具體實現(xiàn),其內(nèi)部有一個JRaftServer成員屬性辆憔,JRaftServer分裝了JRaft的各種API操作撇眯,比如數(shù)據(jù)操作的提交,數(shù)據(jù)的查詢虱咧,成員節(jié)點的變更熊榛,Leader節(jié)點的查詢等等。
注意事項:JRaft運行期間產(chǎn)生的數(shù)據(jù)在
${nacos.home}/protocol/raft
文件目錄下腕巡。不同的業(yè)務模塊有不同的文件分組玄坦,如果當節(jié)點出現(xiàn)crash或者異常關(guān)閉時,清空該目錄下的文件绘沉,重啟節(jié)點即可煎楣。
由于JRaft實現(xiàn)了raft group的概念,因此梆砸,完全可以利用 raft group的設(shè)計转质,為每個功能模塊單獨創(chuàng)建個raft group。這里給出部分代碼帖世,該代碼體現(xiàn)了如何將LogProcessor嵌入到狀態(tài)機中并為每個LogPrcessor創(chuàng)建一個Raft Group。
synchronized void createMultiRaftGroup(Collection<LogProcessor4CP> processors) {
// There is no reason why the LogProcessor cannot be processed b ecause of the synchronization
if (!this.isStarted) {
this.processors.addAll(processors);
return;
}
final String parentPath = Paths.get(ApplicationUtils.getNacosHome(), "protocol/raft").toString();
for (LogProcessor4CP processor : processors) {
final String groupName = processor.group();
if (alreadyRegisterBiz.contains(groupName)) {
throw new DuplicateRaftGroupException(groupName);
}
alreadyRegisterBiz.add(groupName);
final String logUri = Paths.get(parentPath, groupName, "log").toString();
final String snapshotUri = Paths.get(parentPath, groupName,"snapshot").toString();
final String metaDataUri = Paths.get(parentPath, groupName,"meta-data").toString();
// Initialize the raft file storage path for different services
try {
DiskUtils.forceMkdir(new File(logUri));
DiskUtils.forceMkdir(new File(snapshotUri));
DiskUtils.forceMkdir(new File(metaDataUri));
}
catch (Exception e) {
Loggers.RAFT.error("Init Raft-File dir have some error : {}", e);
throw new RuntimeException(e);
}
// Ensure that each Raft Group has its own configuration and NodeOptions
Configuration configuration = conf.copy();
NodeOptions copy = nodeOptions.copy();
// Here, the LogProcessor is passed into StateMachine, and when the StateMachine
// triggers onApply, the onApply of the LogProcessor is actually called
NacosStateMachine machine = new NacosStateMachine(this, processor);
copy.setLogUri(logUri);
copy.setRaftMetaUri(metaDataUri);
copy.setSnapshotUri(snapshotUri);
copy.setFsm(machine);
copy.setInitialConf(configuration);
// Set snapshot interval, default 1800 seconds
int doSnapshotInterval = ConvertUtils.toInt(raftConfig.getVal(RaftSysConstants.RAFT_SNAPSHOT_INTERVAL_SECS),
RaftSysConstants.DEFAULT_RAFT_SNAPSHOT_INTERVAL_ SECS);
// If the business module does not implement a snapshot processor, cancel the snapshot
doSnapshotInterval = CollectionUtils.isEmpty(processor.loadS napshotOperate()) ? 0 : doSnapshotInterval;
copy.setSnapshotIntervalSecs(doSnapshotInterval);
Loggers.RAFT.info("create raft group : {}", groupName);
RaftGroupService raftGroupService = new RaftGroupService(gro upName, localPeerId, copy, rpcServer, true);
// Because RpcServer has been started before, it is not allo wed to start again here
Node node = raftGroupService.start(false);
machine.setNode(node);
RouteTable.getInstance().updateConfiguration(groupName, conf iguration);
// Turn on the leader auto refresh for this group
Random random = new Random();
long period = nodeOptions.getElectionTimeoutMs() + random.ne xtInt(5 * 1000);
RaftExecutor.scheduleRaftMemberRefreshJob(() -> refreshRoute Table(groupName), period, period, TimeUnit.MILLISECONDS);
// Save the node instance corresponding to the current group
multiRaftGroup.put(groupName, new RaftGroupTuple(node, proce ssor, raftGroupService));
}
}
或許有的人會有疑問沸枯,為什么要創(chuàng)建多個raft group日矫,既然之前已經(jīng)設(shè)計出了LogProcessor,完全可以利用一個Raft
Group绑榴,在狀態(tài)機appl時哪轿,根據(jù)Log的group屬性進行路由到不同的LogProcessor即可,每個功能模塊就創(chuàng)建一個raft group翔怎,不是會消耗大量的資源嗎窃诉?
正如之前所說杨耙,我們希望獨工作的模塊之間相互不存在影響,比如A模塊處理Log因為存在Block操作可能使得apply的速度緩慢飘痛,亦或者可能中途發(fā)生異常珊膜,對于Raft協(xié)議來說,當日志apply失敗時宣脉,狀態(tài)機將不能夠繼續(xù)向前推進车柠,因為如果繼續(xù)向前推進的話,由于上一步的apply失敗塑猖,后面的所有apply都可能失敗竹祷,將會導致這個節(jié)點的數(shù)據(jù)與其他節(jié)點的數(shù)據(jù)永遠不一致。如果說我們將所有獨立工作的模塊羊苟,對于數(shù)據(jù)操作的請求處理放在同一個raft group塑陵,即一個狀態(tài)機中,就不可避免的會出現(xiàn)上述所說的問題蜡励,某個模塊在apply 日志發(fā)生不可控的因素時令花,會影響其他模塊的正常工作。
JRaft運維操作
為了使用者能夠?qū)Raft進行相關(guān)簡單的運維巍虫,如Leader的切換彭则,重置當前Raft集群成員,觸發(fā)某個節(jié)點進行Snapshot操作等等占遥,提供了一個簡單的HTTP接口進行操作俯抖,并且該接口有一定的限制,即每次只會執(zhí)行一條運維指令瓦胎。
1.切換某一個Raft Group的Leader節(jié)點
POST /nacos/v1/core/ops/raft
{
"groupId": "xxx",
"transferLeader": "ip:{raft_port}"
}
2.重置某一個Raft Group的集群成員
POST /nacos/v1/core/ops/raft
{
"groupId": "xxx",
"resetRaftCluster": "ip:{raft_port},ip:{raft_port},ip:{raft_por t},ip:{raft_port}"
}
3.觸發(fā)某一個Raft Group執(zhí)行快照操作
POST /nacos/v1/core/ops/raft
{
"groupId": "xxx",
"doSnapshot": "ip:{raft_port}"
}
JRaft協(xié)議相關(guān)配置參數(shù)
### Sets the Raft cluster election timeout, default value is 5 second
nacos.core.protocol.raft.data.election_timeout_ms=5000
### Sets the amount of time the Raft snapshot will execute periodica lly, default is 30 minute
nacos.core.protocol.raft.data.snapshot_interval_secs=30
### Requested retries, default value is 1
nacos.core.protocol.raft.data.request_failoverRetries=1
### raft internal worker threads
nacos.core.protocol.raft.data.core_thread_num=8
### Number of threads required for raft business request processing
nacos.core.protocol.raft.data.cli_service_thread_num=4
### raft linear read strategy, defaults to index
nacos.core.protocol.raft.data.read_index_type=ReadOnlySafe
### rpc request timeout, default 5 seconds
nacos.core.protocol.raft.data.rpc_request_timeout_ms=5000
### Maximum size of each file RPC (snapshot copy) request between me mbers, default is 128 K
nacos.core.protocol.raft.data.max_byte_count_per_rpc=131072
### Maximum number of logs sent from leader to follower, default is 1024
nacos.core.protocol.raft.data.max_entries_size=1024
### Maximum body size for sending logs from leader to follower, defa ult is 512K
nacos.core.protocol.raft.data.max_body_size=524288
### Maximum log storage buffer size, default 256K
nacos.core.protocol.raft.data.max_append_buffer_size=262144
### Election timer interval will be a random maximum outside the spe cified time, default is 1 second
nacos.core.protocol.raft.data.max_election_delay_ms=1000
### Specify the ratio between election timeout and heartbeat interval. Heartbeat interval is equal to
### electionTimeoutMs/electionHeartbeatFactor芬萍,One tenth by default.
nacos.core.protocol.raft.data.election_heartbeat_factor=10
### The tasks submitted to the leader accumulate the maximum batch s ize of a batch flush log storage. The default is 32 tasks.
nacos.core.protocol.raft.data.apply_batch=32
### Call fsync when necessary when writing logs and meta informatio n, usually should be true
nacos.core.protocol.raft.data.sync=true
### Whether to write snapshot / raft meta-information to call fsync. The default is false. When sync is true, it is preferred to respect sync.
nacos.core.protocol.raft.data.sync_meta=false
### Internal disruptor buffer size. For applications with high write throughput, you need to increase this value. The default value is 16384.
nacos.core.protocol.raft.data.disruptor_buffer_size=16384
### Whether to enable replication of pipeline request optimization, which is enabled by default
nacos.core.protocol.raft.data.replicator_pipeline=true
### Maximum number of in-flight requests with pipeline requests enab led, default is 256
nacos.core.protocol.raft.data.max_replicator_inflight_msgs=256
### Whether to enable LogEntry checksum
nacos.core.protocol.raft.data.enable_log_entry_checksum=false
Nacos內(nèi)嵌分布式ID
nacos內(nèi)嵌的分布式ID為Snakeflower,dataCenterId默認為1搔啊,workerId的值計算方式如下:
InetAddress address;
try {
address = InetAddress.getLocalHost();
} catch (final UnknownHostException e) {
throw new IllegalStateException(
"Cannot get LocalHost InetAddress, please ch eck your network!");
}
byte[] ipAddressByteArray = address.getAddress();
workerId = (((ipAddressByteArray[ipAddressByteArray.length - 2] & 0B 11)
<< Byte.SIZE) + (ipAddressByteArray[ipAddressByteArray.length - 1] & 0xFF));
如果需要手動指定dataCenterId以及workerId柬祠,則在application.properties或者啟動時添加命令 參數(shù)
### set the dataCenterID manually
# nacos.core.snowflake.data-center=
### set the WorkerID manually
# nacos.core.snowflake.worker-id=
Nacos內(nèi)嵌的輕量的基于Derby的分布式關(guān)系型存儲
背景
- 如果配置文件數(shù)量較少,在集群模式下需要高可用數(shù)據(jù)庫集群作為支撐的成本太大负芋,期望有一個輕量的分布式關(guān)系型存儲來解決漫蛔。
- nacos內(nèi)部一些元數(shù)據(jù)信息存儲,比如用戶信息旧蛾,命名空間信息
- 思路來源:https://github.com/rqlite/rqlite
設(shè)計思路
總體
將一次請求操作涉及的SQL上下文按順序保存起來莽龟。然后通過一致協(xié)議層將本次請求涉及的SQL上下 文進行同步,然后每個節(jié)點將其解析并重新按順序在一次數(shù)據(jù)庫會話中執(zhí)行锨天。
誰可以處理請求
當使用者開啟1.3.0-BETA的新特性——內(nèi)嵌分布式關(guān)系型數(shù)據(jù)存儲時毯盈,所有的寫操作請求都將路由到Leader節(jié)點進行處理;但是病袄,由于Raft狀態(tài)機的特性搂赋,當某一個節(jié)點在apply數(shù)據(jù)庫操作請求時發(fā)生非SQL邏輯錯誤引發(fā)的異常時赘阀,將導致狀態(tài)機無法繼續(xù)正常進行工作,此時將會觸發(fā)配置管理模塊的降級操作脑奠。
private void registerSubscribe() {
NotifyCenter.registerSubscribe(new SmartSubscribe() {
@Override
public void onEvent(Event event) {
if (event instanceof RaftDBErrorRecoverEvent) {
downgrading = false;
return;
}
if (event instanceof RaftDBErrorEvent) {
downgrading = true;
}
}
@Override
public boolean canNotify(Event event) {
return (event instanceof RaftDBErrorEvent) || (event instanceof RaftDBErrorRecoverEvent);
}
});
}
因此基公,綜上所述,可以通過活動圖來理解下捺信,什么情況下需要將請求進行轉(zhuǎn)發(fā)呢酌媒?
相關(guān)數(shù)據(jù)承載對象
數(shù)據(jù)庫的DML語句是select、insert迄靠、update秒咨、delete,根據(jù)SQL語句對于數(shù)據(jù)操作的性質(zhì)掌挚,可以分為兩類:query以及update雨席,select語句對應的是數(shù)據(jù)查詢,insert吠式、update陡厘、delete語句對應的是數(shù)據(jù)修改。同時在進行數(shù)據(jù)庫操作時特占,為了避免SQL入注糙置,使用的是PreparedStatement,因此需要SQL語句+參數(shù)是目,因此可以得到兩個關(guān)于數(shù)據(jù)庫操作的Request對象谤饭。
- SelectRequest
public class SelectRequest implements Serializable {
private static final long serialVersionUID = 2212052574976898602L;
// 查詢類別,因為 前使 的是JdbcTemplate懊纳,查詢單個揉抵、查詢多個,是否使 RowM apper轉(zhuǎn)為對象
5private byte queryType;
// sql語句
// select * from config_info where
private String sql;
private Object[] args;
private String className;
}
- ModifyRequest
public class ModifyRequest implements Serializable {
private static final long serialVersionUID = 4548851816596520564L;
private int executeNo;
private String sql;
private Object[] args;
}
配置發(fā)布
配置發(fā)布操作涉及三個事務:
- config_info保存配置信息嗤疯。
- config_tags_relation保存配置與標簽的關(guān)聯(lián)關(guān)系冤今。
- his_config_info保存 條配置操作歷史記錄。
這三個事務都在配置發(fā)布這個大事務下茂缚,如果說我們對每個事務操作進行一個Raft協(xié)議提交戏罢,假設(shè)1、2兩個事務通過Raft提交后都成功Apply了脚囊,第三個事務在進行Raft提交后apply失敗帖汞,那么對于這個配置發(fā)布的大事務來說,是需要整體回滾的凑术,否則就會違反原子性,那么可能需要說將事務回滾操作又進行一次Raft提交所意,那么整體的復雜程度上升淮逊,并且直接引了分布式事務的管理催首,因此為了避免這個問題,我們將這三個事務涉及的SQL上下文進行整合成一個大的SQL上下文泄鹏,對這大的SQL上下文進行Raft協(xié)議提交郎任。保證了三個子事務在同一次數(shù)據(jù)庫會話當中,成功解決原子性的問題备籽,同時由于Raft協(xié)議對于事務日志的處理是串行執(zhí)行的舶治,因此相當于將數(shù)據(jù)庫的事務隔離級別調(diào)整為串行化。
public void addConfigInfo(final String srcIp, final String srcUser,
final ConfigInfo configInfo, final Timestamp time,
final Map<String, Object> configAdvanceInfo, final boolean notify) {
try {
// 同過雪花ID獲取一個ID值
long configId = idGeneratorManager.nextId(configInfoId);
long configHistoryId = idGeneratorManager.nextId(this.configHistoryId);
// 配置插入
addConfigInfoAtomic(configId, srcIp, srcUser, configInfo, time, configAdvanceInfo);
String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");
// 配置與標簽信息關(guān)聯(lián)操作
addConfigTagsRelation(configId, configTags, configInfo.getD ataId(), configInfo.getGroup(), configInfo.getTenant());
// 配置歷史插入
insertConfigHistoryAtomic(configHistoryId, configInfo, srcI p, srcUser, time, "I");
boolean result = databaseOperate.smartUpdate();
if (!result) {
throw new NacosConfigException("Config add failed");
}
if (notify) {
EventDispatcher.fireEvent(
new ConfigDataChangeEvent(false, configInfo.getDataId(),
configInfo.getGroup(), configInfo.getTenant(), time.getTime()));
}
}
finally {
SqlContextUtils.cleanCurrentSqlContext();
}
}
public long addConfigInfoAtomic(final long id, final String srcIp, final String srcUser, final ConfigInfo configInfo, final Timestamp time, Map<String, Object> configAdvanceInfo) {
...
// 參數(shù)處理
...
final String sql =
"INSERT INTO config_info(id, data_id, group_id, tenant_id, app_name, content, md5, src_ip, src_user, gmt_create,"
+ "gmt_modified, c_desc, c_use, effect, type, c_schema) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
final Object[] args = new Object[] { id, configInfo.getDataId(),
configInfo.getGroup(), tenantTmp, appNameTmp, configInfo.getContent(),
md5Tmp, srcIp, srcUser, time, time, desc, use, effect, type, schema, };
SqlContextUtils.addSqlContext(sql, args);
return id;
}
public void addConfigTagRelationAtomic(long configId, String tagName, String dataId,
String group, String tenant) {
final String sql = "INSERT INTO config_tags_relation(id,tag_name,tag_type,data_id,group_id,tenant_id) " + "VALUES(?,?,?,?,?,?)";
final Object[] args = new Object[] { configId, tagName, null, d ataId, group, tenant };
SqlContextUtils.addSqlContext(sql, args);
}
public void insertConfigHistoryAtomic(long configHistoryId, ConfigI nfo configInfo, String srcIp, String srcUser, final Timestamp time, String ops) {
...
// 參數(shù)處理
...
final String sql = "INSERT INTO his_config_info (id,data_id,group_id,tenant_id,app_name,content,md5," + "src_ip,src_user,gmt_modified,op_type) VALUES(?,?,?,?,?,?,?,?,?,?,?)";
final Object[] args = new Object[] { configHistoryId, configInf o.getDataId(), configInfo.getGroup(), tenantTmp, appNameTmp, configInfo.getContent(), md5Tmp, srcIp, srcUser, time, ops};
SqlContextUtils.addSqlContext(sql, args);
}
/**
* Temporarily saves all insert, update, and delete statements under
* a transaction in the order in which they occur
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public class SqlContextUtils {
private static final ThreadLocal<ArrayList<ModifyRequest>> SQL_CONTEXT =
ThreadLocal.withInitial(ArrayList::new);
public static void addSqlContext(String sql, Object... args) {
ArrayList<ModifyRequest> requests = SQL_CONTEXT.get();
ModifyRequest context = new ModifyRequest();
context.setExecuteNo(requests.size());
context.setSql(sql);
context.setArgs(args);
requests.add(context);
SQL_CONTEXT.set(requests);
}
public static List<ModifyRequest> getCurrentSqlContext() {
return SQL_CONTEXT.get();
}
public static void cleanCurrentSqlContext() {
SQL_CONTEXT.remove();
}
}
通過一個時序圖來更加直觀的理解
如何使用新特性
#*************** Embed Storage Related Configurations ***************
#
### This value is true in stand-alone mode and false in cluster mode
### If this value is set to true in cluster mode, nacos's distributed storage engine is turned on embeddedStorage=true
是否啟用內(nèi)嵌的分布式關(guān)系型存儲的活動圖
新特性的相關(guān)運維操作
直接查詢每個節(jié)點的derby存儲的數(shù)據(jù)
GET /nacos/v1/cs/ops/derby?sql=select * from config_info
return List<Map<String, Object>>
不足
- 在數(shù)據(jù)庫上層構(gòu)建一層分布式數(shù)據(jù)操作同步層车猬,對數(shù)據(jù)庫的操作存在了限制霉猛,如第一步insert操作,然后select操作珠闰,最后在update操作惜浅,這種在數(shù)據(jù)修改語句中穿插著查詢語句的操作順序是不支持的。
- 限制了數(shù)據(jù)庫的性能伏嗜,由于間接的將數(shù)據(jù)庫事務隔離級別調(diào)整為了串行化坛悉,人為的將并發(fā)能力降低了。
未來演進
將于Apache Derby官方一起嘗試基于Raft實現(xiàn)BingLog的同步復制操作承绸,從底層實現(xiàn)數(shù)據(jù)庫同步能力裸影。