nacos2.0.4配置監(jiān)聽分析

本篇文章從界面發(fā)布配置開始,分析整個配置發(fā)布到應(yīng)用客戶端變更的通用過程

主體流程

客戶端

ClientWorker陶缺,每個5秒執(zhí)行一次配置監(jiān)聽(發(fā)送ConfigBatchListenRequest)钾挟。如果時間間隔超過5分鐘,則同步所有配置

服務(wù)端

ConfigController修改配置

  1. ConfigChangePublisher發(fā)布ConfigDataChangeEvent

  2. AsyncNotifyService監(jiān)聽ConfigDataChangeEvent
    2.1 執(zhí)行AsyncRpcTask(含有各個成員的NotifySingleRpcTask的queue)
    2.2 遍歷queue
    2.2.1 如果是自己饱岸,dump
    2.2.2 如果是成員掺出,異步同步配置改變ConfigChangeClusterSyncRequest(dataId, group, isBata, lastModified, tag, tenant)
    接收節(jié)點(diǎn),dump苫费,其余流程同3

  3. 異步執(zhí)行DumpProcessor
    3.1 從config_info里查詢出配置內(nèi)容
    3.2 執(zhí)行ConfigCacheService.dump
    3.2.1 保存到data目錄
    3.2.2 更新CacheItem的md5和修改時間汤锨,發(fā)布LocalDataChangeEvent事件

  4. RpcConfigChangeNotifier監(jiān)聽到LocalDataChangeEvent,遍歷監(jiān)聽的所有客戶端連接百框,構(gòu)建 ConfigChangeNotifyRequest闲礼,異步執(zhí)行RpcPushTask,推送變更

  5. 客戶端ClientWorker接受到ConfigChangeNotifyRequest事件后铐维,將對應(yīng)的緩存置為不同步柬泽,調(diào)用配置監(jiān)聽動作

服務(wù)端處理

  1. 頁面請求,可以看到嫁蛇,調(diào)用了/v1/cs/config接口,對應(yīng)的就是com.alibaba.nacos.config.server.controller.ConfigController#publishConfig方法


    頁面發(fā)布配置
  2. ConfigController#publishConfig處理
    在沒有灰度IP睬棚,沒有標(biāo)簽的情況下第煮,會保存配置信息到config_info表肌蜻,插入歷史記錄表his_config_info
    同時發(fā)布配置變更事件:ConfigDataChangeEvent
    具體代碼如下:
@PostMapping
    @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
    public Boolean publishConfig(...) throws NacosException {
        ...
        final Timestamp time = TimeUtils.getCurrentTime();
        # 灰度IP
        String betaIps = request.getHeader("betaIps");
        ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
        configInfo.setType(type);
        if (StringUtils.isBlank(betaIps)) {
            if (StringUtils.isBlank(tag)) {
                # 沒有標(biāo)簽砂竖,則保存信息到表config_info,插入歷史記錄表his_config_info
                persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
                # 發(fā)布ConfigDataChangeEvent事件
                ConfigChangePublisher
                        .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
            } else {
                persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
                ConfigChangePublisher.notifyConfigChange(
                        new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
            }
        } else {
            // beta publish
            persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
            ConfigChangePublisher
                    .notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
        }
        ConfigTraceService
                .logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(),
                        ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
        return true;
    }
  1. 接下來到監(jiān)聽配置變更事件的AsyncNotifyService
    首先是事先注冊好監(jiān)聽的事件
    事件處理器里匙监,可以看到遍歷成員新荤,會構(gòu)建兩個隊(duì)列揽趾,一個是http隊(duì)列,一個是rpc隊(duì)列苛骨。
    對于支持長連接的成員篱瞎,會構(gòu)建NotifySingleRpcTask任務(wù)苟呐,放入rpc隊(duì)列。
    對于rpc客戶端俐筋,異步執(zhí)行AsyncRpcTask任務(wù)
@Autowired
    public AsyncNotifyService(ServerMemberManager memberManager) {
        this.memberManager = memberManager;
        
        // Register ConfigDataChangeEvent to NotifyCenter.
        NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
        
        // Register A Subscriber to subscribe ConfigDataChangeEvent.
        NotifyCenter.registerSubscriber(new Subscriber() {
            
            @Override
            public void onEvent(Event event) {
                // Generate ConfigDataChangeEvent concurrently
                if (event instanceof ConfigDataChangeEvent) {
                    ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
                    long dumpTs = evt.lastModifiedTs;
                    String dataId = evt.dataId;
                    String group = evt.group;
                    String tenant = evt.tenant;
                    String tag = evt.tag;
                    Collection<Member> ipList = memberManager.allMembers();
                    
                    // In fact, any type of queue here can be
                    Queue<NotifySingleTask> httpQueue = new LinkedList<NotifySingleTask>();
                    Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<NotifySingleRpcTask>();
                    # 遍歷所有成員
                    for (Member member : ipList) {
                        if (!MemberUtil.isSupportedLongCon(member)) {
                            httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
                                    evt.isBeta));
                        } else {
                            rpcQueue.add(
                                    new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));
                        }
                    }
                    if (!httpQueue.isEmpty()) {
                        ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue));
                    }
                    if (!rpcQueue.isEmpty()) {
                        # 異步執(zhí)行rpc任務(wù)
                        ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
                    }
                    
                }
            }
            
            @Override
            public Class<? extends Event> subscribeType() {
                return ConfigDataChangeEvent.class;
            }
        });
    }
  1. AsyncNotifyService.AsyncRpcTask#run方法
    這里遍歷第3步構(gòu)建的隊(duì)列牵素,取出每個rpc任務(wù),構(gòu)建ConfigChangeClusterSyncRequest 請求
    4.1 如果成員是自己澄者,dump請求
    4.2 如果成員為成員列表里的其他成員笆呆,發(fā)送ConfigChangeClusterSyncRequest到對應(yīng)節(jié)點(diǎn)
