Nacos客戶端namespace初始化及注冊(cè)流程

很多人在學(xué)習(xí)開源的時(shí)候椭懊,無從下手,代碼那么多步势,從哪個(gè)地方開始呢氧猬?我們學(xué)習(xí)nacos,首先去到nocas的github源碼的地方鏈接: https://github.com/alibaba/nacos下載源碼到我們的idea坏瘩,打開example項(xiàng)目盅抚,

在這里插入圖片描述

進(jìn)入APP,可以看到如下代碼:

   public static void main(String[] args) throws NacosException {
        Properties properties = new Properties();
        properties.setProperty("serverAddr", "21.34.53.5:8848,21.34.53.6:8848");
        properties.setProperty("namespace", "quickStart");
        NamingService naming = NamingFactory.createNamingService(properties);
        naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1");
        naming.registerInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");
        System.out.println(naming.getAllInstances("nacos.test.3"));
    }

這里我們可以看到倔矾,這里構(gòu)建了一個(gè)NamingService實(shí)例妄均,同時(shí)設(shè)置了我們的nacos服務(wù)端的地址和端口,設(shè)置namespace。
我們進(jìn)入createNamingService方法

NamingService

    /**
     * Create a new naming service.
     *
     * @param properties naming service properties
     * @return new naming service
     * @throws NacosException nacos exception
     */
    public static NamingService createNamingService(Properties properties) throws NacosException {
        try {
            Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
            Constructor constructor = driverImplClass.getConstructor(Properties.class);
            NamingService vendorImpl = (NamingService) constructor.newInstance(properties);
            return vendorImpl;
        } catch (Throwable e) {
            throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
        }
    }

這里通過反射創(chuàng)建了一個(gè)NamingService實(shí)例丛晦,實(shí)際的實(shí)現(xiàn)類是在api項(xiàng)目里面的NacosNamingService奕纫,隨后我們進(jìn)入NacosNamingService看看

NacosNamingService

1.先看里面的屬性
namespace 名字空間
endpoint 服務(wù)管理服務(wù)端地址管理服務(wù)器地址,獲取服務(wù)管理服務(wù)端地址(當(dāng) nacos server 集群需要擴(kuò)縮容時(shí)烫沙,客戶端需要有一種能力能夠及時(shí)感知到集群發(fā)生變化,及時(shí)感知到集群的變化是通過 endpoint 來實(shí)現(xiàn)的匹层。也即客戶端會(huì)定時(shí)的向endpoint發(fā)送請(qǐng)求來更新客戶端內(nèi)存中的集群列表。)
serverList 服務(wù)管理服務(wù)端地址锌蓄,可直接配置升筏,或從endpoint獲取
cacheDir 調(diào)用服務(wù)信息本地文件緩存地址
logName 暫未使用
HostReactor 客戶端關(guān)心的服務(wù)的實(shí)例信息,推拉模式的更新,failover服務(wù)實(shí)例信息讀寫管理
BeatReactor 本地實(shí)例信息心跳
EventDispatcher 服務(wù)信息變更監(jiān)聽回調(diào)處理
NamingProxy 服務(wù)管理服務(wù)端地址列表更新管理瘸爽,接口調(diào)用負(fù)載均衡您访,失敗重試

 /**
     * Each Naming service should have different namespace.
     * 名字空間
     */
    private String namespace;

    /**
     * 當(dāng) nacos server 集群需要擴(kuò)縮容時(shí),客戶端需要有一種能力能夠及時(shí)感知到集群發(fā)生變化剪决。
     * 及時(shí)感知到集群的變化是通過 endpoint 來實(shí)現(xiàn)的灵汪。也即客戶端會(huì)定時(shí)的向 endpoint 發(fā)送請(qǐng)求來更新客戶端內(nèi)存中的集群列表。
     * 服務(wù)管理服務(wù)端地址管理服務(wù)器地址柑潦,獲取服務(wù)管理服務(wù)端地址
     */
    private String endpoint;

    /**
     * 服務(wù)管理服務(wù)端地址管理服務(wù)器地址享言,獲取服務(wù)管理服務(wù)端地址
     */
    private String serverList;

    /**
     * 服務(wù)管理服務(wù)端地址管理服務(wù)器地址,獲取服務(wù)管理服務(wù)端地址
     */
    private String cacheDir;

    private String logName;

    /**
     * 客戶端關(guān)心的服務(wù)的實(shí)例信息,推拉模式的更新渗鬼,failover服務(wù)實(shí)例信息讀寫管理
     */
    private HostReactor hostReactor;

    /**
     * 本地實(shí)例信息心跳
     */
    private BeatReactor beatReactor;

    /**
     * 服務(wù)信息變更監(jiān)聽回調(diào)處理
     */
    private EventDispatcher eventDispatcher;

    /**
     * 服務(wù)管理服務(wù)端地址列表更新管理览露,接口調(diào)用負(fù)載均衡,失敗重試
     */
    private NamingProxy serverProxy;

了解了相關(guān)字段的意思我們來看看構(gòu)造方法

 public NacosNamingService(Properties properties) throws NacosException {
        init(properties);
    }

這里其實(shí)就是執(zhí)行init初始化方法

   private void init(Properties properties) throws NacosException {
        ValidatorUtils.checkInitParam(properties); //檢查contextPath格式 可為空
        this.namespace = InitUtils.initNamespaceForNaming(properties); //初始化命名空間
        //子類實(shí)現(xiàn)類中的靜態(tài)代碼串中已經(jīng)向Jackson進(jìn)行了注冊(cè)譬胎,但是由于classloader的原因差牛,只有當(dāng) 該子類被使用的時(shí)候,才會(huì)加載該類堰乔。
        // 這可能會(huì)導(dǎo)致Jackson先進(jìn)性反序列化偏化,再注冊(cè)子類,從而導(dǎo)致 反序列化失敗浩考。
        //所以這里將NoneSelector夹孔、ExpressionSelector這兩個(gè)類進(jìn)行注冊(cè)或者銷毀
        InitUtils.initSerialization();

        //這里進(jìn)行nacos服務(wù)端地址初始化
        //這里面會(huì)涉及到是否啟用endpoint
        initServerAddr(properties);

        //如果應(yīng)用由EDAS部署被盈,則支持阿里云的web上下文
        InitUtils.initWebRootContext();

        //這里初始化本地緩存的路徑及存放的registerInstance的內(nèi)容
        initCacheDir();

        //初始化LogName析孽,未設(shè)置用naming.log
        initLogName(properties);

        /**
         *初始化ExecutorService線程池,創(chuàng)建名字為com.alibaba.nacos.naming.client.listener的daemon線程N(yùn)otifier
         * EventDispatcher中有一個(gè)LinkedBlockingQueue隊(duì)列只怎,放的是ServiceInfo
         * EventDispatcher中有ConcurrentMap<String, List<EventListener>>放入的是EventListener
         *Notifier中run方法解析
         *                  先去隊(duì)列中彈出隊(duì)頂元素(poll方法)
         *                  如果為空進(jìn)行下一次循環(huán)
         *                  如果不為空則去ConcurrentMap取listeners
         *                  去除listener去監(jiān)聽NamingEvent
         *
        */
        this.eventDispatcher = new EventDispatcher();

        /**
         * 初始化服務(wù)代理袜瞬,用戶名密碼服務(wù)地址及initRefreshTask任務(wù)的線程池,創(chuàng)建com.alibaba.nacos.client.naming.updater名字的daemon線程
         */
        this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);

        /**
         * initClientBeatThreadCount(properties):Runtime.getRuntime().availableProcessors()返回到Java虛擬機(jī)的可用的處理器數(shù)量
         * 創(chuàng)建一個(gè)此案城池com.alibaba.nacos.naming.beat.sender的daemon線程
         */
        this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));
        /**
         * 同上
         */
        this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, beatReactor, this.cacheDir,
                isLoadCacheAtStart(properties), initPollingThreadCount(properties));
    }

innit方法里面在初始化各個(gè)模塊身堡,具體的步驟是
1.檢查contextPath格式
2.將NoneSelector邓尤、ExpressionSelector這兩個(gè)類進(jìn)行注冊(cè)或者銷毀
3.nacos服務(wù)端地址初始化
4.如果應(yīng)用由EDAS部署,則支持阿里云的web上下文
5.這里初始化本地緩存
6.初始化LogName
7.初始化服務(wù)信息變更監(jiān)聽回調(diào)處理
8.初始化服務(wù)管理服務(wù)端地址列表更新管理,接口調(diào)用負(fù)載均衡汞扎,失敗重試
9.初始化本地實(shí)例信息心跳
10.初始化客戶端關(guān)心的服務(wù)的實(shí)例信息
說明:7-10都會(huì)初始化線程池季稳,創(chuàng)建daemon線程
總的來說,init方法為我們初始化各種本地信息澈魄,下面來看具體初始化方法

ValidatorUtils.checkInitParam(properties)

public static final String CONTEXT_PATH = "contextPath";
   
    private static final Pattern CONTEXT_PATH_MATCH = Pattern.compile("(\\/)\\1+");
    
    public static void checkInitParam(Properties properties) throws NacosException {
        checkContextPath(properties.getProperty(PropertyKeyConst.CONTEXT_PATH));
    }
    
    /**
     * Check context path.
     *
     * @param contextPath context path
     */
    public static void checkContextPath(String contextPath) {
        if (contextPath == null) {
            return;
        }
        Matcher matcher = CONTEXT_PATH_MATCH.matcher(contextPath);
        if (matcher.find()) {
            throw new IllegalArgumentException("Illegal url path expression");
        }
    }
    

這里的代碼比較簡(jiǎn)單景鼠,只是檢查了一下contextPath

InitUtils.initNamespaceForNaming(properties)

    /**
     * Add a difference to the name naming. This method simply initializes the namespace for Naming. Config
     * initialization is not the same, so it cannot be reused directly.
     *
     * 為名稱命名添加差異。此方法簡(jiǎn)單地初始化命名空間以進(jìn)行命名痹扇。配置初始化不一樣铛漓,所以不能直接重用。
     *
     * @param properties properties
     * @return namespace
     */
    public static String initNamespaceForNaming(Properties properties) {
        String tmpNamespace = null;

        String isUseCloudNamespaceParsing = properties.getProperty(PropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING,
                System.getProperty(SystemPropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING,
                        String.valueOf(Constants.DEFAULT_USE_CLOUD_NAMESPACE_PARSING)));//默認(rèn)是true
        System.out.println("isUseCloudNamespaceParsing:" + isUseCloudNamespaceParsing);
        if (Boolean.parseBoolean(isUseCloudNamespaceParsing)) {

            tmpNamespace = TenantUtil.getUserTenantForAns();//這里是ans鲫构,據(jù)說是注冊(cè)中心浓恶,未設(shè)置tenant.id和ans.namespace 返回為空
            /**
             * 這里檢查是否為空,如果不為空發(fā)返回tmpNamespace结笨,如果為空?qǐng)?zhí)行Callable.call()方法包晰,
             * call()方法里面去取ans.namespace屬性,返回namespace
             */
            tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
                @Override
                public String call() {
                    String namespace = System.getProperty(SystemPropertyKeyConst.ANS_NAMESPACE);
                    LogUtils.NAMING_LOGGER.info("initializer namespace from System Property :" + namespace);

                    return namespace;
                }
            });

            /**
             * 這里檢查是否為空炕吸,如果不為空發(fā)返回tmpNamespace杜窄,如果為空?qǐng)?zhí)行Callable.call()方法,
             * call()方法里面去取ALIBABA_ALIWARE_NAMESPACE環(huán)境變量
             */
            tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
                @Override
                public String call() {
                    String namespace = System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_NAMESPACE);
                    LogUtils.NAMING_LOGGER.info("initializer namespace from System Environment :" + namespace);
                    return namespace;
                }
            });
        }

        /**
         * 這里檢查是否為空算途,如果不為空發(fā)返回tmpNamespace塞耕,如果為空?qǐng)?zhí)行Callable.call()方法,
         * call()方法里面去取NAMESPACE屬性
         */
        tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
            @Override
            public String call() {
                String namespace = System.getProperty(PropertyKeyConst.NAMESPACE);
                LogUtils.NAMING_LOGGER.info("initializer namespace from System Property :" + namespace);
                return namespace;
            }
        });
        if (StringUtils.isEmpty(tmpNamespace) && properties != null) {
            /**
             * 這里拿到我們外面設(shè)置的namespace
             */
            tmpNamespace = properties.getProperty(PropertyKeyConst.NAMESPACE);
        }
        /**
         * 這里如果前面tmpNamespace都是null嘴瓤,則返回默認(rèn)的NAMESPACE:public
         */
        tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
            @Override
            public String call() {
                return UtilAndComs.DEFAULT_NAMESPACE_ID;
            }
        });
        return tmpNamespace;
    }

這個(gè)方法里面先去會(huì)判斷是否使用isUseCloudNamespaceParsing扫外,默認(rèn)是true,然后回去檢查是否用ans廓脆,ALIBABA_ALIWARE筛谚。同時(shí)會(huì)拿到我們最開始設(shè)置的namespace,如果為設(shè)置停忿,則用默認(rèn)的public驾讲。

InitUtils.initSerialization()

 /**
     * Register subType for serialization.
     *
     * <p>
     * Now these subType implementation class has registered in static code. But there are some problem for classloader.
     * The implementation class will be loaded when they are used, which will make deserialize before register.
     * </p>
     *
     * <p>
     * 子類實(shí)現(xiàn)類中的靜態(tài)代碼串中已經(jīng)向Jackson進(jìn)行了注冊(cè),但是由于classloader的原因席赂,只有當(dāng) 該子類被使用的時(shí)候吮铭,才會(huì)加載該類。這可能會(huì)導(dǎo)致Jackson先進(jìn)性反序列化颅停,再注冊(cè)子類谓晌,從而導(dǎo)致 反序列化失敗。
     * </p>
     */
    public static void initSerialization() {
        // TODO register in implementation class or remove subType
        JacksonUtils.registerSubtype(NoneSelector.class, SelectorType.none.name());
        JacksonUtils.registerSubtype(ExpressionSelector.class, SelectorType.label.name());
    }

這里很簡(jiǎn)單癞揉,主要是為了防止反序列化失敗

initServerAddr(properties)

   private void initServerAddr(Properties properties) {
        //這里拿到我們前面填寫的nacos服務(wù)端地址
        serverList = properties.getProperty(PropertyKeyConst.SERVER_ADDR);

        endpoint = InitUtils.initEndpoint(properties);
        if (StringUtils.isNotEmpty(endpoint)) {
            serverList = "";
        }
    }
  /**
     * Init end point.
     *
     * @param properties properties
     * @return end point
     */
    public static String initEndpoint(final Properties properties) {
        if (properties == null) {

            return "";
        }
        // Whether to enable domain name resolution rules 是否啟用域名解析規(guī)則
        /**
         * 這里是去取end point的解析規(guī)則纸肉,即對(duì)傳入的endpoint參數(shù)規(guī)則解析的能力可以是一個(gè)具體的值溺欧,也可以是一個(gè)占位符的形式
         * 1.endpoint.options 是一個(gè)具體的變量。支持從系統(tǒng)屬性柏肪,系統(tǒng)環(huán)境變量中讀取姐刁。
         * 2.defaultValue 是給出的一個(gè)默認(rèn)值。當(dāng)從具體的變量中沒有被正確初始化時(shí)烦味,會(huì)使用給出的默認(rèn)值來初始化绢淀。
         *
         */
        String isUseEndpointRuleParsing = properties.getProperty(PropertyKeyConst.IS_USE_ENDPOINT_PARSING_RULE,
                System.getProperty(SystemPropertyKeyConst.IS_USE_ENDPOINT_PARSING_RULE,
                        String.valueOf(ParamUtil.USE_ENDPOINT_PARSING_RULE_DEFAULT_VALUE)));

        //isUseEndpointParsingRule的值決定是否啟用endpoint解析規(guī)則
        boolean isUseEndpointParsingRule = Boolean.parseBoolean(isUseEndpointRuleParsing);
        String endpointUrl;
        if (isUseEndpointParsingRule) {//如果啟用解析規(guī)則
            // Get the set domain name information
            endpointUrl = ParamUtil.parsingEndpointRule(properties.getProperty(PropertyKeyConst.ENDPOINT));
            if (StringUtils.isBlank(endpointUrl)) {
                return "";
            }
        } else {//不啟用
            endpointUrl = properties.getProperty(PropertyKeyConst.ENDPOINT);
        }

        if (StringUtils.isBlank(endpointUrl)) {
            return "";
        }

        String endpointPort = TemplateUtils
                .stringEmptyAndThenExecute(System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_ENDPOINT_PORT),
                        new Callable<String>() {
                            @Override
                            public String call() {

                                return properties.getProperty(PropertyKeyConst.ENDPOINT_PORT);
                            }
                        });

        endpointPort = TemplateUtils.stringEmptyAndThenExecute(endpointPort, new Callable<String>() {
            @Override
            public String call() {
                return "8080";
            }
        });

        return endpointUrl + ":" + endpointPort;
    }

第一部分是設(shè)置serverList為我們最開始設(shè)置的服務(wù)端地址
第二部分設(shè)置我們的endpoint規(guī)則

InitUtils.initWebRootContext()

   /**
     * Init web root context.
     */
    public static void initWebRootContext() {
        // support the web context with ali-yun if the app deploy by EDAS
        final String webContext = System.getProperty(SystemPropertyKeyConst.NAMING_WEB_CONTEXT);
        TemplateUtils.stringNotEmptyAndThenExecute(webContext, new Runnable() {
            @Override
            public void run() {
                UtilAndComs.webContext = webContext.indexOf("/") > -1 ? webContext : "/" + webContext;

                UtilAndComs.nacosUrlBase = UtilAndComs.webContext + "/v1/ns";
                UtilAndComs.nacosUrlInstance = UtilAndComs.nacosUrlBase + "/instance";
            }
        });
    }

這里如果應(yīng)用由EDAS部署矛缨,則支持阿里云的web上下文

initCacheDir()

private void initCacheDir() {
    cacheDir = System.getProperty("com.alibaba.nacos.naming.cache.dir");
    if (StringUtils.isEmpty(cacheDir)) {
        cacheDir = System.getProperty("user.home") + "/nacos/naming/" + namespace;
    }
}

這里初始化本地實(shí)例信息八回,在本地你會(huì)看到這樣的文件
C:\Users\nacos\naming\quickStart
[圖片上傳失敗...(image-bbdedd-1600251861321)]

initLogName(properties)

    private void initLogName(Properties properties) {
        logName = System.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME);
        if (StringUtils.isEmpty(logName)) {

            if (properties != null && StringUtils
                    .isNotEmpty(properties.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME))) {
                logName = properties.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME);
            } else {
                logName = "naming.log";
            }
        }
    }

這里設(shè)置logname萄焦,目前沒看到哪里用

new EventDispatcher()

    public EventDispatcher() {
        
        this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener");
                thread.setDaemon(true);
                
                return thread;
            }
        });
        
        this.executor.execute(new Notifier());
    }

我們看到構(gòu)造方法里面初始化了一個(gè)線程池,并且加入了Notifier這個(gè)線程凤瘦,我們來看看Notifier的run方法

@Override
        public void run() {
            while (!closed) {
                
                ServiceInfo serviceInfo = null;
                try {
                    serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
                } catch (Exception ignore) {
                }
                
                if (serviceInfo == null) {
                    continue;
                }
                
                try {
                    List<EventListener> listeners = observerMap.get(serviceInfo.getKey());
                    
                    if (!CollectionUtils.isEmpty(listeners)) {
                        for (EventListener listener : listeners) {
                            List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
                            listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                                    serviceInfo.getClusters(), hosts));
                        }
                    }
                    
                } catch (Exception e) {
                    NAMING_LOGGER.error("[NA] notify error for service: " + serviceInfo.getName() + ", clusters: "
                            + serviceInfo.getClusters(), e);
                }
            }
        }

先去隊(duì)列中彈出隊(duì)頂元素(poll方法)
如果為空進(jìn)行下一次循環(huán)
如果不為空則去ConcurrentMap取listeners
取出listener去監(jiān)聽NamingEvent

new NamingProxy(this.namespace, this.endpoint, this.serverList, properties)

    public NamingProxy(String namespaceId, String endpoint, String serverList, Properties properties) {
        
        this.securityProxy = new SecurityProxy(properties, nacosRestTemplate);
        this.properties = properties;
        this.setServerPort(DEFAULT_SERVER_PORT);
        this.namespaceId = namespaceId;
        this.endpoint = endpoint;
        if (StringUtils.isNotEmpty(serverList)) {
            this.serverList = Arrays.asList(serverList.split(","));
            if (this.serverList.size() == 1) {
                this.nacosDomain = serverList;
            }
        }
        this.initRefreshTask();
    }

這里的初始化動(dòng)作很多 我們一個(gè)一個(gè)看
首先看new SecurityProxy(properties, nacosRestTemplate)

    /**
     * Construct from properties, keeping flexibility.
     *
     * @param properties a bunch of properties to read
     */
    public SecurityProxy(Properties properties, NacosRestTemplate nacosRestTemplate) {
        username = properties.getProperty(PropertyKeyConst.USERNAME, StringUtils.EMPTY);
        password = properties.getProperty(PropertyKeyConst.PASSWORD, StringUtils.EMPTY);
        contextPath = properties.getProperty(PropertyKeyConst.CONTEXT_PATH, "/nacos");
        contextPath = contextPath.startsWith("/") ? contextPath : "/" + contextPath;
        this.nacosRestTemplate = nacosRestTemplate;
    }

