聊聊nacos ServiceManager的updateInstance

本文主要研究一下nacos ServiceManager的updateInstance

ServiceManager

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component
@DependsOn("nacosApplicationContext")
public class ServiceManager implements RecordListener<Service> {

    /**
     * Map<namespace, Map<group::serviceName, Service>>
     */
    private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

    private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);

    private Synchronizer synchronizer = new ServiceStatusSynchronizer();

    private final Lock lock = new ReentrantLock();

    @Resource(name = "consistencyDelegate")
    private ConsistencyService consistencyService;

    @Autowired
    private SwitchDomain switchDomain;

    @Autowired
    private DistroMapper distroMapper;

    @Autowired
    private ServerListManager serverListManager;

    @Autowired
    private PushService pushService;

    private final Object putServiceLock = new Object();

    //......

    public void updateInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {

        Service service = getService(namespaceId, serviceName);

        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }

        if (!service.allIPs().contains(instance)) {
            throw new NacosException(NacosException.INVALID_PARAM, "instance not exist: " + instance);
        }

        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {

        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

        Service service = getService(namespaceId, serviceName);

        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

        Instances instances = new Instances();
        instances.setInstanceList(instanceList);

        consistencyService.put(key, instances);
    }

    public List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
        return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
    }

    public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {

        Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));

        Map<String, Instance> oldInstanceMap = new HashMap<>(16);
        List<Instance> currentIPs = service.allIPs(ephemeral);
        Map<String, Instance> map = new ConcurrentHashMap<>(currentIPs.size());

        for (Instance instance : currentIPs) {
            map.put(instance.toIPAddr(), instance);
        }
        if (datum != null) {
            oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(), map);
        }

        // use HashMap for deep copy:
        HashMap<String, Instance> instanceMap = new HashMap<>(oldInstanceMap.size());
        instanceMap.putAll(oldInstanceMap);

        for (Instance instance : ips) {
            if (!service.getClusterMap().containsKey(instance.getClusterName())) {
                Cluster cluster = new Cluster(instance.getClusterName(), service);
                cluster.init();
                service.getClusterMap().put(instance.getClusterName(), cluster);
                Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                    instance.getClusterName(), instance.toJSON());
            }

            if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
                instanceMap.remove(instance.getDatumKey());
            } else {
                instanceMap.put(instance.getDatumKey(), instance);
            }

        }

        if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
            throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: "
                + JSON.toJSONString(instanceMap.values()));
        }

        return new ArrayList<>(instanceMap.values());
    }

    //......
 }
  • updateInstance會通過service.allIPs().contains(instance)校驗要更新的instance是否存在,不存在則拋出NacosException榄审,存在則執(zhí)行addInstance方法
  • addInstance方法它會獲取service,然后執(zhí)行addIpAddresses摩桶,最后執(zhí)行consistencyService.put爬骤;addIpAddresses調(diào)用的是updateIpAddresses方法,其action參數(shù)為UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD
  • updateIpAddresses方法首先從consistencyService獲取datum,然后通過service.allIPs方法獲取currentIPs改含,之后根據(jù)datum設(shè)置oldInstanceMap闻葵,對于UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE類型執(zhí)行刪除民泵,其余的action則將instance方法到instanceMap中

DistroConsistencyServiceImpl.put

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java

@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {

    private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);

            t.setDaemon(true);
            t.setName("com.alibaba.nacos.naming.distro.notifier");

            return t;
        }
    });

    @Autowired
    private DistroMapper distroMapper;

    @Autowired
    private DataStore dataStore;

    @Autowired
    private TaskDispatcher taskDispatcher;

    @Autowired
    private DataSyncer dataSyncer;

    @Autowired
    private Serializer serializer;

    @Autowired
    private ServerListManager serverListManager;

    @Autowired
    private SwitchDomain switchDomain;

    @Autowired
    private GlobalConfig globalConfig;

    private boolean initialized = false;

    public volatile Notifier notifier = new Notifier();

    private Map<String, CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>();

    private Map<String, String> syncChecksumTasks = new ConcurrentHashMap<>(16);

    //......

    public void put(String key, Record value) throws NacosException {
        onPut(key, value);
        taskDispatcher.addTask(key);
    }

    public void onPut(String key, Record value) {

        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            Datum<Instances> datum = new Datum<>();
            datum.value = (Instances) value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
            dataStore.put(key, datum);
        }

        if (!listeners.containsKey(key)) {
            return;
        }

        notifier.addTask(key, ApplyAction.CHANGE);
    }
    //......
}
  • DistroConsistencyServiceImpl的put方法會先執(zhí)行onPut,然后執(zhí)行taskDispatcher.addTask(key)槽畔;onPut在判斷key是ephemeralInstanceListKey時會創(chuàng)建一個Datum栈妆,遞增其timestamp,然后放到dataStore中厢钧,最后調(diào)用notifier.addTask(key, ApplyAction.CHANGE)