public void run() {
            while (!queue.isEmpty()) {
                NotifySingleRpcTask task = queue.poll();
                
                ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();
                syncRequest.setDataId(task.getDataId());
                syncRequest.setGroup(task.getGroup());
                syncRequest.setBeta(task.isBeta);
                syncRequest.setLastModified(task.getLastModified());
                syncRequest.setTag(task.tag);
                syncRequest.setTenant(task.getTenant());
                Member member = task.member;
                # 任務(wù)的成員為自己
                if (memberManager.getSelf().equals(member)) {
                    if (syncRequest.isBeta()) {
                        dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
                                syncRequest.getLastModified(), NetUtils.localIP(), true);
                    } else {
                        # 正常發(fā)布
                        dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
                                syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());
                    }
                    continue;
                }
                # 成員列表里的其他成員
                if (memberManager.hasMember(member.getAddress())) {
                    // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
                    boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress());
                    if (unHealthNeedDelay) {
                        // target ip is unhealthy, then put it in the notification list
                        ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
                                task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
                                0, member.getAddress());
                        // get delay time and set fail count to the task
                        asyncTaskExecute(task);
                    } else {
    
                        if (!MemberUtil.isSupportedLongCon(member)) {
                            asyncTaskExecute(
                                    new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag,
                                            task.getLastModified(), member.getAddress(), task.isBeta));
                        } else {
                            try {
                                configClusterRpcClientProxy
                                        .syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));
                            } catch (Exception e) {
                                MetricsMonitor.getConfigNotifyException().increment();
                                asyncTaskExecute(task);
                            }
                        }
                      
                    }
                } else {
                    //No nothig if  member has offline.
                }
                
            }
        }

4.3 其他成員節(jié)點(diǎn),ConfigChangeClusterSyncRequestHandler粱挡,接收到ConfigChangeClusterSyncRequest 赠幕,同樣執(zhí)行dump操作


其他節(jié)點(diǎn),同樣dump

5 dump操作
com.alibaba.nacos.config.server.service.dump.DumpService#dump
5.1 添加一個DumpTask任務(wù)

public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
            boolean isBeta) {
        String groupKey = GroupKey2.getKey(dataId, group, tenant);
        String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
        dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
        DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
    }

5.2 在DumpService實(shí)例化時询筏,設(shè)置了dupTaskMgr的默認(rèn)任務(wù)處理器DumpProcessor


dumpTaskMgr默認(rèn)任務(wù)處理器

6 Dump任務(wù)處理
com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor#process
這里主要是根據(jù)任務(wù)里的參數(shù)查找配置信息榕堰,執(zhí)行DumpConfigHandler#configDump