這里設(shè)置了用戶名和密碼同時(shí)初始化了nacosRestTemplate宿礁,nacosRestTemplate是客戶端發(fā)送信息到服務(wù)端的類,里面用HttpClient實(shí)現(xiàn)蔬芥,有興趣的可以去看看
再看看initRefreshTask方法

private void initRefreshTask() {
        
        this.executorService = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.naming.updater");
                t.setDaemon(true);
                return t;
            }
        });
        
        this.executorService.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                refreshSrvIfNeed();
            }
        }, 0, vipSrvRefInterMillis, TimeUnit.MILLISECONDS);
        
        this.executorService.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                securityProxy.login(getServerList());
            }
        }, 0, securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
        
        refreshSrvIfNeed();
        this.securityProxy.login(getServerList());
    }

首先初始化一個(gè)線程池梆靖,同時(shí)refreshSrvIfNeed去拿服務(wù)端serverList,同時(shí)securityProxy.login登陸到拿到的服務(wù)端列表笔诵。

new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties))

我們先看看initClientBeatThreadCount方法

    public static final int DEFAULT_CLIENT_BEAT_THREAD_COUNT =
            Runtime.getRuntime().availableProcessors() > 1 ? Runtime.getRuntime().availableProcessors() / 2 : 1;
   private int initClientBeatThreadCount(Properties properties) {
        if (properties == null) {
            return UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT;
        }

        return ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_CLIENT_BEAT_THREAD_COUNT),
                UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
    }

通過 Runtime.getRuntime().availableProcessors()方法拿到Java虛擬機(jī)的可用的處理器數(shù)量返吻,下面我們看看構(gòu)造方法

    public BeatReactor(NamingProxy serverProxy, int threadCount) {
        this.serverProxy = serverProxy;
        this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.beat.sender");
                return thread;
            }
        });
    }

這里只是初始化了線程池,本身這個(gè)BeatReactor有一個(gè)內(nèi)部類BeatTask執(zhí)行本地實(shí)例注冊(cè)到服務(wù)端做心跳檢測(cè)

new HostReactor(this.eventDispatcher, this.serverProxy, beatReactor, this.cacheDir, isLoadCacheAtStart(properties), initPollingThreadCount(properties))

這里先看看isLoadCacheAtStart

   private boolean isLoadCacheAtStart(Properties properties) {
        boolean loadCacheAtStart = false;
        if (properties != null && StringUtils
                .isNotEmpty(properties.getProperty(PropertyKeyConst.NAMING_LOAD_CACHE_AT_START))) {
            loadCacheAtStart = ConvertUtils
                    .toBoolean(properties.getProperty(PropertyKeyConst.NAMING_LOAD_CACHE_AT_START));
        }

        return loadCacheAtStart;
    }

這個(gè)方法比較簡(jiǎn)單乎婿,只是設(shè)置了是否加載本地緩存测僵,下面我們看看構(gòu)造方法

   public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor,
            String cacheDir, boolean loadCacheAtStart, int pollingThreadCount) {
        // init executorService
        this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.client.naming.updater");
                return thread;
            }
        });
        this.eventDispatcher = eventDispatcher;
        this.beatReactor = beatReactor;
        this.serverProxy = serverProxy;
        this.cacheDir = cacheDir;
        if (loadCacheAtStart) {
            this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
        } else {
            this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
        }
        
        this.updatingMap = new ConcurrentHashMap<String, Object>();
        this.failoverReactor = new FailoverReactor(this, cacheDir);
        this.pushReceiver = new PushReceiver(this);
    }
    

這里初始化了一些本地緩存的內(nèi)容,我們主要看看FailoverReactor和PushReceiver

FailoverReactor

    public FailoverReactor(HostReactor hostReactor, String cacheDir) {
        this.hostReactor = hostReactor;
        this.failoverDir = cacheDir + "/failover";
        // init executorService
        this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.failover");
                return thread;
            }
        });
        this.init();
    }
    
  public void init() {
        
        executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);
        
        executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);
        
        // backup file on startup if failover directory is empty.
        executorService.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    File cacheDir = new File(failoverDir);
                    
                    if (!cacheDir.exists() && !cacheDir.mkdirs()) {
                        throw new IllegalStateException("failed to create cache dir: " + failoverDir);
                    }
                    
                    File[] files = cacheDir.listFiles();
                    if (files == null || files.length <= 0) {
                        new DiskFileWriter().run();
                    }
                } catch (Throwable e) {
                    NAMING_LOGGER.error("[NA] failed to backup file on startup.", e);
                }
                
            }
        }, 10000L, TimeUnit.MILLISECONDS);
    }

