Nacos源碼分析-服務注冊

零屯掖、本文綱要

  • 一、源碼準備
  • 二襟衰、了解服務注冊-客戶端
    1贴铜、Nacos的服務注冊表結構
    2、查看Nacos的服務注冊源碼
    3瀑晒、跟蹤Nacos的服務注冊流程
    4绍坝、客戶端注冊的流程圖
  • 三、了解服務注冊-服務端
    1苔悦、確定模塊
    2轩褐、跟蹤Nacos接收處理服務注冊源碼
    3、服務端注冊的流程圖
  • 四间坐、Nacos服務注冊部分總結

tips:Ctrl + F快速定位所需內容閱讀吧。

一邑退、源碼準備

1竹宋、下載Nacos源碼

官方下載連接:Release 1.4.2 (Apr 29th, 2021) · alibaba/nacos · GitHub

下載Nacos源碼.png

2地技、解壓導入源碼

導入IDEA蜈七,此處省略步驟。

3莫矗、proto編譯

Nacos底層的數據通信會基于protobuf對數據做序列化和反序列化飒硅,需要先將proto文件編譯為對應的Java代碼砂缩。

proto編譯.png
  • ① 安裝protoc

下載protoc:Releases · protocolbuffers/protobuf · GitHub

image.png
  • ② 解壓&配置環(huán)境變量
復制目錄.png
配置系統變量.png
配置環(huán)境變量.png

4三娩、編譯proto

  • ① 進入目標目錄nacos-1.4.2\consistency\src\main
進入目標目錄.png
  • ② 打開cmd窗口進行編譯

編譯consistency.proto到java目錄庵芭,如下:

protoc --java_out=./java ./proto/consistency.proto

編譯Data.proto到java目錄,如下:

protoc --java_out=./java ./proto/Data.proto

5雀监、啟動Nacos配置

Nacos控制臺啟動類.png
啟動Nacos配置.png

6双吆、測試

訪問控制臺.png

二、了解服務注冊-客戶端

1会前、Nacos的服務注冊表結構

  • ① 環(huán)境隔離:namespace
  • ② 服務分組:group
  • ③ 服務集群:service cluster
  • ④ 服務實例:service instance
image.png

2好乐、查看Nacos的服務注冊源碼

  • ① 定位依賴

服務注冊與Nacos的依賴有關,所以查看spring-cloud-starter-alibaba-nacos-discovery依賴瓦宜,如下:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
  • ② 在依賴中查看自動裝配文件spring.factories
查看自動裝配文件.png
  • ③ 定位Nacos服務注冊自動配置類
定位Nacos服務注冊自動配置類.png
  • ④ 查看NacosServiceRegistryAutoConfiguration類源碼
image.png

3蔚万、跟蹤Nacos的服務注冊流程

由上述內容我們可以知道,Nacos服務自動注冊是從NacosServiceRegistryAutoConfiguration類開始的临庇,自動注冊涉及到NacosAutoServiceRegistration類反璃。

  • ① NacosAutoServiceRegistration類

NacosServiceRegistryAutoConfiguration類最后返回了new出來的NacosAutoServiceRegistration類對象,所以我們繼續(xù)跟蹤該類的構造方法苔巨,如下:

NacosAutoServiceRegistration類.png

該類構造方法中初始化其父類第献,所以我們繼續(xù)跟蹤父類乾巧。

  • ② AbstractAutoServiceRegistration類

該類實現了ApplicationListener接口,監(jiān)聽Spring容器啟動過程中的WebServerInitializedEvent事件,如下:

AbstractAutoServiceRegistration類.png

在監(jiān)聽到WebServerInitializedEvent(web服務初始化完成)的事件后描验,執(zhí)行了bind 方法,如下:

監(jiān)聽到WebServerInitializedEvent事件.png

AbstractAutoServiceRegistration#bind(row91-102)方法牌废,如下:

@Deprecated
public void bind(WebServerInitializedEvent event) {
  // 獲取 ApplicationContext 對象
  ApplicationContext context = event.getApplicationContext();
  // 判斷服務的 Namespace章贞,一般為 null
  if (context instanceof ConfigurableWebServerApplicationContext) {
    if ("management".equals(((ConfigurableWebServerApplicationContext) context)
        .getServerNamespace())) {
      return;
    }
  }
  // 記錄當前 web 服務的端口
  this.port.compareAndSet(0, event.getWebServer().getPort());
  // 啟動當前服務注冊流程
  this.start();
}