public boolean process(NacosTask task) {
        final PersistService persistService = dumpService.getPersistService();
        DumpTask dumpTask = (DumpTask) task;
        String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());
        String dataId = pair[0];
        String group = pair[1];
        String tenant = pair[2];
        long lastModified = dumpTask.getLastModified();
        String handleIp = dumpTask.getHandleIp();
        boolean isBeta = dumpTask.isBeta();
        String tag = dumpTask.getTag();
        
        ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)
                .group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);
        
        if (isBeta) {
            // if publish beta, then dump config, update beta cache
            ConfigInfo4Beta cf = persistService.findConfigInfo4Beta(dataId, group, tenant);
            
            build.remove(Objects.isNull(cf));
            build.betaIps(Objects.isNull(cf) ? null : cf.getBetaIps());
            build.content(Objects.isNull(cf) ? null : cf.getContent());
            
            return DumpConfigHandler.configDump(build.build());
        }
        if (StringUtils.isBlank(tag)) {
            # 查找配置信息
            ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);

            build.remove(Objects.isNull(cf));
            build.content(Objects.isNull(cf) ? null : cf.getContent());
            build.type(Objects.isNull(cf) ? null : cf.getType());
        } else {
            ConfigInfo4Tag cf = persistService.findConfigInfo4Tag(dataId, group, tenant, tag);

            build.remove(Objects.isNull(cf));
            build.content(Objects.isNull(cf) ? null : cf.getContent());

        }
        return DumpConfigHandler.configDump(build.build());
    }

7 配置dump
com.alibaba.nacos.config.server.service.dump.DumpConfigHandler#configDump
更新事件中,調(diào)用ConfigCacheService.dump執(zhí)行dump操作

public static boolean configDump(ConfigDumpEvent event) {
        final String dataId = event.getDataId();
        final String group = event.getGroup();
        final String namespaceId = event.getNamespaceId();
        final String content = event.getContent();
        final String type = event.getType();
        final long lastModified = event.getLastModifiedTs();
        if (event.isBeta()) {
            ...
        }
        if (StringUtils.isBlank(event.getTag())) {
            ...
            boolean result;
            if (!event.isRemove()) {
               # 非刪除事件嫌套,執(zhí)行dump操作
                result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type);
                
                if (result) {
                    ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
                            ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
                            content.length());
                }
            } else {
                result = ConfigCacheService.remove(dataId, group, namespaceId);
                
                if (result) {
                    ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
                            ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
                }
            }
            return result;
        } else {
            ...
        }
        
    }

8 ConfigCacheService#dump操作
保存配置文件逆屡,更新cache里的md5值,同時發(fā)布LocalDataChangeEvent事件

public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
            String type) {
        String groupKey = GroupKey2.getKey(dataId, group, tenant);
        CacheItem ci = makeSure(groupKey);
        ci.setType(type);
        final int lockResult = tryWriteLock(groupKey);
        assert (lockResult != 0);
        
        if (lockResult < 0) {
            DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
            return false;
        }
        
        try {
            final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
            
            if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {
                DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
                                + "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
                        lastModifiedTs);
            } else if (!PropertyUtil.isDirectRead()) {
               # 保存配置到目錄/data/tenant-config-data
                DiskUtil.saveToDisk(dataId, group, tenant, content);
            }
            # 更新緩存里的md5踱讨,發(fā)布LocalDataChangeEvent事件
            updateMd5(groupKey, md5, lastModifiedTs);
            return true;
        } catch (IOException ioe) {
            DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);
            if (ioe.getMessage() != null) {
                String errMsg = ioe.getMessage();
                if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg
                        .contains(DISK_QUATA_EN)) {
                    // Protect from disk full.
                    FATAL_LOG.error("磁盤滿自殺退出", ioe);
                    System.exit(0);
                }
            }
            return false;
        } finally {
            releaseWriteLock(groupKey);
        }
    }

更新md5的操作魏蔗。更新cache里的md5和最后修改時間,發(fā)布LocalDataChangeEvent事件

public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
        CacheItem cache = makeSure(groupKey);
        if (cache.md5 == null || !cache.md5.equals(md5)) {
            cache.md5 = md5;
            cache.lastModifiedTs = lastModifiedTs;
            NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
        }
    }

