Eureka服務(wù)端支持集群部署不见,通過源碼查看集群節(jié)點發(fā)現(xiàn)以及數(shù)據(jù)同步功能的實現(xiàn)
提供集群功能的包路徑
com.netflix.eureka.cluster
集群節(jié)點發(fā)現(xiàn)以及動態(tài)更新節(jié)點功能
Eureka服務(wù)端封裝了一個集群節(jié)點管理的類名稱為PeerEurekaNodes 通過名稱翻譯出來為對等的Eureka節(jié)點集合斑鼻,可以看出這個類是對eureka服務(wù)端集群節(jié)點抽象,下面通過源碼查詢eureka是怎么管理與發(fā)現(xiàn)節(jié)點信息
/**
* eureka server 集群節(jié)點 集合類 幫助管理維護集群節(jié)點
* Helper class to manage lifecycle of a collection of {@link PeerEurekaNode}s.
*
* @author Tomasz Bak
*/
@Singleton
public class PeerEurekaNodes {
/**
* 集群節(jié)點集合
*/
private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();
/**
* 集群節(jié)點URL集合
*/
private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();
/**
* 定時任務(wù)線程池
*/
private ScheduledExecutorService taskExecutor;
通過PeerEurekaNodes類屬性可以看到提供了兩個集合以及一個執(zhí)行定時任務(wù)的線程池泊愧,其它配置屬性忽略
- peerEurekaNodes 表示集群節(jié)點集合
- peerEurekaNodeUrls 表示集群節(jié)點對應(yīng)的URL集合
- taskExecutor 執(zhí)行定時任務(wù)的線程池
同時 PeerEurekaNodes 類提供start 與shutdown方法,接下來主要看start方法的實現(xiàn)
/**
* 啟動方法盛正,此方法管理集群節(jié)點間的通訊
*/
public void start() {
//初始化定時任務(wù)線程池
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
updatePeerEurekaNodes(resolvePeerUrls());
//節(jié)點更新任務(wù)線程
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
// 創(chuàng)建并執(zhí)行一個在給定初始延遲后首次啟用的定期操作删咱,隨后,在每一次執(zhí)行終止和下一次執(zhí)行開始之間都存在給定的延遲豪筝。
// 如果任務(wù)的任一執(zhí)行遇到異常痰滋,就會取消后續(xù)執(zhí)行摘能。否則,只能通過執(zhí)行程序的取消或終止方法來終止該任務(wù)即寡。
// 參數(shù):
// command - 要執(zhí)行的任務(wù)
// initialdelay - 首次執(zhí)行的延遲時間
// delay - 一次執(zhí)行終止和下一次執(zhí)行開始之間的延遲
// unit - initialdelay 和 delay 參數(shù)的時間單位
//定時執(zhí)行節(jié)點更新任務(wù)線程
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: " + node.getServiceUrl());
}
}
start 方法主要完成以下幾件事
- 初始化定時任務(wù)線程池
- 首次更新集群節(jié)點 updatePeerEurekaNodes方法
- 創(chuàng)建更新集群節(jié)點任務(wù)線程
- 通過定時任務(wù)線程池定時執(zhí)行更新集群節(jié)點線程
通過start 可以看出 eureka是通過一個定時線程定時去更新集群的節(jié)點信息達到對集群節(jié)點的動態(tài)發(fā)現(xiàn)和感知徊哑,在上面我們可以看到更新操作主要由updatePeerEurekaNodes方法完成,下面查看此方法的實現(xiàn)
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
if (newPeerUrls.isEmpty()) {
logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
return;
}
Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
toShutdown.removeAll(newPeerUrls);
Set<String> toAdd = new HashSet<>(newPeerUrls);
toAdd.removeAll(peerEurekaNodeUrls);
//校驗新的URL集合與舊有的URL集合是否一致聪富,一致莺丑,不需要更新直接返回
if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
return;
}
// Remove peers no long available
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
//移除舊集合中不可用的節(jié)點信息
if (!toShutdown.isEmpty()) {
logger.info("Removing no longer available peer nodes {}", toShutdown);
int i = 0;
while (i < newNodeList.size()) {
PeerEurekaNode eurekaNode = newNodeList.get(i);
if (toShutdown.contains(eurekaNode.getServiceUrl())) {
newNodeList.remove(i);
eurekaNode.shutDown();
} else {
i++;
}
}
}
//添加新增加的節(jié)點信息
// Add new peers
if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for (String peerUrl : toAdd) {
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
//重新賦值peerEurekaNodes與peerEurekaNodeUrls
this.peerEurekaNodes = newNodeList;
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
updatePeerEurekaNodes根據(jù)傳入的新集群URL集合完成節(jié)點的更新
- 校驗傳入的URL集合是否需要更新
- 移除新url集合中沒有的舊節(jié)點并關(guān)閉節(jié)點
- 創(chuàng)建舊節(jié)點集合中沒有的新URL節(jié)點通過createPeerEurekaNode方法
- 重新賦值節(jié)點集合以及URL集合完成節(jié)點的更新
updatePeerEurekaNodes傳入的新URL集合是通過resolvePeerUrls方法獲取,這個方法實際上是解析配置文件中的eureka.serviceUrl前綴的配置獲取墩蔓,并動態(tài)監(jiān)聽配置的更新梢莽。 創(chuàng)建新的節(jié)點是通過createPeerEurekaNode創(chuàng)建,下面查看此方法源碼
/**
* 根據(jù)URL創(chuàng)建server新節(jié)點信息
* @param peerEurekaNodeUrl
* @return
*/
protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
//創(chuàng)建一個連接遠(yuǎn)程節(jié)點的客戶端
HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
//獲取新節(jié)點host信息
String targetHost = hostFromUrl(peerEurekaNodeUrl);
if (targetHost == null) {
targetHost = "host";
}
//創(chuàng)建新節(jié)點
return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
}
PeerEurekaNode 方法
- 創(chuàng)建遠(yuǎn)程通訊客戶端replicationClient 用戶與此節(jié)點間通訊奸披,數(shù)據(jù)同步等工作
- 獲取要創(chuàng)建的遠(yuǎn)程節(jié)點的host
- 創(chuàng)建一個表示遠(yuǎn)程節(jié)點實例 PeerEurekaNode
PeerEurekaNode 表示一個與當(dāng)前節(jié)點對等的遠(yuǎn)程節(jié)點昏名,當(dāng)前節(jié)點與遠(yuǎn)程節(jié)點的數(shù)據(jù)同步工作都是在此實例中完成的。
集群節(jié)點數(shù)據(jù)同步
在上面節(jié)點發(fā)現(xiàn)中知道eureka是通過PeerEurekaNode表示遠(yuǎn)程對等接點阵面,并將遠(yuǎn)程通訊客戶端replicationClient傳入到PeerEurekaNode中轻局,接下來通過查看PeerEurekaNode源碼來看eureka集群節(jié)點間都有那些數(shù)據(jù)需要同步以及通訊內(nèi)容
/* For testing */ PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
HttpReplicationClient replicationClient, EurekaServerConfig config,
int batchSize, long maxBatchingDelayMs,
long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
String batcherName = getBatcherName();
//任務(wù)處理器
ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
//創(chuàng)建一個批量執(zhí)行的任務(wù)調(diào)度器
this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
batcherName,
config.getMaxElementsInPeerReplicationPool(),
batchSize,
config.getMaxThreadsForPeerReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
//創(chuàng)建一個單任務(wù)調(diào)度器
this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
targetHost,
config.getMaxElementsInStatusReplicationPool(),
config.getMaxThreadsForStatusReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
}
PeerEurekaNode 完成以下事件
- 創(chuàng)建數(shù)據(jù)同步的任務(wù)處理器ReplicationTaskProcessor
- 創(chuàng)建批處理任務(wù)調(diào)度器
- 創(chuàng)建單任務(wù)處理調(diào)度器
說明: eureka將節(jié)點間的數(shù)據(jù)同步工作包裝成一個個細(xì)微的任務(wù)ReplicationTask ,每一個數(shù)據(jù)操作代表一個任務(wù)样刷,將任務(wù)發(fā)送給任務(wù)調(diào)度器TaskDispatcher去異步處理仑扑。
下來查看PeerEurekaNode都可以創(chuàng)建那些同步任務(wù)
- register
/**
* 當(dāng)eureka server注冊新服務(wù)時,同時創(chuàng)建一個定時任務(wù)將新服務(wù)同步到集群其它節(jié)點
* Sends the registration information of {@link InstanceInfo} receiving by
* this node to the peer node represented by this class.
*
* @param info
* the instance information {@link InstanceInfo} of any instance
* that is send to this instance.
* @throws Exception
*/
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
//任務(wù)調(diào)度器中添加一個請求類型為注冊register新服務(wù)的同步任務(wù)
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
注冊同步任務(wù)置鼻,當(dāng)有服務(wù)注冊到當(dāng)前節(jié)點時镇饮,通過注冊同步任務(wù)將服務(wù)信息同步到集群遠(yuǎn)程節(jié)點
- cancel
public void cancel(final String appName, final String id) throws Exception {
long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
//任務(wù)調(diào)度器中添加一個請求類型為取消cancel服務(wù)的同步任務(wù)
batchingDispatcher.process(
taskId("cancel", appName, id),
new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
@Override
public EurekaHttpResponse<Void> execute() {
return replicationClient.cancel(appName, id);
}
@Override
public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
super.handleFailure(statusCode, responseEntity);
if (statusCode == 404) {
logger.warn("{}: missing entry.", getTaskName());
}
}
},
expiryTime
);
}
取消服務(wù)注冊任務(wù),當(dāng)前節(jié)點有服務(wù)取消注冊箕母,將信息同步到集群遠(yuǎn)程節(jié)點
- heartbeat
public void heartbeat(final String appName, final String id,
final InstanceInfo info, final InstanceStatus overriddenStatus,
boolean primeConnection) throws Throwable {
//當(dāng)?shù)谝淮芜B接時直接發(fā)送心跳到遠(yuǎn)端
if (primeConnection) {
// We do not care about the result for priming request.
replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
return;
}
//心跳同步任務(wù)
ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
@Override
public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
}
@Override
public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
super.handleFailure(statusCode, responseEntity);
if (statusCode == 404) {
logger.warn("{}: missing entry.", getTaskName());
if (info != null) {
logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",
getTaskName(), info.getId(), info.getStatus());
register(info);
}
} else if (config.shouldSyncWhenTimestampDiffers()) {
InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
if (peerInstanceInfo != null) {
syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
}
}
}
};
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
//任務(wù)調(diào)度器中添加一個請求類型為heartbeat服務(wù)的同步任務(wù)
batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
}
心跳同步任務(wù)储藐,當(dāng)前節(jié)點有服務(wù)發(fā)送心跳續(xù)租,將信息同步到集群遠(yuǎn)程節(jié)點
- StatusUpdate
- DeleteStatusOverride
集群節(jié)點數(shù)據(jù)同步任務(wù)處理
在PeerEurekaNode的構(gòu)造函數(shù)中可以看到同步任務(wù)處理由ReplicationTaskProcessor完成嘶是,下面看此類源碼
/**
* 單個處理ReplicationTask任務(wù)
* @param task
* @return
*/
@Override
public ProcessingResult process(ReplicationTask task) {
try {
//調(diào)用任務(wù)execute方法钙勃,完成任務(wù)的執(zhí)行
EurekaHttpResponse<?> httpResponse = task.execute();
int statusCode = httpResponse.getStatusCode();
//判斷任務(wù)返回結(jié)果
Object entity = httpResponse.getEntity();
if (logger.isDebugEnabled()) {
logger.debug("Replication task {} completed with status {}, (includes entity {})", task.getTaskName(), statusCode, entity != null);
}
if (isSuccess(statusCode)) {
task.handleSuccess();
} else if (statusCode == 503) {
logger.debug("Server busy (503) reply for task {}", task.getTaskName());
return ProcessingResult.Congestion;
} else {
task.handleFailure(statusCode, entity);
return ProcessingResult.PermanentError;
}
} catch (Throwable e) {
if (isNetworkConnectException(e)) {
logNetworkErrorSample(task, e);
return ProcessingResult.TransientError;
} else {
logger.error(peerId + ": " + task.getTaskName() + "Not re-trying this exception because it does not seem to be a network exception", e);
return ProcessingResult.PermanentError;
}
}
return ProcessingResult.Success;
}
單任務(wù)處理
- 調(diào)用任務(wù)task的execute完成遠(yuǎn)程數(shù)據(jù)同步
- 分析遠(yuǎn)程返回結(jié)果
/**
* 批量處理ReplicationTask任務(wù)
* @param tasks
* @return
*/
@Override
public ProcessingResult process(List<ReplicationTask> tasks) {
//根據(jù)task集合創(chuàng)建ReplicationList
ReplicationList list = createReplicationListOf(tasks);
try {
//調(diào)用批量同步接口 將同步集合發(fā)送到遠(yuǎn)端節(jié)點同步數(shù)據(jù)
EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
//判斷同步返回結(jié)果
int statusCode = response.getStatusCode();
if (!isSuccess(statusCode)) {
if (statusCode == 503) {
logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
return ProcessingResult.Congestion;
} else {
// Unexpected error returned from the server. This should ideally never happen.
logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
return ProcessingResult.PermanentError;
}
} else {
handleBatchResponse(tasks, response.getEntity().getResponseList());
}
} catch (Throwable e) {
if (isNetworkConnectException(e)) {
logNetworkErrorSample(null, e);
return ProcessingResult.TransientError;
} else {
logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
return ProcessingResult.PermanentError;
}
}
return ProcessingResult.Success;
}
批處理任務(wù),將一組任務(wù)一次性發(fā)送到遠(yuǎn)程進行處理
- 根據(jù)task集合創(chuàng)建ReplicationList
- 調(diào)用批量同步接口將同步集合發(fā)送到遠(yuǎn)端節(jié)點同步數(shù)據(jù) 即調(diào)用rest API /{version}/peerreplication
- 分析遠(yuǎn)程返回結(jié)果
eureka 服務(wù)端 集群節(jié)點發(fā)現(xiàn)聂喇,數(shù)據(jù)同步功能主要是由PeerEurekaNodes與PeerEurekaNode類實現(xiàn),通過源碼的跟蹤可以清晰看出集群實現(xiàn)的邏輯辖源,方便在實際應(yīng)用中對問題的定位