Nacos 1.3.0-BETA 即將來襲,這次來波大的坏挠!

概述

本次1.3.0-BETA的改動程度很大芍躏,涉及兩個模塊的修改以及新增一個核心模塊。

  1. nacos-core模塊修改
    a. nacos集群節(jié)點成員尋址模式的統(tǒng)一管理
    b. nacos內(nèi)部事件機制
    c. nacos一致性協(xié)議層
  2. nacos-config模塊修改
    a. 新增內(nèi)嵌分布式數(shù)據(jù)存儲組件
    b. 內(nèi)嵌存儲與外置存儲細分
    c. 內(nèi)嵌存儲簡單運維
  3. nacos-consistency模塊新增
    a. 對于AP協(xié)議以及CP協(xié)議的統(tǒng)一抽象

Nacos的未來整體邏輯架構(gòu)及其組件

1.png

Nacos集群成員節(jié)點尋址模式

在1.3.0-BETA之前降狠,nacos的naming模塊以及config模塊存在各自的集群成員節(jié)點列表管理任務对竣。為了統(tǒng)一nacos集群下成員列表的尋址模式庇楞,將集群節(jié)點管理的實現(xiàn)從naming模塊以及config模塊剝離出來,統(tǒng)一下沉到了core模塊的尋址模式否纬,同時新增命令參數(shù)-Dnacos.member.list進行設(shè)置nacos集群節(jié)點列表吕晌,該參數(shù)可以看作是cluster.conf 文件的一個替代。 前nacos的尋址模式類別如下:

  • a. 單機模式:StandaloneMemberLookup
  • b. 集群模式:
    • i.cluster.conf 件存在:FileConfigMemberLookup
    • ii.nacos.member.discovery==true:DiscoveryMemberLookup
    • iii.cluster.conf 件不存在或者 -Dnacos.member.list沒有設(shè)置:
      AddressServerMemberLookup

邏輯圖如下:


2.png

本次還新增成員節(jié)點元數(shù)據(jù)信息临燃,如site睛驳、raft_port、adweight膜廊、weight乏沸,以支持將來在成員節(jié)點之間做相應的負載均衡或者其他操作,因此cluster.conf 件中配置集群成員節(jié)點列表的格式如下:

1172.20.10.7:7001?raft_port=8001&site=unknown&adweight=0&weight=1

該格式完全兼容原本的cluster.conf格式爪瓜,用戶在使用 1.3.0-BETA版本時蹬跃, 無需改動cluster.conf 文件的內(nèi)容。

尋址模式詳細

接下來介紹除了單機模式下的尋址模式的其他三種尋址模式

FileConfigMemberLookup

該尋址模式是基于cluster.conf文件進行管理的铆铆,每個節(jié)點會讀取各${nacos.home}/conf下的cluster.conf 件內(nèi)的成員節(jié)點列表蝶缀,然后組成一個集群。并且在首次讀取完${nacos.home}/conf下的cluster.conf文件后薄货,會自動向操作系統(tǒng)的inotify機制注冊一個目錄監(jiān)聽器翁都,監(jiān)聽${nacos.home}/conf目錄下的所有文件變動(注意,這里只會監(jiān)聽文件谅猾,對于目錄下的文件變動無法監(jiān)聽),當需要進行集群節(jié)點擴縮容時柄慰,需要手動去修改每個節(jié)點各自${nacos.home}/conf下的cluster.conf的成員節(jié)點列表內(nèi)容。

private FileWatcher watcher = new FileWatcher() {

@Override
public void onChange(FileChangeEvent event) {
    readClusterConfFromDisk();
    }

@Override
public boolean interest(String context) {
    return StringUtils.contains(context, "cluster.conf");
    }
};

@Override
public void run() throws NacosException {
    readClusterConfFromDisk();
   
    if (memberManager.getServerList().isEmpty()) {
        throw new NacosException(NacosException.SERVER_ERROR,
            "Failed to initialize the member node, is empty" );
    }
    
    // Use the inotify mechanism to monitor file changes and automat ically
    // trigger the reading of cluster.conf
    
    try {
        WatchFileCenter.registerWatcher(ApplicationUtils.getConfFile Path(), watcher);
    }
    catch (Throwable e) {
        Loggers.CLUSTER.error("An exception occurred in the launch f ile monitor : {}", e);
    }
}

首次啟動時直接讀取cluster.conf文件內(nèi)的節(jié)點列表信息赊瞬,然后向WatchFileCenter注冊一個目錄監(jiān)聽器先煎,當cluster.conf 文件發(fā)生變動時自動觸發(fā)readClusterConfFromDisk()重新讀取cluster.conf文件。

AddressServerMemberLookup

該尋址模式是基于一個額外的web服務器來管理cluster.conf巧涧,每個節(jié)點定期向該web服務器請求cluster.conf的文件內(nèi)容薯蝎,然后實現(xiàn)集群節(jié)點間的尋址,以及擴縮容谤绳。

當需要進行集群擴縮容時占锯,只需要修改cluster.conf文件即可,然后每個節(jié)點向地址服務器請求時會自動得到最新的cluster.conf文件內(nèi)容缩筛。

public void init(ServerMemberManager memberManager) throws NacosExce ption {
    super.init(memberManager);
    initAddressSys();
    this.maxFailCount =Integer.parseInt(ApplicationUtils.getProperty("maxHealthCheckFailCount", "12"));
}

private void initAddressSys() {

    String envDomainName = System.getenv("address_server_domain");
    if (StringUtils.isBlank(envDomainName)) {
        domainName = System.getProperty("address.server.domain", "jm env.tbsite.net");
    } else {
        domainName = envDomainName;
    }
    String envAddressPort = System.getenv("address_server_port");
    if (StringUtils.isBlank(envAddressPort)) {
        addressPort = System.getProperty("address.server.port", "8080");
    } else {
        addressPort = envAddressPort;
    }
    addressUrl = System.getProperty("address.server.url", memberManager.getContextPath() + "/" + "serverlist");
    addressServerUrl = "http://" + domainName + ":" + addressPort + addressUrl;envIdUrl = "http://" + domainName + ":" + addressPort + "/env";

    Loggers.CORE.info("ServerListService address-server port:" + addressPort);
    Loggers.CORE.info("ADDRESS_SERVER_URL:" + addressServerUrl);
}

@SuppressWarnings("PMD.UndefineMagicConstantRule")
@Override
public void run() throws NacosException {
    // With the address server, you need to perform a synchronous me mber node pull at startup
    // Repeat three times, successfully jump out
    boolean success = false;
    Throwable ex = null;
    int maxRetry = ApplicationUtils.getProperty("nacos.core.address-server.retry", Integer.class, 5);
    for (int i = 0; i < maxRetry; i ++) {
        try {
            syncFromAddressUrl();
            success = true;
            break;
        } catch (Throwable e) {
            ex = e;
            Loggers.CLUSTER.error("[serverlist] exception, error : {}", ex);
        }
    }
    if (!success) {
        throw new NacosException(NacosException.SERVER_ERROR, ex);;
    }
    task = new AddressServerSyncTask();
    GlobalExecutor.scheduleSyncJob(task, 5_000L);
}

在初始化時消略,會主動去向地址服務器同步當前的集群成員列表信息,如果失敗則進行重試瞎抛,其最大重試次數(shù)可通過設(shè)置nacos.core.address-server.retry來控制艺演,默認是5次,然后成功之后,將創(chuàng)建定時任務去向地址服務器同步集群成員節(jié)點信息胎撤。

DiscoveryMemberLookup

該尋址模式是新增的集群節(jié)點發(fā)現(xiàn)模式晓殊,該模式需要cluster.conf或者-Dnacos.member.list提供初始化集群節(jié)點列表,假設(shè)已有集群cluster-one中有A伤提、B巫俺、C三個節(jié)點,新節(jié)點D要加集群肿男,那么只需要節(jié)點D在啟動時的集群節(jié)點列表存在A介汹、B、C三個中的一個即可舶沛,然后節(jié)點之間會相互同步各自知道的集群節(jié)點列表嘹承,在一定的是時間內(nèi),A冠王、B赶撰、C舌镶、D四個節(jié)點知道的集群節(jié)點成員列表都會是[A柱彻、B、C餐胀、D]在執(zhí)行集群節(jié)點列表同步時哟楷,會隨機選取K個處于UP狀態(tài)的節(jié)點進行同步。

Collection<Member> members = MemberUtils.kRandom(memberManager, membe r -> {
// local node or node check failed will not perform task processing
    if (memberManager.isSelf(member) || !member.check ())   {
        return false;
    }
    NodeState state = member.getState();
    return !(state == NodeState.DOWN || state == Node State.SUSPICIOUS);
});

通過一個簡單的流程圖看下DiscoveryMemberLookup是怎么工作的

5EB524F1-1688-45D1-AB5A-AA24B02FDE27.png

RPC端口協(xié)商

由于將來Nacos會對整體通信通道做升級否灾,采用GRPC優(yōu)化nacos-server之間卖擅,nacos-client與nacos-server之間的通信,同時為了兼容目前已有的HTTP協(xié)議接口墨技,那么勢必會帶來這個問題惩阶,本機用于RPC協(xié)議的端口如何讓其他節(jié)點知道?這里有兩個解決方案扣汪。

重新設(shè)計cluster.conf

之前的cluster.conf格式

ip[:port]
ip[:port]
ip[:port]

由于nacos默認端口是8848断楷,因此在端口未被修改的情況下,可以直接寫IP列表

新的cluster.conf

ip[:port][:RPC_PORT]
ip[:port][:RPC_PORT]
ip[:port][:RPC_PORT]

對于之前的cluster.conf是完全支持的崭别,因為nacos內(nèi)部可以通過一些計算來約定RPC_PORT的端口值冬筒,也可以通過顯示的設(shè)置來約定。通過計算來約定RPC_PORT的代碼如下:

// member port
int port = Member.getPort();
// Set the default Raft port information for security
int rpcPort = port + 1000 >= 65535 ? port + 1 : port + 1000;

但是這樣會有一個問題茅主,即如果用戶手動設(shè)置了RPC_PORT的話舞痰,那么對于客戶端、服務端來說诀姚,感知新的RPC_PORT就要修改對應的配置文件或者初始化參數(shù)响牛。因此希望說能夠讓用戶無感知的過渡到RPC_PORT通信通道,即用戶需要對RPC協(xié)議使用的端口無需自己在進行設(shè)置。

端口協(xié)商

端口協(xié)商即利用目前已有的HTTP接口呀打,將RPC協(xié)議占用的端口通過HTTP接口進行查詢返回论衍,這樣無論是客戶端還是服務端,都無需修改目前已有的初始化參數(shù)或者cluster.conf文件聚磺,其大致時序圖如下:

3.png

通過一個額外的端口獲取HTTP接口坯台,直接在內(nèi)部實現(xiàn)RPC端口的協(xié)商,并且只會在初始化時進行拉取瘫寝,這樣蜒蕾,將來nacos新增任何一種協(xié)議的端口都無需修改相應的配置信息,自動完成協(xié)議端口的感知焕阿。

Nacos一致性協(xié)議協(xié)議層抽象

從nacos的未來的整體架構(gòu)圖可以看出咪啡,一致性協(xié)議層將是作為nacos的最為核心的模塊,將服務于構(gòu)建在core模塊之上的各個功能模塊暮屡,或者服務與core模塊本身撤摸。而一致性協(xié)議因為分區(qū)容錯性的存在,需要在可用性與一致性之間做選擇褒纲,因此就存在兩大類一致性:最終一致性和強一致性准夷。在nacos中,這兩類致性協(xié)議都是可能用到的莺掠,比如naming模塊衫嵌,對于服務實例的數(shù)據(jù)管理分別用到了AP以及CP,而對于config模塊彻秆,將會涉及使用CP楔绞。同時還有如下幾個功能需求點:

  1. 目前持久化服務使用了變種版本的raft,并且業(yè)務和raft協(xié)議耦合唇兑,因此需要抽離解耦酒朵,同時是選擇一個標準的Java版Raft實現(xiàn)。
  2. 對于中小用戶扎附,配置基本不超過100個蔫耽,獨立一個mysql,相對重一些帕棉,需要一個輕量化的存儲方案针肥,并且支持2.0不依賴mysql和3.0依賴mysql可配置能力。
  3. 由于CP或者AP香伴,其存在多種實現(xiàn)慰枕,如何對一致性協(xié)議層做一次很好的抽象,以便將來可以快速的實現(xiàn)底層一致性協(xié)議具體實現(xiàn)的替換即纲,如Raft協(xié)議具帮,目前nacos的選型是JRaft,不排除將來nacos會自己實現(xiàn)一個標準raft協(xié)議或者實現(xiàn)Paxos協(xié)議。
  4. 由于Nacos存在多個獨立工作的功能模塊蜂厅,每個功能模塊之間不能出現(xiàn)影響匪凡,比如A模塊處理請求過慢或者出現(xiàn)異常時,不能影響B(tài)模塊的正常工作掘猿,即每個功能模塊在使用一致性協(xié)議時病游,如何將每個模塊的數(shù)據(jù)處理進行隔離?

根據(jù)一致協(xié)議以及上述功能需求點稠通,本次做了一個抽象的一致協(xié)議層以及相關(guān)的接口衬衬。

一致協(xié)議接口:ConsistencyProtocol

所謂一致性,即多個副本之間是否能夠保持一致性的特性改橘,而副本的本質(zhì)就是數(shù)據(jù)滋尉,對數(shù)據(jù)的操作,不是獲取就是修改飞主。同時狮惜,一致協(xié)議其實是針對分布式情況的,而這必然涉及多個節(jié)點碌识,因此碾篡,需要有相應的接口能夠調(diào)整一致性協(xié)議的協(xié)同工作節(jié)點。如果我們要觀察一致性協(xié)議運行的情況丸冕,該怎么辦耽梅?比如Raft協(xié)議,我們希望得知當前集群中的Leader是誰胖烛,任期的情況,當前集群中的成員節(jié)點有誰诅迷?因此佩番,還需要提供一個一致性協(xié)議元數(shù)據(jù)獲取。

綜上所述罢杉,ConsistencyProtcol的大致設(shè)計可以出來了

/**
* Has nothing to do with the specific implementation of the consist ency protocol
* Initialization sequence: init(Config)
*
* <ul>
*   <li>{@link Config} : Relevant configuration information requi red by the consistency protocol,
*   for example, the Raft protocol needs to set the election time out time, the location where
*   the Log is stored, and the snapshot task execution interval</ li>
*   <li>{@link ConsistencyProtocol#protocolMetaData()} : Returns metadata information of the consistency
*   protocol, such as leader, term, and other metadata informatio n in the Raft protocol</li>
* </ul>
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/

public interface ConsistencyProtocol<T extends Config> extends CommandOperations {
    
    /**
     * Consistency protocol initialization: perform initialization o perations based
    on the incoming Config
     * 一致性協(xié)議初始化趟畏,根據(jù) Config 實現(xiàn)類
     *
     * @param config {@link Config}
     */
     
     void init(T config);
    
    /**
     * Copy of metadata information for this consensus protocol
     * 該一致性協(xié)議的元數(shù)據(jù)信息
     *
     * @return metaData {@link ProtocolMetaData}
     */
     
    ProtocolMetaData protocolMetaData();
    
    /**
     * Obtain data according to the request
     * 數(shù)據(jù)獲取操作,根據(jù)GetRequest中的請求上下文進行查詢相應的數(shù)據(jù)
     *
     * @param request request
     * @return data {@link GetRequest}
     * @throws Exception
     */
    
    GetResponse getData(GetRequest request) throws Exception;
    
    /** 
     * Data operation, returning submission results synchronously
     * 同步數(shù)據(jù)提交滩租,在 Datum 中已攜帶相應的數(shù)據(jù)操作信息
     *
     * @param data {@link Log}
     * @return submit operation result
     * @throws Exception
     */
    
    LogFuture submit(Log data) throws Exception;
    
    /**
     * Data submission operation, returning submission results async hronously
     * 異步數(shù)據(jù)提交赋秀,在 Datum 中已攜帶相應的數(shù)據(jù)操作信息,返回一個Future律想,自行操作猎莲,提交發(fā) 的異常會在CompleteFuture中
     *
     * @param data {@link Log}
     * @return {@link CompletableFuture<LogFuture>} submit result
     * @throws Exception when submit throw Exception
     */
    
    CompletableFuture<LogFuture> submitAsync(Log data);
    
    /**
     * New member list
     * 新的成員節(jié)點列表,一致性協(xié)議處理相應的成員節(jié)點是加入還是離開
     *
     * @param addresses [ip:port, ip:port, ...]
     */
    
    void memberChange(Set<String> addresses);

    /**
     * Consistency agreement service shut down
     * 一致性協(xié)議服務關(guān)閉
     */

    void shutdown();

}

針對CP協(xié)議技即,由于存在Leader的概念著洼,因此需要提供一個方法用于獲取CP協(xié)議當前的Leader是誰

public interface CPProtocol<C extends Config> extends ConsistencyPro tocol<C> {

    /**
     * Returns whether this node is a leader node
     *
     * @param group business module info
     * @return is leader
     * @throws Exception
     */

    boolean isLeader(String group) throws Exception;

}

數(shù)據(jù)操作請求提交對象:Log、GetRequest