9 RpcConfigChangeNotifier處理LocalDataChangeEvent事件
遍歷groupKey對應(yīng)的所有客戶端連接痹筛,構(gòu)造ConfigChangeNotifyRequest 請求莺治,推送給客戶端

public void onEvent(LocalDataChangeEvent event) {
        String groupKey = event.groupKey;
        boolean isBeta = event.isBeta;
        List<String> betaIps = event.betaIps;
        String[] strings = GroupKey.parseKey(groupKey);
        String dataId = strings[0];
        String group = strings[1];
        String tenant = strings.length > 2 ? strings[2] : "";
        String tag = event.tag;
        
        configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag);
        
    }

public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta,
            List<String> betaIps, String tag) {
        
        Set<String> listeners = configChangeListenContext.getListeners(groupKey);
        if (CollectionUtils.isEmpty(listeners)) {
            return;
        }
        int notifyClientCount = 0;
        for (final String client : listeners) {
            Connection connection = connectionManager.getConnection(client);
            if (connection == null) {
                continue;
            }

            //beta ips check.
            String clientIp = connection.getMetaInfo().getClientIp();
            String clientTag = connection.getMetaInfo().getTag();
            if (isBeta && betaIps != null && !betaIps.contains(clientIp)) {
                continue;
            }
            //tag check
            if (StringUtils.isNotBlank(tag) && !tag.equals(clientTag)) {
                continue;
            }

            ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant);

            RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp,
                    connection.getMetaInfo().getAppName());
            push(rpcPushRetryTask);
            notifyClientCount++;
        }
        Loggers.REMOTE_PUSH.info("push [{}] clients ,groupKey=[{}]", notifyClientCount, groupKey);
    }

客戶端處理

  1. 初始化ClientWorker,構(gòu)建ConfigRpcTransportClient味混,每隔5秒執(zhí)行配置監(jiān)聽


    初始化ConfigRpcTransportClient

    定期執(zhí)行配置監(jiān)聽
  2. 配置監(jiān)聽邏輯产雹,如果距離上次全量同步時間達(dá)到5分鐘诫惭,則全量同步
    2.1 遍歷所有緩存的數(shù)據(jù)翁锡,跳過已同步的緩存,根據(jù)緩存是否存在監(jiān)聽器夕土,構(gòu)造listenCachesMap和removeListenCachesMap
    2.2 遍歷listenCachesMap馆衔,構(gòu)造ConfigBatchListenRequest 請求,發(fā)送到服務(wù)端怨绣。根據(jù)響應(yīng)構(gòu)造changeKey角溃,從配置中心拉取配置,檢查監(jiān)聽器的md5和數(shù)據(jù)的md5是否一致篮撑,不一致就調(diào)用監(jiān)聽器的監(jiān)聽方法
    2.3 遍歷removeListenCachesMap减细,構(gòu)造ConfigBatchListenRequest 請求,移除服務(wù)端的監(jiān)聽赢笨。如果成功則移除本地緩存
