一. 核心接口
-
ILoadBalancer
Ribbon通過ILoadBalancer接口對外提供統(tǒng)一的選擇服務(wù)器(Server)的功能,此接口會根據(jù)不同的負(fù)載均衡策略(IRule)選擇合適的Server返回給使用者。其核心方法如下:public interface ILoadBalancer { public void addServers(List<Server> newServers); public Server chooseServer(Object key); public void markServerDown(Server server); public List<Server> getReachableServers(); public List<Server> getAllServers(); }
此接口默認(rèn)實(shí)現(xiàn)類為ZoneAwareLoadBalancer,相關(guān)類關(guān)系圖如下:
image -
IRule
IRule是負(fù)載均衡策略的抽象,ILoadBalancer通過調(diào)用IRule的choose()方法返回Server聘裁,其核心方法如下:public interface IRule{ public Server choose(Object key); public void setLoadBalancer(ILoadBalancer lb); public ILoadBalancer getLoadBalancer(); }
實(shí)現(xiàn)類有:
-
BestAviableRule
跳過熔斷的Server,在剩下的Server中選擇并發(fā)請求最低的Server - ClientConfigEnabledRoundRobinRule、RoundRobinRule
輪詢 -
RandomRule
隨機(jī)選擇 -
RetryRule
可重試的策略乱顾,可以對其他策略進(jìn)行重試,默認(rèn)輪詢重試 -
WeightedResponseTimeRule
根據(jù)響應(yīng)時間加權(quán)宫静,響應(yīng)時間越短權(quán)重越大 -
AvailabilityFilteringRule
剔除因為連續(xù)鏈接走净、讀失敗或鏈接超過最大限制導(dǎo)致熔斷的Server券时,在剩下讀Server中進(jìn)行輪詢。
相關(guān)類圖如下:
image -
BestAviableRule
-
IPing
IPing用來檢測Server是否可用伏伯,ILoadBalancer的實(shí)現(xiàn)類維護(hù)一個Timer每隔10s檢測一次Server的可用狀態(tài)橘洞,其核心方法有:public interface IPing { public boolean isAlive(Server server); }
其實(shí)現(xiàn)類有:
image IClientConfig
IClientConfig主要定義了用于初始化各種客戶端和負(fù)載均衡器的配置信息,器實(shí)現(xiàn)類為DefaultClientConfigImpl说搅。
二. 負(fù)載均衡的邏輯實(shí)現(xiàn)
1. Server的選擇
ILoadBalancer接口的主要實(shí)現(xiàn)類為BaseLoadBalancer和ZoneAwareLoadBalancer炸枣,ZoneAwareLoadBalancer為BaseLoadBalancer的子類并且其也重寫了chooseServer方法,ZoneAwareLoadBalancer從其名稱可以看出這個實(shí)現(xiàn)類是和Spring Cloud的分區(qū)有關(guān)的弄唧,當(dāng)分區(qū)的數(shù)量為1(默認(rèn)配置)時它直接調(diào)用父類BaseLoadBalancer的chooseServer()方法适肠,源碼如下:
@Override
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
// 調(diào)用父類BaseLoadBalancer的chooseServer()方法
return super.chooseServer(key);
}
// 略
}
類BaseLoadBalancer的chooseServer()方法直接調(diào)用IRule接口的choose()方法,源碼如下:
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
這里IRule的實(shí)現(xiàn)類為ZoneAvoidanceRule候引,choose()方法的實(shí)現(xiàn)在其父類PredicateBasedRule中侯养,如下:
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
從上面源碼可以看出,其先調(diào)用ILoadBalancer的getAllServers()方法獲取所有Server列表背伴,getAllServers()方法的實(shí)現(xiàn)在BaseLoadBalancer類中沸毁,此類維護(hù)了一個List<Server>類型的屬性allServerList,所有Server都緩存至此集合中傻寂。獲取Server列表后調(diào)用chooseRoundRobinAfterFiltering()方法返回Server對象息尺。chooseRoundRobinAfterFiltering()方法會根據(jù)loadBalancerKey篩選出候選的Server,然后通過輪詢的負(fù)載均衡策略選出Server疾掰,相關(guān)源碼如下:
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextIndex.get();
int next = (current + 1) % modulo;
if (nextIndex.compareAndSet(current, next) && current < modulo)
return current;
}
}
可以看到其輪詢選擇Server的策略為獲取次數(shù)加1然后對Server數(shù)量取余得到搂誉。
2. Server的狀態(tài)檢測
BaseLoadBalancer類的集合allServerList緩存了所有Server信息,但是這些Server的狀態(tài)有可能發(fā)生變化静檬,比如Server不可用了炭懊,Ribbon就需要及時感知到,那么Ribbon是如何感知Server可用不可用的呢拂檩?
BaseLoadBalancer的構(gòu)造函數(shù)中初始化了一個任務(wù)調(diào)度器Timer侮腹,這個調(diào)度器每隔10s執(zhí)行一次PingTask任務(wù),相關(guān)源碼如下:
public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
IPing ping, IPingStrategy pingStrategy) {
this.name = name;
this.ping = ping;
this.pingStrategy = pingStrategy;
setRule(rule);
setupPingTask();
lbStats = stats;
init();
}
void setupPingTask() {
if (canSkipPing()) {
return;
}
if (lbTimer != null) {
lbTimer.cancel();
}
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
true);
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
forceQuickPing();
}
class PingTask extends TimerTask {
public void run() {
try {
new Pinger(pingStrategy).runPinger();
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error pinging", name, e);
}
}
}
深入Pinger和SerialPingStrategy的源碼可知稻励,最終通過NIWSDiscoveryPing這一IPing實(shí)現(xiàn)類判斷Server是否可用父阻,NIWSDiscoveryPing的isAlive()方法通過判斷與Server關(guān)聯(lián)的InstanceInfo的status是否為UP來判斷Server是否可用,其isAlive()方法源碼如下:
public boolean isAlive(Server server) {
boolean isAlive = true;
if (server!=null && server instanceof DiscoveryEnabledServer){
DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;
InstanceInfo instanceInfo = dServer.getInstanceInfo();
if (instanceInfo!=null){
InstanceStatus status = instanceInfo.getStatus();
if (status!=null){
// 其狀態(tài)是否為UP
isAlive = status.equals(InstanceStatus.UP);
}
}
}
return isAlive;
}
三望抽、Ribbon的使用姿勢
1. RestTemplate + @LoadBalanced
-
使用
提供一個標(biāo)記@LoadBalanced的RestTemplate Bean加矛,然后直接使用此Bean發(fā)起請求即可,如下:@Configuration public class Config { @Bean @LoadBalanced RestTemplate restTemplate() { // 提供一個標(biāo)記@LoadBalanced的RestTemplat Bean return new RestTemplate(); } } @RestController public class HelloController { @Resource private RestTemplate restTemplate; @GetMapping("/hi") public String hi() { // 直接使用即可 return restTemplate.getForEntity("http://Eureka-Producer/hello", String.class).getBody(); } }
-
實(shí)現(xiàn)原理
當(dāng)實(shí)例化LoadBalancerAutoConfiguration時煤篙,給所有標(biāo)記了@LoadBalanced的RestTemplate Bean設(shè)置了攔截器LoadBalancerInterceptor斟览,此實(shí)例保存在了RestTemplate的父類InterceptingHttpAccessor的集合List<ClientHttpRequestInterceptor> interceptors中。RestTemplate相關(guān)類圖如下:
image
設(shè)置攔截器LoadBalancerInterceptor源碼如下:@Configuration @ConditionalOnClass(RestTemplate.class) @ConditionalOnBean(LoadBalancerClient.class) @EnableConfigurationProperties(LoadBalancerRetryProperties.class) public class LoadBalancerAutoConfiguration { // 1. 收集到所有標(biāo)記了@LoadBalanced的RestTemplate @LoadBalanced @Autowired(required = false) private List<RestTemplate> restTemplates = Collections.emptyList(); @Bean public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated( final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) { return () -> restTemplateCustomizers.ifAvailable(customizers -> { for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) { for (RestTemplateCustomizer customizer : customizers) { // 3. 對于每一個RestTemplate執(zhí)行customize()方法 customizer.customize(restTemplate); } } }); } @Bean @ConditionalOnMissingBean public LoadBalancerRequestFactory loadBalancerRequestFactory( LoadBalancerClient loadBalancerClient) { return new LoadBalancerRequestFactory(loadBalancerClient, transformers); } @Configuration @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate") static class LoadBalancerInterceptorConfig { @Bean public LoadBalancerInterceptor ribbonInterceptor( LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) { // 2. 注入LoadBalancerInterceptor return new LoadBalancerInterceptor(loadBalancerClient, requestFactory); } @Bean @ConditionalOnMissingBean public RestTemplateCustomizer restTemplateCustomizer( final LoadBalancerInterceptor loadBalancerInterceptor) { return restTemplate -> { // 4. customize()方法給RestTemplate設(shè)置LoadBalancerInterceptor List<ClientHttpRequestInterceptor> list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); }; } } // 略 }
從上面源碼可以看出LoadBalancerInterceptor的構(gòu)造函數(shù)接受兩個參數(shù):LoadBalancerClient和LoadBalancerRequestFactory辑奈,LoadBalancerRequestFactory的實(shí)例在此Configuration中被注入類苛茂,而LoadBalancerClient的實(shí)例卻沒有已烤。那么LoadBalancerClient的實(shí)例是在哪里實(shí)例化的呢?答案是RibbonAutoConfiguration味悄,這個Configuration注入了LoadBalancerClient的實(shí)現(xiàn)類RibbonLoadBalancerClient的實(shí)例和SpringClientFactory的實(shí)例草戈,相關(guān)源碼如下:
@Bean public SpringClientFactory springClientFactory() { SpringClientFactory factory = new SpringClientFactory(); factory.setConfigurations(this.configurations); return factory; } @Bean @ConditionalOnMissingBean(LoadBalancerClient.class) public LoadBalancerClient loadBalancerClient() { return new RibbonLoadBalancerClient(springClientFactory()); }
至此攔截器LoadBalancerInterceptor創(chuàng)建完成并且保存在了RestTemplate的集合屬性中,那么RestTemplate是如何利用此攔截器的呢侍瑟?當(dāng)我們使用RestTemplate發(fā)起請求時最終會調(diào)用到RestTemplate的doExecute()方法唐片,此方法會創(chuàng)建ClientHttpRequest對象并調(diào)用其execute()方法發(fā)起請求,源碼如下:
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback, @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException { ClientHttpResponse response = null; try { // 1. 創(chuàng)建ClientHttpRequest涨颜。 ClientHttpRequest request = createRequest(url, method); if (requestCallback != null) { requestCallback.doWithRequest(request); } // 2. 執(zhí)行其execute()方法獲取結(jié)果费韭。 response = request.execute(); handleResponse(url, method, response); return (responseExtractor != null ? responseExtractor.extractData(response) : null); } catch (IOException ex) { String resource = url.toString(); String query = url.getRawQuery(); resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource); throw new ResourceAccessException("I/O error on " + method.name() + " request for \"" + resource + "\": " + ex.getMessage(), ex); } finally { if (response != null) { response.close(); } } } protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException { ClientHttpRequest request = getRequestFactory().createRequest(url, method); if (logger.isDebugEnabled()) { logger.debug("HTTP " + method.name() + " " + url); } return request; } @Override public ClientHttpRequestFactory getRequestFactory() { List<ClientHttpRequestInterceptor> interceptors = getInterceptors(); if (!CollectionUtils.isEmpty(interceptors)) { ClientHttpRequestFactory factory = this.interceptingRequestFactory; if (factory == null) { factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors); this.interceptingRequestFactory = factory; } return factory; } else { return super.getRequestFactory(); } }
從上面的getRequestFactory()方法可以看到當(dāng)集合interceptors不為空的時候ClientHttpRequest對象是由類InterceptingClientHttpRequestFactory的createRequest()方法創(chuàng)建出來的,并且集合interceptors作為參數(shù)傳遞到了InterceptingClientHttpRequestFactory中庭瑰,深入InterceptingClientHttpRequestFactory的createRequest()方法星持,如下:
public class InterceptingClientHttpRequestFactory extends AbstractClientHttpRequestFactoryWrapper { private final List<ClientHttpRequestInterceptor> interceptors; public InterceptingClientHttpRequestFactory(ClientHttpRequestFactory requestFactory, @Nullable List<ClientHttpRequestInterceptor> interceptors) { super(requestFactory); this.interceptors = (interceptors != null ? interceptors : Collections.emptyList()); } @Override protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) { // 直接返回InterceptingClientHttpRequest對象。 return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod); } }
可以看到攔截器最終傳遞到了InterceptingClientHttpRequest中弹灭,上面說了RestTemplate的doExecute()方法創(chuàng)建了InterceptingClientHttpRequest對象且調(diào)用了其execute()方法獲取響應(yīng)結(jié)果督暂,深入其execute()方法發(fā)現(xiàn)在execute()中直接調(diào)用了攔截器的intercept()方法,也即InterceptingClientHttpRequest的intercept()方法穷吮,源碼如下:
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException { if (this.iterator.hasNext()) { ClientHttpRequestInterceptor nextInterceptor = this.iterator.next(); // 這里調(diào)用InterceptingClientHttpRequest的intercept()方法 return nextInterceptor.intercept(request, body, this); } // 略 }
也就是說RestTemplate的請求最終是委托給InterceptingClientHttpRequest來處理逻翁。那么InterceptingClientHttpRequest是如何利用Ribbon相關(guān)接口處理請求的呢?且看InterceptingClientHttpRequest的intercept()方法:
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor { private LoadBalancerClient loadBalancer; private LoadBalancerRequestFactory requestFactory; public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) { this.loadBalancer = loadBalancer; this.requestFactory = requestFactory; } public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) { // for backwards compatibility this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer)); } @Override public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { final URI originalUri = request.getURI(); String serviceName = originalUri.getHost(); // 直接調(diào)用LoadBalancerClient的execute()方法捡鱼。 return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution)); } }
可以看到InterceptingClientHttpRequest的intercept()方法直接調(diào)用LoadBalancerClient的execute()方法八回,LoadBalancerClient是一個接口,這里其實(shí)現(xiàn)類為RibbonLoadBalancerClient驾诈,上面創(chuàng)建InterceptingClientHttpRequest時提到LoadBalancerAutoConfiguration注入了RibbonLoadBalancerClient Bean缠诅,此Bean通過構(gòu)造函數(shù)保存在了LoadBalancerClient中。那么接下來就是LoadBalancerClient的execute()方法了乍迄,類是LoadBalancerClient非常有意思管引,先看下其類圖:
imageLoadBalancerClient的execute()方法首先會通過調(diào)用SpringClientFactory的getLoadBalancer()方法獲取ILoadBalancer,那么此方法是如何返回ILoadBalancer呢闯两?很簡單褥伴,就是從Spring上下文中獲取,那么Spring上下文中的ILoadBalancer時何時注入的呢生蚁?答案是RibbonClientConfiguration噩翠,此Configuration向Spring上下文注入了以下Bean:
- ILoadBalancer的實(shí)現(xiàn)類ZoneAwareLoadBalancer戏自。
- IRule的實(shí)現(xiàn)類ZoneAvoidanceRule邦投。
- IClientConfig的實(shí)現(xiàn)類DefaultClientConfigImpl。
另外EurekaRibbonClientConfiguration還注入了:
- ServerList的實(shí)現(xiàn)類DomainExtractingServerList和DiscoveryEnabledNIWSServerList擅笔。
- IPing的實(shí)現(xiàn)類NIWSDiscoveryPing志衣。
源碼如下:
@Bean @ConditionalOnMissingBean public IClientConfig ribbonClientConfig() { DefaultClientConfigImpl config = new DefaultClientConfigImpl(); config.loadProperties(this.name); config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT); config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT); config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD); return config; } @Bean @ConditionalOnMissingBean public IRule ribbonRule(IClientConfig config) { if (this.propertiesFactory.isSet(IRule.class, name)) { return this.propertiesFactory.get(IRule.class, config, name); } ZoneAvoidanceRule rule = new ZoneAvoidanceRule(); rule.initWithNiwsConfig(config); return rule; } @Bean @ConditionalOnMissingBean public ServerList<Server> ribbonServerList(IClientConfig config) { if (this.propertiesFactory.isSet(ServerList.class, name)) { return this.propertiesFactory.get(ServerList.class, config, name); } ConfigurationBasedServerList serverList = new ConfigurationBasedServerList(); serverList.initWithNiwsConfig(config); return serverList; } @Bean @ConditionalOnMissingBean public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) { if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) { return this.propertiesFactory.get(ILoadBalancer.class, config, name); } return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList, serverListFilter, serverListUpdater); } @Bean @ConditionalOnMissingBean public IPing ribbonPing(IClientConfig config) { if (this.propertiesFactory.isSet(IPing.class, serviceId)) { return this.propertiesFactory.get(IPing.class, config, serviceId); } NIWSDiscoveryPing ping = new NIWSDiscoveryPing(); ping.initWithNiwsConfig(config); return ping; } @Bean @ConditionalOnMissingBean public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) { if (this.propertiesFactory.isSet(ServerList.class, serviceId)) { return this.propertiesFactory.get(ServerList.class, config, serviceId); } DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList( config, eurekaClientProvider); DomainExtractingServerList serverList = new DomainExtractingServerList( discoveryServerList, config, this.approximateZoneFromHostname); return serverList; }
ZoneAwareLoadBalancer的構(gòu)造函數(shù)通過調(diào)用DiscoveryEnabledNIWSServerList的getUpdatedListOfServers()方法獲取Server集合屯援,DiscoveryEnabledNIWSServerList維護(hù)了一個Provider<EurekaClient>類型的屬性eurekaClientProvider,eurekaClientProvider緩存了EurekaClient的實(shí)現(xiàn)類CloudEurekaClient的實(shí)例念脯,getUpdatedListOfServers()方法通過調(diào)用CloudEurekaClient的getInstancesByVipAddress()方法從Eureka Client緩存中獲取應(yīng)用對應(yīng)的所有InstanceInfo列表狞洋。源碼如下:
// 緩存了EurekaClient的實(shí)現(xiàn)類CloudEurekaClient的實(shí)例 private final Provider<EurekaClient> eurekaClientProvider; @Override public List<DiscoveryEnabledServer> getUpdatedListOfServers(){ return obtainServersViaDiscovery(); } private List<DiscoveryEnabledServer> obtainServersViaDiscovery() { List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>(); if (eurekaClientProvider == null || eurekaClientProvider.get() == null) { logger.warn("EurekaClient has not been initialized yet, returning an empty list"); return new ArrayList<DiscoveryEnabledServer>(); } EurekaClient eurekaClient = eurekaClientProvider.get(); if (vipAddresses!=null){ for (String vipAddress : vipAddresses.split(",")) { // if targetRegion is null, it will be interpreted as the same region of client List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion); for (InstanceInfo ii : listOfInstanceInfo) { if (ii.getStatus().equals(InstanceStatus.UP)) { if(shouldUseOverridePort){ if(logger.isDebugEnabled()){ logger.debug("Overriding port on client name: " + clientName + " to " + overridePort); } InstanceInfo copy = new InstanceInfo(ii); if(isSecure){ ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build(); }else{ ii = new InstanceInfo.Builder(copy).setPort(overridePort).build(); } } DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr); serverList.add(des); } } if (serverList.size()>0 && prioritizeVipAddressBasedServers){ break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers } } } return serverList; }
LoadBalancerClient的execute()方法在通過調(diào)用SpringClientFactory的getLoadBalancer()方法獲取ILoadBalancer后調(diào)用其chooseServer()返回一個Server對象,如下:
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException { // 1. 獲取ILoadBalancer ILoadBalancer loadBalancer = getLoadBalancer(serviceId); // 2. 通過ILoadBalancer選擇一個Server Server server = getServer(loadBalancer, hint); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); // 3. 對Server發(fā)起請求 return execute(serviceId, ribbonServer, request); } protected Server getServer(ILoadBalancer loadBalancer, Object hint) { if (loadBalancer == null) { return null; } // Use 'default' on a null hint, or just pass it on? return loadBalancer.chooseServer(hint != null ? hint : "default"); }
ZoneAwareLoadBalancer的chooseServer()方法會通過調(diào)用ZoneAvoidanceRule的choose()方法返回一個Server绿店,ZoneAvoidanceRule繼承類ClientConfigEnabledRoundRobinRule吉懊,所以其會根據(jù)ZoneAwareLoadBalancer獲取的Server列表采用輪詢的負(fù)載均衡策略選擇一個Server返回;最后根據(jù)此Server的地址等向其發(fā)起請求假勿。
相關(guān)類圖如下:
image
2. Feign接口
相對于RestTemplate+@Loadbalance的方式借嗽,我們在使用Spring Cloud的時候使用更多的是Feign接口,因為Feign接口使用起來會更加簡單转培,下面就是一個使用Feign接口調(diào)用服務(wù)的例子:
// 定義Feign接口
@FeignClient(value = "Eureka-Producer", fallbackFactory = HelloClientFallbackFactory.class)
public interface HelloClient {
@GetMapping("/hello")
String hello();
}
// 訂單熔斷快速失敗回調(diào)
@Component
public class HelloClientFallbackFactory implements FallbackFactory<HelloClient>, HelloClient {
@Override
public HelloClient create(Throwable throwable) {
return this;
}
@Override
public String hello() {
return "熔斷";
}
}
// 使用
@RestController
public class HelloController {
@Resource
private HelloClient helloClient;
@GetMapping("/hello")
public String hello() {
return helloClient.hello();
}
}
與RestTemplate的通過RibbonLoadBalancerClient獲取Server并執(zhí)行請求類似恶导,F(xiàn)eign接口通過LoadBalancerFeignClient獲取Server并執(zhí)行請求。DefaultFeignLoadBalancedConfiguration會注入LoadBalancerFeignClient Bean浸须,源碼如下:
@Configuration
class DefaultFeignLoadBalancedConfiguration {
@Bean
@ConditionalOnMissingBean
public Client feignClient(CachingSpringLoadBalancerFactory cachingFactory,
SpringClientFactory clientFactory) {
return new LoadBalancerFeignClient(new Client.Default(null, null),
cachingFactory, clientFactory);
}
}
那么Feign接口是如何通過LoadBalancerFeignClient實(shí)現(xiàn)負(fù)載均衡調(diào)用的呢惨寿?在《Feign源碼解析》一文中介紹到Feign接口的代理實(shí)現(xiàn)類由FeignClientFactoryBean負(fù)責(zé)生成,F(xiàn)eignClientFactoryBean實(shí)現(xiàn)了FactoryBean删窒,所以其getObject()方法會返回Feign接口的代理實(shí)現(xiàn)裂垦,getObject()方法會從Spring上下文中獲取到LoadBalancerFeignClient,源碼如下:
@Override
public Object getObject() throws Exception {
return getTarget();
}
<T> T getTarget() {
FeignContext context = applicationContext.getBean(FeignContext.class);
Feign.Builder builder = feign(context);
if (!StringUtils.hasText(this.url)) {
if (!this.name.startsWith("http")) {
url = "http://" + this.name;
}
else {
url = this.name;
}
url += cleanPath();
return (T) loadBalance(builder, context, new HardCodedTarget<>(this.type,
this.name, url));
}
if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {
this.url = "http://" + this.url;
}
String url = this.url + cleanPath();
// 從Spring上下文中獲取LoadBalancerFeignClient
Client client = getOptional(context, Client.class);
if (client != null) {
if (client instanceof LoadBalancerFeignClient) {
// not load balancing because we have a url,
// but ribbon is on the classpath, so unwrap
client = ((LoadBalancerFeignClient)client).getDelegate();
}
builder.client(client);
}
Targeter targeter = get(context, Targeter.class);
return (T) targeter.target(this, builder, context, new HardCodedTarget<>(
this.type, this.name, url));
}
LoadBalancerFeignClient對外提供服務(wù)的接口是execute()方法易稠,那么此方法是何時被Feign接口調(diào)用的呢缸废?從《Feign源碼解析》一文中可知SynchronousMethodHandler作為MethodHandler的實(shí)現(xiàn)在調(diào)用Feign接口時進(jìn)行攔截并執(zhí)行其invoke()方法,invoke()方法則調(diào)用了LoadBalancerFeignClient的execute()方法發(fā)起網(wǎng)絡(luò)請求驶社,相關(guān)源碼如下:
@Override
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = buildTemplateFromArgs.create(argv);
Retryer retryer = this.retryer.clone();
while (true) {
try {
return executeAndDecode(template);
} catch (RetryableException e) {
// 略
continue;
}
}
}
Object executeAndDecode(RequestTemplate template) throws Throwable {
Request request = targetRequest(template);
if (logLevel != Logger.Level.NONE) {
logger.logRequest(metadata.configKey(), logLevel, request);
}
Response response;
long start = System.nanoTime();
try {
// 調(diào)用LoadBalancerFeignClient的execute()方法獲取響應(yīng)企量。
response = client.execute(request, options);
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
}
throw errorExecuting(request, e);
}
// 略
}
那么LoadBalancerFeignClient的execute()方法又是如何利用Ribbon做負(fù)載均衡的呢?其通過調(diào)用CachingSpringLoadBalancerFactory的create()方法獲取FeignLoadBalancer對象亡电,F(xiàn)eignLoadBalancer對象持有一個ILoadBalancer的對象實(shí)例届巩,此ILoadBalancer對象實(shí)例是CachingSpringLoadBalancerFactory通過調(diào)用SpringClientFactory的getLoadBalancer()方法從Spring上下文中獲取的,源碼如下:
public FeignLoadBalancer create(String clientName) {
FeignLoadBalancer client = this.cache.get(clientName);
if(client != null) {
return client;
}
IClientConfig config = this.factory.getClientConfig(clientName);
ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
ServerIntrospector serverIntrospector = this.factory.getInstance(clientName, ServerIntrospector.class);
client = loadBalancedRetryFactory != null ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
loadBalancedRetryFactory) : new FeignLoadBalancer(lb, config, serverIntrospector);
this.cache.put(clientName, client);
return client;
}
創(chuàng)建完FeignLoadBalancer后緊接著接著調(diào)用了FeignLoadBalancer的executeWithLoadBalancer()方法份乒,如下:
@Override
public Response execute(Request request, Request.Options options) throws IOException {
URI asUri = URI.create(request.url());
String clientName = asUri.getHost();
URI uriWithoutHost = cleanUrl(request.url(), clientName);
FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
this.delegate, request, uriWithoutHost);
IClientConfig requestConfig = getClientConfig(options, clientName);
// 執(zhí)行FeignLoadBalancer的executeWithLoadBalancer()方法恕汇。
return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
requestConfig).toResponse();
// 略
}
// 創(chuàng)建FeignLoadBalancer對象并返回
private FeignLoadBalancer lbClient(String clientName) {
return this.lbClientFactory.create(clientName);
}
executeWithLoadBalancer()方法的具體實(shí)現(xiàn)在類FeignLoadBalancer的父類AbstractLoadBalancerAwareClient中,如下:
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
// 略
}
}
executeWithLoadBalancer()方法創(chuàng)建了LoadBalancerCommand對象并且向提交(submit()方法)了一個ServerOperation對象或辖,跟蹤LoadBalancerCommand的submit()方法發(fā)現(xiàn)其調(diào)用了selectServer()方法獲取Server瘾英,而selectServer()方法則委托給了FeignLoadBalancer的父類LoadBalancerContext的getServerFromLoadBalancer()方法獲取Server,如下:
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
public Observable<T> submit(final ServerOperation<T> operation) {
final ExecutionInfoContext context = new ExecutionInfoContext();
// 略
// 這里當(dāng)server為null時調(diào)用selectServer()獲取Server颂暇。
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
@Override
// Called for each server being selected
public Observable<T> call(Server server) {
context.setServer(server);
final ServerStats stats = loadBalancerContext.getServerStats(server);
// Called for each attempt and retry
Observable<T> o = Observable
.just(server)
.concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(final Server server) {
// 略
}
});
// 略
}
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
// 調(diào)用LoadBalancerContext的getServerFromLoadBalancer()獲取Server
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
FeignLoadBalancer和LoadBalancerCommand互相依賴缺谴、彼此調(diào)用,最終FeignLoadBalancer的父類LoadBalancerContext的getServerFromLoadBalancer()方法返回了Server耳鸯,此方法通過調(diào)用其持有的ILoadBalancer對象的chooseServer()方法獲取Server湿蛔,源碼如下:
public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
String host = null;
int port = -1;
if (original != null) {
host = original.getHost();
}
if (original != null) {
Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);
port = schemeAndPort.second();
}
// 獲取ILoadBalancer
ILoadBalancer lb = getLoadBalancer();
// 調(diào)用ILoadBalancer的chooseServer()方法獲取Server膀曾。
Server svc = lb.chooseServer(loadBalancerKey);
if (svc == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Load balancer does not have available server for client: "
+ clientName);
}
host = svc.getHost();
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Invalid Server for :" + svc);
}
logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
return svc;
}
至此終于看到了通過ILoadBalancer獲取Server的代碼了,相關(guān)類圖如下:
四阳啥、總結(jié)
Ribbon通過ILoadBalancer接口提供負(fù)載均衡服務(wù)添谊,其實(shí)現(xiàn)原理為:
- ILoadBalancer依賴ServerList通過DiscoveryClient從Eureka Client處獲取Server列表并緩存這些Server列表。
- IPing接口定時對ILoadBalancer緩存的Server列表進(jìn)行檢測察迟,判斷其是否可用斩狱。
- IRule接口是負(fù)載均衡策略的抽象,ILoadBalancer通過IRule選出一個Server扎瓶。
當(dāng)使用RestTemplate+@LoadBalanced的方式進(jìn)行服務(wù)調(diào)用時喊废,LoadBalancerInterceptor和RibbonLoadBalancerClient作為橋梁結(jié)合Ribbon提供負(fù)載均衡服務(wù)。
當(dāng)使用Feign接口調(diào)用服務(wù)時栗弟,LoadBalancerFeignClient和FeignLoadBalancer作為調(diào)用Ribbon的入口為Feign接口提供負(fù)載均衡服務(wù)污筷。
不管使用那種姿勢,最終都會通過Ribbon的ILoadBalancer接口實(shí)現(xiàn)負(fù)載均衡乍赫。
最后放兩個彩蛋
-
Ribbon相關(guān)Configuration以及注入的Bean:
-
RibbonAutoConfiguration
- 注入了 LoadBalancerClient的實(shí)現(xiàn)類RibbonLoadBalancerClient瓣蛀。
- 注入了SpringClientFactory。
-
LoadBalancerAutoConfiguration
- 注入了LoadBalancerInterceptor雷厂。
- 給RestTemplate設(shè)置LoadBalancerInterceptor惋增。
-
RibbonClientConfiguration
- 注入了ILoadBalancer的實(shí)現(xiàn)類ZoneAwareLoadBalancer。
- 注入了IRule的實(shí)現(xiàn)類ZoneAvoidanceRule改鲫。
- 注入了IClientConfig的實(shí)現(xiàn)類DefaultClientConfigImpl诈皿。
-
EurekaRibbonClientConfiguration
- 注入了IPing的實(shí)現(xiàn)類NIWSDiscoveryPing。
- 注入了ServerList的實(shí)現(xiàn)類DiscoveryEnabledNIWSServerList像棘。
-
Ribbon相關(guān)類關(guān)系圖: