Nacos源碼解析

完整流程

image.png

Nacos服務(wù)注冊(cè)表結(jié)構(gòu):Map<namespace, Map<group::serviceName, Service>>


舉例說(shuō)明:


image.png

一:服務(wù)注冊(cè)

客戶端閱讀入口

<!--nacos客戶端-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>

1. 查看spring.factories文件中幫我們自動(dòng)裝配的類

image.png


2.查看自動(dòng)裝配的類

com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration
  ->com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration#nacosAutoServiceRegistration()
  //返回 new NacosAutoServiceRegistration()

3.NacosAutoServiceRegistration里的調(diào)用鏈路

com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration
  ->com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration#nacosAutoServiceRegistration()
  //返回 new NacosAutoServiceRegistration()
     //監(jiān)聽(tīng)spring的WebServerInitializedEvent啟動(dòng)事件時(shí)
   ->org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#bind
     ->org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#start
       ->org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#register
         ->com.alibaba.cloud.nacos.registry.NacosServiceRegistry#register
           ->com.alibaba.nacos.client.naming.NacosNamingService#registerInstance(String serviceName, Instance instance)
           //做了兩件事
           //1.調(diào)用com.alibaba.nacos.client.naming.beat.BeatReactor#addBeatInfo(String serviceName, BeatInfo beatInfo)添加心跳信息
           //2.調(diào)用代理執(zhí)行注冊(cè)com.alibaba.nacos.client.naming.net.NamingProxy#registerService(String serviceName, String groupName, Instance instance)
             ->com.alibaba.nacos.client.naming.net.NamingProxy#reqAPI()
  

小結(jié):其實(shí)就是使用http請(qǐng)求了服務(wù)端的注冊(cè)接口

--

服務(wù)端閱讀

com.alibaba.nacos.naming.controllers.InstanceController#register
 ->com.alibaba.nacos.naming.core.ServiceManager#registerInstance
  ->com.alibaba.nacos.naming.core.ServiceManager#addInstance
   ->com.alibaba.nacos.naming.consistency.DelegateConsistencyServiceImpl#put
    ->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put
     ->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#onPut
      ->com.alibaba.nacos.naming.consistency.ephemeral.distro.DataStore#put
      ->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier#addTask

添加到隊(duì)列里

public class Notifier implements Runnable {
        
        private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
        
        private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
        
        /**
         * Add new notify task to queue.
         *
         * @param datumKey data key
         * @param action   action for data
         */
        public void addTask(String datumKey, DataOperation action) {
            
            if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
                return;
            }
            if (action == DataOperation.CHANGE) {
                services.put(datumKey, StringUtils.EMPTY);
            }
            tasks.offer(Pair.with(datumKey, action));
        }
        
        public int getTaskSize() {
            return tasks.size();
        }
        
        @Override
        public void run() {
            Loggers.DISTRO.info("distro notifier started");
            
            for (; ; ) {
                try {
                    Pair<String, DataOperation> pair = tasks.take();
                    handle(pair);
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                }
            }
        }
        
        private void handle(Pair<String, DataOperation> pair) {
            try {
                String datumKey = pair.getValue0();
                DataOperation action = pair.getValue1();
                
                services.remove(datumKey);
                
                int count = 0;
                
                if (!listeners.containsKey(datumKey)) {
                    return;
                }
                
                for (RecordListener listener : listeners.get(datumKey)) {
                    
                    count++;
                    
                    try {
                        if (action == DataOperation.CHANGE) {
                            listener.onChange(datumKey, dataStore.get(datumKey).value);
                            continue;
                        }
                        
                        if (action == DataOperation.DELETE) {
                            listener.onDelete(datumKey);
                            continue;
                        }
                    } catch (Throwable e) {
                        Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                    }
                }
                
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO
                            .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                                    datumKey, count, action.name());
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
            }
        }
    }

調(diào)用鏈路

com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier#run
 ->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier#handle
  ->com.alibaba.nacos.naming.core.Service#onChange
   ->com.alibaba.nacos.naming.core.Service#updateIPs
    ->com.alibaba.nacos.naming.core.Cluster#updateIps
public void updateIps(List<Instance> ips, boolean ephemeral) {
        
        ....
        
        toUpdateInstances = new HashSet<>(ips);
        
        if (ephemeral) {
            ephemeralInstances = toUpdateInstances;
        } else {
            persistentInstances = toUpdateInstances;
        }
    }