public void executeConfigListen() {
            Map<String, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16);
            Map<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16);
            long now = System.currentTimeMillis();
            boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;
            for (CacheData cache : cacheMap.get().values()) {
                synchronized (cache) {
                    //check local listeners consistent.
                    # 如果本緩存已經(jīng)和服務(wù)端同步 && 不需要全量同步未蝌,就跳過處理
                    if (cache.isSyncWithServer()) {
                        cache.checkListenerMd5();
                        if (!needAllSync) {
                            continue;
                        }
                    }
                    
                    if (!CollectionUtils.isEmpty(cache.getListeners())) {
                        //get listen  config
                        if (!cache.isUseLocalConfigInfo()) {
                            List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));
                            if (cacheDatas == null) {
                                cacheDatas = new LinkedList<>();
                                listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
                            }
                            cacheDatas.add(cache);
                            
                        }
                    } else if (CollectionUtils.isEmpty(cache.getListeners())) {
                        if (!cache.isUseLocalConfigInfo()) {
                            List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));
                            if (cacheDatas == null) {
                                cacheDatas = new LinkedList<>();
                                removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
                            }
                            cacheDatas.add(cache);
                            
                        }
                    }
                }
                
            }
            
            boolean hasChangedKeys = false;
            # 有監(jiān)聽器的緩存處理
            if (!listenCachesMap.isEmpty()) {
                for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
                    String taskId = entry.getKey();
                    Map<String, Long> timestampMap = new HashMap<>(listenCachesMap.size() * 2);
                    
                    List<CacheData> listenCaches = entry.getValue();
                    for (CacheData cacheData : listenCaches) {
                        timestampMap.put(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant),
                                cacheData.getLastModifiedTs().longValue());
                    }
                    
                    ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
                    configChangeListenRequest.setListen(true);
                    try {
                        RpcClient rpcClient = ensureRpcClient(taskId);
                        ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(
                                rpcClient, configChangeListenRequest);
                        if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) {
                            
                            Set<String> changeKeys = new HashSet<String>();
                            //handle changed keys,notify listener
                            if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) {
                                hasChangedKeys = true;
                                for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse
                                        .getChangedConfigs()) {
                                    String changeKey = GroupKey
                                            .getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(),
                                                    changeConfig.getTenant());
                                    changeKeys.add(changeKey);
                                    boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
                                    refreshContentAndCheck(changeKey, !isInitializing);
                                }
                                
                            }
                            
                            //handler content configs
                            for (CacheData cacheData : listenCaches) {
                                String groupKey = GroupKey
                                        .getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant());
                                if (!changeKeys.contains(groupKey)) {
                                    //sync:cache data md5 = server md5 && cache data md5 = all listeners md5.
                                    synchronized (cacheData) {
                                        if (!cacheData.getListeners().isEmpty()) {
                                            
                                            Long previousTimesStamp = timestampMap.get(groupKey);
                                            if (previousTimesStamp != null && !cacheData.getLastModifiedTs().compareAndSet(previousTimesStamp,
                                                    System.currentTimeMillis())) {
                                                continue;
                                            }
                                            cacheData.setSyncWithServer(true);
                                        }
                                    }
                                }
                                
                                cacheData.setInitializing(false);
                            }
                            
                        }
                    } catch (Exception e) {
                        
                        LOGGER.error("Async listen config change error ", e);
                        try {
                            Thread.sleep(50L);
                        } catch (InterruptedException interruptedException) {
                            //ignore
                        }
                    }
                }
            }
            # 無監(jiān)聽器的緩存處理
            if (!removeListenCachesMap.isEmpty()) {
                for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) {
                    String taskId = entry.getKey();
                    List<CacheData> removeListenCaches = entry.getValue();
                    ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches);
                    configChangeListenRequest.setListen(false);
                    try {
                        RpcClient rpcClient = ensureRpcClient(taskId);
                        boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest);
                        if (removeSuccess) {
                            for (CacheData cacheData : removeListenCaches) {
                                synchronized (cacheData) {
                                    if (cacheData.getListeners().isEmpty()) {
                                        ClientWorker.this
                                                .removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);
                                    }
                                }
                            }
                        }
                        
                    } catch (Exception e) {
                        LOGGER.error("async remove listen config change error ", e);
                    }
                    try {
                        Thread.sleep(50L);
                    } catch (InterruptedException interruptedException) {
                        //ignore
                    }
                }
            }
            
            if (needAllSync) {
                lastAllSyncTime = now;
            }
            //If has changed keys,notify re sync md5.
            if (hasChangedKeys) {
                notifyListenConfig();
            }
        }

private void refreshContentAndCheck(String groupKey, boolean notify) {
        if (cacheMap.get() != null && cacheMap.get().containsKey(groupKey)) {
            CacheData cache = cacheMap.get().get(groupKey);
            # 獲取服務(wù)端配置驮吱,檢查md5和緩存的是否一致,不一致則執(zhí)行監(jiān)聽器方法
            refreshContentAndCheck(cache, notify);
        }
    }
    
    private void refreshContentAndCheck(CacheData cacheData, boolean notify) {
        try {
            # 獲取服務(wù)端配置
            ConfigResponse response = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L,
                    notify);
            cacheData.setContent(response.getContent());
            cacheData.setEncryptedDataKey(response.getEncryptedDataKey());
            if (null != response.getConfigType()) {
                cacheData.setType(response.getConfigType());
            }
            if (notify) {
                LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                        agent.getName(), cacheData.dataId, cacheData.group, cacheData.tenant, cacheData.getMd5(),
                        ContentUtils.truncateContent(response.getContent()), response.getConfigType());
            }
            # 緩存數(shù)據(jù)的md5和監(jiān)聽器的md5不一致萧吠,
            cacheData.checkListenerMd5();
        } catch (Exception e) {
            LOGGER.error("refresh content and check md5 fail ,dataId={},group={},tenant={} ", cacheData.dataId,
                    cacheData.group, cacheData.tenant, e);
        }
    }
  1. CacheData
    檢查監(jiān)聽器緩存的md5是否和數(shù)據(jù)的md5一致左冬,不一致則觸發(fā)監(jiān)聽器的receiveConfigChange,推送變更