AbstractAutoServiceRegistration#start(row125-147)方法,如下:

public void start() {
  if (!isEnabled()) {
    if (logger.isDebugEnabled()) {
      logger.debug("Discovery Lifecycle disabled. Not starting");
    }
    return;
  }

  // only initialize if nonSecurePort is greater than 0 and it isn't already running
  // because of containerPortInitializer below
  // 當前服務處于未運行狀態(tài)時闺魏,才進行初始化
  if (!this.running.get()) {
    // 發(fā)布服務開始注冊的事件
    this.context.publishEvent(
        new InstancePreRegisteredEvent(this, getRegistration()));
    // 【關鍵】開始服務注冊
    register();
    if (shouldRegisterManagement()) {
      registerManagement();
    }
    // 發(fā)布注冊完成事件
    this.context.publishEvent(
        new InstanceRegisteredEvent<>(this, getConfiguration()));
    // 服務狀態(tài)設置為運行狀態(tài)未状,基于AtomicBoolean#compareAndSet(row98-102)
    this.running.compareAndSet(false, true);
  }

}

AbstractAutoServiceRegistration#register(row238-240)方法,如下:

ServiceRegistry的register方法.png
NacosServiceRegistry實現類.png
  • ③ NacosServiceRegistry類

NacosServiceRegistry#register(row59-89)方法析桥,如下:

@Override
public void register(Registration registration) {
  // 判斷 ServiceId 是否為空司草,spring.applicaion.name 不能為空
  if (StringUtils.isEmpty(registration.getServiceId())) {
    log.warn("No service to register for nacos client...");
    return;
  }
  // 獲取 Nacos 的命名服務,就是注冊中心服務
  NamingService namingService = namingService();
  // 獲取 serviceId泡仗、group
  String serviceId = registration.getServiceId();
  String group = nacosDiscoveryProperties.getGroup();
  // 封裝服務實例埋虹,包含:Ip、Port娩怎、Weight搔课、ClusterName、Ephemeral等
  Instance instance = getNacosInstanceFromRegistration(registration);

  try {
    // 開始注冊服務
    namingService.registerInstance(serviceId, group, instance);
    log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
        instance.getIp(), instance.getPort());
  }
  catch (Exception e) {
    if (nacosDiscoveryProperties.isFailFast()) {
      log.error("nacos registry, {} register failed...{},", serviceId,
          registration.toString(), e);
      rethrowRuntimeException(e);
    }
    else {
      log.warn("Failfast is false. {} register failed...{},", serviceId,
          registration.toString(), e);
    }
  }
}

可以看到方法中最終是調用NamingService的registerInstance方法實現注冊的截亦,如下:

NamingService的registerInstance方法.png

NamingService的實現類NacosNamingService爬泥,如下:

image.png
  • ④ NacosNamingService類

NacosNamingService#registerInstance(row204-213)方法柬讨,如下:

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    // 檢查超時參數是否異常,心跳超時時間(默認15秒)必須大于心跳周期(默認5秒)
    NamingUtils.checkInstanceIsLegal(instance);
    // 拼接得到新的服務名袍啡,格式:groupName@@serviceName
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 判斷是否為臨時實例踩官,默認為 true
    if (instance.isEphemeral()) {
        // 是臨時實例,需要定時向 Nacos 服務發(fā)送心跳
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
        beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
    // 【關鍵】發(fā)送注冊服務實例的請求
    serverProxy.registerService(groupedServiceName, groupName, instance);
}

可以看到registerService最后是由NamingProxy的實現的葬馋,如下:

registerService.png

補充:com.alibaba.nacos.api.common.Constants(row167-171)卖鲤,如下:

// 心跳超時時間,15s
public static final long DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
// IP刪除超時時間畴嘶,30s
public static final long DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
// 心跳周期蛋逾,5s
public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
  • ⑤ NamingProxy類

