本文也我是邊看邊寫的,如果有哪里說的不對請告知.
源碼注意看注釋.
Server集群
Eureka Server 集群節(jié)點被抽象成 PeerEurekaNode
, 從名字可以看出他們的身份是對等的, 沒有類似主從的概念. 集群間的數(shù)據(jù)同步是近實時的, 由節(jié)點自身負(fù)責(zé). 因此應(yīng)用服務(wù)集群規(guī)模較大時, 同步的壓力也是非常大的.
PeerEurekaNode
封裝了一些集群間同步的行為, 包括客戶端的注冊, 取消, 心跳等等(見下圖1-1).
當(dāng)某個客戶端發(fā)送了注冊,取消或者心跳請求到某個eureka server上時, 該節(jié)點會同步客戶端的行為到集群中其他節(jié)點, 并且通過一個任務(wù)執(zhí)行器異步地并且(大多數(shù))批量地完成這些任務(wù)(batchingDispatcher
). 所以, 集群間的數(shù)據(jù)同步是增量的.看一下注冊的代碼(PeerAwareInstanceRegistryImpl#register
).
//注冊
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//注冊
super.register(info, leaseDuration, isReplication);
//集群同步
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
舉個例子, 假設(shè)集群中節(jié)點A和節(jié)點B之間的數(shù)據(jù)不一致, 有應(yīng)用X第一次注冊到了A上, A會向B注冊X, 這樣A和B就一致了. 如果是X向A發(fā)送心跳, A會向B同步該心跳, 如果此時B中沒有X, A會向B發(fā)起X的注冊. 其他行為也都類似.
跟進(jìn)看下同步的代碼
public void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
根據(jù)action用PeerEurekaNode
的不同方法, 還是看注冊
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
//任務(wù)丟進(jìn)分發(fā)器
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
這里是把注冊封裝成了一個任務(wù)丟給了batchingDispatcher
. 這是一個任務(wù)分發(fā)器. 看PeerEurekaNode
的構(gòu)造函數(shù)找到這個東西的初始化方法--TaskDispatchers#createBatchingTaskDispatcher
, 跟進(jìn)去看一下.
public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id,
int maxBufferSize,
int workloadSize,
int workerCount,
long maxBatchingDelay,
long congestionRetryDelayMs,
long networkFailureRetryMs,
TaskProcessor<T> taskProcessor) {
//任務(wù)接收器
final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs);
//任務(wù)調(diào)度器,與acceptorExecutor配合使用
final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
//任務(wù)分發(fā)
return new TaskDispatcher<ID, T>() {
@Override
public void process(ID id, T task, long expiryTime) {
//任務(wù)丟進(jìn)acceptorExecutor的接收隊列
acceptorExecutor.process(id, task, expiryTime);
}
@Override
public void shutdown() {
//停止acceptor線程
acceptorExecutor.shutdown();
//停止work線程
taskExecutor.shutdown();
}
};
}
主要是這兩個東西--AcceptorExecutor
和TaskExecutors
.
AcceptorExecutor
有一個接收線程接收客戶端的任務(wù), 然后分發(fā)給工作線程(TaskExecutors
提供)處理.
下面這段比較具體和細(xì)節(jié), 感興趣的可以看一下, 對整體理解沒什么作用, 但是能學(xué)習(xí)到一些技術(shù)方面的東西.
先看一下AcceptorExecutor
的幾個關(guān)鍵屬性:
acceptorQueue
接收隊列, 接收任務(wù)reprocessQueue
重試隊列, 任務(wù)失敗進(jìn)入重試隊列pendingTasks
是一個map, key是任務(wù)id, value是任務(wù), 方便去重processingOrder
處理序列, 存放任務(wù)idsingleItemWorkQueue
單項工作隊列batchWorkQueue
批處理工作隊列
看下AcceptorExecutor.AcceptorRunner
的run
方法
@Override
public void run() {
long scheduleTime = 0;
while (!isShutdown.get()) {
try {
//從接收隊列和重試隊列中取出所有任務(wù)到待處理集合中
drainInputQueues();
int totalItems = processingOrder.size();
long now = System.currentTimeMillis();
if (scheduleTime < now) {
scheduleTime = now + trafficShaper.transmissionDelay();
}
if (scheduleTime <= now) {
//按需將pendingTasks中的任務(wù)丟進(jìn)2個工作隊列
assignBatchWork();
assignSingleItemWork();
}
// If no worker is requesting data or there is a delay injected by the traffic shaper,
// sleep for some time to avoid tight loop.
if (totalItems == processingOrder.size()) {
Thread.sleep(10);
}
} catch (InterruptedException ex) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery AcceptorThread error", e);
}
}
}
跟進(jìn)drainInputQueues()
private void drainInputQueues() throws InterruptedException {
do {
//將重試隊列和接收隊列清空,其中的任務(wù)丟進(jìn)待處理的任務(wù)集合pendingTasks
drainReprocessQueue();
drainAcceptorQueue();
if (!isShutdown.get()) {
//隊列為空,阻塞一小段時間.這么做是為了盡可能達(dá)成退出循環(huán)條件,避免tight loop
if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
if (taskHolder != null) {
appendTaskHolder(taskHolder);
}
}
}
} while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty());//循環(huán)直到取出全部任務(wù)
}
跟進(jìn)assignBatchWork()
void assignBatchWork() {
//是否有需要執(zhí)行的任務(wù)
//1.處理序列空,則不執(zhí)行
//2.待處理任務(wù)數(shù)量達(dá)到最大值則立即執(zhí)行
//3.超過任務(wù)執(zhí)行的延遲則立即執(zhí)行
if (hasEnoughTasksForNextBatch()) {
//獲取信號量.該信號量由消費者線程釋放.實現(xiàn)了按需分配.
if (batchWorkRequests.tryAcquire(1)) {
long now = System.currentTimeMillis();
int len = Math.min(maxBatchingSize, processingOrder.size());
//小細(xì)節(jié),避免數(shù)組擴(kuò)容
List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
while (holders.size() < len && !processingOrder.isEmpty()) {
ID id = processingOrder.poll();
TaskHolder<ID, T> holder = pendingTasks.remove(id);
if (holder.getExpiryTime() > now) {
//未過期
holders.add(holder);
} else {
expiredTasks++;
}
}
if (holders.isEmpty()) {
//沒有取到任務(wù),不會占用信號量
batchWorkRequests.release();
} else {
batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
//添加到批處理工作隊列
batchWorkQueue.add(holders);
}
}
}
}
然后TaskExecutors
會有一批工作線程不停地從AcceptorExecutor
的工作隊列中取出任務(wù)進(jìn)行處理(就是調(diào)一下batch接口: com.netflix.eureka.resources.PeerReplicationResource#batchReplication
).
看一下TaskExecutors
的工作線程做了什么事情, 看TaskExecutors.BatchWorkRunable
@Override
public void run() {
try {
while (!isShutdown.get()) {
//從AcceptorExecutors的工作隊列中取出任務(wù).
//釋放一個信號量,然后循環(huán)取出隊列中的所有任務(wù).
List<TaskHolder<ID, T>> holders = getWork();
metrics.registerExpiryTimes(holders);
List<T> tasks = getTasksOf(holders);
//調(diào)其他節(jié)點的batch接口
ProcessingResult result = processor.process(tasks);
switch (result) {
case Success:
break;
//返回503,節(jié)點繁忙,稍后重試
case Congestion:
//網(wǎng)絡(luò)異常,稍后重試
case TransientError:
//丟進(jìn)重試隊列
taskDispatcher.reprocess(holders, result);
break;
//其他非網(wǎng)絡(luò)異常,不會重試
case PermanentError:
logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
}
metrics.registerTaskResult(result, tasks.size());
}
} catch (InterruptedException e) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery WorkerThread error", e);
}
}
集群節(jié)點間的協(xié)作差不多就到這里了.下面看一下數(shù)據(jù)存儲.
數(shù)據(jù)存儲
Eureka的數(shù)據(jù)是存在內(nèi)存中的.注冊中心抽象成 AbstractInstanceRegistry
.應(yīng)用實例的數(shù)據(jù)存在registry
變量中, 類型是ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>
.它的key是appName, 內(nèi)層的key是instanceId.通過這兩個key可以唯一確定一個應(yīng)用實例的租約, 查詢起來效率也非常高.value是Lease<InstanceInfo>.Lease是一個很關(guān)鍵的概念,后面會分析這個東西的意義.
先來看一下注冊的代碼,乍一看有點多,莫慌,硬看.
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
//初始化
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
//獲取引用
gMap = gNewMap;
}
}
//先看有沒有已經(jīng)存在該應(yīng)用的租約
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
//如果存在
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
//已經(jīng)存在的版本更新
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
//這里省略了日志代碼
//以本地的instanceInfo為準(zhǔn)
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
//每分鐘的續(xù)約期望數(shù).因為是新注冊了一個客戶端,所以加2(30s1次,1min2次)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
//每分鐘的續(xù)約數(shù)量閾值,乘了一個百分比系數(shù)
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
}
//封裝成一個實例的租約
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
//存入registry數(shù)據(jù)結(jié)構(gòu)中
gMap.put(registrant.getId(), lease);
//統(tǒng)計和debug用,可以忽略
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
//外界操作的覆蓋狀態(tài),比如將某個服務(wù)手動上下線等等.該值會被緩存,即時客戶端重新注冊,也可以從緩存中取出.
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
//用overriddenStatus覆蓋status
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
//更新緩存
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
實例的租約是由一個定時任務(wù)和客戶端的續(xù)約行為來維護(hù)的, 客戶端的續(xù)約也會在集群內(nèi)同步, 保持該實例不過期, 始終處于激活狀態(tài). 如果租約到期, 客戶端由于某些原因沒有進(jìn)行續(xù)約, 那么該任務(wù)會將過期實例下線.參考EvictionTask
和AbstractInstanceRegistry#evict
的源碼, 這里不贅述.
另外還有一個變量需要關(guān)注, 就是ResponseCache
. 這個緩存在eureka開放的restful接口中都有用到, 顧名思義, 是接口返回值的緩存. Eureka接口的返回格式有json和xml, 并且有些接口需要返回的數(shù)據(jù)量龐大, 需要壓縮, 因此有了這樣一層緩存, 可以省去一些序列化和壓縮以及大數(shù)據(jù)量查詢帶來的性能損耗.
Restful Api
從這些api也可以推理出一些客戶端與服務(wù)端以及服務(wù)端與服務(wù)端之間的交互邏輯,從而能夠知道客戶端大概長什么樣子.代碼在com.netflix.eureka.resources
下.它用的是jersey框架.
總結(jié)一下常用的一些api.
- /{version}/apps GET
獲取全部app - /{version}/apps/delta GET
獲取應(yīng)用數(shù)據(jù)增量 - /{version}/apps/{appId} GET
獲取指定app - /{version}/apps/{appId}/{id} GET
獲取指定instance - /{version}/apps/{appId} POST
實例注冊 - /{version}/apps/{appId}/{id} PUT
實例續(xù)約 - /{version}/apps/{appId}/{id}/status PUT
更新狀態(tài) - /{version}/apps/{appId}/{id}/status DELETE
刪除狀態(tài) - /{version}/apps/{appId}/{id}/metadata PUT
修改metadata - /{version}/apps/{appId}/{id} DELETE
取消租約 - /{version}/peerreplication/batch POST
集群數(shù)據(jù)復(fù)制接口
隨便找兩個接口感受一下
看一下續(xù)約接口和增量接口(客戶端常用的接口).
先看續(xù)約
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
//請求是否來自集群其他節(jié)點
boolean isFromReplicaNode = "true".equals(isReplication);
//向注冊中心續(xù)約是否成功
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// Not found in the registry, immediately ask for a register
if (!isSuccess) {
//續(xù)約失敗,表示注冊中心中沒有這個實例
logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
Response response = null;
if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
//這里可以跟進(jìn)去看一下
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
// Store the overridden status since the validation found out the node that replicates wins
if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
&& (overriddenStatus != null)
&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
&& isFromReplicaNode) {
registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
}
} else {
response = Response.ok().build();
}
logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
return response;
}
InstanceInfo
有一個概念叫dirty time stamp. 在InstanceInfo
中是成員變量lastDirtyTimestamp
,這個概念非常重要,是最近一次更新的時間戳,可以理解為一個版本號一樣的東西.跟進(jìn)this.validateDirtyTimestamp
看一下
private Response validateDirtyTimestamp(Long lastDirtyTimestamp,
boolean isReplication) {
InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
if (appInfo != null) {
if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};
//如果客戶端續(xù)約的時候傳過來的lastDirtyTimestamp比當(dāng)前的注冊中心中的更新,
//那么表示當(dāng)前注冊中心中的租約是過時的,應(yīng)該有新的租約注冊進(jìn)來, 所以返回404
//因此當(dāng)前情況下表示注冊中心的租約是老的,也就是注冊中心中的instanceInfo是落后于客戶端的
if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
logger.debug(
"Time to sync, since the last dirty timestamp differs -"
+ " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
args);
return Response.status(Status.NOT_FOUND).build();
}
//如果注冊中心的instanceInfo比客戶端的新
else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
//如果是集群間的復(fù)制,那么把當(dāng)前的instanceInfo返回,以便發(fā)起復(fù)制的節(jié)點同步最新數(shù)據(jù)
//這段邏輯需要關(guān)聯(lián)PeerEurekaNode#heartbeat方法的replicationTask的handleFailure方法理解
if (isReplication) {
logger.debug(
"Time to sync, since the last dirty timestamp differs -"
+ " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
args);
return Response.status(Status.CONFLICT).entity(appInfo).build();
} else {
//如果是客戶端的,依然允許續(xù)約
return Response.ok().build();
}
}
}
}
return Response.ok().build();
}
再看看增量接口, 客戶端依賴這個接口維護(hù)本地的服務(wù)列表.
我們可以學(xué)習(xí)這種思想,大數(shù)據(jù)量同步的時候使用增量同步,可以減少占用帶寬和cpu壓力.
//此處省略接口源碼,因為T*D都是從緩存中拿的,感興趣的看下responseCache
End, 如果有哪里寫得不對,希望聯(lián)系一下我.
覺得有幫助的希望點個贊支持一下, 又不要錢= =.