本文會基于Eureka源碼色罚,一步一步分析Client是如何刷新注冊表緩存信息的
前言
何為刷新緩存
Client中啟動了一個定時任務,周期性的通過Http請求去Server端拉取最新的注冊表信息涯肩,并緩存到本地
如何開啟緩存刷新
Eureka Client默認開啟緩存刷新屡江,即表明會從Eureka Server 抓取注冊表信息
可以通過配置項
eureka:
client:
fetch-registry: true # 默認開啟
刷新周期
默認每隔30s執(zhí)行一次刷新任務
eureka:
client:
registry-fetch-interval-seconds: 30
可以通過該參數(shù)自定義刷新任務間隔時間
只鐘情獲取某單個Server的注冊表信息
當Server集群化時溺职,如果想要Client從固定的某個server節(jié)點獲取注冊表信息已球,可通過以下參數(shù)配置
eureka:
client:
registry-refresh-single-vip-address: xxx # 表示只鐘情該server節(jié)點的注冊表信息
何為全量獲取
每次刷新注冊表緩存任務執(zhí)行時,都去server端獲取所有注冊表信息辅愿,這就是全量獲取智亮,全量獲取沒有參數(shù)可配置,當禁用了增量獲取以后就會總是執(zhí)行全量獲取了
何為增量更新
我們知道在Eureka中点待,注冊表實例發(fā)生改變的頻率要遠遠小于每隔客戶點獲取注冊表信息請求的頻率阔蛉,如果每次請求都是拉取Server注冊表中所有的實例信息,那么會造成一定的帶寬浪費和傳輸性能降低
Eureka提供了增量獲取的特性癞埠,可以提高獲取請求的傳輸效率和降低網(wǎng)絡堵塞状原,這就是增量更新
-
配置項
eureka: client: disable-delta: false # 默認開啟增量更新注冊表緩存,即不禁用增量
可以通過該參數(shù)調整緩存刷新策略
時序圖
源碼
DiscoveryClient
localRegionApps.set(new Applications());
@Override
public Applications getApplications() {
return localRegionApps.get();
}
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
// ...
//
localRegionApps.set(new Applications());
// 1. 若獲取注冊表信息配置參數(shù)開啟苗踪,則會在初始化DiscoverClient且啟動定時器之前颠区,主動獲取一次注冊表信息
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
// 如果fetchRegistry(false)返回false, 代表所有的Eureka Server都不可用通铲,那么會從備用的服務里面去取數(shù)據(jù)
fetchRegistryFromBackup();
}
// 2. 初始化定時器
initScheduledTasks();
// ...
}
可以看到會在DiscoveryClient初始化時毕莱,且還未啟動心跳續(xù)約,緩存刷新以及實例信息復制等定時任務前
就會立即去Eureka Server主動獲取一次注冊表信息颅夺,若獲取失敗朋截,還會調用fetchRegistryFromBackup()方法從備用服務器獲取
暫不考慮定時刷新注冊表緩存任務的初始化過程,我們直接去看第一次主動獲取注冊表的代碼fetchRegistry(false)
/**
*
* 獲取注冊表信息
*
* @param forceFullRegistryFetch 是否強制全量獲取注冊表信息
*
* @return 如果注冊表被獲取到則返回true吧黄,否則返回false
*/
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
// 1. 啟動獲取注冊表信息跑表
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// 2. 獲取本地已有的注冊表信息
Applications applications = getApplications();
// 3. 如果增量獲取被禁用部服,則總是會獲取全量注冊表信息,否則會在首次獲取到注冊表實例信息后(applications.getRegisteredApplications().size() > 0)開始執(zhí)行增量獲取
if (clientConfig.shouldDisableDelta() // 根據(jù)配置決定是否禁用增量獲取
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch // 根據(jù)參數(shù)forceFullRegistryFetch 決定是否執(zhí)行全量獲取
|| (applications == null) // 總是false
|| (applications.getRegisteredApplications().size() == 0) // 只要本地沒有獲取到一個注冊實例拗慨,就會一致執(zhí)行全量獲取
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
// 3.1. 全量獲取并存儲本地注冊表信息
getAndStoreFullRegistry();
} else {
// 3.2. 增量獲取注冊表信息并更新本地已有注冊表信息
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
這里簡單做個分析
當滿足以下任一條件時會進行全量注冊信息獲取
增量獲取被禁用即disableDelta = true
參數(shù)forceFullRegistryFetch = true
-
applications == null廓八,這里總是false奉芦,因為在調用該方法前已經進行了初始化locaRegionApps
localRegionApps.set(new Applications());
當applications.getRegisteredApplications().size() == 0,即表示還未獲取到任何注冊表信息時
applications.getVersion() == -1或clientConfig.getRegistryRefreshSingleVipAddress() 為不為空或null
全量獲取注冊表信息之 getAndStoreFullRegistry();
方法仍然定義在了DiscoveryClient類中
/**
* Gets the full registry information from the eureka server and stores it locally.
* When applying the full registry, the following flow is observed:
*
* if (update generation have not advanced (due to another thread))
* atomically set the registry to the new registry
* fi
*
* @return the full registry information.
* @throws Throwable
* on error.
*/
private void getAndStoreFullRegistry() throws Throwable {
// 1. 當前獲取注冊表年齡(次數(shù))
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
// 2. 根據(jù)是否啟用只獲取單個server節(jié)點的注冊表剧蹂,從而調用queryClient的不同方法去獲取注冊表信息
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
// 3. 判斷獲取響應狀態(tài)碼是否時成功声功,成功則取出獲取到的注冊表信息即Applications對象
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
// 如果整個Applications對象為null,則表示server端出現(xiàn)某種問題国夜,打印error級別日志
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
// 3.1 CAS操作當前獲取注冊表次數(shù),設置成功短绸,表示沒有其他線程競爭
// 過濾所有UP狀態(tài)的注冊信息且將其利用洗牌算法打亂
// 再替換本地注冊表為處理后的注冊表信息
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
AbstractJerseyEurekaHttpClient.getApplications() 發(fā)起全量獲取注冊表信息請求
// 該方法就是構建了Http Get請求調用server獲取注冊表信息
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
ClientResponse response = null;
String regionsParamValue = null;
try {
WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
if (regions != null && regions.length > 0) {
regionsParamValue = StringUtil.join(regions);
webResource = webResource.queryParam("regions", regionsParamValue);
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);
Applications applications = null;
// 1. 如果響應碼為200且有響應體返回车吹,則將響應體轉為Applications對象
if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
applications = response.getEntity(Applications.class);
}
// 返回調用方
return anEurekaHttpResponse(response.getStatus(), Applications.class)
.headers(headersOf(response))
.entity(applications)
.build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
serviceUrl, urlPath,
regionsParamValue == null ? "" : "regions=" + regionsParamValue,
response == null ? "N/A" : response.getStatus()
);
}
// response關閉掉
if (response != null) {
response.close();
}
}
}
Eureka Server端
ApplicationsResource
@Path("/{version}/apps")
@Produces({"application/xml", "application/json"})
public class ApplicationsResource {
// ...
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
String[] regions = null;
if (!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
}
// Check if the server allows the access to the registry. The server can
// restrict access if it is not
// ready to serve traffic depending on various reasons.
if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
return Response.status(Status.FORBIDDEN).build();
}
CurrentRequestVersion.set(Version.toEnum(version));
// 1. keyType和returnMediaType默認為JSON
KeyType keyType = Key.KeyType.JSON;
String returnMediaType = MediaType.APPLICATION_JSON;
// 1.1. 若請求參數(shù)acceptHeader 為null或不包含JSON的話,則轉為xml形式的keyType
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}
// 2. 生成一個新的key醋闭,用于從緩存中獲取對應的value值窄驹,生成規(guī)則:entityType + entityName + requestType + requestVersion + eurekaAccept
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
Response response;
// 3. 根據(jù)請求參數(shù)acceptEncoding類型,返回不同類型的編碼流
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
// 3.1. 返回GZIP壓縮后的注冊表信息
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
// 3.2. 返回普通的注冊表信息
response = Response.ok(responseCache.get(cacheKey))
.build();
}
return response;
}
}
可見Server端就是根據(jù)請求client的參數(shù)構建了一個cacheKey证逻,并根據(jù)client請求參數(shù)acceptEncoding是否需要壓縮乐埠,而從ResponseCacheImpl中獲取該key對應的value返回client端
ResponseCacheImpl 響應緩存類
Eureka Server針對Client端的注冊表信息獲取請求進行了結果的緩存,并不會每次請求都去注冊表中獲取信息囚企,而是將信息存在了ResponseCache實例中丈咐,當?shù)谝淮握埱筮^來時,請求Key對應的緩存會被生成龙宏,之后再請求棵逊,都會從緩存中直接取,以提高請求性能
至于請求緩存的信息更新银酗,Eureka會利用一個后臺線程去定期更新
緩存Key分類規(guī)則
根據(jù)client端請求的類型辆影,緩存被分為了三種類型:
all applications 全量注冊信息
delta changes 增量信息
individual applications 個體注冊信息
針對每一種類型,緩存信息格式又分為壓縮和非壓縮兩種
壓縮格式的緩存信息尤其對于查詢全量注冊信息時黍特,能提供高效的網(wǎng)絡傳輸性能
對于客戶端接受的媒體類型不同蛙讥,緩存的payload被分為了JSON和XML兩種類型
最后一點是請求的版本號不同,緩存也會不同
緩存相關參數(shù)配置
Eureka Server 針對請求響應ResponseCache灭衷,采用了一個2級緩存:readWrite cache讀寫緩存(存在過期時間) 和 readonly cache只讀緩存(沒有過期時間)
eureka:
server:
use-read-only-response-cache: true # 是否使用只讀緩存作為請求響應的緩存次慢,默認啟用
response-cache-update-interval-ms: 30000 # 用于一級響應緩存多久更新一次,默認30秒
initial-capacity-of-response-cache: 1000 # 用于定義二級響應緩存的容量大小翔曲,默認1000
response-cache-auto-expiration-in-seconds: 180 # 二級響應緩存自動失效時間经备,默認180秒
- 一級二級緩存獲取規(guī)則
當開啟使用一級緩存時,會從一級緩存獲取信息部默,否則直接從二級緩存取
當?shù)谝淮潍@取緩存時(一級緩存為null)侵蒙,會從二級緩存獲取并將信息存入一級緩存(只讀緩存)中
源碼
// 響應緩存類
public class ResponseCacheImpl implements ResponseCache {
// 用于啟用一級緩存,定時更新一級緩存的定時器
private final java.util.Timer timer = new java.util.Timer("Eureka-CacheFillTimer", true);
// 一級緩存(只讀緩存)傅蹂,采用了一個ConcurrentHashMap實現(xiàn)
private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
// 二級緩存(讀寫緩存)纷闺,采用了google的LoadingCache實現(xiàn)
private final LoadingCache<Key, Value> readWriteCacheMap;
// 是否啟用一級緩存作為請求的響應緩存首選算凿,通過配置參數(shù)自定義
private final boolean shouldUseReadOnlyResponseCache;
// 對等實例注冊器,PeerAwareInstanceRegistryImpl
private final AbstractInstanceRegistry registry;
// Eureka Server配置
private final EurekaServerConfig serverConfig;
// 省略其他屬性
...
// 構造方法犁功,會在PeerAwareInstanceRegistryImpl 實例初始化時觸發(fā)調用響應緩存實例的構造
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
// 省略其他初始化
...
// 1. 是否啟用一級緩存
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
// 2. 緩存持有的實例注冊器實例
this.registry = registry;
// 3. 獲取client響應一級緩存刷新時間間隔氓轰,默認30s
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
// 4. 最重要的一步是初始化這個二級緩存,因為一級緩存都是從二級緩存中獲取數(shù)據(jù)或更新數(shù)據(jù)的
this.readWriteCacheMap = initReadWriteCacheMap();
// 5. 如果啟用了一級緩存浸卦,則啟用定時任務署鸡,周期性的更新二級緩存中的數(shù)據(jù)到一級緩存中
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
}
}
// 一級緩存刷新定時任務
// 獲取一級緩存中的所有key,從二級緩存中取出key對應的value限嫌,并比較二級緩存中的value和一級緩存中的值靴庆,不相等則替換掉
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
logger.debug("Updating the client cache from response cache");
for (Key key : readOnlyCacheMap.keySet()) {
if (logger.isDebugEnabled()) {
logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType());
}
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
}
}
}
};
}
這里把最重要的初始化二級緩存的一步單獨拿出來分析
this.readWriteCacheMap =
// 利用google CacheBuilder類構造二級緩存
CacheBuilder.newBuilder()
// 1. 定義二級緩存初始容量,默認1000
.initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
// 2. 定義緩存自動失效時間怒医,即寫后多久自動失效炉抒,默認180秒
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
// 3. 定義緩存清除監(jiān)聽器,用于解決github上曾經的一個issue https://github.com/Netflix/eureka/issues/118
// 該issue的問題是稚叹,所有包含region的已緩存的的key焰薄,都不會默認在180秒內失效清除
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
// 4. 重點是這個CacheLoader 加載器,當調用readWriteCacheMap.get(key)方法扒袖,且該緩存中沒有對應的key時塞茅,會執(zhí)行該load方法,進而調用generatePayload(key)方法
// 生成key對應的value后季率,將該value set進緩存凡桥,再返回該value值,這是guava cache的緩存機制
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});
生成緩存key對應value的方法 generatePayload(key)
// 這里就是根據(jù)key的entityType類型以及key的Name蚀同,調用registry不同的方法以獲取對應的Applications信息
// 再調用getPayLoad方法根據(jù)key的type缅刽,以生成JSON或XML格式的的Applications信息后返回
// 最后封裝成為Value對象返回,注意這里再new Value對象時蠢络,同時內部生成了壓縮版的格式的payLoad 即gzipped參數(shù)
private Value generatePayload(Key key) {
Stopwatch tracer = null;
try {
String payload;
switch (key.getEntityType()) {
case Application:
boolean isRemoteRegionRequested = key.hasRegions();
if (ALL_APPS.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeAllAppsWithRemoteRegionTimer.start();
payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeAllAppsTimer.start();
payload = getPayLoad(key, registry.getApplications());
}
} else if (ALL_APPS_DELTA.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
versionDeltaWithRegions.incrementAndGet();
versionDeltaWithRegionsLegacy.incrementAndGet();
payload = getPayLoad(key,
registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeDeltaAppsTimer.start();
versionDelta.incrementAndGet();
versionDeltaLegacy.incrementAndGet();
payload = getPayLoad(key, registry.getApplicationDeltas());
}
} else {
tracer = serializeOneApptimer.start();
payload = getPayLoad(key, registry.getApplication(key.getName()));
}
break;
case VIP:
case SVIP:
tracer = serializeViptimer.start();
payload = getPayLoad(key, getApplicationsForVip(key, registry));
break;
default:
logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
payload = "";
break;
}
return new Value(payload);
} finally {
if (tracer != null) {
tracer.stop();
}
}
}
緩存失效
一級緩存不存在自動失效期和手動清除
二級緩存存在默認180s自動清除以及當注冊服務下線衰猛,過期,注冊刹孔,狀態(tài)變更啡省,都會來清除里面的數(shù)據(jù)
另外當二級緩存數(shù)據(jù)被清除以后以后,只能依靠定時任務刷新一級緩存里面的數(shù)據(jù)髓霞,也就是說最快也要等默認的30s才能更新一級緩存
- 一級緩存是默認開啟的卦睹,如果不能忍受這30秒的響應緩存變更延遲,可以手動禁止使用一級緩存
至此整個定時刷新流程基本走完方库,server端關于一級/二級緩存的存取流程圖奉上
寫在最后
問幾個小問題:
client在啟動時就會去server拉取一次注冊表信息嗎结序?
client端緩存刷新機制是如何實現(xiàn)的?
server端針對client頻繁獲取注冊表信息請求做了哪些優(yōu)化纵潦?當client出現(xiàn)注冊徐鹤,下線垃环,失效,狀態(tài)變更等情況時返敬,client端會立即獲取到變更后的注冊表信息嗎遂庄?
有沒有可能出現(xiàn)某個client已經下線,但是其他client仍然認為該client服務仍舊在線劲赠?如何優(yōu)化涛目?