為Cluster類的成員變量

private Set<Instance> ephemeralInstances = new HashSet<>();

run()方法什么時(shí)候觸發(fā)?
可以看到,使用了@PostConstruct注解將notifier提交到了一個(gè)線程池里面

public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
    
    ....
    
    private volatile Notifier notifier = new Notifier();
    
    @PostConstruct
    public void init() {
        GlobalExecutor.submitDistroNotifyTask(notifier);
    }
    
    ....
}

小結(jié):使用了阻塞隊(duì)列來(lái)提高了并發(fā)能力,但是否隊(duì)列會(huì)被撐爆?注冊(cè)成功延時(shí)會(huì)有多少?

我們可以看到隊(duì)列大小為1024*1024
同時(shí)并發(fā)注冊(cè)的情況應(yīng)該很小,此外為寫(xiě)內(nèi)存操作,所以從隊(duì)列中獲取內(nèi)容進(jìn)行消費(fèi)應(yīng)該也是很快的

private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);

那還為注冊(cè)完,消費(fèi)端直接從注冊(cè)中心獲取是否會(huì)讀到臟數(shù)據(jù)?

我們常規(guī)的操作可能是直接加鎖,寫(xiě)完才運(yùn)行讀,但無(wú)疑會(huì)影響吞吐量

我們來(lái)看nacos是怎么處理的?
回到

public void updateIps(List<Instance> ips, boolean ephemeral) {
        //如果為ephemeral 則復(fù)制出一份副本
        Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
        
        HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
        
        //復(fù)制操作
        for (Instance ip : toUpdateInstances) {
            oldIpMap.put(ip.getDatumKey(), ip);
        }
        
        //基于oldIpMap 即我們復(fù)制出來(lái)的 進(jìn)行注冊(cè)操作
        List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
        
        ....

        //最終將 toUpdateInstances 賦值給ephemeralInstances 或者 persistentInstances
        toUpdateInstances = new HashSet<>(ips);
        
        if (ephemeral) {
            ephemeralInstances = toUpdateInstances;
        } else {
            persistentInstances = toUpdateInstances;
        }
    }

使用到了寫(xiě)時(shí)復(fù)制,即讀寫(xiě)分離的思想
**那會(huì)不會(huì)出現(xiàn)多個(gè)實(shí)例同時(shí)寫(xiě),然后出現(xiàn)覆蓋的問(wèn)題?

我們回顧之前的初始化邏輯,只會(huì)初始化一次,所以這里是一個(gè)單線程不斷從隊(duì)列里面拿然后執(zhí)行注冊(cè)邏輯,所以不會(huì)出現(xiàn)覆蓋寫(xiě)的問(wèn)題

@PostConstruct
    public void init() {
        GlobalExecutor.submitDistroNotifyTask(notifier);
    }

寫(xiě)時(shí)復(fù)制會(huì)不會(huì)占用很多內(nèi)存空間?

我們可以看到其實(shí)只是復(fù)制了Cluster里面的Set<Instance>集合(詳細(xì)的可能看下面的nacos的完整存儲(chǔ)模型),而不是復(fù)制了整個(gè)注冊(cè)表,所以我們使用寫(xiě)時(shí)復(fù)制時(shí)需要考慮復(fù)制的粒度問(wèn)題


我們?cè)賮?lái)看下nacos底層的存儲(chǔ)模型是什么樣的?

/**
 * Core manager storing all services in Nacos.
 *
 * @author nkorange
 */
@Component
public class ServiceManager implements RecordListener<Service> {
    
    /**
     * Map(namespace, Map(group::serviceName, Service)).
     */
    private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
}

Service
我們可以看到 'service --> cluster --> instance' model, in which service stores a list of clusters, which contain a list of instances.
一個(gè)sercie可能部署了一個(gè)集群,一個(gè)集群可能會(huì)有多個(gè)實(shí)例

/**
 * Service of Nacos server side
 *
 * <p>We introduce a 'service --> cluster --> instance' model, in which service stores a list of clusters, which
 * contain
 * a list of instances.
 *
 * <p>his class inherits from Service in API module and stores some fields that do not have to expose to client.
 *
 * @author nkorange
 */
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
private Map<String, Cluster> clusterMap = new HashMap<>();
}

Cluster
就包含了我們?cè)趗pdateIps()方法內(nèi)最終更新的persistentInstances 和ephemeralInstances