Notifier.addTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java

    public class Notifier implements Runnable {

        private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);

        private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);

        public void addTask(String datumKey, ApplyAction action) {

            if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
                return;
            }
            if (action == ApplyAction.CHANGE) {
                services.put(datumKey, StringUtils.EMPTY);
            }
            tasks.add(Pair.with(datumKey, action));
        }

        public int getTaskSize() {
            return tasks.size();
        }

        @Override
        public void run() {
            Loggers.DISTRO.info("distro notifier started");

            while (true) {
                try {

                    Pair pair = tasks.take();

                    if (pair == null) {
                        continue;
                    }

                    String datumKey = (String) pair.getValue0();
                    ApplyAction action = (ApplyAction) pair.getValue1();

                    services.remove(datumKey);

                    int count = 0;

                    if (!listeners.containsKey(datumKey)) {
                        continue;
                    }

                    for (RecordListener listener : listeners.get(datumKey)) {

                        count++;

                        try {
                            if (action == ApplyAction.CHANGE) {
                                listener.onChange(datumKey, dataStore.get(datumKey).value);
                                continue;
                            }

                            if (action == ApplyAction.DELETE) {
                                listener.onDelete(datumKey);
                                continue;
                            }
                        } catch (Throwable e) {
                            Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                        }
                    }

                    if (Loggers.DISTRO.isDebugEnabled()) {
                        Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                            datumKey, count, action.name());
                    }
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                }
            }
        }
    }
  • Notifier的addTask方法對于action為ApplyAction.CHANGE的且不在services當(dāng)中的會放入到services當(dāng)中鳞尔,最后添加到tasks;run方法會不斷從tasks取出數(shù)據(jù)早直,執(zhí)行相應(yīng)的回調(diào)

TaskDispatcher.addTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/TaskDispatcher.java

@Component
public class TaskDispatcher {

    @Autowired
    private GlobalConfig partitionConfig;

    @Autowired
    private DataSyncer dataSyncer;

    private List<TaskScheduler> taskSchedulerList = new ArrayList<>();

    private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();

    @PostConstruct
    public void init() {
        for (int i = 0; i < cpuCoreCount; i++) {
            TaskScheduler taskScheduler = new TaskScheduler(i);
            taskSchedulerList.add(taskScheduler);
            GlobalExecutor.submitTaskDispatch(taskScheduler);
        }
    }

    public void addTask(String key) {
        taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
    }

    public class TaskScheduler implements Runnable {

        private int index;

        private int dataSize = 0;

        private long lastDispatchTime = 0L;

        private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);

        public TaskScheduler(int index) {
            this.index = index;
        }

        public void addTask(String key) {
            queue.offer(key);
        }

        public int getIndex() {
            return index;
        }

        @Override
        public void run() {

            List<String> keys = new ArrayList<>();
            while (true) {

                try {

                    String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
                        TimeUnit.MILLISECONDS);

                    if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                        Loggers.DISTRO.debug("got key: {}", key);
                    }

                    if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
                        continue;
                    }

                    if (StringUtils.isBlank(key)) {
                        continue;
                    }

                    if (dataSize == 0) {
                        keys = new ArrayList<>();
                    }

                    keys.add(key);
                    dataSize++;

                    if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
                        (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {

                        for (Server member : dataSyncer.getServers()) {
                            if (NetUtils.localServer().equals(member.getKey())) {
                                continue;
                            }
                            SyncTask syncTask = new SyncTask();
                            syncTask.setKeys(keys);
                            syncTask.setTargetServer(member.getKey());

                            if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                                Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
                            }

                            dataSyncer.submit(syncTask, 0);
                        }
                        lastDispatchTime = System.currentTimeMillis();
                        dataSize = 0;
                    }

                } catch (Exception e) {
                    Loggers.DISTRO.error("dispatch sync task failed.", e);
                }
            }
        }
    }
}
  • TaskDispatcher的addTask方法會從taskSchedulerList獲取指定的TaskScheduler寥假,然后執(zhí)行其addTask方法;TaskScheduler的addTask方法會往queue中添加數(shù)據(jù)莽鸿,而run方法則不斷從queue取數(shù)據(jù)昧旨,然后通過dataSyncer執(zhí)行syncTask

SyncTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/SyncTask.java

public class SyncTask {

    private List<String> keys;

    private int retryCount;

    private long lastExecuteTime;

    private String targetServer;

    public List<String> getKeys() {
        return keys;
    }

    public void setKeys(List<String> keys) {
        this.keys = keys;
    }

    public int getRetryCount() {
        return retryCount;
    }

    public void setRetryCount(int retryCount) {
        this.retryCount = retryCount;
    }

    public long getLastExecuteTime() {
        return lastExecuteTime;
    }

    public void setLastExecuteTime(long lastExecuteTime) {
        this.lastExecuteTime = lastExecuteTime;
    }

    public String getTargetServer() {
        return targetServer;
    }

    public void setTargetServer(String targetServer) {
        this.targetServer = targetServer;
    }
}
  • SyncTask包含了keys、targetServer屬性祥得,其中targetServer用于告訴DataSyncer該往哪個server執(zhí)行sync操作

小結(jié)

  • updateInstance會通過service.allIPs().contains(instance)校驗要更新的instance是否存在兔沃,不存在則拋出NacosException,存在則執(zhí)行addInstance方法
  • addInstance方法它會獲取service级及,然后執(zhí)行addIpAddresses乒疏,最后執(zhí)行consistencyService.put;addIpAddresses調(diào)用的是updateIpAddresses方法饮焦,其action參數(shù)為UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD
  • updateIpAddresses方法首先從consistencyService獲取datum怕吴,然后通過service.allIPs方法獲取currentIPs,之后根據(jù)datum設(shè)置oldInstanceMap县踢,對于UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE類型執(zhí)行刪除转绷,其余的action則將instance方法到instanceMap中

doc

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市硼啤,隨后出現(xiàn)的幾起案子议经,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,729評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件煞肾,死亡現(xiàn)場離奇詭異咧织,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)籍救,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,226評論 3 399
  • 文/潘曉璐 我一進(jìn)店門习绢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人蝙昙,你說我怎么就攤上這事闪萄。” “怎么了奇颠?”我有些...
    開封第一講書人閱讀 169,461評論 0 362
  • 文/不壞的土叔 我叫張陵桃煎,是天一觀的道長。 經(jīng)常有香客問我大刊,道長为迈,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,135評論 1 300
  • 正文 為了忘掉前任缺菌,我火速辦了婚禮葫辐,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘伴郁。我一直安慰自己耿战,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,130評論 6 398
  • 文/花漫 我一把揭開白布焊傅。 她就那樣靜靜地躺著剂陡,像睡著了一般。 火紅的嫁衣襯著肌膚如雪狐胎。 梳的紋絲不亂的頭發(fā)上鸭栖,一...
    開封第一講書人閱讀 52,736評論 1 312
  • 那天,我揣著相機(jī)與錄音握巢,去河邊找鬼晕鹊。 笑死,一個胖子當(dāng)著我的面吹牛暴浦,可吹牛的內(nèi)容都是我干的溅话。 我是一名探鬼主播,決...
    沈念sama閱讀 41,179評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼歌焦,長吁一口氣:“原來是場噩夢啊……” “哼飞几!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起独撇,我...
    開封第一講書人閱讀 40,124評論 0 277
  • 序言:老撾萬榮一對情侶失蹤屑墨,失蹤者是張志新(化名)和其女友劉穎窟社,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體绪钥,經(jīng)...
    沈念sama閱讀 46,657評論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,723評論 3 342
  • 正文 我和宋清朗相戀三年关炼,在試婚紗的時候發(fā)現(xiàn)自己被綠了程腹。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,872評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡儒拂,死狀恐怖寸潦,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情社痛,我是刑警寧澤见转,帶...
    沈念sama閱讀 36,533評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站蒜哀,受9級特大地震影響斩箫,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜撵儿,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,213評論 3 336
  • 文/蒙蒙 一乘客、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧淀歇,春花似錦易核、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,700評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽漱逸。三九已至,卻和暖如春碰逸,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背阔加。 一陣腳步聲響...
    開封第一講書人閱讀 33,819評論 1 274
  • 我被黑心中介騙來泰國打工花竞, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人掸哑。 一個月前我還...
    沈念sama閱讀 49,304評論 3 379
  • 正文 我出身青樓约急,卻偏偏與公主長得像,于是被迫代替她去往敵國和親苗分。 傳聞我的和親對象是個殘疾皇子厌蔽,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,876評論 2 361

推薦閱讀更多精彩內(nèi)容