背景
筆者接觸到的一個項目里邊有很多微服務(wù)莉测,但受限于項目創(chuàng)立之初時間和團隊技術(shù)條件所限高诺,都是最基本的SpringBoot工程挨措,沒有使用Spring Cloud Netflix或alibaba這類“全家桶”來搭建减牺,所有沒有負載均衡是嗜、服務(wù)注冊解取、服務(wù)發(fā)現(xiàn)步责、服務(wù)配置、熔斷等這些微服務(wù)治理層面的東西禀苦。每個微服務(wù)采用多節(jié)點部署蔓肯,使用nginx或阿里云slb來做負載均衡,大體上是"微服務(wù)A -> nginx/slb -> 微服務(wù)B"這樣一種方式振乏,每當重要線上活動來臨蔗包、服務(wù)的擴容需要人工的方式去配置nginx來增加新節(jié)點的路由代理,開發(fā)運維人員一直這樣初步的做著每個服務(wù)的流量在多節(jié)點進行分攤昆码。除了運維方式比較原始之外气忠,另外,當多節(jié)點中的某個節(jié)點出現(xiàn)故障時赋咽,由于缺乏自動故障轉(zhuǎn)移機制仍然會有一部分請求進入到故障節(jié)點旧噪,對應(yīng)的異常錯誤為前端用戶所感知。更為糟糕的是脓匿,高并發(fā)時間由于節(jié)點故障異常導(dǎo)致的上游服務(wù)(這里上游服務(wù)指的是出現(xiàn)節(jié)點故障的服務(wù)的調(diào)用者)超時請求異常淘钟,通常微服務(wù)之間這類http調(diào)用超時設(shè)置為5-30秒之間,這就導(dǎo)致了上游服務(wù)的QPS性能驟減,請求層層積壓,出現(xiàn)了傳說中的級聯(lián)故障撩穿。
針對以上種種钉答,筆者給這個項目或者說這個公司同樣類型架構(gòu)的一眾項目做架構(gòu)升級方案,考慮到實施成本和開發(fā)團隊的接受程度摧玫,原則是盡量減少架構(gòu)升級的改造工作量,盡量減少團隊的學(xué)習(xí)成本,同時又要解決上面所講的所有痛點慧耍,方案不求高大上,但求使用為主丐谋。
本文是當時思路的一些整理芍碧,時間也有些遠,加之筆者不是對Spring Cloud或netflix Ribbon所有的細節(jié)都了解号俐,所以行文邏輯不是很有條理泌豆,或許還有錯誤的地方。如有不當吏饿,請大家指正踪危。
分析
先定義問題蔬浙,確定問題的邊界,然后再解決問題陨倡。
1敛滋、我們需要的是一個能夠有自動負載路由功能的東西,幫助服務(wù)之間rest調(diào)用進行多節(jié)點路由兴革,路由可以采用輪詢規(guī)則绎晃、這是負載均衡,同時要能及時的發(fā)現(xiàn)被調(diào)服務(wù)中的故障節(jié)點杂曲、并及時的下線節(jié)點使得調(diào)用者能夠快速失敗庶艾、當故障恢復(fù)之后及時上線節(jié)點,這屬于熔斷機制擎勘。
2咱揍、路由配置能過改變現(xiàn)狀的人工去主機配置文件,要有一個統(tǒng)一的服務(wù)路由配置中心棚饵。
3煤裙、盡可能少的引入第三方的組件減少學(xué)習(xí)成本,盡可能少的減少改造工作量噪漾,方案力求使用為主硼砰。
筆者經(jīng)過思考,和研究各種負載均衡方案(有服務(wù)端負載均衡欣硼,和客戶端負載均衡)题翰,想到了一個方案:
如果利用Ribbon做客戶端負載均衡,結(jié)合配置中心比如Apollo或者Spring Cloud Config诈胜,一定程度上也可以實現(xiàn)動態(tài)配置新節(jié)點上線豹障,健康檢查識別故障服務(wù)節(jié)點并下線等功能:
Ribbon本身是可以選擇從本地配置文件中的或者從Eureka拉取的服務(wù)列表中選擇節(jié)點進行路由的,默認的負載均衡規(guī)則是輪詢焦匈,如果選擇引入Eureka血公,那么就是Spring Cloud標準的解決方案了:服務(wù)注冊中心Eureka負責收集各個應(yīng)用啟動的時候主動上報上來的自己的服務(wù)信息,比如服務(wù)名缓熟、地址坞笙、端口等,然后調(diào)用的應(yīng)用就可以通過eureka獲得所謂遠端服務(wù)的服務(wù)列表了荚虚,調(diào)用端的Ribbon做客戶端負載均衡從拉取的服務(wù)列表中選擇可用的服務(wù)按照負載均衡策略選一個調(diào)用。
但這樣一來需要團隊進行Eureka的搭建和維護籍茧,同時需要每一個服務(wù)都引入Eureka相關(guān)依賴并進行代碼和配置版述、使之能夠進行服務(wù)注冊,需要動員人力進行各個服務(wù)的代碼進行改造寞冯。而經(jīng)過了解渴析,團隊目前服務(wù)之間調(diào)用使用的是RestTemplate(實現(xiàn)配置的是Apache HttpClient)晚伙,做的面向接口的開發(fā),代碼類似如下:
@Autowired
private RestTemplate restTemplate;
restTemplate.getForObject(serviceFullUrl, clazz, new Object[0]); //GET調(diào)用后端服務(wù)
restTemplate.postForObject(serviceFullUrl, requestBody, clazz, new Object[0]);//POST調(diào)用
托Spring RestTemplate接口與實現(xiàn)分離的福俭茧,那么如果能僅僅改造RestTemplate的實現(xiàn)層咆疗,使之具備客戶端負載均衡、服務(wù)發(fā)現(xiàn)母债、被調(diào)服務(wù)故障下線午磁、被調(diào)故障恢復(fù)上線等功能那便是極好的。而且由于少了調(diào)用服務(wù)與被調(diào)服務(wù)之間的nginx毡们,也減少了運維人員的工作量迅皇。
解決方案思路
現(xiàn)在就是想辦法要讓Ribbon本地的服務(wù)列表動態(tài)化,Ribbon可以讀取本地Spring文件的服務(wù)列表衙熔,而配置文件是可以通過Config配置中心進行統(tǒng)一管理登颓、并且搭配Spring bus機制可以做到配置發(fā)生變化時主動通知各服務(wù)來實時更新本地的配置,這樣一來就實現(xiàn)了本地服務(wù)列表的動態(tài)化红氯。應(yīng)用可以免去各個服務(wù)主動上報給注冊中心這個動作(改為使用配置中心去配置)框咙。然后節(jié)點健康原來是由Eureka告訴Ribbon的,現(xiàn)在要ribbon主動根據(jù)服務(wù)列表去心跳檢查痢甘。一圖勝千言喇嘱,邏輯架構(gòu)如下所示:
進入正題,下面結(jié)合源代碼分析一下Ribbon的原理产阱,同時說明一下需要對Ribbon源碼進行哪些改造婉称、擴展和封裝,使之滿足我們上面的需求构蹬。
使用的主要開源項目的版本:
spring boot
2.1.13.RELEASE
spring-cloud-netflix-ribbon
2.1.5.RELEASE
spring cloud
Greenwich.SR6
Ribbon主要代碼分析與改造
Ribbon的自動裝配
spring-cloud-netflix-ribbon-2.1.5.RELEASE.jar包里的spring.factories文件:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration
是spring cloud為了繼承ribbon而開發(fā)的自動裝配類王暗,Ribbon自動裝配的Bean都在這。
同時庄敛,
/**
* 所有RibbonClient的默認配置
* author: linyang
* */
@Configuration
public class DefaultRibbonConfig {
private Logger logger = LoggerFactory.getLogger(DefaultRibbonConfig.class);
//在可用服務(wù)中輪詢選擇
@Bean
public IRule ribbonRule() {
logger.info("全局IRule實現(xiàn)為AvailabilityFilteringRule俗壹,實例化...");
return new AvailabilityFilteringRule();
}
//定時健康檢查服務(wù)列表中的應(yīng)用,置可用狀態(tài)
@Bean
public IPing ribbonPing() {
logger.info("全局IPing實現(xiàn)為HealthCheckPing藻烤,實例化...");
return new HealthCheckPing();
}
}
然后
/**
* 統(tǒng)一配置所有ribbon client
* author: linyang
* */
@RibbonClients(defaultConfiguration = DefaultRibbonConfig.class)
public class DefaultRibbonClientsConfig {
public static class BazServiceList extends ConfigurationBasedServerList {
public BazServiceList(IClientConfig config) {
super.initWithNiwsConfig(config);
}
}
}
上面三個類結(jié)合Spring自動裝配绷雏,會進到org.springframework.cloud.netflix.ribbon.RibbonClientConfiguration這個配置類里邊,然后Spring對里邊的Bean進行自動注入:
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
Ok怖亭,我們看到默認的ILoadBalancer是ZoneAwareLoadBalancer
這個實現(xiàn)涎显,它的父類是DynamicServerListLoadBalancer
,再上一級父類是BaseLoadBalancer
兴猩,同時通過源碼可以看到期吓,這三個類實例化的時候都會先super()自己父類的構(gòu)造方法∏阒ィ可以看到讨勤,我們之前提到的Ribbon的“服務(wù)列表”就緩存在BaseLoadBalancer里:
protected volatile List<Server> allServerList;
protected volatile List<Server> upServerList;
protected ReadWriteLock allServerLock;
protected ReadWriteLock upServerLock;
除了上面的3個類之外箭跳,還有兩個定時任務(wù)也很關(guān)鍵:
PollingServerListUpdater 定時更新緩存的server list
PingTask 定時執(zhí)行ping來確定server節(jié)點的狀態(tài)
下面分“Ribbon的啟動流程”和“LoadBalancerClient根據(jù)服務(wù)名自動根據(jù)Rule選擇服務(wù)”兩條線來進行分析:
Ribbon的啟動流程
前文提到ZoneAwareLoadBalancer實例化
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater){
//...
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
構(gòu)造是傳入的參數(shù)ServerListUpdater也是在這個類配置注入的:
@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
return new PollingServerListUpdater(config);
}
然后new ZoneAwareLoadBalancer會調(diào)用父類DynamicServerListLoadBalancer的構(gòu)造方法:
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.isSecure = false;
this.useTunnel = false;
this.serverListUpdateInProgress = new AtomicBoolean(false);
this.updateAction = new NamelessClass_1();
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter)filter).setLoadBalancerStats(this.getLoadBalancerStats());
}
this.restOfInit(clientConfig);
this.setIntValue();
}
其中this.restOfInit(clientConfig)
會調(diào)用this.enableAndInitLearnNewServersFeature()
:
public void enableAndInitLearnNewServersFeature() {
this.serverListUpdater.start(this.updateAction);
}
所以這里執(zhí)行的就是PollingServerListUpdater的start方法了:
public synchronized void start(final UpdateAction updateAction) {
if (this.isActive.compareAndSet(false, true)) {
Runnable wrapperRunnable = new Runnable() {
public void run() {
if (!PollingServerListUpdater.this.isActive.get()) {
if (PollingServerListUpdater.this.scheduledFuture != null) {
PollingServerListUpdater.this.scheduledFuture.cancel(true);
}
} else {
try {
updateAction.doUpdate();
PollingServerListUpdater.this.lastUpdated = System.currentTimeMillis();
} catch (Exception var2) {
PollingServerListUpdater.logger.warn("Failed one update cycle", var2);
}
}
}
};
this.scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(wrapperRunnable, this.initialDelayMs, this.refreshIntervalMs, TimeUnit.MILLISECONDS);
//logger.info("polling定時任務(wù)啟動,首次延遲" + this.initialDelayMs + "ms執(zhí)行潭千,之后每" + this.refreshIntervalMs + "ms執(zhí)行一次");
} else {
logger.info("Already active, no-op");
}
}
其中g(shù)etRefreshExecutor()會得到一個ScheduledThreadPoolExecutor
谱姓,用來定時執(zhí)行wrapperRunnable,Runnable里邊業(yè)務(wù)邏輯主要是執(zhí)行updateAction.doUpdate()
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
public void updateListOfServers() {
...
servers = serverListImpl.getUpdatedListOfServers(); //得到服務(wù)列表
...
updateAllServerList(servers); //更新服務(wù)列表
}
關(guān)于Ribbon如何從遠程獲取配置信息:
從DynamicServerListLoadBalancer里獲取服務(wù)列表配置serverListImpl.getUpdatedListOfServers()開始跟刨晴,最后找到package com.netflix.config.sources , URLConfigurationSource類:
@Override public PollResult poll(boolean initial, Object checkPoint) throws IOException { if (configUrls == null || configUrls.length == 0) { return PollResult.createFull(null); } Map<String, Object> map = new HashMap<String, Object>(); for (URL url: configUrls) { InputStream fin = url.openStream(); Properties props = ConfigurationUtils.loadPropertiesFromInputStream(fin); //對遠程url的流中加載Properties for (Entry<Object, Object> entry: props.entrySet()) { map.put((String) entry.getKey(), entry.getValue()); } } return PollResult.createFull(map); }
package com.netflix.config.util;
ConfigurationUtils:public static Properties loadPropertiesFromInputStream(InputStream fin) throws IOException { Properties props = new Properties(); InputStreamReader reader = new InputStreamReader(fin, "UTF-8"); try { props.load(reader); //加載java Properties return props; } finally { if (reader != null) { reader.close(); } if (fin != null) { fin.close(); } } }
之后屉来,把獲得的服務(wù)列表里的服務(wù)alive全置為true,調(diào)用setServersList(ls)割捅,然后做forceQuickPing:
protected void updateAllServerList(List<T> ls) {
// other threads might be doing this - in which case, we pass
if (serverListUpdateInProgress.compareAndSet(false, true)) {
try {
for (T s : ls) {
s.setAlive(true); // set so that clients can start using these
// servers right away instead
// of having to wait out the ping cycle.
}
setServersList(ls);
super.forceQuickPing();
} finally {
serverListUpdateInProgress.set(false);
}
}
}
調(diào)用父類BaseLoadBalancer的setServersList奶躯,然后setServerListForZones(serversInZones)
@Override
public void setServersList(List lsrv) {
super.setServersList(lsrv); //第一次BaseLoadBalancer.setServersList(lsrv)
//...
setServerListForZones(serversInZones);//有幾個zone執(zhí)行幾次BaseLoadBalancer.setServersList(List lsrv)
}
這里插一句,筆者之前通過jmap觀察JVM堆里發(fā)現(xiàn)居然有兩個BaseLoadBalancer實例亿驾,這讓筆者很疑惑嘹黔,因為按照之前的理解,BaseLoadBalancer應(yīng)該是只有一個實例才對莫瞬,仔細分析源碼發(fā)現(xiàn)了原因:
setServerListForZones這個方法是被override的儡蔓,所以最終會執(zhí)行回ZoneAwareLoadBalancer的setServerListForZones邏輯,在這里邊會通過getLoadBalancer方法完成對成員變量balancers的填充:
loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats()); BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
也就是說疼邀,在父類DynamicServerListLoadBalancer的構(gòu)造方法完成的時候喂江,是會實例化一個new BaseLoadBalancer并且放到balancers里的(上面的setServerListForZones)。但是旁振,當從父類構(gòu)造方法返回获询,子類ZoneAwareLoadBalancer的構(gòu)造方法接下來會去執(zhí)行子類的成員變量的初始化,類似這樣一個關(guān)系:
public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> { private ConcurrentHashMap<String, BaseLoadBalancer> balancers = new ConcurrentHashMap(); //構(gòu)造方法結(jié)束拐袜,成員變量balancers初始化吉嚣,置為new ConcurrentHashMap() public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) { //父類DynamicServerListLoadBalancer的構(gòu)造方法,最后會通過setServerListForZones調(diào)用回子類的setServerListForZones方法蹬铺,然后填充了balancers super(clientConfig, rule, ping, serverList, filter, serverListUpdater); } }
所以盡管在父類里對balancers進行了填充尝哆,但這時候因為子類構(gòu)造方法返回前又相當于執(zhí)行了一次balancers = new ConcurrentHashMap,所以balancers又變空了甜攀。當下一次polling任務(wù)執(zhí)行的時候秋泄,會認為balancer未初始化,所以還會new一個BaseLoadBalancer填充進去规阀。 這就是兩個BaseLoadBalancer的由來恒序。
其實可以認為,第一個BaseLoadBalancer實例這時候已經(jīng)成為了垃圾對象谁撼、失去了引用奸焙,會在下一次ygc的時候被gc回收。
DynamicServerListLoadBalancer.java
//這個方法在初始化和之后的每次polling任務(wù)都會執(zhí)行
public void setServersList(List lsrv) {
super.setServersList(lsrv); //第一次BaseLoadBalancer.setServersList(List lsrv)
setServerListForZones(serversInZones); //有幾個zone執(zhí)行幾次BaseLoadBalancer.setServersList(List lsrv)
}
其次,上述setServerListForZones邏輯new出來的BaseLoadBalancer用的構(gòu)造方法最后ping都是null与帆,如下:
public BaseLoadBalancer(String lbName, IRule rule, LoadBalancerStats lbStats) {
this(lbName, rule, lbStats, (IPing)null);
}
所以執(zhí)行下一次polling任務(wù),BaseLoadBalancer.setServersList(lsrv)的時候會進到canSkipPing()墨榄、把全部server都置成alive玄糟!筆者的解決辦法是把這段邏輯注釋掉了的。
兩個BaseLoadBalancer實例其中一個成為了垃圾對象袄秩,交給gc去處理就好了阵翎。zone對應(yīng)的BaseLoadBalancer執(zhí)行setServersList時因為ping=null會默認把所有server的狀態(tài)置為alive,需要去掉這部分邏輯之剧。
最后郭卫,看下BaseLoadBalancer.setServersList(lsrv)方法,也就是設(shè)置服務(wù)列表的核心方法:
public void setServersList(List lsrv) {
Lock writeLock = allServerLock.writeLock();
logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name);
ArrayList<Server> newServers = new ArrayList<Server>();
writeLock.lock();
try {
ArrayList<Server> allServers = new ArrayList<Server>();
for (Object server : lsrv) {
if (server == null) {
continue;
}
if (server instanceof String) {
server = new Server((String) server);
}
if (server instanceof Server) {
logger.debug("LoadBalancer [{}]: addServer [{}]", name, ((Server) server).getId());
/*
*
allServers里邊新加載的服務(wù)列表原來默認都是alive的背稼,
這里改為如果服務(wù)在當前的allServerList里存在贰军、先以當前的為準;如果不存在則先置為alive蟹肘,我們認為這樣更合理词疼。
這樣當pinger超時阻塞而暫未返回的時候,不會得出錯誤的服務(wù)狀態(tài)
*/
for(Server s : allServerList) {
if(((Server) server).getId().equals(s.getId())) {
((Server) server).setAlive(s.isAlive());
}
}
allServers.add((Server) server);
} else {
throw new IllegalArgumentException(
"Type String or Server expected, instead found:"
+ server.getClass());
}
}
boolean listChanged = false;
if (!allServerList.equals(allServers)) {
listChanged = true;
if (changeListeners != null && changeListeners.size() > 0) {
List<Server> oldList = ImmutableList.copyOf(allServerList);
List<Server> newList = ImmutableList.copyOf(allServers);
for (ServerListChangeListener l: changeListeners) {
try {
l.serverListChanged(oldList, newList);
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e);
}
}
}
}
if (isEnablePrimingConnections()) {
for (Server server : allServers) {
if (!allServerList.contains(server)) {
server.setReadyToServe(false);
newServers.add((Server) server);
}
}
if (primeConnections != null) {
primeConnections.primeConnectionsAsync(newServers, this);
}
}
// This will reset readyToServe flag to true on all servers
// regardless whether
// previous priming connections are success or not
allServerList = allServers; //這時候重置了allServerList的引用
if (canSkipPing()) {
/**
* 這段注釋掉的原因:
* 每個zone對應(yīng)的BaseLoadBalancer的ping=null帘腹,將會把所有server都再置為alive贰盗,
* 與筆者認為的“已經(jīng)存在的server先默認認為與原來狀態(tài)一樣、新server才默認置為alive”的原則相左
*
* */
/*
for (Server s : allServerList) {
s.setAlive(true);
}
upServerList = allServerList;
*/
} else if (listChanged) {
forceQuickPing();
}
logger.debug("服務(wù)列表已更新BaseLoadBalancer.setServersList() 阳欲,allServerList :" + JSON.toJSONString(allServerList));
} finally {
writeLock.unlock();
}
}
LoadBalancerClient根據(jù)服務(wù)名自動根據(jù)Rule選擇服務(wù)
前面花了很多時間分析Ribbon本地內(nèi)存里的服務(wù)列表是怎么更新的舵盈,有了服務(wù)列表那么就可以按照一定的Rule規(guī)則來選擇1個服務(wù)來執(zhí)行調(diào)用了,所以接下來關(guān)鍵是規(guī)則球化。筆者這里使用Ribbon的AvailabilityFilteringRule秽晚,即在當前可用服務(wù)中輪詢選擇,所以關(guān)鍵就在于如何判斷各服務(wù)的可用狀態(tài)Alive赊窥,Ribbon使用IPing
接口來判斷每個服務(wù)節(jié)點的可用狀態(tài)爆惧,其默認實現(xiàn)是使用的DummyPing
,就是每個服務(wù)isAlive都返回true锨能,因為Ribbon一般是搭配Eureka使用的扯再,由后者負責維護各服務(wù)節(jié)點的可用狀態(tài),Ribbon默認認為從Eureka獲得的節(jié)點就都是可用的址遇。所以我們這里要自己實現(xiàn)一個自定義的Ping熄阻,來判斷每個節(jié)點的狀態(tài):
@Configuration
public class DefaultRibbonConfig {
private Logger logger = LoggerFactory.getLogger(DefaultRibbonConfig.class);
public DefaultRibbonConfig() {
}
@Bean
public IRule ribbonRule() {
this.logger.info("全局IRule實現(xiàn)為AvailabilityFilteringRule,實例化...");
return new AvailabilityFilteringRule();
}
@Bean
public IPing ribbonPing() {
this.logger.info("全局IPing實現(xiàn)為HealthCheckPing倔约,實例化...");
return new HealthCheckPing();
}
}
HealthCheckPing:
/**
自定義的Ping秃殉,向各服務(wù)節(jié)點統(tǒng)一的一個健康檢查接口發(fā)送Http HEAD請求來判斷節(jié)點的可用狀態(tài)
*/
public class HealthCheckPing implements IPing {
private Logger logger = LoggerFactory.getLogger(HealthCheckPing.class);
private static CloseableHttpClient httpClient;
public HealthCheckPing() {
}
private CloseableHttpClient getHttpClient() {
if (httpClient == null) {
httpClient = HttpClientBuilder.create().build();
}
return httpClient;
}
public boolean isAlive(Server server) {
boolean isAlive = true;
String url = "http://" + server.getId() + "/" + server.getServiceName() + "/healthcheck/checkAlive";
HttpHead headRequest = new HttpHead(url);
RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(2000).setSocketTimeout(2000).setConnectionRequestTimeout(2000).build();
headRequest.setConfig(requestConfig);
try {
HttpResponse response = this.getHttpClient().execute(headRequest);
isAlive = response.getStatusLine().getStatusCode() == 200;
} catch (Exception var10) {
this.logger.error(var10.getMessage());
isAlive = false;
} finally {
this.logger.info("心跳檢測結(jié)果:節(jié)點" + url + " ,狀態(tài):[" + isAlive + "] ");
headRequest.abort();
}
return isAlive;
}
}
在BaseLoadBalancer實例化的時候會啟動定時任務(wù)PingTask,默認10秒一次:
/**
ShutdownEnabledTimer是個java Timer钾军,Runntime.shutdown的時候會調(diào)用cancel()結(jié)束定時,
而定時執(zhí)行的邏輯就是PingTask的run()方法了
*/
void setupPingTask() {
if (!this.canSkipPing()) {
if (this.lbTimer != null) {
this.lbTimer.cancel();
}
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
this.lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + uuid + "-" + this.name, true);
this.lbTimer.schedule(new BaseLoadBalancer.PingTask(), 0L, (long)(this.pingIntervalSeconds * 1000));
this.forceQuickPing();
}
}
定時執(zhí)行PingTask鳄袍,task里邊邏輯是new Pinger(pingStrategy).runPinger();
public void runPinger() throws Exception {
if (!pingInProgress.compareAndSet(false, true)) {
logger.debug("runPinger pingInProgress " + pingInProgress.get());
return; // Ping in progress - nothing to do
}
// we are "in" - we get to Ping
Server[] allServers = null;
boolean[] results = null;
Lock allLock = null;
Lock upLock = null;
String uuid = UUID.randomUUID().toString().replaceAll("-","");
try {
/*
* The readLock should be free unless an addServer operation is
* going on...
*/
allLock = allServerLock.readLock();
allLock.lock();
allServers = allServerList.toArray(new Server[allServerList.size()]);
allLock.unlock();
int numCandidates = allServers.length;
results = pingerStrategy.pingServers(ping, allServers);
final List<Server> newUpList = new ArrayList<Server>();
final List<Server> changedServers = new ArrayList<Server>();
final Map<String, Server> allServersMap = new HashMap<String, Server>(); //暫存allServers,里邊的Server狀態(tài)是正確的
//根據(jù)ping結(jié)果吏恭,設(shè)置allServers里Server的狀態(tài)
for (int i = 0; i < numCandidates; i++) {
boolean isAlive = results[i];
Server svr = allServers[i];
boolean oldIsAlive = svr.isAlive();
svr.setAlive(isAlive);
allServersMap.put(svr.getId(), svr);
if (oldIsAlive != isAlive) {
changedServers.add(svr); //狀態(tài)發(fā)生變化的
logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}",
name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
}
if (isAlive) {
newUpList.add(svr);
}
}
upLock = upServerLock.writeLock();
upLock.lock();
upServerList = newUpList;
upLock.unlock();
changeServerStatusInAllServerList(allServersMap); //補償邏輯
notifyServerStatusChangeListener(changedServers);
} finally {
pingInProgress.set(false);
}
}
runPinger()的關(guān)鍵代碼是results = pingerStrategy.pingServers(ping, allServers);
里邊就是對每個server都用iPing接口實現(xiàn)的ping邏輯進行驗證一遍拗小,獲得每個server的最新狀態(tài)。
記錄一下本次changedServers都有哪些樱哼。注意哀九,這里會去持有一個真正的allServerList的引用并去更新狀態(tài),upServerList也會去更新搅幅。 allServers = allServerList.toArray(new Server[allServerList.size()]); 所以定時線程PingTask的作用是用iPing里的邏輯驗證服務(wù)器的狀態(tài)阅束,然后更新服務(wù)器列表狀態(tài)。
但是茄唐,坑爹的是ping持有的這個allServerList的引用會“失效”息裸!
ping接口持有的allServerList的引用為什么會失效,原因是PollingServerListUpdater線程每次加載配置的serverlist的時候琢融,默認都是alive的界牡、且用的是allServerList = serverList
這種方式,這樣就算allServerList定義為volatile類型也無濟于事漾抬,畢竟整個引用指向了另一個對象(這時候之前的對象正在被Ping接口持有)宿亡。
而這時候Ping接口里邊的邏輯因為在阻塞超時2秒,沒有執(zhí)行完纳令,所以相當于PIng接口里邊持有的allServerList指向的還是之前的那個對象挽荠。所以這時候Ping接口根據(jù)當前檢查結(jié)果去更新服務(wù)列表也是沒用的了,因為持有的服務(wù)列表是過期的對象平绩。
簡單來說就是polling和ping兩個定時任務(wù)同時去修改服務(wù)列表圈匆、加上polling默認服務(wù)alive、且直接修改allServerList引用捏雌,導(dǎo)致的并發(fā)問題跃赚。(源碼里allServerList有個讀寫鎖,但是對這個場景來說并沒有用)
我的解決辦法是在BaseLoadBalancer里邊在runPinger最后加了補償邏輯性湿,最后再更新一遍allServerList纬傲;
/**
* 在runPinger之后,再修改一次allServerList肤频,防止runPinger的過程中allServerList已經(jīng)被修改了引用
* 這時候runPinger持有的已經(jīng)是過期引用叹括,修改的server狀態(tài)也是沒法更新到真正的allServerList了。
* */
private void changeServerStatusInAllServerList(Map<String, Server> allServersMap) {
Lock allLock = null;
allLock = allServerLock.writeLock();
allLock.lock();
try {
for(Server s : allServerList) {
Server serverWithRightStatus = allServersMap.get(s.getId());
if(null != serverWithRightStatus) {
s.setAlive(serverWithRightStatus.isAlive());
logger.debug("修改了Server {}的狀態(tài)宵荒,最新狀態(tài)為alive={}" , s.getId(), s.isAlive());
}
}
}finally {
allLock.unlock();
}
}
但這樣還不夠汁雷,如果一個節(jié)點down掉净嘀,在下一次ping循環(huán)時,超時2秒之內(nèi)pinger還沒返回的時候侠讯,如果正好這2s的時候發(fā)生了調(diào)用挖藏,本該down的節(jié)點仍然被設(shè)置的是alive,所以關(guān)鍵問題是要改一下polling線程每次“都設(shè)置為alive”的邏輯:回顧一下前文中BaseLoadBalancer的setServersList(List lsrv)
方法中的兩段注釋:
/*
*
allServers里邊新加載的服務(wù)列表原來默認都是alive的厢漩,
這里改為如果服務(wù)在當前的allServerList里存在熬苍、先以當前的為準;
如果不存在則先置為alive袁翁,我們認為這樣更合理。
這樣當pinger超時阻塞而暫未返回的時候婿脸,不會得出錯誤的服務(wù)狀態(tài)
*/
for(Server s : allServerList) {
if(((Server) server).getId().equals(s.getId())) {
((Server) server).setAlive(s.isAlive());
}
}
然后又發(fā)現(xiàn)了新的問題:down的節(jié)點在2秒的時候又被設(shè)置回alive了粱胜!
檢查代碼發(fā)現(xiàn)是第二次setServerList的時候又給改回來了,也就是zone的setServerList的時候狐树;(見上文焙压,setServerList會被掉兩次,后面也會有解釋)
if (canSkipPing()) {
/**
* 這段注釋掉的原因:
* 每個zone對應(yīng)的BaseLoadBalancer的ping=null抑钟,將會把所有server都再置為alive涯曲,
* 與筆者認為的“已經(jīng)存在的server先默認認為與原來狀態(tài)一樣、新server才默認置為alive”的原則相左
* */
/*
for (Server s : allServerList) {
s.setAlive(true);
}
upServerList = allServerList;
*/
} else if (listChanged) {
forceQuickPing();
}
上面的代碼會走到注釋的地方在塔,所以還是zone對應(yīng)的balancer有兩個的原因幻件,其中一個ping為null
暫時注釋掉這段解決了問題。
BaseLoadBalancer的setServersList方法蛔溃,執(zhí)行兩次
PollingServerListUpdater-0或者PollingServerListUpdater-1線程組成個線程池绰沥,負責執(zhí)行30秒一次的定時任務(wù)。這個定時任務(wù)執(zhí)行BaseLoadBalancer
的setServersList方法贺待,也執(zhí)行了兩次徽曲。
原因是在DynamicServerListLoadBalancer
初始化的時候會初始化PollingServerListUpdater
并調(diào)用其start()啟動定時任務(wù),定時執(zhí)行updateAction.doUpdate()麸塞,里邊也就是setServersList(List lsrv)秃臣,它接連調(diào)用了BaseLoadBalancer
的setServersList方法,和一個setServerListForZones方法哪工,而后者的實現(xiàn)是在子類ZoneAwareLoadBalancer
里奥此,getLoadBalancer(zone).setServersList(entry.getValue());就又會調(diào)用一次setServersList方法了。這就是為啥日志里看是PollingServerListUpdater線程執(zhí)行了兩次BaseLoadBalancer的setServersList的邏輯正勒。
這個執(zhí)行兩次得院,感覺是一種業(yè)務(wù)邏輯:服務(wù)列表更新,相當于是先更新當前banancer的章贞,然后更新當前所屬zone的祥绞;也就是說每30秒只執(zhí)行一次polling線程的邏輯非洲,然后setServersList執(zhí)行了兩次。
對com.netflix.loadbalancer.Server類的字段進行擴充蜕径,增加應(yīng)用名serviceName两踏,Ribbon配置服務(wù)列表的時候是IP+Port,我們這次要求加上服務(wù)名兜喻,即ip:port/serviceName
package com.netflix.loadbalancer;
import com.netflix.util.Pair;
/**
* Class that represents a typical Server (or an addressable Node) i.e. a
* Host:port identifier
*
* @author stonse
*
* 在原來的基礎(chǔ)上增加了應(yīng)用名serviceName
* 刷新服務(wù)列表的同時保存應(yīng)用名梦染,給自定義Ping做心跳檢測用
*/
public class Server {
//...
public static final String UNKNOWN_ZONE = "UNKNOWN";
private String host;
private int port = 80;
private String scheme;
private volatile String id;
private volatile boolean isAliveFlag;
private String zone = UNKNOWN_ZONE;
private volatile boolean readyToServe = true;
private String serviceName; //服務(wù)名,與serviceName.ribbon.xxx中的serviceName一樣朴皆,是服務(wù)提供者的實際應(yīng)用名稱
// ...
public void setId(String id) {
Pair<String, Integer> hostPort = getHostPort(id);
if (hostPort != null) {
this.id = hostPort.first() + ":" + hostPort.second();
this.host = hostPort.first();
this.port = hostPort.second();
this.scheme = getScheme(id);
} else {
this.id = null;
}
this.serviceName = parseServiceName(id); //add by liny
}
//獲取服務(wù)名
public String getServiceName() {
return this.serviceName;
}
//解析服務(wù)名
private String parseServiceName(String id) {
if (id != null) {
if (id.toLowerCase().startsWith("http://")) {
id = id.substring(7);
} else if (id.toLowerCase().startsWith("https://")) {
id = id.substring(8);
}
if (id.contains("/")) {
int slash_idx = id.indexOf("/");
id = id.substring(slash_idx);
if(id.length()>1)
return id.substring(1);
else
return "";
}else {
return "";
}
}else {
return null;
}
}
// ...
}
到此帕识,我們了解了定時更新服務(wù)列表與定時Ping來判斷服務(wù)節(jié)點的可用性。接下來看下組件提供的工具類的代碼:
/**
* 創(chuàng)建一個有負載均衡功能的restTemplate遂铡,最終提供出去的工具類
*
* */
public class LbRestTemplate {
private Logger logger = LoggerFactory.getLogger(LbRestTemplate.class);
// @Autowired
private RestTemplate restTemplate;
// @Autowired
private LoadBalancerClient loadBalancer;
public LbRestTemplate(RestTemplate restTemplate, LoadBalancerClient loadBalancer){
this.restTemplate = restTemplate;
this.loadBalancer = loadBalancer;
}
public <T extends Object> T getForBean(String serviceName, String interfaceUrl, Class<T> clazz){
ServiceInstance instance = loadBalancer.choose(serviceName);
String contextPath ;
if(null == instance)
throw new RuntimeException(serviceName + "當前無可用節(jié)點");
URI serviceAddress = instance.getUri();
if(instance instanceof RibbonServer) {
contextPath = ((RibbonServer) instance).getServer().getServiceName();
}else {
contextPath = serviceName;
}
String serviceFullUrl = serviceAddress + "/" + contextPath + interfaceUrl;
logger.debug("serviceFullUrl:{}", serviceFullUrl);
return restTemplate.getForObject(serviceFullUrl, clazz);
}
public <T extends Object> T postForBean(String serviceName, String interfaceUrl, Object requestBody, Class<T> clazz){
ServiceInstance instance = loadBalancer.choose(serviceName);
String contextPath ;
if(null == instance)
throw new RuntimeException(serviceName + "當前無可用節(jié)點");
URI serviceAddress = instance.getUri();
if(instance instanceof RibbonServer) {
contextPath = ((RibbonServer) instance).getServer().getServiceName();
}else {
contextPath = serviceName;
}
String serviceFullUrl = serviceAddress + "/" + contextPath + interfaceUrl;
logger.debug("serviceFullUrl:{}", serviceFullUrl);
return restTemplate.postForObject(serviceFullUrl, requestBody, clazz);
}
}
LbRestTemplate通過Spring SPI機制注入到Spring IOC容器后肮疗,LoadBalancerClient會Ribbon的RibbonAutoConfiguration也通過SPI自動注入:
@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
return new RibbonLoadBalancerClient(springClientFactory());
}
RibbonLoadBalancerClient.choose(serviceName)最終會走到ZoneAwareLoadBalancer以及BaseLoadBalancer的chooseServer方法里,后者掉用this.rule.choose()方法來根據(jù)Rule來選服務(wù)扒接。
總結(jié):
至此伪货,Ribbon的整個流程就串起來了:polling動態(tài)更新服務(wù)列表、服務(wù)狀態(tài)判定ping钾怔、動態(tài)路由根據(jù)rule選取服務(wù)節(jié)點碱呼。
Ribbon一般是配合Eureka使用,節(jié)點健康狀態(tài)從Eureka獲取到本地然后直接取用宗侦、不會發(fā)生阻塞愚臀,我們使用了自定義的Ping根據(jù)心跳檢查來判定對端服務(wù)節(jié)點的可用狀態(tài),當節(jié)點故障時會有超時阻塞凝垛。所以針對阻塞的情況我們對Ribbon源代碼中節(jié)點狀態(tài)邏輯做了一系列的優(yōu)化懊悯。