public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
    @JsonIgnore
    private Set<Instance> persistentInstances = new HashSet<>();
    
    @JsonIgnore
    private Set<Instance> ephemeralInstances = new HashSet<>();
}

小結(jié):它的數(shù)據(jù)模型就是一個(gè)Namespace下面可能會(huì)有一個(gè)服務(wù)分組,一個(gè)服務(wù)分組下面可能會(huì)有多個(gè)服務(wù),一個(gè)服務(wù)可能會(huì)有一個(gè)集群,一個(gè)集群可能會(huì)有多個(gè)實(shí)例


心跳發(fā)送

com.alibaba.nacos.client.naming.NacosNamingService#registerInstance()
  ->com.alibaba.nacos.client.naming.beat.BeatReactor#addBeatInfo

addBeatInfo()方法

class BeatTask implements Runnable {
        
        BeatInfo beatInfo;
        
        public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }
        
        @Override
        public void run() {
            if (beatInfo.isStopped()) {
                return;
            }
            long nextTime = beatInfo.getPeriod();
            ...
            //1.發(fā)送心跳
            JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);

            ...
            //繼續(xù)嵌套調(diào)用
            executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
        }
    }

即客戶端最終使用Http請(qǐng)求服務(wù)端接口/instance/beat


服務(wù)端心跳檢查

在服務(wù)注冊(cè)的Init方法中com.alibaba.nacos.naming.core.Service#init
開(kāi)啟了心跳檢查
com.alibaba.nacos.naming.healthcheck.HealthCheckReactor#scheduleCheck(com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask)

public void init() {
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
            entry.getValue().init();
        }
    }

默認(rèn)的心跳超時(shí)時(shí)間
默認(rèn)15秒

ClientBeatCheckTask.run()中instance.getInstanceHeartBeatTimeOut()

默認(rèn)的delete時(shí)間
默認(rèn)30秒

ClientBeatCheckTask.run()中instance.getIpDeleteTimeout()
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市茁彭,隨后出現(xiàn)的幾起案子总寒,更是在濱河造成了極大的恐慌,老刑警劉巖理肺,帶你破解...
    沈念sama閱讀 206,311評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件摄闸,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡妹萨,警方通過(guò)查閱死者的電腦和手機(jī)年枕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)乎完,“玉大人熏兄,你說(shuō)我怎么就攤上這事∈饕蹋” “怎么了摩桶?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,671評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)帽揪。 經(jīng)常有香客問(wèn)我硝清,道長(zhǎng),這世上最難降的妖魔是什么台丛? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,252評(píng)論 1 279
  • 正文 為了忘掉前任耍缴,我火速辦了婚禮砾肺,結(jié)果婚禮上挽霉,老公的妹妹穿的比我還像新娘。我一直安慰自己变汪,他們只是感情好侠坎,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著裙盾,像睡著了一般实胸。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上番官,一...
    開(kāi)封第一講書(shū)人閱讀 49,031評(píng)論 1 285
  • 那天庐完,我揣著相機(jī)與錄音,去河邊找鬼徘熔。 笑死门躯,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的酷师。 我是一名探鬼主播讶凉,決...
    沈念sama閱讀 38,340評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼染乌,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了懂讯?” 一聲冷哼從身側(cè)響起荷憋,我...
    開(kāi)封第一講書(shū)人閱讀 36,973評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎褐望,沒(méi)想到半個(gè)月后勒庄,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,466評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡譬挚,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評(píng)論 2 323
  • 正文 我和宋清朗相戀三年锅铅,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片减宣。...
    茶點(diǎn)故事閱讀 38,039評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡盐须,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出漆腌,到底是詐尸還是另有隱情贼邓,我是刑警寧澤,帶...
    沈念sama閱讀 33,701評(píng)論 4 323
  • 正文 年R本政府宣布闷尿,位于F島的核電站塑径,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏填具。R本人自食惡果不足惜统舀,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望劳景。 院中可真熱鬧誉简,春花似錦、人聲如沸盟广。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,259評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)筋量。三九已至烹吵,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間桨武,已是汗流浹背肋拔。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留呀酸,地道東北人凉蜂。 一個(gè)月前我還...
    沈念sama閱讀 45,497評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親跃惫。 傳聞我的和親對(duì)象是個(gè)殘疾皇子叮叹,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容