void checkListenerMd5() {
        for (ManagerListenerWrap wrap : listeners) {
            if (!md5.equals(wrap.lastCallMd5)) {
                safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);
            }
        }
    }

private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
            final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {
        final Listener listener = listenerWrap.listener;
        if (listenerWrap.inNotifying) {
            LOGGER.warn(
                    "[{}] [notify-currentSkip] dataId={}, group={}, md5={}, listener={}, listener is not finish yet,will try next time.",
                    name, dataId, group, md5, listener);
            return;
        }
        Runnable job = () -> {
            long start = System.currentTimeMillis();
            ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
            ClassLoader appClassLoader = listener.getClass().getClassLoader();
            try {
                if (listener instanceof AbstractSharedListener) {
                    AbstractSharedListener adapter = (AbstractSharedListener) listener;
                    adapter.fillContext(dataId, group);
                    LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
                }
                // Before executing the callback, set the thread classloader to the classloader of
                // the specific webapp to avoid exceptions or misuses when calling the spi interface in
                // the callback method (this problem occurs only in multi-application deployment).
                Thread.currentThread().setContextClassLoader(appClassLoader);
                
                ConfigResponse cr = new ConfigResponse();
                cr.setDataId(dataId);
                cr.setGroup(group);
                cr.setContent(content);
                cr.setEncryptedDataKey(encryptedDataKey);
                configFilterChainManager.doFilter(null, cr);
                String contentTmp = cr.getContent();
                listenerWrap.inNotifying = true;
                // 監(jiān)聽器執(zhí)行監(jiān)聽動作
                listener.receiveConfigInfo(contentTmp);
                // compare lastContent and content
                if (listener instanceof AbstractConfigChangeListener) {
                    Map data = ConfigChangeHandler.getInstance()
                            .parseChangeData(listenerWrap.lastContent, content, type);
                    ConfigChangeEvent event = new ConfigChangeEvent(data);
                    ((AbstractConfigChangeListener) listener).receiveConfigChange(event);
                    listenerWrap.lastContent = content;
                }
                
                listenerWrap.lastCallMd5 = md5;
                LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ,cost={} millis.", name,
                        dataId, group, md5, listener, (System.currentTimeMillis() - start));
            } catch (NacosException ex) {
                LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
                        name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
            } catch (Throwable t) {
                LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
                        group, md5, listener, t.getCause());
            } finally {
                listenerWrap.inNotifying = false;
                Thread.currentThread().setContextClassLoader(myClassLoader);
            }
        };
        
        final long startNotify = System.currentTimeMillis();
        try {
            if (null != listener.getExecutor()) {
                listener.getExecutor().execute(job);
            } else {
                try {
                    INTERNAL_NOTIFIER.submit(job);
                } catch (RejectedExecutionException rejectedExecutionException) {
                    LOGGER.warn(
                            "[{}] [notify-blocked] dataId={}, group={}, md5={}, listener={}, no available internal notifier,will sync notifier ",
                            name, dataId, group, md5, listener);
                    job.run();
                } catch (Throwable throwable) {
                    LOGGER.error(
                            "[{}] [notify-blocked] dataId={}, group={}, md5={}, listener={}, submit internal async task fail,throwable= ",
                            name, dataId, group, md5, listener, throwable);
                    job.run();
                }
            }
        } catch (Throwable t) {
            LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,
                    group, md5, listener, t.getCause());
        }
        final long finishNotify = System.currentTimeMillis();
        LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
                name, (finishNotify - startNotify), dataId, group, md5, listener);
    }
  1. RpcClient監(jiān)聽服務(wù)端主動推送的配置變更ConfigChangeNotifyRequest
    這里會修改緩存的最后修改時間纸型,將緩存狀態(tài)置為不同步拇砰,觸發(fā)配置監(jiān)聽動作


    RpcClient監(jiān)聽配置變更
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市狰腌,隨后出現(xiàn)的幾起案子除破,更是在濱河造成了極大的恐慌,老刑警劉巖琼腔,帶你破解...
    沈念sama閱讀 216,544評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件皂岔,死亡現(xiàn)場離奇詭異,居然都是意外死亡展姐,警方通過查閱死者的電腦和手機(jī)躁垛,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來圾笨,“玉大人教馆,你說我怎么就攤上這事±薮铮” “怎么了土铺?”我有些...
    開封第一講書人閱讀 162,764評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長板鬓。 經(jīng)常有香客問我悲敷,道長,這世上最難降的妖魔是什么俭令? 我笑而不...
    開封第一講書人閱讀 58,193評論 1 292
  • 正文 為了忘掉前任后德,我火速辦了婚禮,結(jié)果婚禮上抄腔,老公的妹妹穿的比我還像新娘瓢湃。我一直安慰自己,他們只是感情好赫蛇,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,216評論 6 388
  • 文/花漫 我一把揭開白布绵患。 她就那樣靜靜地躺著,像睡著了一般悟耘。 火紅的嫁衣襯著肌膚如雪落蝙。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,182評論 1 299
  • 那天,我揣著相機(jī)與錄音筏勒,去河邊找鬼赚瘦。 笑死,一個胖子當(dāng)著我的面吹牛奏寨,可吹牛的內(nèi)容都是我干的起意。 我是一名探鬼主播,決...
    沈念sama閱讀 40,063評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼病瞳,長吁一口氣:“原來是場噩夢啊……” “哼揽咕!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起套菜,我...
    開封第一講書人閱讀 38,917評論 0 274
  • 序言:老撾萬榮一對情侶失蹤亲善,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后逗柴,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蛹头,經(jīng)...
    沈念sama閱讀 45,329評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,543評論 2 332
  • 正文 我和宋清朗相戀三年戏溺,在試婚紗的時候發(fā)現(xiàn)自己被綠了渣蜗。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,722評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡旷祸,死狀恐怖耕拷,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情托享,我是刑警寧澤骚烧,帶...
    沈念sama閱讀 35,425評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站闰围,受9級特大地震影響赃绊,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜羡榴,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,019評論 3 326
  • 文/蒙蒙 一碧查、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧炕矮,春花似錦么夫、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,671評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽涉枫。三九已至邢滑,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背困后。 一陣腳步聲響...
    開封第一講書人閱讀 32,825評論 1 269
  • 我被黑心中介騙來泰國打工乐纸, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人摇予。 一個月前我還...
    沈念sama閱讀 47,729評論 2 368
  • 正文 我出身青樓汽绢,卻偏偏與公主長得像,于是被迫代替她去往敵國和親侧戴。 傳聞我的和親對象是個殘疾皇子宁昭,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,614評論 2 353

推薦閱讀更多精彩內(nèi)容

  • Nacos主要有兩大功能:配置中心和服務(wù)注冊 配置中心 我們知道客戶端會有一個長輪訓(xùn)的任務(wù)去檢查服務(wù)器端的配置是否...
    WEIJAVA閱讀 16,481評論 1 1
  • 要分析Nacos源碼,好歹我們也通過源碼啟動起來酗宋,這樣也方便我們debug代碼积仗。注:nacos1.1.3 文章篇幅...
    Visonwu閱讀 16,249評論 0 3
  • 配置中心 配置中心簡介 說到配置中心, 大家可能都不陌生蜕猫。我們攜程現(xiàn)在用的qconfig寂曹, 就是一個典型的配置中心...
    窩牛狂奔閱讀 7,963評論 0 3
  • 動態(tài)配置管理是 Nacos 的三大功能之一回右,通過動態(tài)配置服務(wù)隆圆,我們可以在所有環(huán)境中以集中和動態(tài)的方式管理所有應(yīng)用程...
    逅弈閱讀 75,831評論 9 101
  • 首先需要引入nacos config jar包 復(fù)制代碼 <dependency> <groupId>com....
    聯(lián)旺閱讀 4,470評論 0 0