上面說到,一致性協(xié)議其實是對于數(shù)據(jù)操作而言的身笤,數(shù)據(jù)操作基本分為兩大類:數(shù)據(jù)查詢以及數(shù)據(jù)修改豹悬,同時還要滿足不同功能模塊之間的數(shù)據(jù)進行隔離。因此這里針對數(shù)據(jù)修改操作以及數(shù)據(jù)查詢操作分別闡述液荸。

1. 數(shù)據(jù)修改

  • 數(shù)據(jù)修改操作瞻佛,一定要知道本次請求是屬于哪一個功能模塊的。
  • 數(shù)據(jù)修改操作娇钱,首先一定要知道這個數(shù)據(jù)的修改操作具體是哪一種修改操作涤久,方便功能模塊針對真正的數(shù)據(jù)修改操作進行相應的邏輯操作。
  • 數(shù)據(jù)修改操作忍弛,一定要知道修改的數(shù)據(jù)是什么响迂,即請求體,為了使得一致性協(xié)議層更為通用细疚,這里對于請求體的數(shù)據(jù)結(jié)構(gòu)蔗彤,選擇了byte[]數(shù)組。
  • 數(shù)據(jù)的類型疯兼,由于我們將真正的數(shù)據(jù)序列化為了byte[]數(shù)組然遏,為了能夠正常序列化,我們可能還需要記錄這個數(shù)據(jù)的類型是什么吧彪。
  • 本次請求的信息摘要或者標識信息待侵。
  • 本次請求的額外信息,用于將來擴展需要傳輸?shù)臄?shù)據(jù)

綜上姨裸,可以得出Log對象的設(shè)計如下:

message Log {
    // 功能模塊分組信息
    string group = 1;
    // 摘要或者標識
    string key = 2;
    // 具體請求數(shù)據(jù)
    bytes data = 3;
    // 數(shù)據(jù)類型
    string type = 4;
    // 更為具體的數(shù)據(jù)操作
    string operation = 5;
    // 額外信息
    map<string, string> extendInfo = 6;
}

2. 數(shù)據(jù)查詢

  • 數(shù)據(jù)查詢操作秧倾,一定要知道本次請求是由哪一個功能模塊發(fā)起的。
  • 數(shù)據(jù)查詢的條件是什么傀缩,為了兼容各種存儲結(jié)構(gòu)的數(shù)據(jù)查詢操作那先,這byte[]進行存儲。
  • 本次請求的額外信息赡艰,用于將來擴展需要傳輸?shù)臄?shù)據(jù)售淡。

綜上,可以得出GetRequest對象的設(shè)計如下

message GetRequest {

    // 功能模塊分組信息
    string group = 1;
    // 具體請求數(shù)據(jù)
    bytes data = 2;
    // 額外信息
    map<string, string> extendInfo = 3; 
}

功能模塊使一致性協(xié)議:LogProcessor

當數(shù)據(jù)操作通過一致性協(xié)議進行submit之后慷垮,每個節(jié)點需要去處理這個Log或者GetRequest對象揖闸,因此,我們需要抽象出一個Log料身、GetRequest對象的Processor汤纸,不同的功能模塊通過實現(xiàn)該處理器泉懦,ConsistencyProtocol內(nèi)部會根據(jù)Log拳话、GetRequest的group屬性乐埠,將Log、GetRequest對象路由到具體的Processor妖谴,當然狂魔,Processor也需要表明自己是屬于哪一個功能模塊的起意。

public abstract class LogProcessor {
    /**
     * get data by key
     *
     * @param request request {@link GetRequest}
     * @return target type data
     */

    public abstract GetResponse getData(GetRequest request);
    /**
     * Process Submitted Log
     *
     * @param log {@link Log}
     * @return {@link boolean}
     */

    public abstract LogFuture onApply(Log log);

    /**
     * Irremediable errors that need to trigger business price cuts
     *
     * @param error {@link Throwable}
     */

    public void onError(Throwable error) {
    }

    /** 
     * In order for the state machine that handles the transaction to be able to route
     * the Log to the correct LogProcessor, the LogProcessor needs to have an identity
     * information
     *  
     * @return Business unique identification name
     */ 
     
     public abstract String group();
    
}   

針對CP協(xié)議诺凡,比如Raft協(xié)議,存在快照的設(shè)計议惰,因此我們需要針對CP協(xié)議單獨擴展出一個方法慎颗。

public abstract class LogProcessor4CP extends LogProcessor {
    
    /**
     * Discovery snapshot handler
     * It is up to LogProcessor to decide which SnapshotOperate shou ld be loaded and saved by itself
     *
     * @return {@link List <SnapshotOperate>}
     */
     
    public List<SnapshotOperation> loadSnapshotOperate() {
        return Collections.emptyList();
    }
}

我們可以通過一個時序圖看看,一致性協(xié)議層的大致工作流程如下:


4.png

Nacos一致性協(xié)議層之CP協(xié)議的實現(xiàn)選擇——JRaft

一致性協(xié)議層抽象好之后言询,剩下就是具體一致性協(xié)議實現(xiàn)的選擇了俯萎,這里我們選擇了螞蟻服開源的JRaft,那么我們?nèi)绾螌Raft作為CP協(xié)議的一個Backend呢运杭?下面的簡單流程圖描述了當JRaft作為CP協(xié)議的一個Backend時的初始化流程夫啊。

CA7509BC-9544-4E10-8DC2-045A0BF37DB3.png

JRaftProtocol是當JRaft作為CP協(xié)議的Backend時的一個ConsistencyProtocol的具體實現(xiàn),其內(nèi)部有一個JRaftServer成員屬性辆憔,JRaftServer分裝了JRaft的各種API操作撇眯,比如數(shù)據(jù)操作的提交,數(shù)據(jù)的查詢虱咧,成員節(jié)點的變更熊榛,Leader節(jié)點的查詢等等。