NamingProxy#registerService(row220-248)方法,如下:

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    
    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
            instance);
    // 組織請求參數    
    final Map<String, String> params = new HashMap<String, String>(16);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put(CommonParams.GROUP_NAME, groupName);
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
    // 通過 POST 請求窗悯,將上述參數發(fā)送到:/nacos/v1/ns/instance
    reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
    
}

這里提交的信息就是Nacos服務注冊接口需要的完整參數区匣,核心參數有:

Ⅰ NAMESPACE_ID:環(huán)境;
Ⅱ SERVICE_NAME:服務名稱蒋院;
Ⅲ GROUP_NAME:分組名稱亏钩;
Ⅳ CLUSTER_NAME:集群名稱;
Ⅴ ip:當前實例的IP地址欺旧;
Ⅵ port:當前實例的端口姑丑。

補充:com.alibaba.nacos.client.naming.utils.UtilAndComs(row30-34),如下:

public static String webContext = "/nacos";
public static String nacosUrlBase = webContext + "/v1/ns";
public static String nacosUrlInstance = nacosUrlBase + "/instance";

4辞友、客戶端注冊的流程圖

客戶端注冊的流程圖.png

三栅哀、了解服務注冊-服務端

經過以上了解,我們知道最后客戶端注冊服務實例是通過 POST 請求称龙,將注冊參數發(fā)送到:/nacos/v1/ns/instance留拾。因此,我們從對應接收此請求的Controller開始鲫尊。

1痴柔、確定模塊

  • ① nacos-concle模塊

我們啟動Nacos服務會使用Nacos-concle模塊的啟動類,該模塊中引用的nacos-naming模塊就是我們服務注冊相關的模塊疫向。

nacos-naming模塊.png
  • ② nacos-naming模塊

可以看到InstanceController類的請求路由即是我們POST請求的路由的部分咳蔚,如下:

image.png

因此,我們從InstanceController開始研究接收請求處理服務注冊的源碼搔驼。

2谈火、跟蹤Nacos接收處理服務注冊源碼

  • ① InstanceController類

InstanceController#register方法,如下:

@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
    // 從 request 獲取 namespaceId匙奴,沒有則為默認 public
    final String namespaceId = WebUtils
            .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);

    // 獲取服務名稱 serviceName = "group@@serviceName"
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);

    // 把 request 中的參數封裝為 Instance 對象
    final Instance instance = parseInstance(request);

    // 【關鍵】注冊實例
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}

此處會進入ServiceManager的注冊方法堆巧,如下:

注冊實例.png
  • ② ServiceManager類

Ⅰ ServiceManager#serviceMap屬性:Map(namespace, Map(group::serviceName, Service))妄荔,里面注冊著各個服務實例泼菌,如下:

Nacos服務注冊表.png

Ⅱ ServiceManager#registerInstance方法谍肤,如下:

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    // 如果是第一次,則創(chuàng)建空的服務哗伯,放入注冊表
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());

    // 從注冊表中拿到 service
    Service service = getService(namespaceId, serviceName);

    if (service == null) {
        throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    }
    // 【關鍵】添加實例到 service 當中
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

Ⅲ ServiceManager#addInstance方法荒揣,如下:

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
        throws NacosException {
    // 給當前服務生成一個唯一標識,可以理解為 serviceId
    // 臨時:com.alibaba.nacos.naming.iplist.ephemeral. + namespaceId + ## + serviceName
    // 永久:com.alibaba.nacos.naming.iplist. + namespaceId + ## + serviceName
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

    // 從注冊表中拿到 service
    Service service = getService(namespaceId, serviceName);

    // 以 service 為鎖對象焊刹,同一個服務的多個實例系任,只能串行來完成注冊(不能并發(fā)修改)
    synchronized (service) {
        // 【重點】拷貝注冊表中 舊的實例列表,在此結合新注冊的實例虐块,得到最終的實例列表 COPY ON WRITE
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

        // 封裝實例列表到 Instances 對象中
        Instances instances = new Instances();
        instances.setInstanceList(instanceList);

        // 更新注冊表(更新本地注冊表俩滥、數據同步給 Nacos 集群中的其他節(jié)點)
        consistencyService.put(key, instances);
    }
}

該方法中對修改服務列表的動作加鎖處理,確保線程安全贺奠。而在同步代碼塊中霜旧,包含下面幾步:

  • 1)先獲取要更新的實例列表,addIpAddresses(service, ephemeral, ips);
  • 2)然后將更新后的數據封裝到Instances對象中儡率,后面更新注冊表時使用
  • 3)最后挂据,調用consistencyService.put()方法完成Nacos集群的數據同步,保證集群一致性儿普。

注意:在第1步的addIPAddress中崎逃,會拷貝舊的實例列表,添加新實例到列表中眉孩。在第3步中个绍,完成對實例狀態(tài)更新后,則會用新列表直接覆蓋舊實例列表勺像。而在更新過程中障贸,舊實例列表不受影響,用戶依然可以讀取吟宦。

COPY ON WRITE:在更新列表狀態(tài)過程中篮洁,無需阻塞用戶的讀操作,也不會導致用戶讀取到臟數據殃姓,性能比較好袁波。

【A、更新服務列表List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

整個過程:

a蜗侈、獲取舊的實例列表篷牌,對比新的與舊的;
b踏幻、添加新的實例枷颊,舊的實例同步id;
c、返回最新的實例列表夭苗。

具體源碼如下:

ServiceManager#addIpAddresses方法信卡,如下:

private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
    return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}

ServiceManager#updateIpAddresses方法,如下:

public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
        throws NacosException {
    // 從 DataStore 中獲取實例列表题造,可以理解為 Nacos 集群同步來的實例列表
    Datum datum = consistencyService
            .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));

    // 從本地注冊表中傍菇,獲取實例列表
    List<Instance> currentIPs = service.allIPs(ephemeral);
    Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
    Set<String> currentInstanceIds = Sets.newHashSet();
    // 封裝本地注冊表中實例列表
    for (Instance instance : currentIPs) {
        currentInstances.put(instance.toIpAddr(), instance);
        currentInstanceIds.add(instance.getInstanceId());
    }

    // 合并與拷貝,舊實例列表
    Map<String, Instance> instanceMap;
    if (datum != null && null != datum.value) {
        // 如果集群同步列表中有數據界赔,則將本地注冊列表和 datum 中的列表做合并
        instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
    } else {
        instanceMap = new HashMap<>(ips.length);
    }

    // 遍歷新實例列表
    for (Instance instance : ips) {
        if (!service.getClusterMap().containsKey(instance.getClusterName())) {
            Cluster cluster = new Cluster(instance.getClusterName(), service);
            cluster.init();
            service.getClusterMap().put(instance.getClusterName(), cluster);
            Loggers.SRV_LOG
                    .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                            instance.getClusterName(), instance.toJson());
        }

        if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
            instanceMap.remove(instance.getDatumKey());
        } else {
            // 嘗試獲取與當前實例ip丢习、端口一致的舊實例
            Instance oldInstance = instanceMap.get(instance.getDatumKey());
            if (oldInstance != null) {
                // 如果存在,則把舊的 instanceId 賦值作為新的 instanceId
                instance.setInstanceId(oldInstance.getInstanceId());
            } else {
                // 如果不存在淮悼,證明是一個全新實例咐低,則生成id
                instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
            }
            instanceMap.put(instance.getDatumKey(), instance);
        }

    }

    if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
        throw new IllegalArgumentException(
                "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
                        .toJson(instanceMap.values()));
    }
    // 返回實例列表
    return new ArrayList<>(instanceMap.values());
}

【B、Nacos集群一致性consistencyService.put(key, instances);

Nacos集群一致性.png

ServiceManager#consistencyService屬性袜腥,如下:

consistencyService屬性.png

可以看到渊鞋,此處的put方法正是采用DelegateConsistencyServiceImpl的put方法。

DelegateConsistencyServiceImpl#put方法瞧挤,如下:

@Override
public void put(String key, Record value) throws NacosException {
    // 根據實例是否是臨時實例锡宋,判斷委托對象
    mapConsistencyService(key).put(key, value);
}

DelegateConsistencyServiceImpl#mapConsistencyService方法,如下:

private ConsistencyService mapConsistencyService(String key) {
    // 判斷是否是臨時實例:
    // 是特恬,選擇 ephemeralConsistencyService执俩,也就是 DistroConsistencyServiceImpl類
    // 否,選擇 persistentConsistencyService癌刽,也就是 PersistentConsistencyServiceDelegateImpl
    return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}

默認情況下役首,所有實例都是臨時實例,下面則關注DistroConsistencyServiceImpl類显拜。

  • ③ DistroConsistencyServiceImpl類

DistroConsistencyServiceImpl#put方法衡奥,如下:

@Override
public void put(String key, Record value) throws NacosException {
    // 異步,更新本地注冊表
    onPut(key, value);
    // 異步远荠,將數據同步給 Nacos 集群中的其他節(jié)點
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
            globalConfig.getTaskDispatchPeriod() / 2);
}

onPut(key, value);
key:ServiceManager#addInstance方法中的String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);矮固,臨時:com.alibaba.nacos.naming.iplist.ephemeral. + namespaceId + ## + serviceName
value:ServiceManager#addInstance方法中的instances.setInstanceList(instanceList);封裝了實例列表的Instances對象譬淳。

distroProtocol.sync(...)
是通過Distro協議將數據同步給集群中的其它Nacos節(jié)點档址。

【A、更新本地實例列表】

DistroConsistencyServiceImpl#onPut方法邻梆,如下:

public void onPut(String key, Record value) {
    // 判斷是否是臨時實例
    if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
        // 把實例列表封裝到 Datum
        Datum<Instances> datum = new Datum<>();
        // value 是服務中的實例列表 Instances
        datum.value = (Instances) value;
        // key 是 serviceId
        datum.key = key;
        datum.timestamp.incrementAndGet();
        // 以 serviceId 為 key守伸,Datum 為 value 緩存起來
        dataStore.put(key, datum);
    }
    //
    if (!listeners.containsKey(key)) {
        return;
    }
    // 【重點】把 serviceId 和當前操作類型存入 notifier
    notifier.addTask(key, DataOperation.CHANGE);
}

此處我們可以看到更新本地列表的操作最后交由notifier對象完成,notifier對象是DistroConsistencyServiceImpl的內部類實例浦妄,如下:

Notifier內部類.png

a尼摹、將變更事件放入阻塞隊列

該對象內部維護了一個阻塞隊列见芹,存放服務列表變更的事件,DistroConsistencyServiceImpl#Notifier#tasks屬性蠢涝,如下:

阻塞隊列屬性.png

DistroConsistencyServiceImpl#Notifier#addTask方法辆童,如下:

public void addTask(String datumKey, DataOperation action) {

    if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
        return;
    }
    if (action == DataOperation.CHANGE) {
        services.put(datumKey, StringUtils.EMPTY);
    }
    // 把 serviceId 和事件放入阻塞隊列
    tasks.offer(Pair.with(datumKey, action));
}

b、異步更新

DistroConsistencyServiceImpl#init方法惠赫,如下:

// 一個bean的初始化過程中,方法執(zhí)行先后順序為 Constructor > @Autowired > @PostConstruct
@PostConstruct // 在依賴加載后故黑,對象使用前執(zhí)行儿咱,而且只執(zhí)行一次
public void init() {
    // 利用線程池執(zhí)行 notifier
    // public class Notifier implements Runnable{...}
    GlobalExecutor.submitDistroNotifyTask(notifier);
}
單線程線程池.png

可以看到Notifier是通過一個單線程的線程池,來不斷從阻塞隊列中獲取任務场晶,執(zhí)行服務列表的更新混埠。

DistroConsistencyServiceImpl#Notifier#run方法,如下:

@Override
public void run() {
    Loggers.DISTRO.info("distro notifier started");
    // 死循環(huán)
    for (; ; ) {
        try {
            // 從阻塞隊列中獲取任務
            Pair<String, DataOperation> pair = tasks.take();
            // 執(zhí)行任務诗轻,更新服務列表
            handle(pair);
        } catch (Throwable e) {
            Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
        }
    }
}

DistroConsistencyServiceImpl#Notifier#handle方法钳宪,如下:

private void handle(Pair<String, DataOperation> pair) {
    try {
        // 獲取 serviceId
        String datumKey = pair.getValue0();
        // 事件類型,是 CHANGE 類型
        DataOperation action = pair.getValue1();

        services.remove(datumKey);

        int count = 0;

        if (!listeners.containsKey(datumKey)) {
            return;
        }

        for (RecordListener listener : listeners.get(datumKey)) {

            count++;

            try {
                if (action == DataOperation.CHANGE) {
                    // 【重點】這里的 listener 就是 service扳炬,當服務變更時吏颖,自然就觸發(fā)了 onChange 事件,處理變更
                    listener.onChange(datumKey, dataStore.get(datumKey).value);
                    continue;
                }

                if (action == DataOperation.DELETE) {
                    listener.onDelete(datumKey);
                    continue;
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
            }
        }

        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO
                    .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                            datumKey, count, action.name());
        }
    } catch (Throwable e) {
        Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
    }
}

c恨樟、覆蓋實例列表listener.onChange(datumKey, dataStore.get(datumKey).value);

Service#onChange方法半醉,如下:

@Override
public void onChange(String key, Instances value) throws Exception {

    Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
    // 對權重做初始化
    for (Instance instance : value.getInstanceList()) {

        if (instance == null) {
            // Reject this abnormal instance list:
            throw new RuntimeException("got null instance " + key);
        }

        if (instance.getWeight() > 10000.0D) {
            instance.setWeight(10000.0D);
        }

        if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
            instance.setWeight(0.01D);
        }
    }
    // 更新實例列表
    updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));

    recalculateChecksum();
}

Service#updateIPs方法,如下:

public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
    // 創(chuàng)建新的 map劝术,相當于一個新的 clusterMap
    Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
    for (String clusterName : clusterMap.keySet()) {
        ipMap.put(clusterName, new ArrayList<>());
    }
    // 把所有實例放入新的 clusterMap
    for (Instance instance : instances) {
        try {
            if (instance == null) {
                Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                continue;
            }

            if (StringUtils.isEmpty(instance.getClusterName())) {
                instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
            }

            if (!clusterMap.containsKey(instance.getClusterName())) {
                Loggers.SRV_LOG
                        .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                                instance.getClusterName(), instance.toJson());
                Cluster cluster = new Cluster(instance.getClusterName(), this);
                cluster.init();
                getClusterMap().put(instance.getClusterName(), cluster);
            }

            List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
            if (clusterIPs == null) {
                clusterIPs = new LinkedList<>();
                ipMap.put(instance.getClusterName(), clusterIPs);
            }

            clusterIPs.add(instance);
        } catch (Exception e) {
            Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
        }
    }

    for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
        //make every ip mine
        // 遍歷新的 clusterMap缩多,得到 cluster 中的實例列表
        List<Instance> entryIPs = entry.getValue();
        // 【重點】把新實例列表,更新到注冊表中的 cluster 中
        clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
    }

    setLastModifiedMillis(System.currentTimeMillis());
    getPushService().serviceChanged(this);
    StringBuilder stringBuilder = new StringBuilder();

    for (Instance instance : allIPs()) {
        stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
    }

    Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
            stringBuilder.toString());

}

Cluster#updateIps方法养晋,如下:

public void updateIps(List<Instance> ips, boolean ephemeral) {
    // 先得到舊的實例列表
    Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;

    HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());

    for (Instance ip : toUpdateInstances) {
        oldIpMap.put(ip.getDatumKey(), ip);
    }
    // ips 中包含兩部分:新增的實例衬吆,要更新的實例
    // 新舊實例列表交集,得到要更新的部分
    List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
    if (updatedIPs.size() > 0) {
        for (Instance ip : updatedIPs) {
            Instance oldIP = oldIpMap.get(ip.getDatumKey());

            // do not update the ip validation status of updated ips
            // because the checker has the most precise result
            // Only when ip is not marked, don't we update the health status of IP:
            if (!ip.isMarked()) {
                // 將實例的 health 保持為 oldInstance 的 health
                ip.setHealthy(oldIP.isHealthy());
            }

            if (ip.isHealthy() != oldIP.isHealthy()) {
                // ip validation status updated
                Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),
                        (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
            }

            if (ip.getWeight() != oldIP.getWeight()) {
                // ip validation status updated
                Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(),
                        ip.toString());
            }
        }
    }
    // 新舊實例列表相減绳泉,得到待新增的實例列表
    List<Instance> newIPs = subtract(ips, oldIpMap.values());
    if (newIPs.size() > 0) {
        Loggers.EVT_LOG
                .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
                        getName(), newIPs.size(), newIPs.toString());

        for (Instance ip : newIPs) {
            HealthCheckStatus.reset(ip);
        }
    }
    // 舊新實例列表相減逊抡,得到待刪除的實例列表(即舊實例列表有,而新實例列表沒有零酪,需刪除)
    List<Instance> deadIPs = subtract(oldIpMap.values(), ips);

    if (deadIPs.size() > 0) {
        Loggers.EVT_LOG
                .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
                        getName(), deadIPs.size(), deadIPs.toString());

        for (Instance ip : deadIPs) {
            HealthCheckStatus.remv(ip);
        }
    }

    toUpdateInstances = new HashSet<>(ips);
    // 用新實例列表直接覆蓋了 cluster 中的舊實例列表
    if (ephemeral) {
        ephemeralInstances = toUpdateInstances;
    } else {
        persistentInstances = toUpdateInstances;
    }
}

【B秦忿、集群數據同步】

DistroConsistencyServiceImpl#sync方法,如下:

public void sync(DistroKey distroKey, DataOperation action, long delay) {
    // 遍歷蛾娶,獲取 Nacos 集群中的所有成員灯谣,除了自己
    for (Member each : memberManager.allMembersWithoutSelf()) {
        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                each.getAddress());
        // 定義一個Distro的同步任務
        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
        // 交給線程池去執(zhí)行
        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
        }
    }
}

上述代碼中同步的任務封裝為一個DistroDelayTask對象,交給了distroTaskEngineHolder.getDelayTaskExecuteEngine()執(zhí)行蛔琅,其返回值為NacosDelayTaskExecuteEngine胎许,這個類維護了一個線程池,并且接收任務,執(zhí)行任務辜窑。

getDelayTaskExecuteEngine.png
DistroDelayTaskExecuteEngine.png
NacosDelayTaskExecuteEngine.png

NacosDelayTaskExecuteEngine#processTasks方法钩述,如下:

protected void processTasks() {
    Collection<Object> keys = getAllTaskKeys();
    for (Object taskKey : keys) {
        AbstractDelayTask task = removeTask(taskKey);
        if (null == task) {
            continue;
        }
        NacosTaskProcessor processor = getProcessor(taskKey);
        if (null == processor) {
            getEngineLog().error("processor not found for task, so discarded. " + task);
            continue;
        }
        try {
            // ReAdd task if process failed
            // 嘗試執(zhí)行同步任務,如果失敗會重試
            if (!processor.process(task)) {
                retryFailedTask(taskKey, task);
            }
        } catch (Throwable e) {
            getEngineLog().error("Nacos task execute error : " + e.toString(), e);
            retryFailedTask(taskKey, task);
        }
    }
}

可以看出來基于Distro模式的同步是異步進行的穆碎,并且失敗時會將任務重新入隊并充實牙勘,因此不保證同步結果的強一致性,屬于AP模式的一致性策略所禀。

3方面、服務端注冊的流程圖

服務端注冊的流程圖.png

四、Nacos服務注冊部分總結

1色徘、Nacos的注冊表結構

Nacos是多級存儲模型恭金,最外層通過namespace來實現環(huán)境隔離,然后是group分組褂策,分組下就是服務横腿,一個服務有可以分為不同的集群,集群中包含多個實例斤寂。

因此其注冊表結構為一個Map耿焊,類型是:

  • Map<String, Map<String, Service>>:外層key是namespace_id,內層key是group+serviceName遍搞;
  • ② Service內部維護一個Map搀别,結構是:Map<String,Cluster>,key是clusterName尾抑,值是集群信息歇父;
  • ③ Cluster內部維護一個Set集合Set<Instance> ephemeralInstancesSet<Instance> persistentInstances,元素是Instance類型再愈,代表集群中的多個實例榜苫。

