Eureka源碼采用1.7.2版本
本人小白募判,此文為本人閱讀源碼筆記,如果您讀到本文咒唆,您需要自己甄別是否正確届垫,文中的說明只代表本人理解,不一定是正確的H汀W按Α!
注冊表的增量拉取主要依賴Eureka Client的定時(shí)緩存更新任務(wù),在進(jìn)行new DiscoveryClient()的時(shí)候進(jìn)行了緩存定時(shí)任務(wù)更新的創(chuàng)建浸船。
//初始化支持緩存刷新的線程池
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
任務(wù)的初始化在同方法的initScheduledTasks()中
com.netflix.discovery.DiscoveryClient#initScheduledTasks
//初始化拉取定時(shí)表定時(shí)調(diào)度任務(wù) 30S
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
//定時(shí)抓取增量注冊表
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
//緩存刷新線程
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
此定時(shí)任務(wù)默認(rèn)30S執(zhí)行一次增量拉取
com.netflix.discovery.DiscoveryClient#refreshRegistry
最后還是調(diào)用的:
com.netflix.discovery.DiscoveryClient#fetchRegistry
} else {
//增量拉取注冊表
getAndUpdateDelta(applications);
}
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
//如果增量的注冊表為空妄迁,則獲取全量注冊表
if (delta == null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
//合并本地注冊表和拉取的增量注冊表
updateDelta(delta);
//計(jì)算合并后的本地注冊表的Hash值
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// There is a diff in number of instances for some reason
//如果合并后本地注冊的Hash值與Server端的全量注冊表的Hash值不相等,則進(jìn)行全量拉取
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
先不討論服務(wù)端對(duì)增量拉取的處理邏輯李命,在客戶端登淘,主要處理邏輯如下:
- http請求獲取增量注冊表信息
- 判斷增量拉取是否為空,如果為空走一次全量拉取
- 如果增量拉取不為空封字,合并本地注冊表和拉取的增量注冊表黔州,計(jì)算本地合并后的注冊表耍鬓,將注冊中心的全量注冊表的HashCode與本地合并后的HashCode進(jìn)行比較,如果不相等流妻,走一次全量拉取
接下來我們看看服務(wù)端對(duì)增量拉取的請求處理邏輯
com.netflix.eureka.resources.ApplicationsResource#getContainerDifferential
//增量拉取的Key ALL_APPS_DELTA
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS_DELTA,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
if (acceptEncoding != null
&& acceptEncoding.contains(HEADER_GZIP_VALUE)) {
return Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
return Response.ok(responseCache.get(cacheKey))
.build();
}
可以看出也是走的多級(jí)緩存那一套機(jī)制
com.netflix.eureka.registry.ResponseCacheImpl#getValue
@VisibleForTesting
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
//readOnlyCacheMap 只讀緩存
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
//readWriteCacheMap 讀寫緩存
payload = readWriteCacheMap.get(key);
//再放入只讀緩存中
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key :" + key, t);
}
return payload;
}
但是有區(qū)別的是讀寫緩存中牲蜀,緩存重建的方法是不同的
com.netflix.eureka.registry.ResponseCacheImpl#generatePayload
} else if (ALL_APPS_DELTA.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
versionDeltaWithRegions.incrementAndGet();
versionDeltaWithRegionsLegacy.incrementAndGet();
payload = getPayLoad(key,
registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeDeltaAppsTimer.start();
versionDelta.incrementAndGet();
versionDeltaLegacy.incrementAndGet();
payload = getPayLoad(key, registry.getApplicationDeltas());
}
}
最終指向的是這個(gè)方法
com.netflix.eureka.registry.AbstractInstanceRegistry#getApplicationDeltasFromMultipleRegions
public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
if (null == remoteRegions) {
remoteRegions = allKnownRemoteRegions; // null means all remote regions.
}
boolean includeRemoteRegion = remoteRegions.length != 0;
if (includeRemoteRegion) {
GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment();
} else {
GET_ALL_CACHE_MISS_DELTA.increment();
}
Applications apps = new Applications();
apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
try {
write.lock();
//獲取到最近修改隊(duì)列中的服務(wù)實(shí)例的 3分鐘內(nèi)變化的實(shí)例
Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
logger.debug("The number of elements in the delta queue is :" + this.recentlyChangedQueue.size());
while (iter.hasNext()) {
Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
InstanceInfo instanceInfo = lease.getHolder();
Object[] args = {instanceInfo.getId(),
instanceInfo.getStatus().name(),
instanceInfo.getActionType().name()};
logger.debug("The instance id %s is found with status %s and actiontype %s", args);
Application app = applicationInstancesMap.get(instanceInfo.getAppName());
if (app == null) {
app = new Application(instanceInfo.getAppName());
applicationInstancesMap.put(instanceInfo.getAppName(), app);
apps.addApplication(app);
}
app.addInstance(decorateInstanceInfo(lease));
}
if (includeRemoteRegion) {
for (String remoteRegion : remoteRegions) {
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null != remoteRegistry) {
Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();
if (null != remoteAppsDelta) {
for (Application application : remoteAppsDelta.getRegisteredApplications()) {
if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
Application appInstanceTillNow =
apps.getRegisteredApplications(application.getName());
if (appInstanceTillNow == null) {
appInstanceTillNow = new Application(application.getName());
apps.addApplication(appInstanceTillNow);
}
for (InstanceInfo instanceInfo : application.getInstances()) {
appInstanceTillNow.addInstance(instanceInfo);
}
}
}
}
}
}
}
Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
//獲取全量注冊表的Hash值
apps.setAppsHashCode(allApps.getReconcileHashCode());
return apps;
} finally {
write.unlock();
}
}
方法的核心就是兩步
- 從recentlyChangedQueue獲取到變化實(shí)例,封裝成Applications
- 計(jì)算當(dāng)前注冊表的HashCode绅这,并賦值給Applications
由此可以看出涣达,客戶端獲取到的增量注冊表就是從recentlyChangedQueue中獲取的。
recentlyChangedQueue 最近修改隊(duì)列证薇,全文搜索下可以發(fā)現(xiàn)度苔,如果出現(xiàn)實(shí)例狀態(tài)變化都會(huì)加入該隊(duì)列中
- 注冊
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
//加入到最近修改隊(duì)列中
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
- 下線\故障移除
if (instanceInfo != null) {
//設(shè)置服務(wù)實(shí)例的行為 刪除
instanceInfo.setActionType(ActionType.DELETED);
//加入最近修改的隊(duì)列中
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
- 狀態(tài)變更
if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
info.setLastDirtyTimestamp(replicaDirtyTimestamp);
}
info.setActionType(ActionType.MODIFIED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
info.setLastUpdatedTimestamp();
那么recentlyChangedQueue就保存著所有修改的服務(wù)實(shí)例,但是該隊(duì)列的元素也是有有效期的浑度,主要靠定時(shí)過期任務(wù)完成的林螃。
com.netflix.eureka.registry.AbstractInstanceRegistry#AbstractInstanceRegistry
//初始化心跳計(jì)數(shù)器 和自我保護(hù)機(jī)制相關(guān)
this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);
//30s執(zhí)行一次recentlyChangedQueue的清理
this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
serverConfig.getDeltaRetentionTimerIntervalInMs(),
serverConfig.getDeltaRetentionTimerIntervalInMs());
//定時(shí)清理recentlyChangedQueue表中的過期變動(dòng)實(shí)例
private TimerTask getDeltaRetentionTask() {
return new TimerTask() {
@Override
public void run() {
Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
while (it.hasNext()) {
//時(shí)間比對(duì) 保留最近3分鐘的數(shù)據(jù) 默認(rèn)
if (it.next().getLastUpdateTime() <
System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
it.remove();
} else {
break;
}
}
}
};
}
默認(rèn)只保留最近3分鐘的變動(dòng)實(shí)例
簡單流程圖