注意事項:JRaft運行期間產(chǎn)生的數(shù)據(jù)在${nacos.home}/protocol/raft文件目錄下腕巡。不同的業(yè)務模塊有不同的文件分組玄坦,如果當節(jié)點出現(xiàn)crash或者異常關(guān)閉時,清空該目錄下的文件绘沉,重啟節(jié)點即可煎楣。

由于JRaft實現(xiàn)了raft group的概念,因此梆砸,完全可以利用 raft group的設(shè)計转质,為每個功能模塊單獨創(chuàng)建個raft group。這里給出部分代碼帖世,該代碼體現(xiàn)了如何將LogProcessor嵌入到狀態(tài)機中并為每個LogPrcessor創(chuàng)建一個Raft Group。

synchronized void createMultiRaftGroup(Collection<LogProcessor4CP> processors) {
    
    // There is no reason why the LogProcessor cannot be processed b ecause of the synchronization
    if (!this.isStarted) {
        this.processors.addAll(processors);
        return;
    }
    
    final String parentPath = Paths.get(ApplicationUtils.getNacosHome(), "protocol/raft").toString();

    for (LogProcessor4CP processor : processors) {
        final String groupName = processor.group();
        if (alreadyRegisterBiz.contains(groupName)) {
            throw new DuplicateRaftGroupException(groupName);
        }
        alreadyRegisterBiz.add(groupName);
        final String logUri = Paths.get(parentPath, groupName, "log").toString();
        final String snapshotUri = Paths.get(parentPath, groupName,"snapshot").toString();
        final String metaDataUri = Paths.get(parentPath, groupName,"meta-data").toString();
        
    // Initialize the raft file storage path for different services 
    try {
        DiskUtils.forceMkdir(new File(logUri));
        DiskUtils.forceMkdir(new File(snapshotUri));
        DiskUtils.forceMkdir(new File(metaDataUri));
    }
    catch (Exception e) {
        Loggers.RAFT.error("Init Raft-File dir have some error : {}", e);   
        throw new RuntimeException(e);
    }
    
    // Ensure that each Raft Group has its own configuration and NodeOptions
    Configuration configuration = conf.copy();
    NodeOptions copy = nodeOptions.copy();
    // Here, the LogProcessor is passed into StateMachine, and when the StateMachine
    // triggers onApply, the onApply of the LogProcessor is actually called
    NacosStateMachine machine = new NacosStateMachine(this, processor); 

    copy.setLogUri(logUri);
    copy.setRaftMetaUri(metaDataUri);
    copy.setSnapshotUri(snapshotUri);
    copy.setFsm(machine);
    copy.setInitialConf(configuration);
    
    // Set snapshot interval, default 1800 seconds
    int doSnapshotInterval = ConvertUtils.toInt(raftConfig.getVal(RaftSysConstants.RAFT_SNAPSHOT_INTERVAL_SECS),
        RaftSysConstants.DEFAULT_RAFT_SNAPSHOT_INTERVAL_ SECS);
    // If the business module does not implement a snapshot processor, cancel the snapshot
        doSnapshotInterval = CollectionUtils.isEmpty(processor.loadS napshotOperate()) ? 0 : doSnapshotInterval;

    copy.setSnapshotIntervalSecs(doSnapshotInterval);
    Loggers.RAFT.info("create raft group : {}", groupName);
    RaftGroupService raftGroupService = new RaftGroupService(gro upName, localPeerId, copy, rpcServer, true);
    
    // Because RpcServer has been started before, it is not allo wed to start again here
        Node node = raftGroupService.start(false);
        machine.setNode(node);
        RouteTable.getInstance().updateConfiguration(groupName, conf iguration);
    
    // Turn on the leader auto refresh for this group
    Random random = new Random();
    long period = nodeOptions.getElectionTimeoutMs() + random.ne xtInt(5 * 1000);
    RaftExecutor.scheduleRaftMemberRefreshJob(() -> refreshRoute Table(groupName), period, period, TimeUnit.MILLISECONDS);
    
    // Save the node instance corresponding to the current group
    multiRaftGroup.put(groupName, new RaftGroupTuple(node, proce ssor, raftGroupService));
    }
}

或許有的人會有疑問沸枯,為什么要創(chuàng)建多個raft group日矫,既然之前已經(jīng)設(shè)計出了LogProcessor,完全可以利用一個Raft
Group绑榴,在狀態(tài)機appl時哪轿,根據(jù)Log的group屬性進行路由到不同的LogProcessor即可,每個功能模塊就創(chuàng)建一個raft group翔怎,不是會消耗大量的資源嗎窃诉?

正如之前所說杨耙,我們希望獨工作的模塊之間相互不存在影響,比如A模塊處理Log因為存在Block操作可能使得apply的速度緩慢飘痛,亦或者可能中途發(fā)生異常珊膜,對于Raft協(xié)議來說,當日志apply失敗時宣脉,狀態(tài)機將不能夠繼續(xù)向前推進车柠,因為如果繼續(xù)向前推進的話,由于上一步的apply失敗塑猖,后面的所有apply都可能失敗竹祷,將會導致這個節(jié)點的數(shù)據(jù)與其他節(jié)點的數(shù)據(jù)永遠不一致。如果說我們將所有獨立工作的模塊羊苟,對于數(shù)據(jù)操作的請求處理放在同一個raft group塑陵,即一個狀態(tài)機中,就不可避免的會出現(xiàn)上述所說的問題蜡励,某個模塊在apply 日志發(fā)生不可控的因素時令花,會影響其他模塊的正常工作。

JRaft運維操作

為了使用者能夠?qū)Raft進行相關(guān)簡單的運維巍虫,如Leader的切換彭则,重置當前Raft集群成員,觸發(fā)某個節(jié)點進行Snapshot操作等等占遥,提供了一個簡單的HTTP接口進行操作俯抖,并且該接口有一定的限制,即每次只會執(zhí)行一條運維指令瓦胎。

1.切換某一個Raft Group的Leader節(jié)點

