本篇文章從界面發(fā)布配置開始,分析整個配置發(fā)布到應(yīng)用客戶端變更的通用過程
主體流程
客戶端
ClientWorker陶缺,每個5秒執(zhí)行一次配置監(jiān)聽(發(fā)送ConfigBatchListenRequest)钾挟。如果時間間隔超過5分鐘,則同步所有配置
服務(wù)端
ConfigController修改配置
ConfigChangePublisher發(fā)布ConfigDataChangeEvent
AsyncNotifyService監(jiān)聽ConfigDataChangeEvent
2.1 執(zhí)行AsyncRpcTask(含有各個成員的NotifySingleRpcTask的queue)
2.2 遍歷queue
2.2.1 如果是自己饱岸,dump
2.2.2 如果是成員掺出,異步同步配置改變ConfigChangeClusterSyncRequest(dataId, group, isBata, lastModified, tag, tenant)
接收節(jié)點(diǎn),dump苫费,其余流程同3異步執(zhí)行DumpProcessor
3.1 從config_info里查詢出配置內(nèi)容
3.2 執(zhí)行ConfigCacheService.dump
3.2.1 保存到data目錄
3.2.2 更新CacheItem的md5和修改時間汤锨,發(fā)布LocalDataChangeEvent事件RpcConfigChangeNotifier監(jiān)聽到LocalDataChangeEvent,遍歷監(jiān)聽的所有客戶端連接百框,構(gòu)建 ConfigChangeNotifyRequest闲礼,異步執(zhí)行RpcPushTask,推送變更
客戶端ClientWorker接受到ConfigChangeNotifyRequest事件后铐维,將對應(yīng)的緩存置為不同步柬泽,調(diào)用配置監(jiān)聽動作
服務(wù)端處理
-
頁面請求,可以看到嫁蛇,調(diào)用了/v1/cs/config接口,對應(yīng)的就是com.alibaba.nacos.config.server.controller.ConfigController#publishConfig方法
- ConfigController#publishConfig處理
在沒有灰度IP睬棚,沒有標(biāo)簽的情況下第煮,會保存配置信息到config_info表肌蜻,插入歷史記錄表his_config_info
同時發(fā)布配置變更事件:ConfigDataChangeEvent
具體代碼如下:
@PostMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean publishConfig(...) throws NacosException {
...
final Timestamp time = TimeUtils.getCurrentTime();
# 灰度IP
String betaIps = request.getHeader("betaIps");
ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
configInfo.setType(type);
if (StringUtils.isBlank(betaIps)) {
if (StringUtils.isBlank(tag)) {
# 沒有標(biāo)簽砂竖,則保存信息到表config_info,插入歷史記錄表his_config_info
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
# 發(fā)布ConfigDataChangeEvent事件
ConfigChangePublisher
.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
} else {
persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
ConfigChangePublisher.notifyConfigChange(
new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
}
} else {
// beta publish
persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
ConfigChangePublisher
.notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
}
ConfigTraceService
.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(),
ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
return true;
}
- 接下來到監(jiān)聽配置變更事件的AsyncNotifyService
首先是事先注冊好監(jiān)聽的事件
事件處理器里匙监,可以看到遍歷成員新荤,會構(gòu)建兩個隊(duì)列揽趾,一個是http隊(duì)列,一個是rpc隊(duì)列苛骨。
對于支持長連接的成員篱瞎,會構(gòu)建NotifySingleRpcTask任務(wù)苟呐,放入rpc隊(duì)列。
對于rpc客戶端俐筋,異步執(zhí)行AsyncRpcTask任務(wù)
@Autowired
public AsyncNotifyService(ServerMemberManager memberManager) {
this.memberManager = memberManager;
// Register ConfigDataChangeEvent to NotifyCenter.
NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
// Register A Subscriber to subscribe ConfigDataChangeEvent.
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
// Generate ConfigDataChangeEvent concurrently
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = evt.group;
String tenant = evt.tenant;
String tag = evt.tag;
Collection<Member> ipList = memberManager.allMembers();
// In fact, any type of queue here can be
Queue<NotifySingleTask> httpQueue = new LinkedList<NotifySingleTask>();
Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<NotifySingleRpcTask>();
# 遍歷所有成員
for (Member member : ipList) {
if (!MemberUtil.isSupportedLongCon(member)) {
httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
evt.isBeta));
} else {
rpcQueue.add(
new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));
}
}
if (!httpQueue.isEmpty()) {
ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue));
}
if (!rpcQueue.isEmpty()) {
# 異步執(zhí)行rpc任務(wù)
ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
}
}
}
@Override
public Class<? extends Event> subscribeType() {
return ConfigDataChangeEvent.class;
}
});
}
- AsyncNotifyService.AsyncRpcTask#run方法
這里遍歷第3步構(gòu)建的隊(duì)列牵素,取出每個rpc任務(wù),構(gòu)建ConfigChangeClusterSyncRequest 請求
4.1 如果成員是自己澄者,dump請求
4.2 如果成員為成員列表里的其他成員笆呆,發(fā)送ConfigChangeClusterSyncRequest到對應(yīng)節(jié)點(diǎn)
public void run() {
while (!queue.isEmpty()) {
NotifySingleRpcTask task = queue.poll();
ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();
syncRequest.setDataId(task.getDataId());
syncRequest.setGroup(task.getGroup());
syncRequest.setBeta(task.isBeta);
syncRequest.setLastModified(task.getLastModified());
syncRequest.setTag(task.tag);
syncRequest.setTenant(task.getTenant());
Member member = task.member;
# 任務(wù)的成員為自己
if (memberManager.getSelf().equals(member)) {
if (syncRequest.isBeta()) {
dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
syncRequest.getLastModified(), NetUtils.localIP(), true);
} else {
# 正常發(fā)布
dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());
}
continue;
}
# 成員列表里的其他成員
if (memberManager.hasMember(member.getAddress())) {
// start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress());
if (unHealthNeedDelay) {
// target ip is unhealthy, then put it in the notification list
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
0, member.getAddress());
// get delay time and set fail count to the task
asyncTaskExecute(task);
} else {
if (!MemberUtil.isSupportedLongCon(member)) {
asyncTaskExecute(
new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag,
task.getLastModified(), member.getAddress(), task.isBeta));
} else {
try {
configClusterRpcClientProxy
.syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));
} catch (Exception e) {
MetricsMonitor.getConfigNotifyException().increment();
asyncTaskExecute(task);
}
}
}
} else {
//No nothig if member has offline.
}
}
}
4.3 其他成員節(jié)點(diǎn),ConfigChangeClusterSyncRequestHandler粱挡,接收到ConfigChangeClusterSyncRequest 赠幕,同樣執(zhí)行dump操作
5 dump操作
com.alibaba.nacos.config.server.service.dump.DumpService#dump
5.1 添加一個DumpTask任務(wù)
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
boolean isBeta) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}
5.2 在DumpService實(shí)例化時询筏,設(shè)置了dupTaskMgr的默認(rèn)任務(wù)處理器DumpProcessor
6 Dump任務(wù)處理
com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor#process
這里主要是根據(jù)任務(wù)里的參數(shù)查找配置信息榕堰,執(zhí)行DumpConfigHandler#configDump
public boolean process(NacosTask task) {
final PersistService persistService = dumpService.getPersistService();
DumpTask dumpTask = (DumpTask) task;
String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());
String dataId = pair[0];
String group = pair[1];
String tenant = pair[2];
long lastModified = dumpTask.getLastModified();
String handleIp = dumpTask.getHandleIp();
boolean isBeta = dumpTask.isBeta();
String tag = dumpTask.getTag();
ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)
.group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);
if (isBeta) {
// if publish beta, then dump config, update beta cache
ConfigInfo4Beta cf = persistService.findConfigInfo4Beta(dataId, group, tenant);
build.remove(Objects.isNull(cf));
build.betaIps(Objects.isNull(cf) ? null : cf.getBetaIps());
build.content(Objects.isNull(cf) ? null : cf.getContent());
return DumpConfigHandler.configDump(build.build());
}
if (StringUtils.isBlank(tag)) {
# 查找配置信息
ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);
build.remove(Objects.isNull(cf));
build.content(Objects.isNull(cf) ? null : cf.getContent());
build.type(Objects.isNull(cf) ? null : cf.getType());
} else {
ConfigInfo4Tag cf = persistService.findConfigInfo4Tag(dataId, group, tenant, tag);
build.remove(Objects.isNull(cf));
build.content(Objects.isNull(cf) ? null : cf.getContent());
}
return DumpConfigHandler.configDump(build.build());
}
7 配置dump
com.alibaba.nacos.config.server.service.dump.DumpConfigHandler#configDump
更新事件中,調(diào)用ConfigCacheService.dump執(zhí)行dump操作
public static boolean configDump(ConfigDumpEvent event) {
final String dataId = event.getDataId();
final String group = event.getGroup();
final String namespaceId = event.getNamespaceId();
final String content = event.getContent();
final String type = event.getType();
final long lastModified = event.getLastModifiedTs();
if (event.isBeta()) {
...
}
if (StringUtils.isBlank(event.getTag())) {
...
boolean result;
if (!event.isRemove()) {
# 非刪除事件嫌套,執(zhí)行dump操作
result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type);
if (result) {
ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
content.length());
}
} else {
result = ConfigCacheService.remove(dataId, group, namespaceId);
if (result) {
ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
}
}
return result;
} else {
...
}
}
8 ConfigCacheService#dump操作
保存配置文件逆屡,更新cache里的md5值,同時發(fā)布LocalDataChangeEvent事件
public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
String type) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
CacheItem ci = makeSure(groupKey);
ci.setType(type);
final int lockResult = tryWriteLock(groupKey);
assert (lockResult != 0);
if (lockResult < 0) {
DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
return false;
}
try {
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {
DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
+ "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
lastModifiedTs);
} else if (!PropertyUtil.isDirectRead()) {
# 保存配置到目錄/data/tenant-config-data
DiskUtil.saveToDisk(dataId, group, tenant, content);
}
# 更新緩存里的md5踱讨,發(fā)布LocalDataChangeEvent事件
updateMd5(groupKey, md5, lastModifiedTs);
return true;
} catch (IOException ioe) {
DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);
if (ioe.getMessage() != null) {
String errMsg = ioe.getMessage();
if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg
.contains(DISK_QUATA_EN)) {
// Protect from disk full.
FATAL_LOG.error("磁盤滿自殺退出", ioe);
System.exit(0);
}
}
return false;
} finally {
releaseWriteLock(groupKey);
}
}
更新md5的操作魏蔗。更新cache里的md5和最后修改時間,發(fā)布LocalDataChangeEvent事件
public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
CacheItem cache = makeSure(groupKey);
if (cache.md5 == null || !cache.md5.equals(md5)) {
cache.md5 = md5;
cache.lastModifiedTs = lastModifiedTs;
NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
}
}
9 RpcConfigChangeNotifier處理LocalDataChangeEvent事件
遍歷groupKey對應(yīng)的所有客戶端連接痹筛,構(gòu)造ConfigChangeNotifyRequest 請求莺治,推送給客戶端
public void onEvent(LocalDataChangeEvent event) {
String groupKey = event.groupKey;
boolean isBeta = event.isBeta;
List<String> betaIps = event.betaIps;
String[] strings = GroupKey.parseKey(groupKey);
String dataId = strings[0];
String group = strings[1];
String tenant = strings.length > 2 ? strings[2] : "";
String tag = event.tag;
configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag);
}
public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta,
List<String> betaIps, String tag) {
Set<String> listeners = configChangeListenContext.getListeners(groupKey);
if (CollectionUtils.isEmpty(listeners)) {
return;
}
int notifyClientCount = 0;
for (final String client : listeners) {
Connection connection = connectionManager.getConnection(client);
if (connection == null) {
continue;
}
//beta ips check.
String clientIp = connection.getMetaInfo().getClientIp();
String clientTag = connection.getMetaInfo().getTag();
if (isBeta && betaIps != null && !betaIps.contains(clientIp)) {
continue;
}
//tag check
if (StringUtils.isNotBlank(tag) && !tag.equals(clientTag)) {
continue;
}
ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant);
RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp,
connection.getMetaInfo().getAppName());
push(rpcPushRetryTask);
notifyClientCount++;
}
Loggers.REMOTE_PUSH.info("push [{}] clients ,groupKey=[{}]", notifyClientCount, groupKey);
}
客戶端處理
-
初始化ClientWorker,構(gòu)建ConfigRpcTransportClient味混,每隔5秒執(zhí)行配置監(jiān)聽
- 配置監(jiān)聽邏輯产雹,如果距離上次全量同步時間達(dá)到5分鐘诫惭,則全量同步
2.1 遍歷所有緩存的數(shù)據(jù)翁锡,跳過已同步的緩存,根據(jù)緩存是否存在監(jiān)聽器夕土,構(gòu)造listenCachesMap和removeListenCachesMap
2.2 遍歷listenCachesMap馆衔,構(gòu)造ConfigBatchListenRequest 請求,發(fā)送到服務(wù)端怨绣。根據(jù)響應(yīng)構(gòu)造changeKey角溃,從配置中心拉取配置,檢查監(jiān)聽器的md5和數(shù)據(jù)的md5是否一致篮撑,不一致就調(diào)用監(jiān)聽器的監(jiān)聽方法
2.3 遍歷removeListenCachesMap减细,構(gòu)造ConfigBatchListenRequest 請求,移除服務(wù)端的監(jiān)聽赢笨。如果成功則移除本地緩存
public void executeConfigListen() {
Map<String, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16);
Map<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16);
long now = System.currentTimeMillis();
boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;
for (CacheData cache : cacheMap.get().values()) {
synchronized (cache) {
//check local listeners consistent.
# 如果本緩存已經(jīng)和服務(wù)端同步 && 不需要全量同步未蝌,就跳過處理
if (cache.isSyncWithServer()) {
cache.checkListenerMd5();
if (!needAllSync) {
continue;
}
}
if (!CollectionUtils.isEmpty(cache.getListeners())) {
//get listen config
if (!cache.isUseLocalConfigInfo()) {
List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));
if (cacheDatas == null) {
cacheDatas = new LinkedList<>();
listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
}
cacheDatas.add(cache);
}
} else if (CollectionUtils.isEmpty(cache.getListeners())) {
if (!cache.isUseLocalConfigInfo()) {
List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));
if (cacheDatas == null) {
cacheDatas = new LinkedList<>();
removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
}
cacheDatas.add(cache);
}
}
}
}
boolean hasChangedKeys = false;
# 有監(jiān)聽器的緩存處理
if (!listenCachesMap.isEmpty()) {
for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
String taskId = entry.getKey();
Map<String, Long> timestampMap = new HashMap<>(listenCachesMap.size() * 2);
List<CacheData> listenCaches = entry.getValue();
for (CacheData cacheData : listenCaches) {
timestampMap.put(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant),
cacheData.getLastModifiedTs().longValue());
}
ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
configChangeListenRequest.setListen(true);
try {
RpcClient rpcClient = ensureRpcClient(taskId);
ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(
rpcClient, configChangeListenRequest);
if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) {
Set<String> changeKeys = new HashSet<String>();
//handle changed keys,notify listener
if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) {
hasChangedKeys = true;
for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse
.getChangedConfigs()) {
String changeKey = GroupKey
.getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(),
changeConfig.getTenant());
changeKeys.add(changeKey);
boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
refreshContentAndCheck(changeKey, !isInitializing);
}
}
//handler content configs
for (CacheData cacheData : listenCaches) {
String groupKey = GroupKey
.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant());
if (!changeKeys.contains(groupKey)) {
//sync:cache data md5 = server md5 && cache data md5 = all listeners md5.
synchronized (cacheData) {
if (!cacheData.getListeners().isEmpty()) {
Long previousTimesStamp = timestampMap.get(groupKey);
if (previousTimesStamp != null && !cacheData.getLastModifiedTs().compareAndSet(previousTimesStamp,
System.currentTimeMillis())) {
continue;
}
cacheData.setSyncWithServer(true);
}
}
}
cacheData.setInitializing(false);
}
}
} catch (Exception e) {
LOGGER.error("Async listen config change error ", e);
try {
Thread.sleep(50L);
} catch (InterruptedException interruptedException) {
//ignore
}
}
}
}
# 無監(jiān)聽器的緩存處理
if (!removeListenCachesMap.isEmpty()) {
for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) {
String taskId = entry.getKey();
List<CacheData> removeListenCaches = entry.getValue();
ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches);
configChangeListenRequest.setListen(false);
try {
RpcClient rpcClient = ensureRpcClient(taskId);
boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest);
if (removeSuccess) {
for (CacheData cacheData : removeListenCaches) {
synchronized (cacheData) {
if (cacheData.getListeners().isEmpty()) {
ClientWorker.this
.removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);
}
}
}
}
} catch (Exception e) {
LOGGER.error("async remove listen config change error ", e);
}
try {
Thread.sleep(50L);
} catch (InterruptedException interruptedException) {
//ignore
}
}
}
if (needAllSync) {
lastAllSyncTime = now;
}
//If has changed keys,notify re sync md5.
if (hasChangedKeys) {
notifyListenConfig();
}
}
private void refreshContentAndCheck(String groupKey, boolean notify) {
if (cacheMap.get() != null && cacheMap.get().containsKey(groupKey)) {
CacheData cache = cacheMap.get().get(groupKey);
# 獲取服務(wù)端配置驮吱,檢查md5和緩存的是否一致,不一致則執(zhí)行監(jiān)聽器方法
refreshContentAndCheck(cache, notify);
}
}
private void refreshContentAndCheck(CacheData cacheData, boolean notify) {
try {
# 獲取服務(wù)端配置
ConfigResponse response = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L,
notify);
cacheData.setContent(response.getContent());
cacheData.setEncryptedDataKey(response.getEncryptedDataKey());
if (null != response.getConfigType()) {
cacheData.setType(response.getConfigType());
}
if (notify) {
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
agent.getName(), cacheData.dataId, cacheData.group, cacheData.tenant, cacheData.getMd5(),
ContentUtils.truncateContent(response.getContent()), response.getConfigType());
}
# 緩存數(shù)據(jù)的md5和監(jiān)聽器的md5不一致萧吠,
cacheData.checkListenerMd5();
} catch (Exception e) {
LOGGER.error("refresh content and check md5 fail ,dataId={},group={},tenant={} ", cacheData.dataId,
cacheData.group, cacheData.tenant, e);
}
}
- CacheData
檢查監(jiān)聽器緩存的md5是否和數(shù)據(jù)的md5一致左冬,不一致則觸發(fā)監(jiān)聽器的receiveConfigChange,推送變更
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
if (!md5.equals(wrap.lastCallMd5)) {
safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);
}
}
}
private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {
final Listener listener = listenerWrap.listener;
if (listenerWrap.inNotifying) {
LOGGER.warn(
"[{}] [notify-currentSkip] dataId={}, group={}, md5={}, listener={}, listener is not finish yet,will try next time.",
name, dataId, group, md5, listener);
return;
}
Runnable job = () -> {
long start = System.currentTimeMillis();
ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
ClassLoader appClassLoader = listener.getClass().getClassLoader();
try {
if (listener instanceof AbstractSharedListener) {
AbstractSharedListener adapter = (AbstractSharedListener) listener;
adapter.fillContext(dataId, group);
LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
}
// Before executing the callback, set the thread classloader to the classloader of
// the specific webapp to avoid exceptions or misuses when calling the spi interface in
// the callback method (this problem occurs only in multi-application deployment).
Thread.currentThread().setContextClassLoader(appClassLoader);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setGroup(group);
cr.setContent(content);
cr.setEncryptedDataKey(encryptedDataKey);
configFilterChainManager.doFilter(null, cr);
String contentTmp = cr.getContent();
listenerWrap.inNotifying = true;
// 監(jiān)聽器執(zhí)行監(jiān)聽動作
listener.receiveConfigInfo(contentTmp);
// compare lastContent and content
if (listener instanceof AbstractConfigChangeListener) {
Map data = ConfigChangeHandler.getInstance()
.parseChangeData(listenerWrap.lastContent, content, type);
ConfigChangeEvent event = new ConfigChangeEvent(data);
((AbstractConfigChangeListener) listener).receiveConfigChange(event);
listenerWrap.lastContent = content;
}
listenerWrap.lastCallMd5 = md5;
LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ,cost={} millis.", name,
dataId, group, md5, listener, (System.currentTimeMillis() - start));
} catch (NacosException ex) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
} catch (Throwable t) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
group, md5, listener, t.getCause());
} finally {
listenerWrap.inNotifying = false;
Thread.currentThread().setContextClassLoader(myClassLoader);
}
};
final long startNotify = System.currentTimeMillis();
try {
if (null != listener.getExecutor()) {
listener.getExecutor().execute(job);
} else {
try {
INTERNAL_NOTIFIER.submit(job);
} catch (RejectedExecutionException rejectedExecutionException) {
LOGGER.warn(
"[{}] [notify-blocked] dataId={}, group={}, md5={}, listener={}, no available internal notifier,will sync notifier ",
name, dataId, group, md5, listener);
job.run();
} catch (Throwable throwable) {
LOGGER.error(
"[{}] [notify-blocked] dataId={}, group={}, md5={}, listener={}, submit internal async task fail,throwable= ",
name, dataId, group, md5, listener, throwable);
job.run();
}
}
} catch (Throwable t) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,
group, md5, listener, t.getCause());
}
final long finishNotify = System.currentTimeMillis();
LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
name, (finishNotify - startNotify), dataId, group, md5, listener);
}
-
RpcClient監(jiān)聽服務(wù)端主動推送的配置變更ConfigChangeNotifyRequest
這里會修改緩存的最后修改時間纸型,將緩存狀態(tài)置為不同步拇砰,觸發(fā)配置監(jiān)聽動作