這里是操作本地實(shí)例信息的一些線程谢翎,F(xiàn)ailoverReactor通過一個(gè)文件配置激活failover模式捍靠。該模式下,會(huì)從本地文件中讀取服務(wù) 列表信息森逮。

PushReceiver

這里主要看run方法

 @Override
    public void run() {
        while (!closed) {
            try {
                
                // byte[] is initialized with 0 full filled by default
                byte[] buffer = new byte[UDP_MSS];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                
                udpSocket.receive(packet);
                
                String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
                NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
                
                PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
                String ack;
                if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                    hostReactor.processServiceJson(pushPacket.data);
                    
                    // send ack to server
                    ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
                            + "\"\"}";
                } else if ("dump".equals(pushPacket.type)) {
                    // dump data to server
                    ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
                            + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
                            + "\"}";
                } else {
                    // do nothing send ack only
                    ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                            + "\", \"data\":" + "\"\"}";
                }
                
                udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
                        packet.getSocketAddress()));
            } catch (Exception e) {
                NAMING_LOGGER.error("[NA] error while receiving push data", e);
            }
        }
    }

run方法使用while true循環(huán)來執(zhí)行udpSocket.receive(packet)榨婆,之后將接收到的數(shù)據(jù)解析為PushPacket,然后根據(jù)不同pushPacket.type做不同處理
當(dāng)pushPacket.type為dom或者service的時(shí)候會(huì)調(diào)用hostReactor.processServiceJSON(pushPacket.data)褒侧;當(dāng)pushPacket.type為dump的時(shí)候會(huì)將hostReactor.getServiceInfoMap()序列化到ack中良风,最后將ack返回回去
至此,初始化工作就完成了闷供,下面我們來看看如何注冊(cè)namespace

registerInstance

    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        /**
         *ephemeral
         *短暫的
         */
        if (instance.isEphemeral()) {
            BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
            /**
             * BeatTask加入到線程池中烟央,線程池中線程注冊(cè)實(shí)例到服務(wù)端
             *
             * 利用prometheus監(jiān)控
             */
            beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        }
        //這里也是注冊(cè)實(shí)例到服務(wù)端,beatReactor里面也是本地實(shí)例心跳
        serverProxy.registerService(groupedServiceName, groupName, instance);
    }

registerInstance方法第一步獲取GroupedName这吻,然后看instance是否短暫的吊档,如果是執(zhí)行beatReactor.addBeatInfo方法篙议,注冊(cè)及監(jiān)控唾糯,最后也是通過serverProxy注冊(cè)namespace
我們直接來看addBeatInfo方法

    /**
     * Add beat information.
     *
     * @param serviceName service name
     * @param beatInfo    beat information
     */
    public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
        NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
        String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
        BeatInfo existBeat = null;
        //fix #1733
        if ((existBeat = dom2Beat.remove(key)) != null) {
            existBeat.setStopped(true);
        }
        dom2Beat.put(key, beatInfo);
        executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
        MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
    }

先是buildKey生成key怠硼,查看beatinfo是否存在,不存在put移怯;然后執(zhí)行BeatTask,我們來看看BeatTask的run方法

public void run() {
            if (beatInfo.isStopped()) {
                return;
            }
            long nextTime = beatInfo.getPeriod();
            try {
                JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
                long interval = result.get("clientBeatInterval").asLong();
                boolean lightBeatEnabled = false;
                if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
                    lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
                }
                BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
                if (interval > 0) {
                    nextTime = interval;
                }
                int code = NamingResponseCode.OK;
                if (result.has(CommonParams.CODE)) {
                    code = result.get(CommonParams.CODE).asInt();
                }
                if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                    Instance instance = new Instance();
                    instance.setPort(beatInfo.getPort());
                    instance.setIp(beatInfo.getIp());
                    instance.setWeight(beatInfo.getWeight());
                    instance.setMetadata(beatInfo.getMetadata());
                    instance.setClusterName(beatInfo.getCluster());
                    instance.setServiceName(beatInfo.getServiceName());
                    instance.setInstanceId(instance.getInstanceId());
                    instance.setEphemeral(true);
                    try {
                        /**
                         * 注冊(cè)實(shí)例到服務(wù)端  reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
                         */
                        serverProxy.registerService(beatInfo.getServiceName(),
                                NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                    } catch (Exception ignore) {
                    }
                }
            } catch (NacosException ex) {
                NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
                        JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());

            }
            executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
        }