POST /nacos/v1/core/ops/raft
{
    "groupId": "xxx",
    "transferLeader": "ip:{raft_port}"
}

2.重置某一個Raft Group的集群成員

POST /nacos/v1/core/ops/raft
{
    "groupId": "xxx",
    "resetRaftCluster": "ip:{raft_port},ip:{raft_port},ip:{raft_por t},ip:{raft_port}"
}

3.觸發(fā)某一個Raft Group執(zhí)行快照操作

POST /nacos/v1/core/ops/raft
{
    "groupId": "xxx",
    "doSnapshot": "ip:{raft_port}"
}

JRaft協(xié)議相關(guān)配置參數(shù)

### Sets the Raft cluster election timeout, default value is 5 second

nacos.core.protocol.raft.data.election_timeout_ms=5000

### Sets the amount of time the Raft snapshot will execute periodica lly, default is 30 minute

nacos.core.protocol.raft.data.snapshot_interval_secs=30

### Requested retries, default value is 1

nacos.core.protocol.raft.data.request_failoverRetries=1

### raft internal worker threads

nacos.core.protocol.raft.data.core_thread_num=8

### Number of threads required for raft business request processing

nacos.core.protocol.raft.data.cli_service_thread_num=4

### raft linear read strategy, defaults to index

nacos.core.protocol.raft.data.read_index_type=ReadOnlySafe

### rpc request timeout, default 5 seconds

nacos.core.protocol.raft.data.rpc_request_timeout_ms=5000

### Maximum size of each file RPC (snapshot copy) request between me mbers, default is 128 K

nacos.core.protocol.raft.data.max_byte_count_per_rpc=131072

### Maximum number of logs sent from leader to follower, default is 1024

nacos.core.protocol.raft.data.max_entries_size=1024

### Maximum body size for sending logs from leader to follower, defa ult is 512K

nacos.core.protocol.raft.data.max_body_size=524288

### Maximum log storage buffer size, default 256K

nacos.core.protocol.raft.data.max_append_buffer_size=262144

### Election timer interval will be a random maximum outside the spe cified time, default is 1 second

nacos.core.protocol.raft.data.max_election_delay_ms=1000

### Specify the ratio between election timeout and heartbeat interval. Heartbeat interval is equal to

### electionTimeoutMs/electionHeartbeatFactor芬萍,One tenth by default.

nacos.core.protocol.raft.data.election_heartbeat_factor=10

### The tasks submitted to the leader accumulate the maximum batch s ize of a batch flush log storage. The default is 32 tasks.

nacos.core.protocol.raft.data.apply_batch=32

### Call fsync when necessary when writing logs and meta informatio n, usually should be true

nacos.core.protocol.raft.data.sync=true

### Whether to write snapshot / raft meta-information to call fsync. The default is false. When sync is true, it is preferred to respect sync.

nacos.core.protocol.raft.data.sync_meta=false

### Internal disruptor buffer size. For applications with high write throughput, you need to increase this value. The default value is 16384.

nacos.core.protocol.raft.data.disruptor_buffer_size=16384

### Whether to enable replication of pipeline request optimization, which is enabled by default

nacos.core.protocol.raft.data.replicator_pipeline=true

### Maximum number of in-flight requests with pipeline requests enab led, default is 256

nacos.core.protocol.raft.data.max_replicator_inflight_msgs=256

### Whether to enable LogEntry checksum

nacos.core.protocol.raft.data.enable_log_entry_checksum=false

Nacos內(nèi)嵌分布式ID

nacos內(nèi)嵌的分布式ID為Snakeflower,dataCenterId默認為1搔啊,workerId的值計算方式如下:

InetAddress address;
try {
    address = InetAddress.getLocalHost(); 
} catch (final UnknownHostException e) {
    throw new IllegalStateException(
            "Cannot get LocalHost InetAddress, please ch eck your network!");
}
byte[] ipAddressByteArray = address.getAddress();
workerId = (((ipAddressByteArray[ipAddressByteArray.length - 2] & 0B 11)
    << Byte.SIZE) + (ipAddressByteArray[ipAddressByteArray.length - 1] & 0xFF));

如果需要手動指定dataCenterId以及workerId柬祠,則在application.properties或者啟動時添加命令 參數(shù)

### set the dataCenterID manually

# nacos.core.snowflake.data-center=

### set the WorkerID manually

# nacos.core.snowflake.worker-id=

Nacos內(nèi)嵌的輕量的基于Derby的分布式關(guān)系型存儲

背景

  • 如果配置文件數(shù)量較少,在集群模式下需要高可用數(shù)據(jù)庫集群作為支撐的成本太大负芋,期望有一個輕量的分布式關(guān)系型存儲來解決漫蛔。
  • nacos內(nèi)部一些元數(shù)據(jù)信息存儲,比如用戶信息旧蛾,命名空間信息
  • 思路來源:https://github.com/rqlite/rqlite

設(shè)計思路

總體

將一次請求操作涉及的SQL上下文按順序保存起來莽龟。然后通過一致協(xié)議層將本次請求涉及的SQL上下 文進行同步,然后每個節(jié)點將其解析并重新按順序在一次數(shù)據(jù)庫會話中執(zhí)行锨天。

5.png
誰可以處理請求

當使用者開啟1.3.0-BETA的新特性——內(nèi)嵌分布式關(guān)系型數(shù)據(jù)存儲時毯盈,所有的寫操作請求都將路由到Leader節(jié)點進行處理;但是病袄,由于Raft狀態(tài)機的特性搂赋,當某一個節(jié)點在apply數(shù)據(jù)庫操作請求時發(fā)生非SQL邏輯錯誤引發(fā)的異常時赘阀,將導致狀態(tài)機無法繼續(xù)正常進行工作,此時將會觸發(fā)配置管理模塊的降級操作脑奠。

