年后不久換了部門,一直在改Bug和優(yōu)化。僚害。。終于有了點(diǎn)時(shí)間繁调,把之前漏下沒(méi)記錄的點(diǎn)慢慢補(bǔ)上
gateway使用ribbon作為服務(wù)調(diào)用的負(fù)載均衡中間件萨蚕,根據(jù)配置的 IRule 對(duì)拉取到的服務(wù)列表進(jìn)行負(fù)載
而這些真正提供服務(wù)的實(shí)例是有動(dòng)態(tài)上下線的情況存在的,為了保證輪詢到的服務(wù)實(shí)例能正常訪問(wèn)蹄胰,ribbon中有一個(gè)接口
ServerListUpdater 會(huì)定期對(duì)服務(wù)列表進(jìn)行更新
在使用 Eureka 作為注冊(cè)中心的時(shí)候岳遥,ServerListUpdater有兩個(gè)實(shí)現(xiàn)類:
PollingServerListUpdater :定時(shí)從注冊(cè)中心拉取服務(wù)列表,如果沒(méi)有配置裕寨,默認(rèn)為30秒
EurekaNotificationServerListUpdater :注冊(cè)中心中的服務(wù)有變動(dòng)時(shí)浩蓉,通知客戶端派继,EurekaNotificationServerListUpdater是通過(guò)添加了一個(gè)監(jiān)聽器,當(dāng)收到注冊(cè)中心的通知后捻艳,做出相應(yīng)的動(dòng)作
PollingServerListUpdater 也是 默認(rèn)的 ServerListUpdater 配置
分別看一下它們的代碼實(shí)現(xiàn)
PollingServerListUpdater
// 從注冊(cè)中心拉取服務(wù)列表的間隔
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
// 這里封裝了一個(gè)進(jìn)行服務(wù)拉取的任務(wù)
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
//這里是進(jìn)行服務(wù)列表更新的動(dòng)作
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
// 使用 scheduled 進(jìn)行定時(shí)拉取
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,//這個(gè)就是剛才配置的拉取間隔
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
EurekaNotificationServerListUpdater
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
this.updateListener = new EurekaEventListener() {
// 監(jiān)聽 Eureka發(fā)布的事件驾窟,然后拉取最新的列表
@Override
public void onEvent(EurekaEvent event) {
if (event instanceof CacheRefreshedEvent) {
if (!updateQueued.compareAndSet(false, true)) { // if an update is already queued
logger.info("an update action is already queued, returning as no-op");
return;
}
if (!refreshExecutor.isShutdown()) {
try {
refreshExecutor.submit(new Runnable() {
@Override
public void run() {
try {
//這里是進(jìn)行服務(wù)列表更新的動(dòng)作
updateAction.doUpdate();
lastUpdated.set(System.currentTimeMillis());
} catch (Exception e) {
logger.warn("Failed to update serverList", e);
} finally {
updateQueued.set(false);
}
}
}); // fire and forget
} catch (Exception e) {
logger.warn("Error submitting update task to executor, skipping one round of updates", e);
updateQueued.set(false); // if submit fails, need to reset updateQueued to false
}
}
else {
logger.debug("stopping EurekaNotificationServerListUpdater, as refreshExecutor has been shut down");
stop();
}
}
}
};
...以下代碼省略
} else {
logger.info("Update listener already registered, no-op");
}
}
可以看到,這兩個(gè) ServerListUpdater 實(shí)現(xiàn)類在更新服務(wù)列表的時(shí)候认轨,都做了同一個(gè)動(dòng)作updateAction.doUpdate()
進(jìn)入這個(gè)方法绅络,發(fā)現(xiàn)它是一個(gè)定義在ServerListUpdater 中的一個(gè)接口
public interface ServerListUpdater {
/**
* an interface for the updateAction that actually executes a server list update
*/
public interface UpdateAction {
void doUpdate();
}
...以下代碼省略
}
而它的方法實(shí)現(xiàn)是在 負(fù)載均衡器 DynamicServerListLoadBalancer 中定義的
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
private static final Logger LOGGER = LoggerFactory.getLogger(DynamicServerListLoadBalancer.class);
......代碼省略......
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
// UpdateAction的方法實(shí)現(xiàn),調(diào)用了另一個(gè)方法
updateListOfServers();
}
};
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
//真正進(jìn)行列表更新的地方
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
......代碼省略......
}
可以看到嘁字,這個(gè) serverListImpl 就是在負(fù)載均衡器的ServerList屬性昨稼,使用這個(gè)接口的getUpdatedListOfServers方法進(jìn)行列表更新,因?yàn)槲业捻?xiàng)目里使用的是自己的注冊(cè)中心拳锚,沒(méi)有用Eureka,所以也寫了一個(gè)實(shí)現(xiàn)類寻行,參照了使用Eureka的情況下的默認(rèn)類 DiscoveryEnabledNIWSServerList霍掺,讓我們看看這個(gè)類的代碼
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
//其實(shí)在更新的時(shí)候,調(diào)用的也是從注冊(cè)中心拉取列表的方法
return obtainServersViaDiscovery();
}
//從Eureka拉取服務(wù)
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
......代碼省略......
return serverList;
}
由此我們知道拌蜘,無(wú)論配置哪個(gè) ServerListUpdater杆烁,在更新服務(wù)列表的時(shí)候,都是調(diào)用ServerList接口進(jìn)行一次服務(wù)拉取简卧,然后更新本地的列表兔魂,只是觸發(fā)的時(shí)機(jī)不同:
- PollingServerListUpdater 30秒拉取一次(時(shí)間可以修改)
- EurekaNotificationServerListUpdater 當(dāng)服務(wù)更新時(shí),通知客戶端
那么這兩種方法分別有什么弊端呢举娩?
先說(shuō)說(shuō)PollingServerListUpdater析校,如果在拉取的間隔中,有服務(wù)下線了铜涉,極端情況下智玻,原來(lái)所有的實(shí)例都不可用,換成了新的一批實(shí)例芙代,要等到下一次拉取的時(shí)間點(diǎn)才會(huì)更新吊奢,這樣會(huì)造成最久30秒的時(shí)間服務(wù)不可用。 比如:原來(lái)5個(gè)實(shí)例 A纹烹、B页滚、C、D铺呵、E變成了 F裹驰、G、H片挂、I邦马、J,但是由于并沒(méi)有到更新的時(shí)間點(diǎn),ribbon保存的還是老的服務(wù)實(shí)例滋将,而這時(shí)它們都已下線邻悬,無(wú)法提供服務(wù)。
那如果我們換成随闽,一旦服務(wù)變更(這里是上下線都會(huì)通知)就通知客戶端的 EurekaNotificationServerListUpdater 會(huì)怎么樣呢父丰,這樣好像可以實(shí)時(shí)的替換成最新的可用實(shí)例,保證服務(wù)不會(huì)打到失效的實(shí)例上掘宪《晟龋可是這會(huì)有另一個(gè)問(wèn)題,也是在極端情況下魏滚,如果這次通知由于網(wǎng)絡(luò)問(wèn)題镀首,沒(méi)有通知到客戶端,那么這次變動(dòng)過(guò)后鼠次,如果一直沒(méi)有服務(wù)變更更哄,客戶端就再也不會(huì)進(jìn)行服務(wù)的拉取,這個(gè)時(shí)候造成不可用的時(shí)間就難以預(yù)估了腥寇。比如:在9:00的時(shí)候成翩,A、B赦役、C麻敌、D、E服務(wù)全部下線掂摔,F(xiàn)术羔、G、H乙漓、I聂示、J上線,Eureka通知客戶端簇秒,但網(wǎng)絡(luò)抖動(dòng)鱼喉,客戶端沒(méi)有收到,或者客戶端收到了趋观,拉取時(shí)候失敗扛禽,并沒(méi)有更新本地的列表,這樣只有等到下次收到通知時(shí)才會(huì)去拉取皱坛,假設(shè)接下來(lái)服務(wù)很穩(wěn)定编曼,12:00的時(shí)候才有一次更新,這樣就有3個(gè)小時(shí)的服務(wù)不可用剩辟。
那么有沒(méi)有什么辦法既可以擁有2個(gè)實(shí)現(xiàn)類的優(yōu)勢(shì)掐场,又摒棄它們的弊端呢往扔?有的!
小孩子才做選擇熊户,我們大人是全都要萍膛!那就是自己寫一個(gè)ServerListUpdater的實(shí)現(xiàn)類,然后:
- 1 啟動(dòng)一個(gè)定時(shí)任務(wù)嚷堡,定時(shí)從注冊(cè)中心拉取蝗罗,而這個(gè)拉取間隔也可以根據(jù)自己的預(yù)估進(jìn)行修改。
- 2 注冊(cè)一個(gè)監(jiān)聽器蝌戒,可以收到注冊(cè)中心的服務(wù)變更通知
- 3 在配置中串塑,把默認(rèn)的 ServerListUpdater改成自己寫的 ServerListUpdater,這樣ribbonConfig類在看到有ServerListUpdater的實(shí)現(xiàn)類的情況下北苟,就不會(huì)加載 Eureka的 ServerListUpdater了桩匪。
實(shí)現(xiàn)類的代碼太長(zhǎng),就不貼了友鼻,邏輯很清晰傻昙,可以參考PollingServerListUpdater 和 EurekaNotificationServerListUpdater,把他們的代碼照著寫就行
接下來(lái)就是配置桃移,在配置類中添加上自己的實(shí)現(xiàn)類
/**
* 服務(wù)更新通知機(jī)制
* @param notificationService
* @param clientConfig
* @return
*/
@Bean
public ServerListUpdater ribbonServerListUpdater(NotificationServiceImpl notificationService, IClientConfig clientConfig) {
return new PollingNotificationServerListUpdater(notificationService, clientConfig);
}
這樣就實(shí)現(xiàn)了輪詢和通知并存的形式。
打完收工葛碧!