2、Nacos保證并發(fā)寫的安全性

  • ① 在注冊實例時翎冲,會對service加鎖垂睬,不同service之間本身就不存在并發(fā)寫問題,互不影響抗悍;相同service時通過鎖來互斥驹饺。
  • ② 在更新實例列表時,是基于異步的線程池來完成缴渊,而線程池的線程數量為1赏壹。

3、Nacos避免并發(fā)讀寫的沖突

Nacos在更新實例列表時衔沼,會采用CopyOnWrite技術蝌借,首先將Old實例列表拷貝一份昔瞧,然后更新拷貝的實例列表,再用更新后的實例列表來覆蓋舊的實例列表菩佑。

4自晰、Nacos應對內部數十萬服務的并發(fā)寫請求

Nacos內部會將服務注冊的任務放入阻塞隊列,采用線程池異步來完成實例更新稍坯,從而提高并發(fā)寫能力酬荞。

五、結尾

以上即為Nacos源碼分析-服務注冊的全部內容瞧哟,感謝閱讀混巧。

?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市绢涡,隨后出現的幾起案子,更是在濱河造成了極大的恐慌遣疯,老刑警劉巖雄可,帶你破解...
    沈念sama閱讀 216,843評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異缠犀,居然都是意外死亡数苫,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 92,538評論 3 392
  • 文/潘曉璐 我一進店門辨液,熙熙樓的掌柜王于貴愁眉苦臉地迎上來虐急,“玉大人,你說我怎么就攤上這事滔迈≈褂酰” “怎么了?”我有些...
    開封第一講書人閱讀 163,187評論 0 353
  • 文/不壞的土叔 我叫張陵燎悍,是天一觀的道長敬惦。 經常有香客問我,道長谈山,這世上最難降的妖魔是什么俄删? 我笑而不...
    開封第一講書人閱讀 58,264評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮奏路,結果婚禮上畴椰,老公的妹妹穿的比我還像新娘。我一直安慰自己鸽粉,他們只是感情好斜脂,可當我...
    茶點故事閱讀 67,289評論 6 390
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著触机,像睡著了一般秽褒。 火紅的嫁衣襯著肌膚如雪壶硅。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,231評論 1 299
  • 那天销斟,我揣著相機與錄音庐椒,去河邊找鬼。 笑死蚂踊,一個胖子當著我的面吹牛约谈,可吹牛的內容都是我干的。 我是一名探鬼主播犁钟,決...
    沈念sama閱讀 40,116評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼棱诱,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了涝动?” 一聲冷哼從身側響起迈勋,我...
    開封第一講書人閱讀 38,945評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎醋粟,沒想到半個月后靡菇,有當地人在樹林里發(fā)現了一具尸體,經...
    沈念sama閱讀 45,367評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡米愿,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,581評論 2 333
  • 正文 我和宋清朗相戀三年厦凤,在試婚紗的時候發(fā)現自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片育苟。...
    茶點故事閱讀 39,754評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡较鼓,死狀恐怖,靈堂內的尸體忽然破棺而出违柏,到底是詐尸還是另有隱情博烂,我是刑警寧澤,帶...
    沈念sama閱讀 35,458評論 5 344
  • 正文 年R本政府宣布漱竖,位于F島的核電站脖母,受9級特大地震影響,放射性物質發(fā)生泄漏闲孤。R本人自食惡果不足惜谆级,卻給世界環(huán)境...
    茶點故事閱讀 41,068評論 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望讼积。 院中可真熱鬧肥照,春花似錦、人聲如沸勤众。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,692評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽们颜。三九已至吕朵,卻和暖如春猎醇,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背努溃。 一陣腳步聲響...
    開封第一講書人閱讀 32,842評論 1 269
  • 我被黑心中介騙來泰國打工硫嘶, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人梧税。 一個月前我還...
    沈念sama閱讀 47,797評論 2 369
  • 正文 我出身青樓沦疾,卻偏偏與公主長得像,于是被迫代替她去往敵國和親第队。 傳聞我的和親對象是個殘疾皇子哮塞,可洞房花燭夜當晚...
    茶點故事閱讀 44,654評論 2 354

推薦閱讀更多精彩內容