這個(gè)方法里面最重要兩步就是第一步sendBeat發(fā)送心跳香璃,第二步通過reqApi注冊(cè)實(shí)例到服務(wù)端
然后看看registerService方法

 /**
     * register a instance to service with specified instance properties.
     *
     * @param serviceName name of service
     * @param groupName   group of service
     * @param instance    instance to register
     * @throws NacosException nacos exception
     */
    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        
        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
                instance);
        
        final Map<String, String> params = new HashMap<String, String>(16);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
        
        reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
        
    }

這里主要配置參數(shù),然后通過reqApi注冊(cè)實(shí)例到服務(wù)端舟误,注冊(cè)實(shí)例到服務(wù)端代碼如下

/**
     * Call server.
     *
     * @param api       api
     * @param params    parameters
     * @param body      body
     * @param curServer ?
     * @param method    http method
     * @return result
     * @throws NacosException nacos exception
     */
    public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
            String method) throws NacosException {
        long start = System.currentTimeMillis();
        long end = 0;
        injectSecurityInfo(params);
        Header header = builderHeader();
        
        String url;
        if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
            url = curServer + api;
        } else {
            if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) {
                curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort;
            }
            url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
        }
        
        try {
            HttpRestResult<String> restResult = nacosRestTemplate
                    .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
            end = System.currentTimeMillis();
            
            MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
                    .observe(end - start);
            
            if (restResult.ok()) {
                return restResult.getData();
            }
            if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
                return StringUtils.EMPTY;
            }
            throw new NacosException(restResult.getCode(), restResult.getMessage());
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] failed to request", e);
            throw new NacosException(NacosException.SERVER_ERROR, e);
        }
    }

我們看到葡秒,這里是通過最開始初始化的nacosRestTemplate發(fā)送的。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末嵌溢,一起剝皮案震驚了整個(gè)濱河市眯牧,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌赖草,老刑警劉巖学少,帶你破解...
    沈念sama閱讀 221,576評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異秧骑,居然都是意外死亡版确,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門乎折,熙熙樓的掌柜王于貴愁眉苦臉地迎上來绒疗,“玉大人,你說我怎么就攤上這事骂澄∠拍ⅲ” “怎么了?”我有些...
    開封第一講書人閱讀 168,017評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵坟冲,是天一觀的道長(zhǎng)士修。 經(jīng)常有香客問我,道長(zhǎng)樱衷,這世上最難降的妖魔是什么棋嘲? 我笑而不...
    開封第一講書人閱讀 59,626評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮矩桂,結(jié)果婚禮上沸移,老公的妹妹穿的比我還像新娘。我一直安慰自己侄榴,他們只是感情好雹锣,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,625評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著癞蚕,像睡著了一般蕊爵。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上桦山,一...
    開封第一講書人閱讀 52,255評(píng)論 1 308
  • 那天会放,我揣著相機(jī)與錄音咧最,去河邊找鬼矢沿。 笑死捣鲸,一個(gè)胖子當(dāng)著我的面吹牛摄狱,可吹牛的內(nèi)容都是我干的祝谚。 我是一名探鬼主播交惯,決...
    沈念sama閱讀 40,825評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼穿仪,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼啊片!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起齐饮,我...
    開封第一講書人閱讀 39,729評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤祖驱,失蹤者是張志新(化名)和其女友劉穎捺僻,沒想到半個(gè)月后束昵,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體妻怎,經(jīng)...
    沈念sama閱讀 46,271評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡匿辩,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,363評(píng)論 3 340
  • 正文 我和宋清朗相戀三年挺庞,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了选侨。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,498評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡然走,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出芍瑞,到底是詐尸還是另有隱情,我是刑警寧澤拆檬,帶...
    沈念sama閱讀 36,183評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站竟贯,受9級(jí)特大地震影響答捕,放射性物質(zhì)發(fā)生泄漏屑那。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,867評(píng)論 3 333
  • 文/蒙蒙 一痢站、第九天 我趴在偏房一處隱蔽的房頂上張望芒填。 院中可真熱鬧呜叫,春花似錦盛泡、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,338評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至箱硕,卻和暖如春拴竹,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背剧罩。 一陣腳步聲響...
    開封第一講書人閱讀 33,458評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工栓拜, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人惠昔。 一個(gè)月前我還...
    沈念sama閱讀 48,906評(píng)論 3 376
  • 正文 我出身青樓幕与,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親镇防。 傳聞我的和親對(duì)象是個(gè)殘疾皇子啦鸣,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,507評(píng)論 2 359