private void registerSubscribe() {
    NotifyCenter.registerSubscribe(new SmartSubscribe() {
    
    @Override
    public void onEvent(Event event) {
        if (event instanceof RaftDBErrorRecoverEvent) {
            downgrading = false;
            return;
        }
        if (event instanceof RaftDBErrorEvent) {
            downgrading = true;
        }
    }

    @Override
    public boolean canNotify(Event event) {
    return (event instanceof RaftDBErrorEvent) || (event instanceof RaftDBErrorRecoverEvent);
    }
  });
}

因此基公,綜上所述,可以通過活動圖來理解下捺信,什么情況下需要將請求進行轉(zhuǎn)發(fā)呢酌媒?

6.png
相關(guān)數(shù)據(jù)承載對象

數(shù)據(jù)庫的DML語句是select、insert迄靠、update秒咨、delete,根據(jù)SQL語句對于數(shù)據(jù)操作的性質(zhì)掌挚,可以分為兩類:query以及update雨席,select語句對應的是數(shù)據(jù)查詢,insert吠式、update陡厘、delete語句對應的是數(shù)據(jù)修改。同時在進行數(shù)據(jù)庫操作時特占,為了避免SQL入注糙置,使用的是PreparedStatement,因此需要SQL語句+參數(shù)是目,因此可以得到兩個關(guān)于數(shù)據(jù)庫操作的Request對象谤饭。

  1. SelectRequest
public class SelectRequest implements Serializable {

    private static final long serialVersionUID = 2212052574976898602L;

    // 查詢類別,因為 前使 的是JdbcTemplate懊纳,查詢單個揉抵、查詢多個,是否使 RowM apper轉(zhuǎn)為對象
5private byte queryType;

    // sql語句
    // select * from config_info where

    private String sql;
    private Object[] args;
    private String className;

}
  1. ModifyRequest
public class ModifyRequest implements Serializable {

    private static final long serialVersionUID = 4548851816596520564L;

    private int executeNo;
    private String sql;
    private Object[] args;

}
配置發(fā)布

配置發(fā)布操作涉及三個事務:

  • config_info保存配置信息嗤疯。
  • config_tags_relation保存配置與標簽的關(guān)聯(lián)關(guān)系冤今。
  • his_config_info保存 條配置操作歷史記錄。

這三個事務都在配置發(fā)布這個大事務下茂缚,如果說我們對每個事務操作進行一個Raft協(xié)議提交戏罢,假設(shè)1、2兩個事務通過Raft提交后都成功Apply了脚囊,第三個事務在進行Raft提交后apply失敗帖汞,那么對于這個配置發(fā)布的大事務來說,是需要整體回滾的凑术,否則就會違反原子性,那么可能需要說將事務回滾操作又進行一次Raft提交所意,那么整體的復雜程度上升淮逊,并且直接引了分布式事務的管理催首,因此為了避免這個問題,我們將這三個事務涉及的SQL上下文進行整合成一個大的SQL上下文泄鹏,對這大的SQL上下文進行Raft協(xié)議提交郎任。保證了三個子事務在同一次數(shù)據(jù)庫會話當中,成功解決原子性的問題备籽,同時由于Raft協(xié)議對于事務日志的處理是串行執(zhí)行的舶治,因此相當于將數(shù)據(jù)庫的事務隔離級別調(diào)整為串行化。

public  void  addConfigInfo(final String srcIp, final String srcUser,
    final ConfigInfo configInfo, final Timestamp time,
    final Map<String, Object> configAdvanceInfo, final boolean notify)  {
    
    try {
        // 同過雪花ID獲取一個ID值
        long configId = idGeneratorManager.nextId(configInfoId);
        long configHistoryId = idGeneratorManager.nextId(this.configHistoryId);

        // 配置插入
        addConfigInfoAtomic(configId, srcIp, srcUser, configInfo, time, configAdvanceInfo);
        String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");

        // 配置與標簽信息關(guān)聯(lián)操作
        addConfigTagsRelation(configId, configTags, configInfo.getD ataId(), configInfo.getGroup(), configInfo.getTenant());
        
        // 配置歷史插入
        insertConfigHistoryAtomic(configHistoryId, configInfo, srcI p, srcUser, time, "I");

        boolean result = databaseOperate.smartUpdate();
        if (!result) {
            throw new NacosConfigException("Config add failed");
        }

        if (notify) {
            EventDispatcher.fireEvent(
                new ConfigDataChangeEvent(false, configInfo.getDataId(),
                configInfo.getGroup(), configInfo.getTenant(), time.getTime()));
        }

    }

    finally {

        SqlContextUtils.cleanCurrentSqlContext();

    }

}

public  long  addConfigInfoAtomic(final long id, final String srcIp, final String srcUser, final ConfigInfo configInfo, final Timestamp time, Map<String, Object> configAdvanceInfo)    {   
    ...
    // 參數(shù)處理
    ...
    final String sql =
        "INSERT INTO config_info(id, data_id, group_id, tenant_id, app_name, content, md5, src_ip, src_user, gmt_create,"
        + "gmt_modified, c_desc, c_use, effect, type, c_schema) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";

    final Object[] args = new Object[] { id, configInfo.getDataId(),

        configInfo.getGroup(), tenantTmp, appNameTmp, configInfo.getContent(),
        md5Tmp, srcIp, srcUser, time, time, desc, use, effect, type, schema, };
        SqlContextUtils.addSqlContext(sql, args);
        return id;
    }
    
public void addConfigTagRelationAtomic(long configId, String tagName, String dataId,
    String group, String tenant) {
        final String sql = "INSERT INTO config_tags_relation(id,tag_name,tag_type,data_id,group_id,tenant_id) " + "VALUES(?,?,?,?,?,?)";
        
        final Object[] args = new Object[] { configId, tagName, null, d ataId, group, tenant };
        
        SqlContextUtils.addSqlContext(sql, args);
}

public void insertConfigHistoryAtomic(long configHistoryId, ConfigI nfo configInfo, String srcIp, String srcUser, final Timestamp time, String ops) {

    ...
    // 參數(shù)處理
    ...
    final String sql = "INSERT INTO his_config_info (id,data_id,group_id,tenant_id,app_name,content,md5," + "src_ip,src_user,gmt_modified,op_type) VALUES(?,?,?,?,?,?,?,?,?,?,?)";

    final Object[] args = new Object[] { configHistoryId, configInf o.getDataId(), configInfo.getGroup(), tenantTmp, appNameTmp, configInfo.getContent(),   md5Tmp, srcIp, srcUser, time, ops};

    SqlContextUtils.addSqlContext(sql, args);

}

/**
 * Temporarily saves all insert, update, and delete statements under
 * a transaction in the order in which they occur
 *
 * @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
 */

public class SqlContextUtils {

    private static final ThreadLocal<ArrayList<ModifyRequest>> SQL_CONTEXT =
        ThreadLocal.withInitial(ArrayList::new);

    public static void addSqlContext(String sql, Object... args) {
        ArrayList<ModifyRequest> requests = SQL_CONTEXT.get();
        ModifyRequest context = new ModifyRequest();
        context.setExecuteNo(requests.size());
        context.setSql(sql);
        context.setArgs(args);
        requests.add(context);
        SQL_CONTEXT.set(requests);
    }

    public static List<ModifyRequest> getCurrentSqlContext() {
        return SQL_CONTEXT.get();
    }
    
    public static void cleanCurrentSqlContext() {
        SQL_CONTEXT.remove();
    }
}

通過一個時序圖來更加直觀的理解

7.png

如何使用新特性

#*************** Embed Storage Related Configurations ***************
#
### This value is true in stand-alone mode and false in cluster mode
### If this value is set to true in cluster mode, nacos's distributed storage engine is turned on embeddedStorage=true

是否啟用內(nèi)嵌的分布式關(guān)系型存儲的活動圖

8.png

新特性的相關(guān)運維操作

直接查詢每個節(jié)點的derby存儲的數(shù)據(jù)

GET /nacos/v1/cs/ops/derby?sql=select * from config_info

    return List<Map<String, Object>>

不足

  • 在數(shù)據(jù)庫上層構(gòu)建一層分布式數(shù)據(jù)操作同步層车猬,對數(shù)據(jù)庫的操作存在了限制霉猛,如第一步insert操作,然后select操作珠闰,最后在update操作惜浅,這種在數(shù)據(jù)修改語句中穿插著查詢語句的操作順序是不支持的。
  • 限制了數(shù)據(jù)庫的性能伏嗜,由于間接的將數(shù)據(jù)庫事務隔離級別調(diào)整為了串行化坛悉,人為的將并發(fā)能力降低了。

未來演進

將于Apache Derby官方一起嘗試基于Raft實現(xiàn)BingLog的同步復制操作承绸,從底層實現(xiàn)數(shù)據(jù)庫同步能力裸影。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市军熏,隨后出現(xiàn)的幾起案子轩猩,更是在濱河造成了極大的恐慌,老刑警劉巖羞迷,帶你破解...
    沈念sama閱讀 217,542評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件界轩,死亡現(xiàn)場離奇詭異,居然都是意外死亡衔瓮,警方通過查閱死者的電腦和手機浊猾,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,822評論 3 394
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來热鞍,“玉大人葫慎,你說我怎么就攤上這事∞背瑁” “怎么了偷办?”我有些...
    開封第一講書人閱讀 163,912評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長澄港。 經(jīng)常有香客問我椒涯,道長,這世上最難降的妖魔是什么回梧? 我笑而不...
    開封第一講書人閱讀 58,449評論 1 293
  • 正文 為了忘掉前任废岂,我火速辦了婚禮祖搓,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘湖苞。我一直安慰自己拯欧,他們只是感情好,可當我...
    茶點故事閱讀 67,500評論 6 392
  • 文/花漫 我一把揭開白布财骨。 她就那樣靜靜地躺著镐作,像睡著了一般。 火紅的嫁衣襯著肌膚如雪隆箩。 梳的紋絲不亂的頭發(fā)上该贾,一...
    開封第一講書人閱讀 51,370評論 1 302
  • 那天,我揣著相機與錄音摘仅,去河邊找鬼靶庙。 笑死,一個胖子當著我的面吹牛娃属,可吹牛的內(nèi)容都是我干的六荒。 我是一名探鬼主播,決...
    沈念sama閱讀 40,193評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼矾端,長吁一口氣:“原來是場噩夢啊……” “哼掏击!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起秩铆,我...
    開封第一講書人閱讀 39,074評論 0 276
  • 序言:老撾萬榮一對情侶失蹤砚亭,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后殴玛,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體捅膘,經(jīng)...
    沈念sama閱讀 45,505評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,722評論 3 335
  • 正文 我和宋清朗相戀三年滚粟,在試婚紗的時候發(fā)現(xiàn)自己被綠了寻仗。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,841評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡凡壤,死狀恐怖署尤,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情亚侠,我是刑警寧澤曹体,帶...
    沈念sama閱讀 35,569評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站硝烂,受9級特大地震影響箕别,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,168評論 3 328
  • 文/蒙蒙 一究孕、第九天 我趴在偏房一處隱蔽的房頂上張望啥酱。 院中可真熱鬧,春花似錦厨诸、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,783評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至颤陶,卻和暖如春颗管,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背滓走。 一陣腳步聲響...
    開封第一講書人閱讀 32,918評論 1 269
  • 我被黑心中介騙來泰國打工垦江, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人搅方。 一個月前我還...
    沈念sama閱讀 47,962評論 2 370
  • 正文 我出身青樓比吭,卻偏偏與公主長得像,于是被迫代替她去往敵國和親姨涡。 傳聞我的和親對象是個殘疾皇子衩藤,可洞房花燭夜當晚...
    茶點故事閱讀 44,781評論 2 354

推薦閱讀更多精彩內(nèi)容